greenplumn COrderedAggPreprocessor 源码

  • 2022-08-18
  • 浏览 (214)

greenplumn COrderedAggPreprocessor 代码

文件路径:/src/backend/gporca/libgpopt/src/operators/COrderedAggPreprocessor.cpp

//---------------------------------------------------------------------------
//	Greenplum Database
//	Copyright 2022 VMware, Inc. or its affiliates.
//
//	@filename:
//		COrderedAggPreprocessor.cpp
//
//	@doc:
//		Preprocessing routines of ordered-set aggregate
//---------------------------------------------------------------------------

#include "gpopt/operators/COrderedAggPreprocessor.h"

#include "gpos/base.h"

#include "gpopt/base/COptCtxt.h"
#include "gpopt/base/CUtils.h"
#include "gpopt/operators/CLogicalCTEAnchor.h"
#include "gpopt/operators/CLogicalCTEConsumer.h"
#include "gpopt/operators/CLogicalGbAgg.h"
#include "gpopt/operators/CLogicalInnerJoin.h"
#include "gpopt/operators/CLogicalLimit.h"
#include "gpopt/operators/CLogicalSequenceProject.h"
#include "gpopt/operators/CPredicateUtils.h"
#include "gpopt/operators/CScalarIdent.h"
#include "gpopt/operators/CScalarProjectElement.h"
#include "gpopt/operators/CScalarProjectList.h"
#include "gpopt/operators/CScalarSortGroupClause.h"
#include "gpopt/operators/CScalarValuesList.h"
#include "gpopt/operators/CScalarWindowFunc.h"
#include "gpopt/xforms/CXformUtils.h"

using namespace gpopt;


