tidb plan_cost 源码

  • 2022-09-19
  • 浏览 (413)

tidb plan_cost 代码

文件路径:/planner/core/plan_cost.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
	"math"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/planner/property"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/statistics"
	"github.com/pingcap/tidb/util/paging"
)

const (
	// CostFlagRecalculate indicates the optimizer to ignore cached cost and recalculate it again.
	CostFlagRecalculate uint64 = 1 << iota

	// CostFlagUseTrueCardinality indicates the optimizer to use true cardinality to calculate the cost.
	CostFlagUseTrueCardinality
)

const (
	modelVer1 = 1
	modelVer2 = 2
)

func hasCostFlag(costFlag, flag uint64) bool {
	return (costFlag & flag) > 0
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *basePhysicalPlan) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		// just calculate the cost once and always reuse it
		return p.planCost, nil
	}
	p.planCost = 0 // the default implementation, the operator have no cost
	for _, child := range p.children {
		childCost, err := child.GetPlanCost(taskType, option)
		if err != nil {
			return 0, err
		}
		p.planCost += childCost
	}
	p.planCostInit = true
	return p.planCost, nil
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalSelection) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}

	var selfCost float64
	switch p.ctx.GetSessionVars().CostModelVersion {
	case modelVer1: // selection cost: rows * cpu-factor
		var cpuFactor float64
		switch taskType {
		case property.RootTaskType, property.MppTaskType:
			cpuFactor = p.ctx.GetSessionVars().GetCPUFactor()
		case property.CopSingleReadTaskType, property.CopDoubleReadTaskType:
			cpuFactor = p.ctx.GetSessionVars().GetCopCPUFactor()
		default:
			return 0, errors.Errorf("unknown task type %v", taskType)
		}
		selfCost = getCardinality(p.children[0], costFlag) * cpuFactor
		if p.fromDataSource {
			selfCost = 0 // for compatibility, see https://github.com/pingcap/tidb/issues/36243
		}
	case modelVer2: // selection cost: rows * num-filters * cpu-factor
		var cpuFactor float64
		switch taskType {
		case property.RootTaskType:
			cpuFactor = p.ctx.GetSessionVars().GetCPUFactor()
		case property.MppTaskType: // use a dedicated cpu-factor for TiFlash
			cpuFactor = p.ctx.GetSessionVars().GetTiFlashCPUFactor()
		case property.CopSingleReadTaskType, property.CopDoubleReadTaskType:
			cpuFactor = p.ctx.GetSessionVars().GetCopCPUFactor()
		default:
			return 0, errors.Errorf("unknown task type %v", taskType)
		}
		selfCost = getCardinality(p.children[0], costFlag) * float64(len(p.Conditions)) * cpuFactor
	}

	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost + selfCost
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of projection operator itself.
func (p *PhysicalProjection) GetCost(count float64) float64 {
	sessVars := p.ctx.GetSessionVars()
	cpuCost := count * sessVars.GetCPUFactor()
	concurrency := float64(sessVars.ProjectionConcurrency())
	if concurrency <= 0 {
		return cpuCost
	}
	cpuCost /= concurrency
	concurrencyCost := (1 + concurrency) * sessVars.GetConcurrencyFactor()
	return cpuCost + concurrencyCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalProjection) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	p.planCost += p.GetCost(getCardinality(p, costFlag)) // projection cost
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes cost of index lookup operator itself.
func (p *PhysicalIndexLookUpReader) GetCost(costFlag uint64) (cost float64) {
	indexPlan, tablePlan := p.indexPlan, p.tablePlan
	ctx := p.ctx
	sessVars := ctx.GetSessionVars()
	// Add cost of building table reader executors. Handles are extracted in batch style,
	// each handle is a range, the CPU cost of building copTasks should be:
	// (indexRows / batchSize) * batchSize * CPUFactor
	// Since we don't know the number of copTasks built, ignore these network cost now.
	indexRows := getCardinality(indexPlan, costFlag)
	idxCst := indexRows * sessVars.GetCPUFactor()
	// if the expectCnt is below the paging threshold, using paging API, recalculate idxCst.
	// paging API reduces the count of index and table rows, however introduces more seek cost.
	if ctx.GetSessionVars().EnablePaging && p.expectedCnt > 0 && p.expectedCnt <= paging.Threshold {
		p.Paging = true
		pagingCst := calcPagingCost(ctx, p.indexPlan, p.expectedCnt)
		// prevent enlarging the cost because we take paging as a better plan,
		// if the cost is enlarged, it'll be easier to go another plan.
		idxCst = math.Min(idxCst, pagingCst)
	}
	cost += idxCst
	// Add cost of worker goroutines in index lookup.
	numTblWorkers := float64(sessVars.IndexLookupConcurrency())
	cost += (numTblWorkers + 1) * sessVars.GetConcurrencyFactor()
	// When building table reader executor for each batch, we would sort the handles. CPU
	// cost of sort is:
	// CPUFactor * batchSize * Log2(batchSize) * (indexRows / batchSize)
	indexLookupSize := float64(sessVars.IndexLookupSize)
	batchSize := math.Min(indexLookupSize, indexRows)
	if batchSize > 2 {
		sortCPUCost := (indexRows * math.Log2(batchSize) * sessVars.GetCPUFactor()) / numTblWorkers
		cost += sortCPUCost
	}
	// Also, we need to sort the retrieved rows if index lookup reader is expected to return
	// ordered results. Note that row count of these two sorts can be different, if there are
	// operators above table scan.
	tableRows := getCardinality(tablePlan, costFlag)
	selectivity := tableRows / indexRows
	batchSize = math.Min(indexLookupSize*selectivity, tableRows)
	if p.keepOrder && batchSize > 2 {
		sortCPUCost := (tableRows * math.Log2(batchSize) * sessVars.GetCPUFactor()) / numTblWorkers
		cost += sortCPUCost
	}
	return
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexLookUpReader) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = 0
	// child's cost
	for _, child := range []PhysicalPlan{p.indexPlan, p.tablePlan} {
		childCost, err := child.GetPlanCost(property.CopDoubleReadTaskType, option)
		if err != nil {
			return 0, err
		}
		p.planCost += childCost
	}

	// to keep compatible with the previous cost implementation, re-calculate table-scan cost by using index stats-count again (see copTask.finishIndexPlan).
	// TODO: amend table-side cost here later
	var tmp PhysicalPlan
	for tmp = p.tablePlan; len(tmp.Children()) > 0; tmp = tmp.Children()[0] {
	}
	ts := tmp.(*PhysicalTableScan)
	if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
		tblCost, err := ts.GetPlanCost(property.CopDoubleReadTaskType, option)
		if err != nil {
			return 0, err
		}
		p.planCost -= tblCost
		p.planCost += getCardinality(p.indexPlan, costFlag) * ts.getScanRowSize() * p.SCtx().GetSessionVars().GetScanFactor(ts.Table)
	}

	// index-side net I/O cost: rows * row-size * net-factor
	netFactor := getTableNetFactor(p.tablePlan)
	rowSize := getTblStats(p.indexPlan).GetAvgRowSize(p.ctx, p.indexPlan.Schema().Columns, true, false)
	p.planCost += getCardinality(p.indexPlan, costFlag) * rowSize * netFactor

	// index-side net seek cost
	p.planCost += estimateNetSeekCost(p.indexPlan)

	// table-side net I/O cost: rows * row-size * net-factor
	tblRowSize := getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
	p.planCost += getCardinality(p.tablePlan, costFlag) * tblRowSize * netFactor

	// table-side seek cost
	p.planCost += estimateNetSeekCost(p.tablePlan)

	// double read cost
	p.planCost += p.estDoubleReadCost(ts.Table, costFlag)

	// consider concurrency
	p.planCost /= float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())

	// lookup-cpu-cost in TiDB
	p.planCost += p.GetCost(costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

func (p *PhysicalIndexLookUpReader) estDoubleReadCost(tbl *model.TableInfo, costFlag uint64) float64 {
	if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
		// only consider double-read cost on modelVer2
		return 0
	}
	// estimate the double-read cost: (numDoubleReadTasks * seekFactor) / concurrency
	doubleReadRows := getCardinality(p.indexPlan, costFlag)
	batchSize := float64(p.ctx.GetSessionVars().IndexLookupSize)
	concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupConcurrency()))
	seekFactor := p.ctx.GetSessionVars().GetSeekFactor(tbl)
	// distRatio indicates how many requests corresponding to a batch, current value is from experiments.
	// TODO: estimate it by using index correlation or make it configurable.
	distRatio := 40.0
	numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio // use Float64 instead of Int like `Ceil(...)` to make the cost continuous.
	return (numDoubleReadTasks * seekFactor) / concurrency
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexReader) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	var rowCount, rowSize, netFactor, indexPlanCost, netSeekCost float64
	sqlScanConcurrency := p.ctx.GetSessionVars().DistSQLScanConcurrency()
	// child's cost
	childCost, err := p.indexPlan.GetPlanCost(property.CopSingleReadTaskType, option)
	if err != nil {
		return 0, err
	}
	indexPlanCost = childCost
	p.planCost = indexPlanCost
	// net I/O cost: rows * row-size * net-factor
	tblStats := getTblStats(p.indexPlan)
	rowSize = tblStats.GetAvgRowSize(p.ctx, p.indexPlan.Schema().Columns, true, false)
	rowCount = getCardinality(p.indexPlan, costFlag)
	netFactor = getTableNetFactor(p.indexPlan)
	p.planCost += rowCount * rowSize * netFactor
	// net seek cost
	netSeekCost = estimateNetSeekCost(p.indexPlan)
	p.planCost += netSeekCost
	// consider concurrency
	p.planCost /= float64(sqlScanConcurrency)

	if option.tracer != nil {
		setPhysicalIndexReaderCostDetail(p, option.tracer, rowCount, rowSize, netFactor, netSeekCost, indexPlanCost, sqlScanConcurrency)
	}
	p.planCostInit = true
	return p.planCost, nil
}

