greenplumn CWindowPreprocessor 源码
greenplumn CWindowPreprocessor 代码
文件路径:/src/backend/gporca/libgpopt/src/operators/CWindowPreprocessor.cpp
//---------------------------------------------------------------------------
// Greenplum Database
// Copyright 2015 VMware, Inc. or its affiliates.
//
// @filename:
// CWindowPreprocessor.cpp
//
// @doc:
// Preprocessing routines of window functions
//---------------------------------------------------------------------------
#include "gpopt/operators/CWindowPreprocessor.h"
#include "gpos/base.h"
#include "gpopt/base/COptCtxt.h"
#include "gpopt/base/CUtils.h"
#include "gpopt/operators/CLogicalCTEAnchor.h"
#include "gpopt/operators/CLogicalCTEConsumer.h"
#include "gpopt/operators/CLogicalGbAgg.h"
#include "gpopt/operators/CLogicalInnerJoin.h"
#include "gpopt/operators/CLogicalSequenceProject.h"
#include "gpopt/operators/CPredicateUtils.h"
#include "gpopt/operators/CScalarProjectElement.h"
#include "gpopt/operators/CScalarProjectList.h"
#include "gpopt/operators/CScalarWindowFunc.h"
#include "gpopt/xforms/CXformUtils.h"
using namespace gpopt;
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::SplitPrjList
//
// @doc:
// Iterate over project elements and split them elements between
// Distinct Aggs list, and Others list
//
//---------------------------------------------------------------------------
void
CWindowPreprocessor::SplitPrjList(
CMemoryPool *mp, CExpression *pexprSeqPrj,
CExpressionArray **
ppdrgpexprDistinctAggsPrEl, // output: list of project elements with Distinct Aggs
CExpressionArray **
ppdrgpexprOtherPrEl, // output: list of project elements with Other window functions
COrderSpecArray **
ppdrgposOther, // output: array of order specs of window functions used in Others list
CWindowFrameArray **
ppdrgpwfOther // output: array of frame specs of window functions used in Others list
)
{
GPOS_ASSERT(nullptr != pexprSeqPrj);
GPOS_ASSERT(nullptr != ppdrgpexprDistinctAggsPrEl);
GPOS_ASSERT(nullptr != ppdrgpexprOtherPrEl);
GPOS_ASSERT(nullptr != ppdrgposOther);
GPOS_ASSERT(nullptr != ppdrgpwfOther);
CLogicalSequenceProject *popSeqPrj =
CLogicalSequenceProject::PopConvert(pexprSeqPrj->Pop());
CExpression *pexprPrjList = (*pexprSeqPrj)[1];
COrderSpecArray *pdrgpos = popSeqPrj->Pdrgpos();
BOOL fHasOrderSpecs = popSeqPrj->FHasOrderSpecs();
CWindowFrameArray *pdrgpwf = popSeqPrj->Pdrgpwf();
BOOL fHasFrameSpecs = popSeqPrj->FHasFrameSpecs();
CExpressionArray *pdrgpexprDistinctAggsPrEl =
GPOS_NEW(mp) CExpressionArray(mp);
CExpressionArray *pdrgpexprOtherPrEl = GPOS_NEW(mp) CExpressionArray(mp);
COrderSpecArray *pdrgposOther = GPOS_NEW(mp) COrderSpecArray(mp);
CWindowFrameArray *pdrgpwfOther = GPOS_NEW(mp) CWindowFrameArray(mp);
// iterate over project list and split project elements between
// Distinct Aggs list, and Others list
const ULONG arity = pexprPrjList->Arity();
for (ULONG ul = 0; ul < arity; ul++)
{
CExpression *pexprPrjEl = (*pexprPrjList)[ul];
CExpression *pexprWinFunc = (*pexprPrjEl)[0];
CScalarWindowFunc *popScWinFunc =
CScalarWindowFunc::PopConvert(pexprWinFunc->Pop());
CScalarProjectElement *popScPrjElem =
CScalarProjectElement::PopConvert(pexprPrjEl->Pop());
CColRef *pcrPrjElem = popScPrjElem->Pcr();
if (popScWinFunc->IsDistinct() && popScWinFunc->FAgg())
{
CExpression *pexprAgg =
CXformUtils::PexprWinFuncAgg2ScalarAgg(mp, pexprWinFunc);
CExpression *pexprNewPrjElem =
CUtils::PexprScalarProjectElement(mp, pcrPrjElem, pexprAgg);
pdrgpexprDistinctAggsPrEl->Append(pexprNewPrjElem);
}
else
{
if (fHasOrderSpecs)
{
(*pdrgpos)[ul]->AddRef();
pdrgposOther->Append((*pdrgpos)[ul]);
}
if (fHasFrameSpecs)
{
(*pdrgpwf)[ul]->AddRef();
pdrgpwfOther->Append((*pdrgpwf)[ul]);
}
pexprWinFunc->AddRef();
CExpression *pexprNewPrjElem =
CUtils::PexprScalarProjectElement(mp, pcrPrjElem, pexprWinFunc);
pdrgpexprOtherPrEl->Append(pexprNewPrjElem);
}
}
*ppdrgpexprDistinctAggsPrEl = pdrgpexprDistinctAggsPrEl;
*ppdrgpexprOtherPrEl = pdrgpexprOtherPrEl;
*ppdrgposOther = pdrgposOther;
*ppdrgpwfOther = pdrgpwfOther;
}
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::SplitSeqPrj
//
// @doc:
// Split SeqPrj expression into:
// - A GbAgg expression containing distinct Aggs, and
// - A SeqPrj expression containing all remaining window functions
//
//---------------------------------------------------------------------------
void
CWindowPreprocessor::SplitSeqPrj(
CMemoryPool *mp, CExpression *pexprSeqPrj,
CExpression *
*ppexprGbAgg, // output: GbAgg expression containing distinct Aggs
CExpression **
ppexprOutputSeqPrj // output: SeqPrj expression containing all remaining window functions
)
{
GPOS_ASSERT(nullptr != pexprSeqPrj);
GPOS_ASSERT(nullptr != ppexprGbAgg);
GPOS_ASSERT(nullptr != ppexprOutputSeqPrj);
// split project elements between Distinct Aggs list, and Others list
CExpressionArray *pdrgpexprDistinctAggsPrEl = nullptr;
CExpressionArray *pdrgpexprOtherPrEl = nullptr;
COrderSpecArray *pdrgposOther = nullptr;
CWindowFrameArray *pdrgpwfOther = nullptr;
SplitPrjList(mp, pexprSeqPrj, &pdrgpexprDistinctAggsPrEl,
&pdrgpexprOtherPrEl, &pdrgposOther, &pdrgpwfOther);
// check distribution spec of original SeqPrj and extract grouping columns
// from window (PARTITION BY) clause
CLogicalSequenceProject *popSeqPrj =
CLogicalSequenceProject::PopConvert(pexprSeqPrj->Pop());
CDistributionSpec *pds = popSeqPrj->Pds();
CColRefArray *pdrgpcrGrpCols = nullptr;
if (CDistributionSpec::EdtHashed == pds->Edt())
{
CColRefSet *pcrs = CUtils::PcrsExtractColumns(
mp, CDistributionSpecHashed::PdsConvert(pds)->Pdrgpexpr());
pdrgpcrGrpCols = pcrs->Pdrgpcr(mp);
pcrs->Release();
}
else
{
// no (PARTITION BY) clause
pdrgpcrGrpCols = GPOS_NEW(mp) CColRefArray(mp);
}
CExpression *pexprSeqPrjChild = (*pexprSeqPrj)[0];
pexprSeqPrjChild->AddRef();
*ppexprGbAgg = GPOS_NEW(mp) CExpression(
mp,
GPOS_NEW(mp)
CLogicalGbAgg(mp, pdrgpcrGrpCols, COperator::EgbaggtypeGlobal),
pexprSeqPrjChild,
GPOS_NEW(mp) CExpression(mp, GPOS_NEW(mp) CScalarProjectList(mp),
pdrgpexprDistinctAggsPrEl));
pexprSeqPrjChild->AddRef();
if (0 == pdrgpexprOtherPrEl->Size())
{
// no remaining window functions after excluding distinct aggs,
// reuse the original SeqPrj child in this case
pdrgpexprOtherPrEl->Release();
pdrgposOther->Release();
pdrgpwfOther->Release();
*ppexprOutputSeqPrj = pexprSeqPrjChild;
return;
}
// create a new SeqPrj expression for remaining window functions
pds->AddRef();
*ppexprOutputSeqPrj = GPOS_NEW(mp) CExpression(
mp,
GPOS_NEW(mp)
CLogicalSequenceProject(mp, pds, pdrgposOther, pdrgpwfOther),
pexprSeqPrjChild,
GPOS_NEW(mp) CExpression(mp, GPOS_NEW(mp) CScalarProjectList(mp),
pdrgpexprOtherPrEl));
}
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::CreateCTE
//
// @doc:
// Create a CTE with two consumers using the child expression of
// Sequence Project
//
//---------------------------------------------------------------------------
void
CWindowPreprocessor::CreateCTE(CMemoryPool *mp, CExpression *pexprSeqPrj,
CExpression **ppexprFirstConsumer,
CExpression **ppexprSecondConsumer)
{
GPOS_ASSERT(nullptr != pexprSeqPrj);
GPOS_ASSERT(COperator::EopLogicalSequenceProject ==
pexprSeqPrj->Pop()->Eopid());
GPOS_ASSERT(nullptr != ppexprFirstConsumer);
GPOS_ASSERT(nullptr != ppexprSecondConsumer);
CExpression *pexprChild = (*pexprSeqPrj)[0];
CColRefSet *pcrsChildOutput = pexprChild->DeriveOutputColumns();
CColRefArray *pdrgpcrChildOutput = pcrsChildOutput->Pdrgpcr(mp);
// create a CTE producer based on SeqPrj child expression
CCTEInfo *pcteinfo = COptCtxt::PoctxtFromTLS()->Pcteinfo();
const ULONG ulCTEId = pcteinfo->next_id();
CExpression *pexprCTEProd = CXformUtils::PexprAddCTEProducer(
mp, ulCTEId, pdrgpcrChildOutput, pexprChild);
CColRefArray *pdrgpcrProducerOutput =
pexprCTEProd->DeriveOutputColumns()->Pdrgpcr(mp);
// first consumer creates new output columns to be used later as input to GbAgg expression
*ppexprFirstConsumer = GPOS_NEW(mp) CExpression(
mp, GPOS_NEW(mp) CLogicalCTEConsumer(
mp, ulCTEId, CUtils::PdrgpcrCopy(mp, pdrgpcrProducerOutput)));
pcteinfo->IncrementConsumers(ulCTEId);
pdrgpcrProducerOutput->Release();
// second consumer reuses the same output columns of SeqPrj child to be able to provide any requested columns upstream
*ppexprSecondConsumer = GPOS_NEW(mp) CExpression(
mp, GPOS_NEW(mp) CLogicalCTEConsumer(mp, ulCTEId, pdrgpcrChildOutput));
pcteinfo->IncrementConsumers(ulCTEId);
}
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::PdrgpcrGrpCols
//
// @doc:
// Extract grouping columns from given expression,
// we expect expression to be either a GbAgg expression or a join
// whose inner child is a GbAgg expression
//
//---------------------------------------------------------------------------
CColRefArray *
CWindowPreprocessor::PdrgpcrGrpCols(CExpression *pexpr)
{
GPOS_ASSERT(nullptr != pexpr);
COperator *pop = pexpr->Pop();
if (COperator::EopLogicalGbAgg == pop->Eopid())
{
// passed expression is a Group By, return grouping columns
return CLogicalGbAgg::PopConvert(pop)->Pdrgpcr();
}
if (CUtils::FLogicalJoin(pop))
{
// pass expression is a join, we expect a Group By on the inner side
COperator *popInner = (*pexpr)[1]->Pop();
if (COperator::EopLogicalGbAgg == popInner->Eopid())
{
return CLogicalGbAgg::PopConvert(popInner)->Pdrgpcr();
}
}
return nullptr;
}
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::PexprSeqPrj2Join
//
// @doc:
// Transform sequence project expression with distinct aggregates
// into an inner join expression,
// - the outer child of the join is a GbAgg expression that computes
// distinct aggs,
// - the inner child of the join is a SeqPrj expression that computes
// remaining window functions
//
// we use a CTE to compute the input to both join children, while maintaining
// all column references upstream by reusing the same computed columns in the
// original SeqPrj expression
//
//---------------------------------------------------------------------------
CExpression *
CWindowPreprocessor::PexprSeqPrj2Join(CMemoryPool *mp, CExpression *pexprSeqPrj)
{
GPOS_ASSERT(nullptr != pexprSeqPrj);
GPOS_ASSERT(COperator::EopLogicalSequenceProject ==
pexprSeqPrj->Pop()->Eopid());
GPOS_ASSERT(0 < (*pexprSeqPrj)[1]->DeriveTotalDistinctAggs());
// split SeqPrj expression into a GbAgg expression (for distinct Aggs), and
// another SeqPrj expression (for remaining window functions)
CExpression *pexprGbAgg = nullptr;
CExpression *pexprWindow = nullptr;
SplitSeqPrj(mp, pexprSeqPrj, &pexprGbAgg, &pexprWindow);
// create CTE using SeqPrj child expression
CExpression *pexprGbAggConsumer = nullptr;
CExpression *pexprWindowConsumer = nullptr;
CreateCTE(mp, pexprSeqPrj, &pexprGbAggConsumer, &pexprWindowConsumer);
// extract output columns of SeqPrj child expression
CExpression *pexprChild = (*pexprSeqPrj)[0];
CColRefArray *pdrgpcrChildOutput =
pexprChild->DeriveOutputColumns()->Pdrgpcr(mp);
// to match requested columns upstream, we have to re-use the same computed
// columns that define the aggregates, we avoid recreating new columns during
// expression copy by passing must_exist as false
CColRefArray *pdrgpcrConsumerOutput =
CLogicalCTEConsumer::PopConvert(pexprGbAggConsumer->Pop())->Pdrgpcr();
UlongToColRefMap *colref_mapping =
CUtils::PhmulcrMapping(mp, pdrgpcrChildOutput, pdrgpcrConsumerOutput);
CExpression *pexprGbAggRemapped = pexprGbAgg->PexprCopyWithRemappedColumns(
mp, colref_mapping, false /*must_exist*/);
colref_mapping->Release();
pdrgpcrChildOutput->Release();
pexprGbAgg->Release();
// finalize GbAgg expression by replacing its child with CTE consumer
pexprGbAggRemapped->Pop()->AddRef();
(*pexprGbAggRemapped)[1]->AddRef();
CExpression *pexprGbAggWithConsumer =
GPOS_NEW(mp) CExpression(mp, pexprGbAggRemapped->Pop(),
pexprGbAggConsumer, (*pexprGbAggRemapped)[1]);
pexprGbAggRemapped->Release();
// in case of multiple Distinct Aggs, we need to expand the GbAgg expression
// into a join expression where leaves carry single Distinct Aggs
CExpression *pexprJoinDQAs =
CXformUtils::PexprGbAggOnCTEConsumer2Join(mp, pexprGbAggWithConsumer);
pexprGbAggWithConsumer->Release();
CExpression *pexprWindowFinal = nullptr;
if (COperator::EopLogicalSequenceProject == pexprWindow->Pop()->Eopid())
{
// create a new SeqPrj expression for remaining window functions,
// and replace expression child withCTE consumer
pexprWindow->Pop()->AddRef();
(*pexprWindow)[1]->AddRef();
pexprWindowFinal = GPOS_NEW(mp) CExpression(
mp, pexprWindow->Pop(), pexprWindowConsumer, (*pexprWindow)[1]);
}
else
{
// no remaining window functions, simply reuse created CTE consumer
pexprWindowFinal = pexprWindowConsumer;
}
pexprWindow->Release();
// extract grouping columns from created join expression
CColRefArray *pdrgpcrGrpCols = PdrgpcrGrpCols(pexprJoinDQAs);
// create final join condition
CExpression *pexprJoinCondition = nullptr;
if (nullptr != pdrgpcrGrpCols && 0 < pdrgpcrGrpCols->Size())
{
// extract PARTITION BY columns from original SeqPrj expression
CLogicalSequenceProject *popSeqPrj =
CLogicalSequenceProject::PopConvert(pexprSeqPrj->Pop());
CDistributionSpec *pds = popSeqPrj->Pds();
CColRefSet *pcrs = CUtils::PcrsExtractColumns(
mp, CDistributionSpecHashed::PdsConvert(pds)->Pdrgpexpr());
CColRefArray *pdrgpcrPartitionBy = pcrs->Pdrgpcr(mp);
pcrs->Release();
GPOS_ASSERT(
pdrgpcrGrpCols->Size() == pdrgpcrPartitionBy->Size() &&
"Partition By columns in window function are not the same as grouping columns in created Aggs");
// create a conjunction of INDF expressions comparing a GROUP BY column to a PARTITION BY column
pexprJoinCondition = CPredicateUtils::PexprINDFConjunction(
mp, pdrgpcrGrpCols, pdrgpcrPartitionBy);
pdrgpcrPartitionBy->Release();
}
else
{
// no PARTITION BY, join condition is const True
pexprJoinCondition = CUtils::PexprScalarConstBool(mp, true /*value*/);
}
// create a join between expanded DQAs and Window expressions
CExpression *pexprJoin = CUtils::PexprLogicalJoin<CLogicalInnerJoin>(
mp, pexprJoinDQAs, pexprWindowFinal, pexprJoinCondition);
ULONG ulCTEId =
CLogicalCTEConsumer::PopConvert(pexprGbAggConsumer->Pop())->UlCTEId();
return GPOS_NEW(mp)
CExpression(mp, GPOS_NEW(mp) CLogicalCTEAnchor(mp, ulCTEId), pexprJoin);
}
//---------------------------------------------------------------------------
// @function:
// CWindowPreprocessor::PexprPreprocess
//
// @doc:
// Main driver
//
//---------------------------------------------------------------------------
CExpression *
CWindowPreprocessor::PexprPreprocess(CMemoryPool *mp, CExpression *pexpr)
{
// protect against stack overflow during recursion
GPOS_CHECK_STACK_SIZE;
GPOS_ASSERT(nullptr != mp);
GPOS_ASSERT(nullptr != pexpr);
COperator *pop = pexpr->Pop();
if (COperator::EopLogicalSequenceProject == pop->Eopid() &&
0 < (*pexpr)[1]->DeriveTotalDistinctAggs())
{
CExpression *pexprJoin = PexprSeqPrj2Join(mp, pexpr);
// recursively process the resulting expression
CExpression *pexprResult = PexprPreprocess(mp, pexprJoin);
pexprJoin->Release();
return pexprResult;
}
// recursively process child expressions
const ULONG arity = pexpr->Arity();
CExpressionArray *pdrgpexprChildren = GPOS_NEW(mp) CExpressionArray(mp);
for (ULONG ul = 0; ul < arity; ul++)
{
CExpression *pexprChild = PexprPreprocess(mp, (*pexpr)[ul]);
pdrgpexprChildren->Append(pexprChild);
}
pop->AddRef();
return GPOS_NEW(mp) CExpression(mp, pop, pdrgpexprChildren);
}
// EOF
相关信息
相关文章
greenplumn CExpressionFactorizer 源码
greenplumn CExpressionHandle 源码
greenplumn CExpressionPreprocessor 源码
greenplumn CExpressionUtils 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