//---------------------------------------------------------------------------
//	@function:
//		COrderedAggPreprocessor::SplitPrjList
//
//	@doc:
//		Iterate over project elements and split them elements between
//		Ordered Aggs list, and Other Aggs list
//
//---------------------------------------------------------------------------
void
COrderedAggPreprocessor::SplitPrjList(
	CMemoryPool *mp, CExpression *pexprInputAggPrj,
	CExpressionArray **
		ppdrgpexprOrderedAggsPrEl,	// output: list of project elements with Ordered Aggs
	CExpressionArray **
		ppdrgpexprOtherPrEl	 // output: list of project elements with Other aggregate functions
)
{
	GPOS_ASSERT(nullptr != pexprInputAggPrj);
	GPOS_ASSERT(nullptr != ppdrgpexprOrderedAggsPrEl);
	GPOS_ASSERT(nullptr != ppdrgpexprOtherPrEl);

	CExpression *pexprPrjList = (*pexprInputAggPrj)[1];

	CExpressionArray *pdrgpexprOrderedAggsPrEl =
		GPOS_NEW(mp) CExpressionArray(mp);

	CExpressionArray *pdrgpexprOtherPrEl = GPOS_NEW(mp) CExpressionArray(mp);
	// create CTE using SeqPrj child expression
	CExpression *pexprRemappedAggConsumer = nullptr;
	CExpression *pexprAggConsumer = nullptr;
	CreateCTE(mp, pexprInputAggPrj, &pexprRemappedAggConsumer,
			  &pexprAggConsumer);

	// iterate over project list and split project elements between
	// Ordered Aggs list, and Other aggs list
	const ULONG arity = pexprPrjList->Arity();
	CColRefArray *colref_array = nullptr;
	CExpressionArray *pdrgpexprSortClauseArray = nullptr;

	for (ULONG ul = 0; ul < arity; ul++)
	{
		CExpression *pexprPrjEl = (*pexprPrjList)[ul];
		CExpression *pexprAggFunc = (*pexprPrjEl)[0];
		CScalarProjectElement *popScPrjElem =
			CScalarProjectElement::PopConvert(pexprPrjEl->Pop());
		CColRef *pcrPrjElem = popScPrjElem->Pcr();

		if (CUtils::FHasOrderedAggToSplit(pexprAggFunc))
		{
			CExpression *pexprSortCol =
				(*(*pexprAggFunc)[EAggfuncChildIndices::EaggfuncIndexArgs])[0];
			CExpression *pexprSortGroupClause =
				(*(*pexprAggFunc)[EAggfuncChildIndices::EaggfuncIndexOrder])[0];
			CScalarSortGroupClause *curr_sort_clause =
				CScalarSortGroupClause::PopConvert(pexprSortGroupClause->Pop());

			const CColRef *colref =
				CCastUtils::PcrExtractFromScIdOrCastScId(pexprSortCol);
			BOOL skip = false;
			if (nullptr == pdrgpexprSortClauseArray)
			{
				colref_array = GPOS_NEW(mp) CColRefArray(mp);
				colref_array->Append(const_cast<CColRef *>(colref));
				pdrgpexprSortClauseArray = GPOS_NEW(mp) CExpressionArray(mp);
				pexprSortGroupClause->AddRef();
				pdrgpexprSortClauseArray->Append(pexprSortGroupClause);
			}
			else
			{
				for (ULONG uidx = 0; uidx < pdrgpexprSortClauseArray->Size();
					 uidx++)
				{
					// For multiple ordered-set aggs on the same ORDERING column and ORDERING SPEC(ASC/DESC), we optimize
					// to add the new aggregate as a ProjectiElement to the ProjectList of an existing JOIN, instead of
					// creating a new JOIN for doing the same SORT.
					// Eg. SELECT percentile_cont(0.25) WITHIN GROUP(ORDER BY a), percentile_cont(0.5) WITHIN GROUP(ORDER BY a) from tab;
					// Currently, to check if colref's SORT ORDER Spec(Asc/Desc) is same, we match only the SortOp and NullFirst
					// as that is what is used for creating the OrderSpec
					// TO-DO: (enhancement) Instead of this loop, we can probably use a map (colref->SortGroupClause)
					CScalarSortGroupClause *visited_sort_clause =
						CScalarSortGroupClause::PopConvert(
							(*pdrgpexprSortClauseArray)[uidx]->Pop());
					if (colref == (*colref_array)[uidx] &&
						curr_sort_clause->SortOp() ==
							visited_sort_clause->SortOp() &&
						curr_sort_clause->NullsFirst() ==
							visited_sort_clause->NullsFirst())
					{
						// Create the new ordered agg function and append it as a ProjElement to the existing ProjectList
						// created for and already split ordered agg on the same column with same ORDERING SPEC
						// Existing ProjList:
						// +--CScalarProjectList
						// +--CScalarProjectElement \"percentile_cont\" (12)
						//    +--CScalarAggFunc
						//       |--CScalarValuesList
						//       |  |--CScalarIdent \"a\" (1)
						//       |  |--CScalarConst (0.2500)
						//       |  +--CScalarIdent \"ColRef_0041\" (41)
						//       ...
						CExpression *pCurrExprProjList =
							(*(*pdrgpexprOrderedAggsPrEl)[uidx])[1];
						CExpression *pCurrExprAggFunc =
							(*(*pCurrExprProjList)[0])[0];
						const CColRef *total_count_colref =
							CCastUtils::PcrExtractFromScIdOrCastScId((*(
								*pCurrExprAggFunc)[EAggfuncChildIndices::
													   EaggfuncIndexArgs])[2]);

						CExpression *pexprNewAggFunc = PexprFinalAgg(
							mp, pexprAggFunc, pcrPrjElem,
							const_cast<CColRef *>(total_count_colref));
						CExpression *pexprAddPrjElem =
							CUtils::PexprScalarProjectElement(mp, pcrPrjElem,
															  pexprNewAggFunc);
						// Updated ProjList:
						// +--CScalarProjectList
						//    |--CScalarProjectElement \"percentile_cont\" (12)
						//    |  +--CScalarAggFunc
						//    |     |--CScalarValuesList
						//    |     |  |--CScalarIdent \"a\" (1)
						//    |     |  |--CScalarConst (0.2500)
						//    |     |  +--CScalarIdent \"ColRef_0041\" (41)
						//    |     ...
						//    +--CScalarProjectElement \"percentile_cont\" (13)
						//       +--CScalarAggFunc
						//          |--CScalarValuesList
						//          |  |--CScalarIdent \"a\" (1)
						//          |  |--CScalarConst (0.500)
						//          |  +--CScalarIdent \"ColRef_0041\" (41)
						//          ...
						CExpressionArray *pCurrProjList =
							pCurrExprProjList->PdrgPexpr();
						pCurrProjList->Append(pexprAddPrjElem);
						skip = true;
						break;
					}
				}
				if (!skip)
				{
					colref_array->Append(const_cast<CColRef *>(colref));
					pexprSortGroupClause->AddRef();
					pdrgpexprSortClauseArray->Append(pexprSortGroupClause);
				}
			}
			if (skip)
			{
				continue;
			}
			if (0 < ul)
			{
				pexprRemappedAggConsumer->AddRef();
				pexprAggConsumer->AddRef();
			}
			// CREATE total count aggregate
			CColumnFactory *col_factory = COptCtxt::PoctxtFromTLS()->Pcf();
			CMDAccessor *md_accessor = COptCtxt::PoctxtFromTLS()->Pmda();
			CExpression *pexprAgg = CUtils::PexprAgg(
				mp, md_accessor, IMDType::EaggCount, colref, false);
			CColRef *total_count_colref = col_factory->PcrCreate(
				md_accessor->RetrieveType(
					CScalarAggFunc::PopConvert(pexprAgg->Pop())->MdidType()),
				CScalarAggFunc::PopConvert(pexprAgg->Pop())->TypeModifier());
			CExpression *pexprNewPrjElem = CUtils::PexprScalarProjectElement(
				mp, total_count_colref, pexprAgg);
			CExpression *pexprCountAggPrjList = GPOS_NEW(mp) CExpression(
				mp, GPOS_NEW(mp) CScalarProjectList(mp), pexprNewPrjElem);
			(*pexprInputAggPrj)[0]->AddRef();
			CExpression *pexprTotalCountAgg = CUtils::PexprLogicalGbAggGlobal(
				mp, GPOS_NEW(mp) CColRefArray(mp), (*pexprInputAggPrj)[0],
				pexprCountAggPrjList);

			// to match requested columns upstream, we have to re-use the same computed
			// columns that define the aggregates, we avoid recreating new columns during
			// expression copy by passing must_exist as false
			CColRefArray *pdrgpcrChildOutput =
				pexprAggConsumer->DeriveOutputColumns()->Pdrgpcr(mp);
			CColRefArray *pdrgpcrConsumerOutput =
				CLogicalCTEConsumer::PopConvert(pexprRemappedAggConsumer->Pop())
					->Pdrgpcr();
			UlongToColRefMap *colref_mapping = CUtils::PhmulcrMapping(
				mp, pdrgpcrChildOutput, pdrgpcrConsumerOutput);
			CExpression *pexprTotalCountAggRemapped =
				pexprTotalCountAgg->PexprCopyWithRemappedColumns(
					mp, colref_mapping, false /*must_exist*/);
			colref_mapping->Release();
			pdrgpcrChildOutput->Release();
			pexprTotalCountAgg->Release();

			// finalize total count agg expression by replacing its child with CTE consumer
			pexprTotalCountAggRemapped->Pop()->AddRef();
			(*pexprTotalCountAggRemapped)[1]->AddRef();
			CExpression *pexprFinalGbAgg = GPOS_NEW(mp) CExpression(
				mp, pexprTotalCountAggRemapped->Pop(), pexprRemappedAggConsumer,
				(*pexprTotalCountAggRemapped)[1]);
			pexprTotalCountAggRemapped->Release();


			CExpression *pexprJoinCondition =
				CUtils::PexprScalarConstBool(mp, true /*value*/);

			// create a join between expanded total count and CTE Consumer expressions
			CExpression *pexprJoin =
				CUtils::PexprLogicalJoin<CLogicalInnerJoin>(
					mp, pexprAggConsumer, pexprFinalGbAgg, pexprJoinCondition);

			// create ordered spec for the Join to merge on the colref passed in as part
			// of the ordered agg's WITHIN(ORDER BY) clause
			COrderSpec *pos = GPOS_NEW(mp) COrderSpec(mp);
			IMDId *mdid_curr_sortop =
				GPOS_NEW(mp) CMDIdGPDB(curr_sort_clause->SortOp());
			IMDType::ECmpType curr_cmp_type =
				CUtils::ParseCmpType(mdid_curr_sortop);
			// Instead of using the SortOp directly, we use the colref's operator for the comparison type
			// to avoid incorret results for cast related columns
			IMDId *mdid_pos =
				colref->RetrieveType()->GetMdidForCmpType(curr_cmp_type);
			mdid_curr_sortop->Release();
			COrderSpec::ENullTreatment ent = COrderSpec::EntLast;
			if (curr_sort_clause->NullsFirst())
			{
				ent = COrderSpec::EntFirst;
			}
			mdid_pos->AddRef();
			pos->Append(mdid_pos, colref, ent);
			CLogicalLimit *popLimit = GPOS_NEW(mp)
				CLogicalLimit(mp, pos, true /* fGlobal */, true /* fHasCount */,
							  false /*fTopLimitUnderDML*/);
			CExpression *pexprLimitOffset = CUtils::PexprScalarConstInt8(mp, 0);
			CExpression *pexprLimitCount =
				CUtils::PexprScalarConstInt8(mp, 1, true);

			CExpression *pexprJoinLimit = GPOS_NEW(mp) CExpression(
				mp, popLimit, pexprJoin, pexprLimitOffset, pexprLimitCount);

			// Create the final gp_percentile agg on top
			CExpression *pexprNewAggFunc =
				PexprFinalAgg(mp, pexprAggFunc, pcrPrjElem, total_count_colref);
			CExpression *pexprExistingPrjElem =
				CUtils::PexprScalarProjectElement(mp, pcrPrjElem,
												  pexprNewAggFunc);
			CExpression *pexprNewAggPrjList = GPOS_NEW(mp) CExpression(
				mp, GPOS_NEW(mp) CScalarProjectList(mp), pexprExistingPrjElem);
			CColRefArray *colref_array = GPOS_NEW(mp) CColRefArray(mp);
			CExpression *pexprNewAgg = CUtils::PexprLogicalGbAggGlobal(
				mp, colref_array, pexprJoinLimit, pexprNewAggPrjList);
			pdrgpexprOrderedAggsPrEl->Append(pexprNewAgg);
		}
		else
		{
			pexprAggFunc->AddRef();
			CExpression *pexprNewPrjElem =
				CUtils::PexprScalarProjectElement(mp, pcrPrjElem, pexprAggFunc);
			pdrgpexprOtherPrEl->Append(pexprNewPrjElem);
		}
	}
	CRefCount::SafeRelease(colref_array);
	CRefCount::SafeRelease(pdrgpexprSortClauseArray);
	*ppdrgpexprOrderedAggsPrEl = pdrgpexprOrderedAggsPrEl;
	*ppdrgpexprOtherPrEl = pdrgpexprOtherPrEl;
}