// GetNetDataSize calculates the cost of the plan in network data transfer.
func (p *PhysicalIndexReader) GetNetDataSize() float64 {
	tblStats := getTblStats(p.indexPlan)
	rowSize := tblStats.GetAvgRowSize(p.ctx, p.indexPlan.Schema().Columns, true, false)
	return p.indexPlan.StatsCount() * rowSize
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalTableReader) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = 0
	netFactor := getTableNetFactor(p.tablePlan)
	var rowCount, rowSize, netSeekCost, tableCost float64
	sqlScanConcurrency := p.ctx.GetSessionVars().DistSQLScanConcurrency()
	storeType := p.StoreType
	switch storeType {
	case kv.TiKV:
		// child's cost
		childCost, err := p.tablePlan.GetPlanCost(property.CopSingleReadTaskType, option)
		if err != nil {
			return 0, err
		}
		tableCost = childCost
		p.planCost = childCost
		// net I/O cost: rows * row-size * net-factor
		rowSize = getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
		rowCount = getCardinality(p.tablePlan, costFlag)
		p.planCost += rowCount * rowSize * netFactor
		// net seek cost
		netSeekCost = estimateNetSeekCost(p.tablePlan)
		p.planCost += netSeekCost
		// consider concurrency
		p.planCost /= float64(sqlScanConcurrency)
	case kv.TiFlash:
		var concurrency, rowSize, seekCost float64
		_, isMPP := p.tablePlan.(*PhysicalExchangeSender)
		if isMPP {
			// mpp protocol
			concurrency = p.ctx.GetSessionVars().CopTiFlashConcurrencyFactor
			rowSize = collectRowSizeFromMPPPlan(p.tablePlan)
			seekCost = accumulateNetSeekCost4MPP(p.tablePlan)
			childCost, err := p.tablePlan.GetPlanCost(property.MppTaskType, option)
			if err != nil {
				return 0, err
			}
			p.planCost = childCost
		} else {
			// cop protocol
			concurrency = float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
			rowSize = getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
			seekCost = estimateNetSeekCost(p.tablePlan)
			tType := property.MppTaskType
			if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
				// regard the underlying tasks as cop-task on modelVer1 for compatibility
				tType = property.CopSingleReadTaskType
			}
			childCost, err := p.tablePlan.GetPlanCost(tType, option)
			if err != nil {
				return 0, err
			}
			p.planCost = childCost
		}

		// net I/O cost
		p.planCost += getCardinality(p.tablePlan, costFlag) * rowSize * netFactor
		// net seek cost
		p.planCost += seekCost
		// consider concurrency
		p.planCost /= concurrency
		// consider tidb_enforce_mpp
		if isMPP && p.ctx.GetSessionVars().IsMPPEnforced() &&
			!hasCostFlag(costFlag, CostFlagRecalculate) { // show the real cost in explain-statements
			p.planCost /= 1000000000
		}
	}
	if option.tracer != nil {
		setPhysicalTableReaderCostDetail(p, option.tracer,
			rowCount, rowSize, netFactor, netSeekCost, tableCost,
			sqlScanConcurrency, storeType)
	}
	p.planCostInit = true
	return p.planCost, nil
}

