greenplumn CPhysicalAgg 源码
greenplumn CPhysicalAgg 代码
文件路径:/src/backend/gporca/libgpopt/src/operators/CPhysicalAgg.cpp
//---------------------------------------------------------------------------
// Greenplum Database
// Copyright (C) 2011 EMC Corp.
//
// @filename:
// CPhysicalAgg.cpp
//
// @doc:
// Implementation of basic aggregate operator
//---------------------------------------------------------------------------
#include "gpopt/operators/CPhysicalAgg.h"
#include "gpos/base.h"
#include "gpopt/base/CDistributionSpecAny.h"
#include "gpopt/base/CDistributionSpecHashed.h"
#include "gpopt/base/CDistributionSpecRandom.h"
#include "gpopt/base/CDistributionSpecReplicated.h"
#include "gpopt/base/CDistributionSpecSingleton.h"
#include "gpopt/base/CDistributionSpecStrictSingleton.h"
#include "gpopt/base/CUtils.h"
#include "gpopt/operators/CExpressionHandle.h"
#include "gpopt/xforms/CXformUtils.h"
using namespace gpopt;
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::CPhysicalAgg
//
// @doc:
// Ctor
//
//---------------------------------------------------------------------------
CPhysicalAgg::CPhysicalAgg(
CMemoryPool *mp, CColRefArray *colref_array,
CColRefArray *pdrgpcrMinimal, // minimal grouping columns based on FD's
COperator::EGbAggType egbaggtype, BOOL fGeneratesDuplicates,
CColRefArray *pdrgpcrArgDQA, BOOL fMultiStage, BOOL isAggFromSplitDQA,
CLogicalGbAgg::EAggStage aggStage, BOOL should_enforce_distribution)
: CPhysical(mp),
m_pdrgpcr(colref_array),
m_egbaggtype(egbaggtype),
m_isAggFromSplitDQA(isAggFromSplitDQA),
m_aggStage(aggStage),
m_pdrgpcrMinimal(nullptr),
m_fGeneratesDuplicates(fGeneratesDuplicates),
m_pdrgpcrArgDQA(pdrgpcrArgDQA),
m_fMultiStage(fMultiStage),
m_should_enforce_distribution(should_enforce_distribution)
{
GPOS_ASSERT(nullptr != colref_array);
GPOS_ASSERT(COperator::EgbaggtypeSentinel > egbaggtype);
GPOS_ASSERT_IMP(EgbaggtypeGlobal != egbaggtype, fMultiStage);
ULONG ulDistrReqs = 1;
if (pdrgpcrMinimal == nullptr || 0 == pdrgpcrMinimal->Size())
{
colref_array->AddRef();
m_pdrgpcrMinimal = colref_array;
}
else
{
pdrgpcrMinimal->AddRef();
m_pdrgpcrMinimal = pdrgpcrMinimal;
}
if (COperator::EgbaggtypeLocal == egbaggtype)
{
// If the local aggregate has no distinct columns we generate
// two optimization requests for its children:
// (1) Any distribution requirement
//
// (2) Random distribution requirement; this is needed to alleviate
// possible data skew
ulDistrReqs = 2;
if (pdrgpcrArgDQA != nullptr && 0 != pdrgpcrArgDQA->Size())
{
// If the local aggregate has distinct columns we generate
// two optimization requests for its children:
// (1) hash distribution on the distinct columns only
// (2) hash distribution on the grouping and distinct
// columns (only if the grouping columns are not empty)
if (0 == m_pdrgpcr->Size())
{
ulDistrReqs = 1;
}
}
}
else if (COperator::EgbaggtypeIntermediate == egbaggtype)
{
GPOS_ASSERT(nullptr != pdrgpcrArgDQA);
GPOS_ASSERT(pdrgpcrArgDQA->Size() <= colref_array->Size());
// Intermediate Agg generates two optimization requests for its children:
// (1) Hash distribution on the group by columns + distinct column
// (2) Hash distribution on the group by columns
ulDistrReqs = 2;
if (pdrgpcrArgDQA->Size() == colref_array->Size() ||
GPOS_FTRACE(EopttraceForceAggSkewAvoidance))
{
// scalar aggregates so we only request the first case
ulDistrReqs = 1;
}
}
else if (COperator::EgbaggtypeGlobal == egbaggtype)
{
// Global Agg generates two optimization requests for its children:
// (1) Singleton distribution, if child has volatile functions
// (2) Hash distribution on the group by columns
ulDistrReqs = 2;
}
// Split DQA generates a 2-stage aggregate to handle the case where
// hash aggregate has a distinct agg func. Here we need to be careful
// not to prohibit distribution property enforcement.
m_should_enforce_distribution &= !(
isAggFromSplitDQA && aggStage == CLogicalGbAgg::EasTwoStageScalarDQA &&
colref_array->Size() > 0);
SetDistrRequests(ulDistrReqs);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::~CPhysicalAgg
//
// @doc:
// Dtor
//
//---------------------------------------------------------------------------
CPhysicalAgg::~CPhysicalAgg()
{
m_pdrgpcr->Release();
m_pdrgpcrMinimal->Release();
CRefCount::SafeRelease(m_pdrgpcrArgDQA);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PcrsRequired
//
// @doc:
// Compute required columns of the n-th child;
// we only compute required columns for the relational child;
//
//---------------------------------------------------------------------------
CColRefSet *
CPhysicalAgg::PcrsRequired(CMemoryPool *mp, CExpressionHandle &exprhdl,
CColRefSet *pcrsRequired, ULONG child_index,
CDrvdPropArray *, // pdrgpdpCtxt
ULONG // ulOptReq
)
{
return PcrsRequiredAgg(mp, exprhdl, pcrsRequired, child_index, m_pdrgpcr);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PcrsRequiredAgg
//
// @doc:
// Compute required columns of the n-th child;
// we only compute required columns for the relational child;
//
//---------------------------------------------------------------------------
CColRefSet *
CPhysicalAgg::PcrsRequiredAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
CColRefSet *pcrsRequired, ULONG child_index,
CColRefArray *pdrgpcrGrp)
{
GPOS_ASSERT(nullptr != pdrgpcrGrp);
GPOS_ASSERT(
0 == child_index &&
"Required properties can only be computed on the relational child");
CColRefSet *pcrs = GPOS_NEW(mp) CColRefSet(mp);
// include grouping columns
pcrs->Include(pdrgpcrGrp);
pcrs->Union(pcrsRequired);
CColRefSet *pcrsOutput =
PcrsChildReqd(mp, exprhdl, pcrs, child_index, 1 /*ulScalarIndex*/);
pcrs->Release();
return pcrsOutput;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PdsRequired
//
// @doc:
// Compute required distribution of the n-th child
//
//---------------------------------------------------------------------------
CDistributionSpec *
CPhysicalAgg::PdsRequiredAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
CDistributionSpec *pdsInput, ULONG child_index,
ULONG ulOptReq, CColRefArray *pdrgpcgGrp,
CColRefArray *pdrgpcrGrpMinimal) const
{
GPOS_ASSERT(0 == child_index);
if (FGlobal())
{
return PdsRequiredGlobalAgg(mp, exprhdl, pdsInput, child_index,
pdrgpcgGrp, pdrgpcrGrpMinimal, ulOptReq);
}
if (COperator::EgbaggtypeIntermediate == m_egbaggtype)
{
return PdsRequiredIntermediateAgg(mp, ulOptReq);
}
// if expression has to execute on a single host then we need a gather
if (exprhdl.NeedsSingletonExecution())
{
return PdsRequireSingleton(mp, exprhdl, pdsInput, child_index);
}
if (COperator::EgbaggtypeLocal == m_egbaggtype &&
m_pdrgpcrArgDQA != nullptr && 0 != m_pdrgpcrArgDQA->Size())
{
if (ulOptReq == 0)
{
return PdsMaximalHashed(mp, m_pdrgpcrArgDQA);
}
else
{
GPOS_ASSERT(1 == ulOptReq);
GPOS_ASSERT(0 < m_pdrgpcr->Size());
CColRefArray *grpAndDistinctCols = GPOS_NEW(mp) CColRefArray(mp);
grpAndDistinctCols->AppendArray(m_pdrgpcr);
grpAndDistinctCols->AppendArray(m_pdrgpcrArgDQA);
CDistributionSpec *pdsSpec =
PdsMaximalHashed(mp, grpAndDistinctCols);
grpAndDistinctCols->Release();
return pdsSpec;
}
}
GPOS_ASSERT(0 == ulOptReq || 1 == ulOptReq);
if (0 == ulOptReq)
{
return GPOS_NEW(mp) CDistributionSpecAny(this->Eopid());
}
// we randomly distribute the input for skew-elimination
return GPOS_NEW(mp) CDistributionSpecRandom();
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PdsMaximalHashed
//
// @doc:
// Compute a maximal hashed distribution using the given columns,
// if no such distribution can be created, return a Singleton distribution
//
//---------------------------------------------------------------------------
CDistributionSpec *
CPhysicalAgg::PdsMaximalHashed(CMemoryPool *mp, CColRefArray *colref_array)
{
GPOS_ASSERT(nullptr != colref_array);
CDistributionSpecHashed *pdshashedMaximal =
CDistributionSpecHashed::PdshashedMaximal(
mp, colref_array, true /*fNullsColocated*/
);
if (nullptr != pdshashedMaximal)
{
return pdshashedMaximal;
}
// otherwise, require a singleton explicitly
return GPOS_NEW(mp) CDistributionSpecSingleton();
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PdsRequiredGlobalAgg
//
// @doc:
// Compute required distribution of the n-th child of a global agg
//
//---------------------------------------------------------------------------
CDistributionSpec *
CPhysicalAgg::PdsRequiredGlobalAgg(CMemoryPool *mp, CExpressionHandle &exprhdl,
CDistributionSpec *pdsInput,
ULONG child_index, CColRefArray *pdrgpcrGrp,
CColRefArray *pdrgpcrGrpMinimal,
ULONG ulOptReq) const
{
GPOS_ASSERT(FGlobal());
GPOS_ASSERT(2 > ulOptReq);
// TODO: - Mar 19, 2012; Cleanup: move this check to the caller
if (exprhdl.HasOuterRefs())
{
return PdsPassThru(mp, exprhdl, pdsInput, child_index);
}
if (0 == pdrgpcrGrp->Size())
{
if (CDistributionSpec::EdtSingleton == pdsInput->Edt())
{
// pass through input distribution if it is a singleton
pdsInput->AddRef();
return pdsInput;
}
// otherwise, require a singleton explicitly
return GPOS_NEW(mp) CDistributionSpecSingleton();
}
if (0 == ulOptReq && (IMDFunction::EfsVolatile ==
exprhdl.DeriveFunctionProperties(0)->Efs()))
{
// request a singleton distribution if child has volatile functions
return GPOS_NEW(mp) CDistributionSpecSingleton();
}
// if there are grouping columns, require a hash distribution explicitly
return PdsMaximalHashed(mp, pdrgpcrGrpMinimal);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PdsRequiredIntermediateAgg
//
// @doc:
// Compute required distribution of the n-th child of an intermediate
// aggregate operator
//
//---------------------------------------------------------------------------
CDistributionSpec *
CPhysicalAgg::PdsRequiredIntermediateAgg(CMemoryPool *mp, ULONG ulOptReq) const
{
GPOS_ASSERT(COperator::EgbaggtypeIntermediate == m_egbaggtype);
if (0 == ulOptReq)
{
return PdsMaximalHashed(mp, m_pdrgpcr);
}
CColRefArray *colref_array = GPOS_NEW(mp) CColRefArray(mp);
const ULONG length = m_pdrgpcr->Size() - m_pdrgpcrArgDQA->Size();
for (ULONG ul = 0; ul < length; ul++)
{
CColRef *colref = (*m_pdrgpcr)[ul];
colref_array->Append(colref);
}
CDistributionSpec *pds = PdsMaximalHashed(mp, colref_array);
colref_array->Release();
return pds;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PrsRequired
//
// @doc:
// Compute required rewindability of the n-th child
//
//---------------------------------------------------------------------------
CRewindabilitySpec *
CPhysicalAgg::PrsRequired(CMemoryPool *mp, CExpressionHandle &exprhdl,
CRewindabilitySpec *prsRequired, ULONG child_index,
CDrvdPropArray *, // pdrgpdpCtxt
ULONG // ulOptReq
) const
{
GPOS_ASSERT(0 == child_index);
CRewindabilitySpec *prsChild =
PrsPassThru(mp, exprhdl, prsRequired, child_index);
if (prsRequired->IsOriginNLJoin())
{
CRewindabilitySpec *prs = GPOS_NEW(mp)
CRewindabilitySpec(CRewindabilitySpec::ErtNone, prsChild->Emht());
prsChild->Release();
return prs;
}
return prsChild;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PcteRequired
//
// @doc:
// Compute required CTE map of the n-th child
//
//---------------------------------------------------------------------------
CCTEReq *
CPhysicalAgg::PcteRequired(CMemoryPool *, //mp,
CExpressionHandle &, //exprhdl,
CCTEReq *pcter,
ULONG
#ifdef GPOS_DEBUG
child_index
#endif
,
CDrvdPropArray *, //pdrgpdpCtxt,
ULONG //ulOptReq
) const
{
GPOS_ASSERT(0 == child_index);
return PcterPushThru(pcter);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::FProvidesReqdCols
//
// @doc:
// Check if required columns are included in output columns
//
//---------------------------------------------------------------------------
BOOL
CPhysicalAgg::FProvidesReqdCols(CExpressionHandle &exprhdl,
CColRefSet *pcrsRequired,
ULONG // ulOptReq
) const
{
GPOS_ASSERT(nullptr != pcrsRequired);
GPOS_ASSERT(2 == exprhdl.Arity());
CColRefSet *pcrs = GPOS_NEW(m_mp) CColRefSet(m_mp);
// include grouping columns
pcrs->Include(PdrgpcrGroupingCols());
// include defined columns by scalar child
pcrs->Union(exprhdl.DeriveDefinedColumns(1));
BOOL fProvidesCols = pcrs->ContainsAll(pcrsRequired);
pcrs->Release();
return fProvidesCols;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PdsDerive
//
// @doc:
// Derive distribution
//
//---------------------------------------------------------------------------
CDistributionSpec *
CPhysicalAgg::PdsDerive(CMemoryPool *mp, CExpressionHandle &exprhdl) const
{
CDistributionSpec *pds = exprhdl.Pdpplan(0 /*child_index*/)->Pds();
if (CDistributionSpec::EdtUniversal == pds->Edt() &&
IMDFunction::EfsVolatile ==
exprhdl.DeriveScalarFunctionProperties(1)->Efs())
{
return GPOS_NEW(mp) CDistributionSpecStrictSingleton(
CDistributionSpecSingleton::EstMaster);
}
else if (CDistributionSpec::EdtStrictReplicated == pds->Edt())
{
// Aggregate functions which are not trivial and which are sensitive to
// the order of their input cannot guarantee replicated data. If the child
// was replicated, we can no longer guarantee that property. Therefore
// we must now dervive tainted replicated.
return GPOS_NEW(mp) CDistributionSpecReplicated(
CDistributionSpec::EdtTaintedReplicated);
}
pds->AddRef();
return pds;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::PrsDerive
//
// @doc:
// Derive rewindability
//
//---------------------------------------------------------------------------
CRewindabilitySpec *
CPhysicalAgg::PrsDerive(CMemoryPool *mp, CExpressionHandle &exprhdl) const
{
return PrsDerivePassThruOuter(mp, exprhdl);
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::HashValue
//
// @doc:
// Operator specific hash function
//
//---------------------------------------------------------------------------
ULONG
CPhysicalAgg::HashValue() const
{
ULONG ulHash = COperator::HashValue();
const ULONG arity = m_pdrgpcr->Size();
ULONG ulGbaggtype = (ULONG) m_egbaggtype;
ULONG ulaggstage = (ULONG) m_aggStage;
for (ULONG ul = 0; ul < arity; ul++)
{
CColRef *colref = (*m_pdrgpcr)[ul];
ulHash = gpos::CombineHashes(ulHash, gpos::HashPtr<CColRef>(colref));
}
ulHash = gpos::CombineHashes(ulHash, gpos::HashValue<ULONG>(&ulGbaggtype));
ulHash = gpos::CombineHashes(ulHash, gpos::HashValue<ULONG>(&ulaggstage));
return gpos::CombineHashes(ulHash,
gpos::HashValue<BOOL>(&m_fGeneratesDuplicates));
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::Matches
//
// @doc:
// Match operator
//
//---------------------------------------------------------------------------
BOOL
CPhysicalAgg::Matches(COperator *pop) const
{
if (pop->Eopid() != Eopid())
{
return false;
}
CPhysicalAgg *popAgg = dynamic_cast<CPhysicalAgg *>(pop);
if (FGeneratesDuplicates() != popAgg->FGeneratesDuplicates())
{
return false;
}
if (popAgg->Egbaggtype() == m_egbaggtype &&
m_pdrgpcr->Equals(popAgg->m_pdrgpcr))
{
if (CColRef::Equals(m_pdrgpcrMinimal, popAgg->m_pdrgpcrMinimal))
{
return (m_pdrgpcrArgDQA == nullptr ||
0 == m_pdrgpcrArgDQA->Size()) ||
CColRef::Equals(m_pdrgpcrArgDQA, popAgg->PdrgpcrArgDQA());
}
}
return false;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::EpetDistribution
//
// @doc:
// Return the enforcing type for distribution property based on this operator
//
//---------------------------------------------------------------------------
CEnfdProp::EPropEnforcingType
CPhysicalAgg::EpetDistribution(CExpressionHandle &exprhdl,
const CEnfdDistribution *ped) const
{
GPOS_ASSERT(nullptr != ped);
// get distribution delivered by the aggregate node
CDistributionSpec *pds = CDrvdPropPlan::Pdpplan(exprhdl.Pdp())->Pds();
if (ped->FCompatible(pds))
{
if (COperator::EgbaggtypeLocal != Egbaggtype() ||
!m_should_enforce_distribution)
{
return CEnfdProp::EpetUnnecessary;
}
// prohibit the plan if local aggregate already delivers the enforced
// distribution, since otherwise we would create two aggregates with
// no intermediate motion operators
return CEnfdProp::EpetProhibited;
}
// required distribution will be enforced on Agg's output
return CEnfdProp::EpetRequired;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::EpetRewindability
//
// @doc:
// Return the enforcing type for rewindability property based on this operator
//
//---------------------------------------------------------------------------
CEnfdProp::EPropEnforcingType
CPhysicalAgg::EpetRewindability(CExpressionHandle &exprhdl,
const CEnfdRewindability *per) const
{
// get rewindability delivered by the Agg node
CRewindabilitySpec *prs = CDrvdPropPlan::Pdpplan(exprhdl.Pdp())->Prs();
if (per->FCompatible(prs))
{
// required rewindability is already provided
return CEnfdProp::EpetUnnecessary;
}
return CEnfdProp::EpetRequired;
}
BOOL
CPhysicalAgg::IsTwoStageScalarDQA() const
{
return (m_aggStage == CLogicalGbAgg::EasTwoStageScalarDQA);
}
BOOL
CPhysicalAgg::IsThreeStageScalarDQA() const
{
return (m_aggStage == CLogicalGbAgg::EasThreeStageScalarDQA);
}
BOOL
CPhysicalAgg::IsAggFromSplitDQA() const
{
return m_isAggFromSplitDQA;
}
//---------------------------------------------------------------------------
// @function:
// CPhysicalAgg::OsPrint
//
// @doc:
// Debug print
//
//---------------------------------------------------------------------------
IOstream &
CPhysicalAgg::OsPrint(IOstream &os) const
{
if (m_fPattern)
{
return COperator::OsPrint(os);
}
os << SzId() << "( ";
CLogicalGbAgg::OsPrintGbAggType(os, m_egbaggtype);
if (m_fMultiStage)
{
os << ", multi-stage";
}
os << " )";
os << " Grp Cols: [";
CUtils::OsPrintDrgPcr(os, m_pdrgpcr);
os << "]"
<< ", Minimal Grp Cols:[";
CUtils::OsPrintDrgPcr(os, m_pdrgpcrMinimal);
os << "]";
if (COperator::EgbaggtypeIntermediate == m_egbaggtype)
{
os << ", Distinct Cols:[";
CUtils::OsPrintDrgPcr(os, m_pdrgpcrArgDQA);
os << "]";
}
os << ", Generates Duplicates :[ " << FGeneratesDuplicates() << " ] ";
// note: 2-stage Scalar DQA and 3-stage scalar DQA are created by CXformSplitDQA only
if (IsTwoStageScalarDQA())
{
os << ", m_aggStage :[ Two Stage Scalar DQA ] ";
}
if (IsThreeStageScalarDQA())
{
os << ", m_aggStage :[ Three Stage Scalar DQA ] ";
}
return os;
}
// EOF
相关信息
相关文章
greenplumn CExpressionFactorizer 源码
greenplumn CExpressionHandle 源码
greenplumn CExpressionPreprocessor 源码
greenplumn CExpressionUtils 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