//---------------------------------------------------------------------------
//	@function:
//		COrderedAggPreprocessor::SplitOrderedAggsPrj
//
//	@doc:
//		Split InputAgg expression into:
//		- A GbAgg expression containing ordered aggs split to gp_percentile agg, and
//		- A GbAgg expression containing all remaining non-ordered agg functions
//
//---------------------------------------------------------------------------
void
COrderedAggPreprocessor::SplitOrderedAggsPrj(
	CMemoryPool *mp, CExpression *pexprInputAggPrj,
	CExpression **
		ppexprOrderedGbAgg,	 // output: GbAgg expression containing ordered aggs split to normal agg
	CExpression **
		ppexprRemainingAgg	// output: GbAgg expression containing all remaining agg functions
)
{
	GPOS_ASSERT(nullptr != pexprInputAggPrj);
	GPOS_ASSERT(nullptr != ppexprOrderedGbAgg);
	GPOS_ASSERT(nullptr != ppexprRemainingAgg);

	// split project elements between ordered Aggs list, and Other aggs list
	CExpressionArray *ppdrgpexprOrderedAggsPrEl = nullptr;
	CExpressionArray *pdrgpexprOtherPrEl = nullptr;
	SplitPrjList(mp, pexprInputAggPrj, &ppdrgpexprOrderedAggsPrEl,
				 &pdrgpexprOtherPrEl);

	CExpression *pexprInputAggPrjChild = (*pexprInputAggPrj)[0];

	CExpression *pexpr = (*ppdrgpexprOrderedAggsPrEl)[0];
	pexpr->AddRef();
	// in case of multiple ordered aggs, we need to expand the GbAgg expression
	// into a join expression where leaves carry split ordered aggs
	for (ULONG ul = 1; ul < ppdrgpexprOrderedAggsPrEl->Size(); ul++)
	{
		CExpression *pexprWindowConsumer = (*ppdrgpexprOrderedAggsPrEl)[ul];
		CExpression *pexprJoinCondition =
			CUtils::PexprScalarConstBool(mp, true /*value*/);

		// create a join between expanded DQAs and Window expressions
		pexprWindowConsumer->AddRef();
		CExpression *pexprJoin = CUtils::PexprLogicalJoin<CLogicalInnerJoin>(
			mp, pexpr, pexprWindowConsumer, pexprJoinCondition);
		pexpr = pexprJoin;
	}

	*ppexprOrderedGbAgg = pexpr;
	if (0 == pdrgpexprOtherPrEl->Size())
	{
		// no remaining aggregate functions after excluding ordered aggs,
		// reuse the original InputAggPrj child in this case
		pdrgpexprOtherPrEl->Release();
		ppdrgpexprOrderedAggsPrEl->Release();
		*ppexprRemainingAgg = pexprInputAggPrjChild;

		return;
	}

	// create a new GbAgg expression for remaining aggregate functions
	CColRefArray *pdrgpcrGrpCols = nullptr;
	pdrgpcrGrpCols = GPOS_NEW(mp) CColRefArray(mp);
	pexprInputAggPrjChild->AddRef();
	*ppexprRemainingAgg = GPOS_NEW(mp) CExpression(
		mp,
		GPOS_NEW(mp)
			CLogicalGbAgg(mp, pdrgpcrGrpCols, COperator::EgbaggtypeGlobal),
		pexprInputAggPrjChild,
		GPOS_NEW(mp) CExpression(mp, GPOS_NEW(mp) CScalarProjectList(mp),
								 pdrgpexprOtherPrEl));
	ppdrgpexprOrderedAggsPrEl->Release();
}