// GetNetDataSize calculates the estimated total data size fetched from storage.
func (p *PhysicalTableReader) GetNetDataSize() float64 {
	rowSize := getTblStats(p.tablePlan).GetAvgRowSize(p.ctx, p.tablePlan.Schema().Columns, false, false)
	return p.tablePlan.StatsCount() * rowSize
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexMergeReader) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = 0
	if tblScan := p.tablePlan; tblScan != nil {
		childCost, err := tblScan.GetPlanCost(property.CopSingleReadTaskType, option)
		if err != nil {
			return 0, err
		}
		netFactor := getTableNetFactor(tblScan)
		p.planCost += childCost // child's cost
		tblStats := getTblStats(tblScan)
		rowSize := tblStats.GetAvgRowSize(p.ctx, tblScan.Schema().Columns, false, false)
		p.planCost += getCardinality(tblScan, costFlag) * rowSize * netFactor // net I/O cost
	}
	for _, partialScan := range p.partialPlans {
		childCost, err := partialScan.GetPlanCost(property.CopSingleReadTaskType, option)
		if err != nil {
			return 0, err
		}
		var isIdxScan bool
		for p := partialScan; ; p = p.Children()[0] {
			_, isIdxScan = p.(*PhysicalIndexScan)
			if len(p.Children()) == 0 {
				break
			}
		}

		netFactor := getTableNetFactor(partialScan)
		p.planCost += childCost // child's cost
		tblStats := getTblStats(partialScan)
		rowSize := tblStats.GetAvgRowSize(p.ctx, partialScan.Schema().Columns, isIdxScan, false)
		p.planCost += getCardinality(partialScan, costFlag) * rowSize * netFactor // net I/O cost
	}

	// TODO: accumulate table-side seek cost

	// consider concurrency
	copIterWorkers := float64(p.ctx.GetSessionVars().DistSQLScanConcurrency())
	p.planCost /= copIterWorkers
	p.planCostInit = true
	return p.planCost, nil
}

// GetPartialReaderNetDataSize returns the estimated total response data size of a partial read.
func (p *PhysicalIndexMergeReader) GetPartialReaderNetDataSize(plan PhysicalPlan) float64 {
	_, isIdxScan := plan.(*PhysicalIndexScan)
	return plan.StatsCount() * getTblStats(plan).GetAvgRowSize(p.ctx, plan.Schema().Columns, isIdxScan, false)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalTableScan) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}

	var selfCost float64
	var rowCount, rowSize, scanFactor float64
	costModelVersion := p.ctx.GetSessionVars().CostModelVersion
	switch costModelVersion {
	case modelVer1: // scan cost: rows * row-size * scan-factor
		scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
		if p.Desc && p.prop != nil && p.prop.ExpectedCnt >= smallScanThreshold {
			scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
		}
		rowCount = getCardinality(p, costFlag)
		rowSize = p.getScanRowSize()
		selfCost = rowCount * rowSize * scanFactor
	case modelVer2: // scan cost: rows * log2(row-size) * scan-factor
		switch taskType {
		case property.MppTaskType: // use a dedicated scan-factor for TiFlash
			// no need to distinguish `Scan` and `DescScan` for TiFlash for now
			scanFactor = p.ctx.GetSessionVars().GetTiFlashScanFactor()
		default: // for TiKV
			scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
			if p.Desc {
				scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
			}
		}
		// the formula `log(rowSize)` is based on experiment results
		rowSize = math.Max(p.getScanRowSize(), 2.0) // to guarantee logRowSize >= 1
		logRowSize := math.Log2(rowSize)
		rowCount = getCardinality(p, costFlag)
		selfCost = rowCount * logRowSize * scanFactor

		// give TiFlash a start-up cost to let the optimizer prefers to use TiKV to process small table scans.
		if p.StoreType == kv.TiFlash {
			selfCost += 2000 * logRowSize * scanFactor
		}
	}
	if option.tracer != nil {
		setPhysicalTableOrIndexScanCostDetail(p, option.tracer, rowCount, rowSize, scanFactor, costModelVersion)
	}
	p.planCost = selfCost
	p.planCostInit = true
	return p.planCost, nil
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexScan) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}

	var selfCost float64
	var rowCount, rowSize, scanFactor float64
	costModelVersion := p.ctx.GetSessionVars().CostModelVersion
	switch costModelVersion {
	case modelVer1: // scan cost: rows * row-size * scan-factor
		scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
		if p.Desc && p.prop != nil && p.prop.ExpectedCnt >= smallScanThreshold {
			scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
		}
		rowCount = getCardinality(p, costFlag)
		rowSize = p.getScanRowSize()
		selfCost = rowCount * rowSize * scanFactor
	case modelVer2:
		scanFactor = p.ctx.GetSessionVars().GetScanFactor(p.Table)
		if p.Desc {
			scanFactor = p.ctx.GetSessionVars().GetDescScanFactor(p.Table)
		}
		rowCount = getCardinality(p, costFlag)
		rowSize = math.Max(p.getScanRowSize(), 2.0)
		logRowSize := math.Log2(rowSize)
		selfCost = rowCount * logRowSize * scanFactor
	}
	if option.tracer != nil {
		setPhysicalTableOrIndexScanCostDetail(p, option.tracer, rowCount, rowSize, scanFactor, costModelVersion)
	}
	p.planCost = selfCost
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of index join operator and its children.
func (p *PhysicalIndexJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	// Add the cost of evaluating outer filter, since inner filter of index join
	// is always empty, we can simply tell whether outer filter is empty using the
	// summed length of left/right conditions.
	if len(p.LeftConditions)+len(p.RightConditions) > 0 {
		cpuCost += sessVars.GetCPUFactor() * outerCnt
		outerCnt *= SelectionFactor
	}
	// Cost of extracting lookup keys.
	innerCPUCost := sessVars.GetCPUFactor() * outerCnt
	// Cost of sorting and removing duplicate lookup keys:
	// (outerCnt / batchSize) * (batchSize * Log2(batchSize) + batchSize) * CPUFactor
	batchSize := math.Min(float64(p.ctx.GetSessionVars().IndexJoinBatchSize), outerCnt)
	if batchSize > 2 {
		innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * sessVars.GetCPUFactor()
	}
	// Add cost of building inner executors. CPU cost of building copTasks:
	// (outerCnt / batchSize) * (batchSize * distinctFactor) * CPUFactor
	// Since we don't know the number of copTasks built, ignore these network cost now.
	innerCPUCost += outerCnt * distinctFactor * sessVars.GetCPUFactor()
	// CPU cost of building hash table for inner results:
	// (outerCnt / batchSize) * (batchSize * distinctFactor) * innerCnt * CPUFactor
	innerCPUCost += outerCnt * distinctFactor * innerCnt * sessVars.GetCPUFactor()
	innerConcurrency := float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency())
	cpuCost += innerCPUCost / innerConcurrency
	// Cost of probing hash table in main thread.
	numPairs := outerCnt * innerCnt
	if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
		p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
		if len(p.OtherConditions) > 0 {
			numPairs *= 0.5
		} else {
			numPairs = 0
		}
	}
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		numPairs = getOperatorActRows(p)
	}
	probeCost := numPairs * sessVars.GetCPUFactor()
	// Cost of additional concurrent goroutines.
	cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.GetConcurrencyFactor()
	// Memory cost of hash tables for inner rows. The computed result is the upper bound,
	// since the executor is pipelined and not all workers are always in full load.
	memoryCost := innerConcurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor()
	// Cost of inner child plan, i.e, mainly I/O and network cost.
	innerPlanCost := outerCnt * innerCost
	if p.ctx.GetSessionVars().CostModelVersion == 2 {
		// IndexJoin executes a batch of rows at a time, so the actual cost of this part should be
		//  `innerCostPerBatch * numberOfBatches` instead of `innerCostPerRow * numberOfOuterRow`.
		// Use an empirical value batchRatio to handle this now.
		// TODO: remove this empirical value.
		batchRatio := 30.0
		innerPlanCost /= batchRatio
	}
	return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

func (p *PhysicalIndexJoin) estDoubleReadCost(doubleReadRows float64) float64 {
	if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
		// only consider double-read cost on modelVer2
		return 0
	}
	// estimate the double read cost for IndexJoin: (double-read-tasks * seek-factor) / concurrency
	seekFactor := p.ctx.GetSessionVars().GetSeekFactor(nil)
	batchSize := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexJoinBatchSize))
	concurrency := math.Max(1.0, float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency()))
	// distRatio indicates how many requests corresponding to a batch, current value is from experiments.
	// TODO: estimate it by using index correlation or make it configurable.
	distRatio := 40.0
	numDoubleReadTasks := (doubleReadRows / batchSize) * distRatio
	return (numDoubleReadTasks * seekFactor) / concurrency
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexJoin) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	outerChild, innerChild := p.children[1-p.InnerChildIdx], p.children[p.InnerChildIdx]
	outerCost, err := outerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	innerCost, err := innerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	outerCnt := getCardinality(outerChild, costFlag)
	innerCnt := getCardinality(innerChild, costFlag)
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
		innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
		innerCost /= outerCnt
	}
	p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of index merge join operator and its children.
func (p *PhysicalIndexHashJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	// Add the cost of evaluating outer filter, since inner filter of index join
	// is always empty, we can simply tell whether outer filter is empty using the
	// summed length of left/right conditions.
	if len(p.LeftConditions)+len(p.RightConditions) > 0 {
		cpuCost += sessVars.GetCPUFactor() * outerCnt
		outerCnt *= SelectionFactor
	}
	// Cost of extracting lookup keys.
	innerCPUCost := sessVars.GetCPUFactor() * outerCnt
	// Cost of sorting and removing duplicate lookup keys:
	// (outerCnt / batchSize) * (batchSize * Log2(batchSize) + batchSize) * CPUFactor
	batchSize := math.Min(float64(sessVars.IndexJoinBatchSize), outerCnt)
	if batchSize > 2 {
		innerCPUCost += outerCnt * (math.Log2(batchSize) + 1) * sessVars.GetCPUFactor()
	}
	// Add cost of building inner executors. CPU cost of building copTasks:
	// (outerCnt / batchSize) * (batchSize * distinctFactor) * CPUFactor
	// Since we don't know the number of copTasks built, ignore these network cost now.
	innerCPUCost += outerCnt * distinctFactor * sessVars.GetCPUFactor()
	concurrency := float64(sessVars.IndexLookupJoinConcurrency())
	cpuCost += innerCPUCost / concurrency
	// CPU cost of building hash table for outer results concurrently.
	// (outerCnt / batchSize) * (batchSize * CPUFactor)
	outerCPUCost := outerCnt * sessVars.GetCPUFactor()
	cpuCost += outerCPUCost / concurrency
	// Cost of probing hash table concurrently.
	numPairs := outerCnt * innerCnt
	if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
		p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
		if len(p.OtherConditions) > 0 {
			numPairs *= 0.5
		} else {
			numPairs = 0
		}
	}
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		numPairs = getOperatorActRows(p)
	}
	// Inner workers do hash join in parallel, but they can only save ONE outer
	// batch results. So as the number of outer batch exceeds inner concurrency,
	// it would fall back to linear execution. In a word, the hash join only runs
	// in parallel for the first `innerConcurrency` number of inner tasks.
	var probeCost float64
	if outerCnt/batchSize >= concurrency {
		probeCost = (numPairs - batchSize*innerCnt*(concurrency-1)) * sessVars.GetCPUFactor()
	} else {
		probeCost = batchSize * innerCnt * sessVars.GetCPUFactor()
	}
	cpuCost += probeCost
	// Cost of additional concurrent goroutines.
	cpuCost += (concurrency + 1.0) * sessVars.GetConcurrencyFactor()
	// Memory cost of hash tables for outer rows. The computed result is the upper bound,
	// since the executor is pipelined and not all workers are always in full load.
	memoryCost := concurrency * (batchSize * distinctFactor) * innerCnt * sessVars.GetMemoryFactor()
	// Cost of inner child plan, i.e, mainly I/O and network cost.
	innerPlanCost := outerCnt * innerCost
	return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexHashJoin) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	outerChild, innerChild := p.children[1-p.InnerChildIdx], p.children[p.InnerChildIdx]
	outerCost, err := outerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	innerCost, err := innerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	outerCnt := getCardinality(outerChild, costFlag)
	innerCnt := getCardinality(innerChild, costFlag)
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
		innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
		innerCost /= outerCnt
	}
	p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of index merge join operator and its children.