//---------------------------------------------------------------------------
//	@function:
//		COrderedAggPreprocessor::CreateCTE
//
//	@doc:
//		Create a CTE with two consumers using the child expression of
//		Sequence Project
//
//---------------------------------------------------------------------------
void
COrderedAggPreprocessor::CreateCTE(CMemoryPool *mp,
								   CExpression *pexprInputAggPrj,
								   CExpression **ppexprFirstConsumer,
								   CExpression **ppexprSecondConsumer)
{
	GPOS_ASSERT(nullptr != pexprInputAggPrj);
	GPOS_ASSERT(COperator::EopLogicalGbAgg == pexprInputAggPrj->Pop()->Eopid());
	GPOS_ASSERT(nullptr != ppexprFirstConsumer);
	GPOS_ASSERT(nullptr != ppexprSecondConsumer);

	CExpression *pexprChild = (*pexprInputAggPrj)[0];
	CColRefSet *pcrsChildOutput = pexprChild->DeriveOutputColumns();
	CColRefArray *pdrgpcrChildOutput = pcrsChildOutput->Pdrgpcr(mp);

	// create a CTE producer based on SeqPrj child expression
	CCTEInfo *pcteinfo = COptCtxt::PoctxtFromTLS()->Pcteinfo();
	const ULONG ulCTEId = pcteinfo->next_id();
	CExpression *pexprCTEProd = CXformUtils::PexprAddCTEProducer(
		mp, ulCTEId, pdrgpcrChildOutput, pexprChild);
	CColRefArray *pdrgpcrProducerOutput =
		pexprCTEProd->DeriveOutputColumns()->Pdrgpcr(mp);

	//	 first consumer creates new output columns to be used later as input to GbAgg expression
	*ppexprFirstConsumer = GPOS_NEW(mp) CExpression(
		mp, GPOS_NEW(mp) CLogicalCTEConsumer(
				mp, ulCTEId, CUtils::PdrgpcrCopy(mp, pdrgpcrProducerOutput)));
	pcteinfo->IncrementConsumers(ulCTEId);
	pdrgpcrProducerOutput->Release();

	// second consumer reuses the same output columns of SeqPrj child to be able to provide any requested columns upstream
	*ppexprSecondConsumer = GPOS_NEW(mp) CExpression(
		mp, GPOS_NEW(mp) CLogicalCTEConsumer(mp, ulCTEId, pdrgpcrChildOutput));
	pcteinfo->IncrementConsumers(ulCTEId);
}


//---------------------------------------------------------------------------
//    @function:
//        COrderedAggPreprocessor::PexprFinalAgg
//
//    @doc:
//        Return expression for final agg function
//        Input:
//          expression for percentile_cont(direct_arg) WITHIN GROUP(ORDER BY col)
//          total_count
//        Returns:
//          expression for gp_percentile_cont(col, direct_arg, total_count, peer_count)
//
//---------------------------------------------------------------------------
CExpression *
COrderedAggPreprocessor::PexprFinalAgg(CMemoryPool *mp,
									   CExpression *pexprAggFunc,
									   CColRef *arg_col_ref,
									   CColRef *total_count_colref)
{
	CScalarAggFunc *popScAggFunc =
		CScalarAggFunc::PopConvert(pexprAggFunc->Pop());
	CExpressionArray *pdrgpexpr = GPOS_NEW(mp) CExpressionArray(mp);
	CMDIdGPDB *mdid = CUtils::GetPercentileAggMDId(mp, pexprAggFunc);
	ULongPtrArray *argtypes = GPOS_NEW(mp) ULongPtrArray(mp);

	CScalarValuesList *popScalarValuesList = GPOS_NEW(mp) CScalarValuesList(mp);
	CExpressionArray *pdrgpexprChildren = GPOS_NEW(mp) CExpressionArray(mp);

	CExpression *pexprAggFuncArg =
		(*(*pexprAggFunc)[EAggfuncChildIndices::EaggfuncIndexArgs])[0];
	pexprAggFuncArg->AddRef();
	pdrgpexprChildren->Append(pexprAggFuncArg);
	argtypes->Append(GPOS_NEW(mp) ULONG(
		CMDIdGPDB::CastMdid(
			CScalar::PopConvert(pexprAggFuncArg->Pop())->MdidType())
			->Oid()));

	CExpression *pexprAggFuncDirectArg =
		(*(*pexprAggFunc)[EAggfuncChildIndices::EaggfuncIndexDirectArgs])[0];
	pexprAggFuncDirectArg->AddRef();
	pdrgpexprChildren->Append(pexprAggFuncDirectArg);
	argtypes->Append(GPOS_NEW(mp) ULONG(
		CMDIdGPDB::CastMdid(
			CScalar::PopConvert(pexprAggFuncDirectArg->Pop())->MdidType())
			->Oid()));
	pdrgpexprChildren->Append(CUtils::PexprScalarIdent(mp, total_count_colref));
	argtypes->Append(GPOS_NEW(mp) ULONG(
		CMDIdGPDB::CastMdid(total_count_colref->RetrieveType()->MDId())
			->Oid()));
	// Currently passing dummy peer_count as 1 always. Will pass along the
	// calculated peer_count col_ref once we dedup data for handling skew
	CExpression *pexprDummyPeerCount =
		CUtils::PexprScalarConstInt8(mp, 1 /*val*/);
	pdrgpexprChildren->Append(pexprDummyPeerCount);
	argtypes->Append(GPOS_NEW(mp) ULONG(
		CMDIdGPDB::CastMdid(
			CScalar::PopConvert(pexprDummyPeerCount->Pop())->MdidType())
			->Oid()));

	pdrgpexpr->Append(
		GPOS_NEW(mp) CExpression(mp, popScalarValuesList, pdrgpexprChildren));

	pdrgpexpr->Append(GPOS_NEW(mp)
						  CExpression(mp, GPOS_NEW(mp) CScalarValuesList(mp),
									  GPOS_NEW(mp) CExpressionArray(mp)));

	pdrgpexpr->Append(GPOS_NEW(mp)
						  CExpression(mp, GPOS_NEW(mp) CScalarValuesList(mp),
									  GPOS_NEW(mp) CExpressionArray(mp)));

	pdrgpexpr->Append(GPOS_NEW(mp)
						  CExpression(mp, GPOS_NEW(mp) CScalarValuesList(mp),
									  GPOS_NEW(mp) CExpressionArray(mp)));

	IMDId *ret_type = nullptr;
	if (popScAggFunc->FHasAmbiguousReturnType())
	{
		popScAggFunc->MdidType()->AddRef();
		ret_type = popScAggFunc->MdidType();
	}
	CScalarAggFunc *popNewAggFunc = CUtils::PopAggFunc(
		mp, mdid, arg_col_ref->Name().Pstr(), false /*is_distinct*/,
		EaggfuncstageGlobal /*eaggfuncstage*/, false /*fSplit*/, ret_type,
		EaggfunckindNormal, argtypes);

	return GPOS_NEW(mp) CExpression(mp, popNewAggFunc, pdrgpexpr);
}