func (p *PhysicalIndexMergeJoin) GetCost(outerCnt, innerCnt, outerCost, innerCost float64, costFlag uint64) float64 {
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	// Add the cost of evaluating outer filter, since inner filter of index join
	// is always empty, we can simply tell whether outer filter is empty using the
	// summed length of left/right conditions.
	if len(p.LeftConditions)+len(p.RightConditions) > 0 {
		cpuCost += sessVars.GetCPUFactor() * outerCnt
		outerCnt *= SelectionFactor
	}
	// Cost of extracting lookup keys.
	innerCPUCost := sessVars.GetCPUFactor() * outerCnt
	// Cost of sorting and removing duplicate lookup keys:
	// (outerCnt / batchSize) * (sortFactor + 1.0) * batchSize * cpuFactor
	// If `p.NeedOuterSort` is true, the sortFactor is batchSize * Log2(batchSize).
	// Otherwise, it's 0.
	batchSize := math.Min(float64(p.ctx.GetSessionVars().IndexJoinBatchSize), outerCnt)
	sortFactor := 0.0
	if p.NeedOuterSort {
		sortFactor = math.Log2(batchSize)
	}
	if batchSize > 2 {
		innerCPUCost += outerCnt * (sortFactor + 1.0) * sessVars.GetCPUFactor()
	}
	// Add cost of building inner executors. CPU cost of building copTasks:
	// (outerCnt / batchSize) * (batchSize * distinctFactor) * cpuFactor
	// Since we don't know the number of copTasks built, ignore these network cost now.
	innerCPUCost += outerCnt * distinctFactor * sessVars.GetCPUFactor()
	innerConcurrency := float64(p.ctx.GetSessionVars().IndexLookupJoinConcurrency())
	cpuCost += innerCPUCost / innerConcurrency
	// Cost of merge join in inner worker.
	numPairs := outerCnt * innerCnt
	if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
		p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
		if len(p.OtherConditions) > 0 {
			numPairs *= 0.5
		} else {
			numPairs = 0
		}
	}
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		numPairs = getOperatorActRows(p)
	}
	avgProbeCnt := numPairs / outerCnt
	var probeCost float64
	// Inner workers do merge join in parallel, but they can only save ONE outer batch
	// results. So as the number of outer batch exceeds inner concurrency, it would fall back to
	// linear execution. In a word, the merge join only run in parallel for the first
	// `innerConcurrency` number of inner tasks.
	if outerCnt/batchSize >= innerConcurrency {
		probeCost = (numPairs - batchSize*avgProbeCnt*(innerConcurrency-1)) * sessVars.GetCPUFactor()
	} else {
		probeCost = batchSize * avgProbeCnt * sessVars.GetCPUFactor()
	}
	cpuCost += probeCost + (innerConcurrency+1.0)*sessVars.GetConcurrencyFactor()

	// Index merge join save the join results in inner worker.
	// So the memory cost consider the results size for each batch.
	memoryCost := innerConcurrency * (batchSize * avgProbeCnt) * sessVars.GetMemoryFactor()

	innerPlanCost := outerCnt * innerCost
	return outerCost + innerPlanCost + cpuCost + memoryCost + p.estDoubleReadCost(outerCnt)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalIndexMergeJoin) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	outerChild, innerChild := p.children[1-p.InnerChildIdx], p.children[p.InnerChildIdx]
	outerCost, err := outerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	innerCost, err := innerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	outerCnt := getCardinality(outerChild, costFlag)
	innerCnt := getCardinality(innerChild, costFlag)
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) && outerCnt > 0 {
		innerCnt /= outerCnt // corresponding to one outer row when calculating IndexJoin costs
		innerCost /= outerCnt
	}
	p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost, costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of apply operator.
func (p *PhysicalApply) GetCost(lCount, rCount, lCost, rCost float64) float64 {
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	if len(p.LeftConditions) > 0 {
		cpuCost += lCount * sessVars.GetCPUFactor()
		lCount *= SelectionFactor
	}
	if len(p.RightConditions) > 0 {
		cpuCost += lCount * rCount * sessVars.GetCPUFactor()
		rCount *= SelectionFactor
	}
	if len(p.EqualConditions)+len(p.OtherConditions) > 0 {
		if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
			p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
			cpuCost += lCount * rCount * sessVars.GetCPUFactor() * 0.5
		} else {
			cpuCost += lCount * rCount * sessVars.GetCPUFactor()
		}
	}
	// Apply uses a NestedLoop method for execution.
	// For every row from the left(outer) side, it executes
	// the whole right(inner) plan tree. So the cost of apply
	// should be : apply cost + left cost + left count * right cost
	return cpuCost + lCost + lCount*rCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalApply) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	outerChild, innerChild := p.children[1-p.InnerChildIdx], p.children[p.InnerChildIdx]
	outerCost, err := outerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	innerCost, err := innerChild.GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	outerCnt := getCardinality(outerChild, costFlag)
	innerCnt := getCardinality(innerChild, costFlag)
	p.planCost = p.GetCost(outerCnt, innerCnt, outerCost, innerCost)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes cost of merge join operator itself.
func (p *PhysicalMergeJoin) GetCost(lCnt, rCnt float64, costFlag uint64) float64 {
	outerCnt := lCnt
	innerCnt := rCnt
	innerKeys := p.RightJoinKeys
	innerSchema := p.children[1].Schema()
	innerStats := p.children[1].statsInfo()
	if p.JoinType == RightOuterJoin {
		outerCnt = rCnt
		innerCnt = lCnt
		innerKeys = p.LeftJoinKeys
		innerSchema = p.children[0].Schema()
		innerStats = p.children[0].statsInfo()
	}
	helper := &fullJoinRowCountHelper{
		sctx:          p.SCtx(),
		cartesian:     false,
		leftProfile:   p.children[0].statsInfo(),
		rightProfile:  p.children[1].statsInfo(),
		leftJoinKeys:  p.LeftJoinKeys,
		rightJoinKeys: p.RightJoinKeys,
		leftSchema:    p.children[0].Schema(),
		rightSchema:   p.children[1].Schema(),
	}
	numPairs := helper.estimate()
	if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
		p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
		if len(p.OtherConditions) > 0 {
			numPairs *= 0.5
		} else {
			numPairs = 0
		}
	}
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		numPairs = getOperatorActRows(p)
	}
	sessVars := p.ctx.GetSessionVars()
	probeCost := numPairs * sessVars.GetCPUFactor()
	// Cost of evaluating outer filters.
	var cpuCost float64
	if len(p.LeftConditions)+len(p.RightConditions) > 0 {
		probeCost *= SelectionFactor
		cpuCost += outerCnt * sessVars.GetCPUFactor()
	}
	cpuCost += probeCost
	// For merge join, only one group of rows with same join key(not null) are cached,
	// we compute average memory cost using estimated group size.
	NDV, _ := getColsNDVWithMatchedLen(innerKeys, innerSchema, innerStats)
	memoryCost := (innerCnt / NDV) * sessVars.GetMemoryFactor()
	return cpuCost + memoryCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalMergeJoin) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = 0
	for _, child := range p.children {
		childCost, err := child.GetPlanCost(taskType, option)
		if err != nil {
			return 0, err
		}
		p.planCost += childCost
	}
	p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag), costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes cost of hash join operator itself.
func (p *PhysicalHashJoin) GetCost(lCnt, rCnt float64, isMPP bool, costFlag uint64, op *physicalOptimizeOp) float64 {
	buildCnt, probeCnt := lCnt, rCnt
	build := p.children[0]
	// Taking the right as the inner for right join or using the outer to build a hash table.
	if (p.InnerChildIdx == 1 && !p.UseOuterToBuild) || (p.InnerChildIdx == 0 && p.UseOuterToBuild) {
		buildCnt, probeCnt = rCnt, lCnt
		build = p.children[1]
	}
	sessVars := p.ctx.GetSessionVars()
	oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load()
	memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
	rowSize := getAvgRowSize(build.statsInfo(), build.Schema())
	spill := oomUseTmpStorage && memQuota > 0 && rowSize*buildCnt > float64(memQuota) && p.storeTp != kv.TiFlash
	// Cost of building hash table.
	cpuFactor := sessVars.GetCPUFactor()
	if isMPP && p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
		cpuFactor = sessVars.GetTiFlashCPUFactor() // use the dedicated TiFlash CPU Factor on modelVer2
	}
	diskFactor := sessVars.GetDiskFactor()
	memoryFactor := sessVars.GetMemoryFactor()
	concurrencyFactor := sessVars.GetConcurrencyFactor()

	cpuCost := buildCnt * cpuFactor
	memoryCost := buildCnt * memoryFactor
	diskCost := buildCnt * diskFactor * rowSize
	// Number of matched row pairs regarding the equal join conditions.
	helper := &fullJoinRowCountHelper{
		sctx:          p.SCtx(),
		cartesian:     false,
		leftProfile:   p.children[0].statsInfo(),
		rightProfile:  p.children[1].statsInfo(),
		leftJoinKeys:  p.LeftJoinKeys,
		rightJoinKeys: p.RightJoinKeys,
		leftSchema:    p.children[0].Schema(),
		rightSchema:   p.children[1].Schema(),
	}
	numPairs := helper.estimate()
	// For semi-join class, if `OtherConditions` is empty, we already know
	// the join results after querying hash table, otherwise, we have to
	// evaluate those resulted row pairs after querying hash table; if we
	// find one pair satisfying the `OtherConditions`, we then know the
	// join result for this given outer row, otherwise we have to iterate
	// to the end of those pairs; since we have no idea about when we can
	// terminate the iteration, we assume that we need to iterate half of
	// those pairs in average.
	if p.JoinType == SemiJoin || p.JoinType == AntiSemiJoin ||
		p.JoinType == LeftOuterSemiJoin || p.JoinType == AntiLeftOuterSemiJoin {
		if len(p.OtherConditions) > 0 {
			numPairs *= 0.5
		} else {
			numPairs = 0
		}
	}
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		numPairs = getOperatorActRows(p)
	}
	// Cost of querying hash table is cheap actually, so we just compute the cost of
	// evaluating `OtherConditions` and joining row pairs.
	probeCost := numPairs * cpuFactor
	probeDiskCost := numPairs * diskFactor * rowSize
	// Cost of evaluating outer filter.
	if len(p.LeftConditions)+len(p.RightConditions) > 0 {
		// Input outer count for the above compution should be adjusted by SelectionFactor.
		probeCost *= SelectionFactor
		probeDiskCost *= SelectionFactor
		probeCost += probeCnt * cpuFactor
	}
	diskCost += probeDiskCost
	probeCost /= float64(p.Concurrency)
	// Cost of additional concurrent goroutines.
	cpuCost += probeCost + float64(p.Concurrency+1)*concurrencyFactor
	// Cost of traveling the hash table to resolve missing matched cases when building the hash table from the outer table
	if p.UseOuterToBuild {
		if spill {
			// It runs in sequence when build data is on disk. See handleUnmatchedRowsFromHashTableInDisk
			cpuCost += buildCnt * cpuFactor
		} else {
			cpuCost += buildCnt * cpuFactor / float64(p.Concurrency)
		}
		diskCost += buildCnt * diskFactor * rowSize
	}

	if spill {
		memoryCost *= float64(memQuota) / (rowSize * buildCnt)
	} else {
		diskCost = 0
	}
	if op != nil {
		setPhysicalHashJoinCostDetail(p, op, spill, buildCnt, probeCnt, cpuFactor, rowSize, numPairs,
			cpuCost, probeCost, memoryCost, diskCost, probeDiskCost,
			diskFactor, memoryFactor, concurrencyFactor,
			memQuota)
	}
	return cpuCost + memoryCost + diskCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalHashJoin) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = 0
	for _, child := range p.children {
		childCost, err := child.GetPlanCost(taskType, option)
		if err != nil {
			return 0, err
		}
		p.planCost += childCost
	}
	p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), getCardinality(p.children[1], costFlag),
		taskType == property.MppTaskType, costFlag, option.tracer)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes cost of stream aggregation considering CPU/memory.
func (p *PhysicalStreamAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFlag uint64) float64 {
	aggFuncFactor := p.getAggFuncCostFactor(false)
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	if isRoot {
		cpuCost = inputRows * sessVars.GetCPUFactor() * aggFuncFactor
	} else if isMPP {
		if p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
			// use the dedicated CPU factor for TiFlash on modelVer2
			cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor
		} else {
			cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
		}
	} else {
		cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
	}
	rowsPerGroup := inputRows / getCardinality(p, costFlag)
	memoryCost := rowsPerGroup * distinctFactor * sessVars.GetMemoryFactor() * float64(p.numDistinctFunc())
	return cpuCost + memoryCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalStreamAgg) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType, taskType == property.MppTaskType, costFlag)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of hash aggregation considering CPU/memory.