//---------------------------------------------------------------------------
//	@function:
//		COrderedAggPreprocessor::PexprInputAggPrj2Join
//
//	@doc:
//		Transform GbAgg project expression with ordered aggregates
//		into an inner join expression,
//			- the outer child of the join is a GbAgg expression that computes
//			gp_percentile aggs,
//			- the inner child of the join is a GbAgg expression that computes
//			remaining aggregate functions
//
//		we use a CTE to compute the input to both join children, while maintaining
//		all column references upstream by reusing the same computed columns in the
//		original input expression
//
//---------------------------------------------------------------------------
CExpression *
COrderedAggPreprocessor::PexprInputAggPrj2Join(CMemoryPool *mp,
											   CExpression *pexprInputAggPrj)
{
	GPOS_ASSERT(nullptr != pexprInputAggPrj);
	GPOS_ASSERT(COperator::EopLogicalGbAgg == pexprInputAggPrj->Pop()->Eopid());
	GPOS_ASSERT(0 < (*pexprInputAggPrj)[1]->DeriveTotalOrderedAggs());

	// split InputAgg expression into a GbAgg expression (for ordered Aggs), and
	// another GbAgg expression (for remaining aggregate functions)
	CExpression *pexprOrderedAgg = nullptr;
	CExpression *pexprOtherAgg = nullptr;
	SplitOrderedAggsPrj(mp, pexprInputAggPrj, &pexprOrderedAgg, &pexprOtherAgg);

	CExpression *pexprFinalJoin = nullptr;
	if (COperator::EopLogicalGbAgg == pexprOtherAgg->Pop()->Eopid())
	{
		CExpression *pexprJoinCondition =
			CUtils::PexprScalarConstBool(mp, true /*value*/);
		pexprFinalJoin = CUtils::PexprLogicalJoin<CLogicalInnerJoin>(
			mp, pexprOtherAgg, pexprOrderedAgg, pexprJoinCondition);
	}
	else
	{
		pexprFinalJoin = pexprOrderedAgg;
	}
	ULONG arity = pexprOrderedAgg->Arity();
	BOOL has_nlj_ontop =
		(COperator::EopLogicalInnerJoin == pexprOrderedAgg->Pop()->Eopid());
	CExpression *pexprTopmostCTE = pexprOrderedAgg;
	while (COperator::EopLogicalGbAgg != pexprTopmostCTE->Pop()->Eopid())
	{
		pexprTopmostCTE = (*pexprTopmostCTE)[0];
	}
	ULONG ulCTEIdStart = CLogicalCTEConsumer::PopConvert(
							 (*(*(*(*pexprTopmostCTE)[0])[0])[1])[0]->Pop())
							 ->UlCTEId();
	ULONG ulCTEIdEnd = ulCTEIdStart;
	if (has_nlj_ontop)
	{
		ulCTEIdEnd =
			CLogicalCTEConsumer::PopConvert(
				(*(*(*(*(*pexprOrderedAgg)[arity - 2])[0])[0])[1])[0]->Pop())
				->UlCTEId();
	}
	CExpression *pexpResult = pexprFinalJoin;
	for (ULONG ul = ulCTEIdEnd; ul > ulCTEIdStart; ul--)
	{
		CExpression *ctenext = GPOS_NEW(mp)
			CExpression(mp, GPOS_NEW(mp) CLogicalCTEAnchor(mp, ul), pexpResult);
		pexpResult = ctenext;
	}
	pexpResult = GPOS_NEW(mp) CExpression(
		mp, GPOS_NEW(mp) CLogicalCTEAnchor(mp, ulCTEIdStart), pexpResult);
	return pexpResult;
}