func (p *PhysicalHashAgg) GetCost(inputRows float64, isRoot, isMPP bool, costFlag uint64) float64 {
	cardinality := getCardinality(p, costFlag)
	numDistinctFunc := p.numDistinctFunc()
	aggFuncFactor := p.getAggFuncCostFactor(isMPP)
	var cpuCost float64
	sessVars := p.ctx.GetSessionVars()
	if isRoot {
		cpuCost = inputRows * sessVars.GetCPUFactor() * aggFuncFactor
		divisor, con := p.cpuCostDivisor(numDistinctFunc > 0)
		if divisor > 0 {
			cpuCost /= divisor
			// Cost of additional goroutines.
			cpuCost += (con + 1) * sessVars.GetConcurrencyFactor()
		}
	} else if isMPP {
		if p.ctx.GetSessionVars().CostModelVersion == modelVer2 {
			// use the dedicated CPU factor for TiFlash on modelVer2
			cpuCost = inputRows * sessVars.GetTiFlashCPUFactor() * aggFuncFactor
		} else {
			cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
		}
	} else {
		cpuCost = inputRows * sessVars.GetCopCPUFactor() * aggFuncFactor
	}
	memoryCost := cardinality * sessVars.GetMemoryFactor() * float64(len(p.AggFuncs))
	// When aggregation has distinct flag, we would allocate a map for each group to
	// check duplication.
	memoryCost += inputRows * distinctFactor * sessVars.GetMemoryFactor() * float64(numDistinctFunc)
	return cpuCost + memoryCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalHashAgg) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	statsCnt := getCardinality(p.children[0], costFlag)
	switch taskType {
	case property.RootTaskType:
		p.planCost += p.GetCost(statsCnt, true, false, costFlag)
	case property.CopSingleReadTaskType, property.CopDoubleReadTaskType:
		p.planCost += p.GetCost(statsCnt, false, false, costFlag)
	case property.MppTaskType:
		p.planCost += p.GetCost(statsCnt, false, true, costFlag)
	default:
		return 0, errors.Errorf("unknown task type %v", taskType)
	}
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes the cost of in memory sort.
func (p *PhysicalSort) GetCost(count float64, schema *expression.Schema) float64 {
	if count < 2.0 {
		count = 2.0
	}
	sessVars := p.ctx.GetSessionVars()
	cpuCost := count * math.Log2(count) * sessVars.GetCPUFactor()
	memoryCost := count * sessVars.GetMemoryFactor()

	oomUseTmpStorage := variable.EnableTmpStorageOnOOM.Load()
	memQuota := sessVars.StmtCtx.MemTracker.GetBytesLimit() // sessVars.MemQuotaQuery && hint
	rowSize := getAvgRowSize(p.statsInfo(), schema)
	spill := oomUseTmpStorage && memQuota > 0 && rowSize*count > float64(memQuota)
	diskCost := count * sessVars.GetDiskFactor() * rowSize
	if !spill {
		diskCost = 0
	} else {
		memoryCost *= float64(memQuota) / (rowSize * count)
	}
	return cpuCost + memoryCost + diskCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalSort) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), p.Schema())
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost computes cost of TopN operator itself.
func (p *PhysicalTopN) GetCost(count float64, isRoot bool) float64 {
	heapSize := float64(p.Offset + p.Count)
	if heapSize < 2.0 {
		heapSize = 2.0
	}
	sessVars := p.ctx.GetSessionVars()
	// Ignore the cost of `doCompaction` in current implementation of `TopNExec`, since it is the
	// special side-effect of our Chunk format in TiDB layer, which may not exist in coprocessor's
	// implementation, or may be removed in the future if we change data format.
	// Note that we are using worst complexity to compute CPU cost, because it is simpler compared with
	// considering probabilities of average complexity, i.e, we may not need adjust heap for each input
	// row.
	var cpuCost float64
	if isRoot {
		cpuCost = count * math.Log2(heapSize) * sessVars.GetCPUFactor()
	} else {
		cpuCost = count * math.Log2(heapSize) * sessVars.GetCopCPUFactor()
	}
	memoryCost := heapSize * sessVars.GetMemoryFactor()
	return cpuCost + memoryCost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalTopN) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	p.planCost += p.GetCost(getCardinality(p.children[0], costFlag), taskType == property.RootTaskType)
	p.planCostInit = true
	return p.planCost, nil
}

// GetCost returns cost of the PointGetPlan.
func (p *BatchPointGetPlan) GetCost(opt *physicalOptimizeOp) float64 {
	cols := p.accessCols
	if cols == nil {
		return 0 // the cost of BatchGet generated in fast plan optimization is always 0
	}
	sessVars := p.ctx.GetSessionVars()
	var rowSize, rowCount float64
	cost := 0.0
	if p.IndexInfo == nil {
		rowCount = float64(len(p.Handles))
		rowSize = p.stats.HistColl.GetTableAvgRowSize(p.ctx, cols, kv.TiKV, true)
	} else {
		rowCount = float64(len(p.IndexValues))
		rowSize = p.stats.HistColl.GetIndexAvgRowSize(p.ctx, cols, p.IndexInfo.Unique)
	}
	networkFactor := sessVars.GetNetworkFactor(p.TblInfo)
	seekFactor := sessVars.GetSeekFactor(p.TblInfo)
	scanConcurrency := sessVars.DistSQLScanConcurrency()
	cost += rowCount * rowSize * networkFactor
	cost += rowCount * seekFactor
	cost /= float64(scanConcurrency)
	if opt != nil {
		setBatchPointGetPlanCostDetail(p, opt, rowCount, rowSize, networkFactor, seekFactor, scanConcurrency)
	}
	return cost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *BatchPointGetPlan) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = p.GetCost(option.tracer)
	p.planCostInit = true
	return p.planCost, nil
}

// GetAvgRowSize return the average row size.
func (p *BatchPointGetPlan) GetAvgRowSize() float64 {
	cols := p.accessCols
	if cols == nil {
		return 0 // the cost of BatchGet generated in fast plan optimization is always 0
	}
	if p.IndexInfo == nil {
		return p.stats.HistColl.GetTableAvgRowSize(p.ctx, cols, kv.TiKV, true)
	}
	return p.stats.HistColl.GetIndexAvgRowSize(p.ctx, cols, p.IndexInfo.Unique)
}

// GetCost returns cost of the PointGetPlan.
func (p *PointGetPlan) GetCost(opt *physicalOptimizeOp) float64 {
	cols := p.accessCols
	if cols == nil {
		return 0 // the cost of PointGet generated in fast plan optimization is always 0
	}
	sessVars := p.ctx.GetSessionVars()
	var rowSize float64
	cost := 0.0
	if p.IndexInfo == nil {
		rowSize = p.stats.HistColl.GetTableAvgRowSize(p.ctx, cols, kv.TiKV, true)
	} else {
		rowSize = p.stats.HistColl.GetIndexAvgRowSize(p.ctx, cols, p.IndexInfo.Unique)
	}
	networkFactor := sessVars.GetNetworkFactor(p.TblInfo)
	seekFactor := sessVars.GetSeekFactor(p.TblInfo)
	cost += rowSize * networkFactor
	cost += seekFactor
	cost /= float64(sessVars.DistSQLScanConcurrency())
	if opt != nil {
		setPointGetPlanCostDetail(p, opt, rowSize, networkFactor, seekFactor)
	}
	return cost
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PointGetPlan) GetPlanCost(_ property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	p.planCost = p.GetCost(option.tracer)
	p.planCostInit = true
	return p.planCost, nil
}

// GetAvgRowSize return the average row size.
func (p *PointGetPlan) GetAvgRowSize() float64 {
	cols := p.accessCols
	if cols == nil {
		return 0 // the cost of PointGet generated in fast plan optimization is always 0
	}
	if p.IndexInfo == nil {
		return p.stats.HistColl.GetTableAvgRowSize(p.ctx, cols, kv.TiKV, true)
	}
	return p.stats.HistColl.GetIndexAvgRowSize(p.ctx, cols, p.IndexInfo.Unique)
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalUnionAll) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	var childMaxCost float64
	for _, child := range p.children {
		childCost, err := child.GetPlanCost(taskType, option)
		if err != nil {
			return 0, err
		}
		childMaxCost = math.Max(childMaxCost, childCost)
	}
	p.planCost = childMaxCost + float64(1+len(p.children))*p.ctx.GetSessionVars().GetConcurrencyFactor()
	p.planCostInit = true
	return p.planCost, nil
}

// GetPlanCost calculates the cost of the plan if it has not been calculated yet and returns the cost.
func (p *PhysicalExchangeReceiver) GetPlanCost(taskType property.TaskType, option *PlanCostOption) (float64, error) {
	costFlag := option.CostFlag
	if p.planCostInit && !hasCostFlag(costFlag, CostFlagRecalculate) {
		return p.planCost, nil
	}
	childCost, err := p.children[0].GetPlanCost(taskType, option)
	if err != nil {
		return 0, err
	}
	p.planCost = childCost
	// accumulate net cost
	if p.ctx.GetSessionVars().CostModelVersion == modelVer1 {
		p.planCost += getCardinality(p.children[0], costFlag) * p.ctx.GetSessionVars().GetNetworkFactor(nil)
	} else { // to avoid regression, only consider row-size on model ver2
		rowSize := getTblStats(p.children[0]).GetAvgRowSize(p.ctx, p.children[0].Schema().Columns, false, false)
		p.planCost += getCardinality(p.children[0], costFlag) * rowSize * p.ctx.GetSessionVars().GetNetworkFactor(nil)
	}
	p.planCostInit = true
	return p.planCost, nil
}

func getOperatorActRows(operator PhysicalPlan) float64 {
	if operator == nil {
		return 0
	}
	runtimeInfo := operator.SCtx().GetSessionVars().StmtCtx.RuntimeStatsColl
	id := operator.ID()
	actRows := 0.0
	if runtimeInfo.ExistsRootStats(id) {
		actRows = float64(runtimeInfo.GetRootStats(id).GetActRows())
	} else if runtimeInfo.ExistsCopStats(id) {
		actRows = float64(runtimeInfo.GetCopStats(id).GetActRows())
	} else {
		actRows = 0.0 // no data for this operator
	}
	return actRows
}

func getCardinality(operator PhysicalPlan, costFlag uint64) float64 {
	if hasCostFlag(costFlag, CostFlagUseTrueCardinality) {
		return getOperatorActRows(operator)
	}
	return operator.StatsCount()
}

// estimateNetSeekCost calculates the net seek cost for the plan.
// for TiKV, it's len(access-range) * seek-factor,
// and for TiFlash, it's len(access-range) * len(access-column) * seek-factor.
func estimateNetSeekCost(copTaskPlan PhysicalPlan) float64 {
	switch x := copTaskPlan.(type) {
	case *PhysicalTableScan:
		if x.StoreType == kv.TiFlash { // the old TiFlash interface uses cop-task protocol
			return float64(len(x.Ranges)) * float64(len(x.Columns)) * x.ctx.GetSessionVars().GetSeekFactor(x.Table)
		}
		return float64(len(x.Ranges)) * x.ctx.GetSessionVars().GetSeekFactor(x.Table) // TiKV
	case *PhysicalIndexScan:
		return float64(len(x.Ranges)) * x.ctx.GetSessionVars().GetSeekFactor(x.Table) // TiKV
	default:
		return estimateNetSeekCost(copTaskPlan.Children()[0])
	}
}

// getTblStats returns the tbl-stats of this plan, which contains all columns before pruning.
func getTblStats(copTaskPlan PhysicalPlan) *statistics.HistColl {
	switch x := copTaskPlan.(type) {
	case *PhysicalTableScan:
		return x.tblColHists
	case *PhysicalIndexScan:
		return x.tblColHists
	default:
		return getTblStats(copTaskPlan.Children()[0])
	}
}

// getTableNetFactor returns the corresponding net factor of this table, it's mainly for temporary tables
func getTableNetFactor(copTaskPlan PhysicalPlan) float64 {
	switch x := copTaskPlan.(type) {
	case *PhysicalTableScan:
		return x.SCtx().GetSessionVars().GetNetworkFactor(x.Table)
	case *PhysicalIndexScan:
		return x.SCtx().GetSessionVars().GetNetworkFactor(x.Table)
	default:
		if len(x.Children()) == 0 {
			x.SCtx().GetSessionVars().GetNetworkFactor(nil)
		}
		return getTableNetFactor(x.Children()[0])
	}
}

相关信息

tidb 源码目录

相关文章

tidb access_object 源码

tidb collect_column_stats_usage 源码

tidb common_plans 源码

tidb encode 源码

tidb errors 源码

tidb exhaust_physical_plans 源码

tidb explain 源码

tidb expression_rewriter 源码

tidb find_best_task 源码

tidb flat_plan 源码

0  赞