//---------------------------------------------------------------------------
//	@function:
//		COrderedAggPreprocessor::PexprPreprocess
//
//	@doc:
//		Main driver
//
//---------------------------------------------------------------------------
CExpression *
COrderedAggPreprocessor::PexprPreprocess(CMemoryPool *mp, CExpression *pexpr)
{
	// protect against stack overflow during recursion
	GPOS_CHECK_STACK_SIZE;
	GPOS_ASSERT(nullptr != mp);
	GPOS_ASSERT(nullptr != pexpr);

	COperator *pop = pexpr->Pop();
	if (COperator::EopLogicalGbAgg == pop->Eopid() &&
		0 == CLogicalGbAgg::PopConvert(pop)->Pdrgpcr()->Size() &&
		0 < (*pexpr)[1]->DeriveTotalOrderedAggs())
	{
		CExpression *pexprJoin = PexprInputAggPrj2Join(mp, pexpr);

		// recursively process the resulting expression
		CExpression *pexprResult = PexprPreprocess(mp, pexprJoin);
		pexprJoin->Release();

		return pexprResult;
	}

	// recursively process child expressions
	const ULONG arity = pexpr->Arity();
	CExpressionArray *pdrgpexprChildren = GPOS_NEW(mp) CExpressionArray(mp);
	for (ULONG ul = 0; ul < arity; ul++)
	{
		CExpression *pexprChild = PexprPreprocess(mp, (*pexpr)[ul]);
		pdrgpexprChildren->Append(pexprChild);
	}

	pop->AddRef();
	return GPOS_NEW(mp) CExpression(mp, pop, pdrgpexprChildren);
}

// EOF

相关信息

greenplumn 源码目录

相关文章

greenplumn CExpression 源码

greenplumn CExpressionFactorizer 源码

greenplumn CExpressionHandle 源码

greenplumn CExpressionPreprocessor 源码

greenplumn CExpressionUtils 源码

greenplumn CHashedDistributions 源码

greenplumn CLogical 源码

greenplumn CLogicalApply 源码

greenplumn CLogicalAssert 源码

greenplumn CLogicalBitmapTableGet 源码

0  赞