tidb find_best_task 源码

  • 2022-09-19
  • 浏览 (657)

tidb find_best_task 代码

文件路径:/planner/core/find_best_task.go

// Copyright 2017 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package core

import (
	"fmt"
	"math"
	"strings"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/ast"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/planner/property"
	"github.com/pingcap/tidb/planner/util"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/stmtctx"
	"github.com/pingcap/tidb/statistics"
	"github.com/pingcap/tidb/types"
	tidbutil "github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/collate"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/ranger"
	"github.com/pingcap/tidb/util/set"
	"github.com/pingcap/tidb/util/tracing"
	"go.uber.org/zap"
)

const (
	// SelectionFactor is the default factor of the selectivity.
	// For example, If we have no idea how to estimate the selectivity
	// of a Selection or a JoinCondition, we can use this default value.
	SelectionFactor = 0.8
	distinctFactor  = 0.8

	// If the actual row count is much more than the limit count, the unordered scan may cost much more than keep order.
	// So when a limit exists, we don't apply the DescScanFactor.
	smallScanThreshold = 10000
)

var aggFuncFactor = map[string]float64{
	ast.AggFuncCount:       1.0,
	ast.AggFuncSum:         1.0,
	ast.AggFuncAvg:         2.0,
	ast.AggFuncFirstRow:    0.1,
	ast.AggFuncMax:         1.0,
	ast.AggFuncMin:         1.0,
	ast.AggFuncGroupConcat: 1.0,
	ast.AggFuncBitOr:       0.9,
	ast.AggFuncBitXor:      0.9,
	ast.AggFuncBitAnd:      0.9,
	ast.AggFuncVarPop:      3.0,
	ast.AggFuncVarSamp:     3.0,
	ast.AggFuncStddevPop:   3.0,
	ast.AggFuncStddevSamp:  3.0,
	"default":              1.5,
}

// PlanCounterTp is used in hint nth_plan() to indicate which plan to use.
type PlanCounterTp int64

// PlanCounterDisabled is the default value of PlanCounterTp, indicating that optimizer needn't force a plan.
var PlanCounterDisabled PlanCounterTp = -1

// Dec minus PlanCounterTp value by x.
func (c *PlanCounterTp) Dec(x int64) {
	if *c <= 0 {
		return
	}
	*c = PlanCounterTp(int64(*c) - x)
	if *c < 0 {
		*c = 0
	}
}

// Empty indicates whether the PlanCounterTp is clear now.
func (c *PlanCounterTp) Empty() bool {
	return *c == 0
}

// IsForce indicates whether to force a plan.
func (c *PlanCounterTp) IsForce() bool {
	return *c != -1
}

var invalidTask = &rootTask{p: nil} // invalid if p is nil

// GetPropByOrderByItems will check if this sort property can be pushed or not. In order to simplify the problem, we only
// consider the case that all expression are columns.
func GetPropByOrderByItems(items []*util.ByItems) (*property.PhysicalProperty, bool) {
	propItems := make([]property.SortItem, 0, len(items))
	for _, item := range items {
		col, ok := item.Expr.(*expression.Column)
		if !ok {
			return nil, false
		}
		propItems = append(propItems, property.SortItem{Col: col, Desc: item.Desc})
	}
	return &property.PhysicalProperty{SortItems: propItems}, true
}

// GetPropByOrderByItemsContainScalarFunc will check if this sort property can be pushed or not. In order to simplify the
// problem, we only consider the case that all expression are columns or some special scalar functions.
func GetPropByOrderByItemsContainScalarFunc(items []*util.ByItems) (*property.PhysicalProperty, bool, bool) {
	propItems := make([]property.SortItem, 0, len(items))
	onlyColumn := true
	for _, item := range items {
		switch expr := item.Expr.(type) {
		case *expression.Column:
			propItems = append(propItems, property.SortItem{Col: expr, Desc: item.Desc})
		case *expression.ScalarFunction:
			col, desc := expr.GetSingleColumn(item.Desc)
			if col == nil {
				return nil, false, false
			}
			propItems = append(propItems, property.SortItem{Col: col, Desc: desc})
			onlyColumn = false
		default:
			return nil, false, false
		}
	}
	return &property.PhysicalProperty{SortItems: propItems}, true, onlyColumn
}

func (p *LogicalTableDual) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, _ *physicalOptimizeOp) (task, int64, error) {
	// If the required property is not empty and the row count > 1,
	// we cannot ensure this required property.
	// But if the row count is 0 or 1, we don't need to care about the property.
	if (!prop.IsSortItemEmpty() && p.RowCount > 1) || planCounter.Empty() {
		return invalidTask, 0, nil
	}
	dual := PhysicalTableDual{
		RowCount: p.RowCount,
	}.Init(p.ctx, p.stats, p.blockOffset)
	dual.SetSchema(p.schema)
	planCounter.Dec(1)
	return &rootTask{p: dual, isEmpty: p.RowCount == 0}, 1, nil
}

func (p *LogicalShow) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, _ *physicalOptimizeOp) (task, int64, error) {
	if !prop.IsSortItemEmpty() || planCounter.Empty() {
		return invalidTask, 0, nil
	}
	pShow := PhysicalShow{ShowContents: p.ShowContents, Extractor: p.Extractor}.Init(p.ctx)
	pShow.SetSchema(p.schema)
	planCounter.Dec(1)
	return &rootTask{p: pShow}, 1, nil
}

func (p *LogicalShowDDLJobs) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, _ *physicalOptimizeOp) (task, int64, error) {
	if !prop.IsSortItemEmpty() || planCounter.Empty() {
		return invalidTask, 0, nil
	}
	pShow := PhysicalShowDDLJobs{JobNumber: p.JobNumber}.Init(p.ctx)
	pShow.SetSchema(p.schema)
	planCounter.Dec(1)
	return &rootTask{p: pShow}, 1, nil
}

// rebuildChildTasks rebuilds the childTasks to make the clock_th combination.
func (p *baseLogicalPlan) rebuildChildTasks(childTasks *[]task, pp PhysicalPlan, childCnts []int64, planCounter int64, ts uint64, opt *physicalOptimizeOp) error {
	// The taskMap of children nodes should be rolled back first.
	for _, child := range p.children {
		child.rollBackTaskMap(ts)
	}

	multAll := int64(1)
	var curClock PlanCounterTp
	for _, x := range childCnts {
		multAll *= x
	}
	*childTasks = (*childTasks)[:0]
	for j, child := range p.children {
		multAll /= childCnts[j]
		curClock = PlanCounterTp((planCounter-1)/multAll + 1)
		childTask, _, err := child.findBestTask(pp.GetChildReqProps(j), &curClock, opt)
		planCounter = (planCounter-1)%multAll + 1
		if err != nil {
			return err
		}
		if curClock != 0 {
			return errors.Errorf("PlanCounterTp planCounter is not handled")
		}
		if childTask != nil && childTask.invalid() {
			return errors.Errorf("The current plan is invalid, please skip this plan")
		}
		*childTasks = append(*childTasks, childTask)
	}
	return nil
}

func (p *baseLogicalPlan) enumeratePhysicalPlans4Task(physicalPlans []PhysicalPlan,
	prop *property.PhysicalProperty, addEnforcer bool, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (task, int64, error) {
	var bestTask task = invalidTask
	var curCntPlan, cntPlan int64
	childTasks := make([]task, 0, len(p.children))
	childCnts := make([]int64, len(p.children))
	cntPlan = 0
	for _, pp := range physicalPlans {
		// Find best child tasks firstly.
		childTasks = childTasks[:0]
		// The curCntPlan records the number of possible plans for pp
		curCntPlan = 1
		timeStampNow := p.GetLogicalTS4TaskMap()
		savedPlanID := p.ctx.GetSessionVars().PlanID
		for j, child := range p.children {
			childProp := pp.GetChildReqProps(j)
			childTask, cnt, err := child.findBestTask(childProp, &PlanCounterDisabled, opt)
			childCnts[j] = cnt
			if err != nil {
				return nil, 0, err
			}
			curCntPlan = curCntPlan * cnt
			if childTask != nil && childTask.invalid() {
				break
			}
			childTasks = append(childTasks, childTask)
		}

		// This check makes sure that there is no invalid child task.
		if len(childTasks) != len(p.children) {
			continue
		}

		// If the target plan can be found in this physicalPlan(pp), rebuild childTasks to build the corresponding combination.
		if planCounter.IsForce() && int64(*planCounter) <= curCntPlan {
			p.ctx.GetSessionVars().PlanID = savedPlanID
			curCntPlan = int64(*planCounter)
			err := p.rebuildChildTasks(&childTasks, pp, childCnts, int64(*planCounter), timeStampNow, opt)
			if err != nil {
				return nil, 0, err
			}
		}

		// Combine best child tasks with parent physical plan.
		curTask := pp.attach2Task(childTasks...)

		if curTask.invalid() {
			continue
		}

		// An optimal task could not satisfy the property, so it should be converted here.
		if _, ok := curTask.(*rootTask); !ok && prop.TaskTp == property.RootTaskType {
			curTask = curTask.convertToRootTask(p.ctx)
		}

		// Enforce curTask property
		if addEnforcer {
			curTask = enforceProperty(prop, curTask, p.basePlan.ctx)
		}

		// Optimize by shuffle executor to running in parallel manner.
		if _, isMpp := curTask.(*mppTask); !isMpp && prop.IsSortItemEmpty() {
			// Currently, we do not regard shuffled plan as a new plan.
			curTask = optimizeByShuffle(curTask, p.basePlan.ctx)
		}

		cntPlan += curCntPlan
		planCounter.Dec(curCntPlan)

		if planCounter.Empty() {
			bestTask = curTask
			break
		}
		opt.appendCandidate(p, curTask.plan(), prop)
		// Get the most efficient one.
		if curIsBetter, err := compareTaskCost(p.ctx, curTask, bestTask, opt); err != nil {
			return nil, 0, err
		} else if curIsBetter {
			bestTask = curTask
		}
	}
	return bestTask, cntPlan, nil
}

// compareTaskCost compares cost of curTask and bestTask and returns whether curTask's cost is smaller than bestTask's.
func compareTaskCost(_ sessionctx.Context, curTask, bestTask task, op *physicalOptimizeOp) (curIsBetter bool, err error) {
	curCost, curInvalid, err := getTaskPlanCost(curTask, op)
	if err != nil {
		return false, err
	}
	bestCost, bestInvalid, err := getTaskPlanCost(bestTask, op)
	if err != nil {
		return false, err
	}
	if curInvalid {
		return false, nil
	}
	if bestInvalid {
		return true, nil
	}
	return curCost < bestCost, nil
}

// getTaskPlanCost returns the cost of this task.
// The new cost interface will be used if EnableNewCostInterface is true.
// The second returned value indicates whether this task is valid.
func getTaskPlanCost(t task, op *physicalOptimizeOp) (float64, bool, error) {
	if t.invalid() {
		return math.MaxFloat64, true, nil
	}

	// use the new cost interface
	var taskType property.TaskType
	switch t.(type) {
	case *rootTask:
		taskType = property.RootTaskType
	case *copTask: // no need to know whether the task is single-read or double-read, so both CopSingleReadTaskType and CopDoubleReadTaskType are OK
		taskType = property.CopSingleReadTaskType
	case *mppTask:
		taskType = property.MppTaskType
	default:
		return 0, false, errors.New("unknown task type")
	}
	cost, err := t.plan().GetPlanCost(taskType, NewDefaultPlanCostOption().WithOptimizeTracer(op))
	return cost, false, err
}

type physicalOptimizeOp struct {
	// tracer is goring to track optimize steps during physical optimizing
	tracer *tracing.PhysicalOptimizeTracer
}

func defaultPhysicalOptimizeOption() *physicalOptimizeOp {
	return &physicalOptimizeOp{}
}

func (op *physicalOptimizeOp) withEnableOptimizeTracer(tracer *tracing.PhysicalOptimizeTracer) *physicalOptimizeOp {
	op.tracer = tracer
	return op
}

func (op *physicalOptimizeOp) appendCandidate(lp LogicalPlan, pp PhysicalPlan, prop *property.PhysicalProperty) {
	if op == nil || op.tracer == nil || pp == nil {
		return
	}
	candidate := &tracing.CandidatePlanTrace{
		PlanTrace: &tracing.PlanTrace{TP: pp.TP(), ID: pp.ID(),
			ExplainInfo: pp.ExplainInfo(), ProperType: prop.String()},
		MappingLogicalPlan: tracing.CodecPlanName(lp.TP(), lp.ID())}
	op.tracer.AppendCandidate(candidate)
	pp.appendChildCandidate(op)
}

func (op *physicalOptimizeOp) appendPlanCostDetail(detail *tracing.PhysicalPlanCostDetail) {
	if op == nil || op.tracer == nil {
		return
	}
	op.tracer.PhysicalPlanCostDetails[detail.GetPlanID()] = detail
}

// findBestTask implements LogicalPlan interface.
func (p *baseLogicalPlan) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (bestTask task, cntPlan int64, err error) {
	// If p is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
	// and set inner child prop nil, so here we do nothing.
	if prop == nil {
		return nil, 1, nil
	}
	// Look up the task with this prop in the task map.
	// It's used to reduce double counting.
	bestTask = p.getTask(prop)
	if bestTask != nil {
		planCounter.Dec(1)
		return bestTask, 1, nil
	}

	canAddEnforcer := prop.CanAddEnforcer

	if prop.TaskTp != property.RootTaskType && !prop.IsFlashProp() {
		// Currently all plan cannot totally push down to TiKV.
		p.storeTask(prop, invalidTask)
		return invalidTask, 0, nil
	}

	bestTask = invalidTask
	cntPlan = 0
	// prop should be read only because its cached hashcode might be not consistent
	// when it is changed. So we clone a new one for the temporary changes.
	newProp := prop.CloneEssentialFields()
	var plansFitsProp, plansNeedEnforce []PhysicalPlan
	var hintWorksWithProp bool
	// Maybe the plan can satisfy the required property,
	// so we try to get the task without the enforced sort first.
	plansFitsProp, hintWorksWithProp, err = p.self.exhaustPhysicalPlans(newProp)
	if err != nil {
		return nil, 0, err
	}
	if !hintWorksWithProp && !newProp.IsSortItemEmpty() {
		// If there is a hint in the plan and the hint cannot satisfy the property,
		// we enforce this property and try to generate the PhysicalPlan again to
		// make sure the hint can work.
		canAddEnforcer = true
	}

	if canAddEnforcer {
		// Then, we use the empty property to get physicalPlans and
		// try to get the task with an enforced sort.
		newProp.SortItems = []property.SortItem{}
		newProp.SortItemsForPartition = []property.SortItem{}
		newProp.ExpectedCnt = math.MaxFloat64
		newProp.MPPPartitionCols = nil
		newProp.MPPPartitionTp = property.AnyType
		var hintCanWork bool
		plansNeedEnforce, hintCanWork, err = p.self.exhaustPhysicalPlans(newProp)
		if err != nil {
			return nil, 0, err
		}
		if hintCanWork && !hintWorksWithProp {
			// If the hint can work with the empty property, but cannot work with
			// the required property, we give up `plansFitProp` to make sure the hint
			// can work.
			plansFitsProp = nil
		}
		if !hintCanWork && !hintWorksWithProp && !prop.CanAddEnforcer {
			// If the original property is not enforced and hint cannot
			// work anyway, we give up `plansNeedEnforce` for efficiency,
			plansNeedEnforce = nil
		}
		newProp = prop
	}

	var cnt int64
	var curTask task
	if bestTask, cnt, err = p.enumeratePhysicalPlans4Task(plansFitsProp, newProp, false, planCounter, opt); err != nil {
		return nil, 0, err
	}
	cntPlan += cnt
	if planCounter.Empty() {
		goto END
	}

	curTask, cnt, err = p.enumeratePhysicalPlans4Task(plansNeedEnforce, newProp, true, planCounter, opt)
	if err != nil {
		return nil, 0, err
	}
	cntPlan += cnt
	if planCounter.Empty() {
		bestTask = curTask
		goto END
	}
	opt.appendCandidate(p, curTask.plan(), prop)
	if curIsBetter, err := compareTaskCost(p.ctx, curTask, bestTask, opt); err != nil {
		return nil, 0, err
	} else if curIsBetter {
		bestTask = curTask
	}

END:
	p.storeTask(prop, bestTask)
	return bestTask, cntPlan, nil
}

func (p *LogicalMemTable) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (t task, cntPlan int64, err error) {
	if prop.MPPPartitionTp != property.AnyType {
		return invalidTask, 0, nil
	}

	// If prop.CanAddEnforcer is true, the prop.SortItems need to be set nil for p.findBestTask.
	// Before function return, reset it for enforcing task prop.
	oldProp := prop.CloneEssentialFields()
	if prop.CanAddEnforcer {
		// First, get the bestTask without enforced prop
		prop.CanAddEnforcer = false
		cnt := int64(0)
		t, cnt, err = p.findBestTask(prop, planCounter, opt)
		if err != nil {
			return nil, 0, err
		}
		prop.CanAddEnforcer = true
		if t != invalidTask {
			cntPlan = cnt
			return
		}
		// Next, get the bestTask with enforced prop
		prop.SortItems = []property.SortItem{}
	}
	defer func() {
		if err != nil {
			return
		}
		if prop.CanAddEnforcer {
			*prop = *oldProp
			t = enforceProperty(prop, t, p.basePlan.ctx)
			prop.CanAddEnforcer = true
		}
	}()

	if !prop.IsSortItemEmpty() || planCounter.Empty() {
		return invalidTask, 0, nil
	}
	memTable := PhysicalMemTable{
		DBName:         p.DBName,
		Table:          p.TableInfo,
		Columns:        p.Columns,
		Extractor:      p.Extractor,
		QueryTimeRange: p.QueryTimeRange,
	}.Init(p.ctx, p.stats, p.blockOffset)
	memTable.SetSchema(p.schema)
	planCounter.Dec(1)
	return &rootTask{p: memTable}, 1, nil
}

// tryToGetDualTask will check if the push down predicate has false constant. If so, it will return table dual.
func (ds *DataSource) tryToGetDualTask() (task, error) {
	for _, cond := range ds.pushedDownConds {
		if con, ok := cond.(*expression.Constant); ok && con.DeferredExpr == nil && con.ParamMarker == nil {
			result, _, err := expression.EvalBool(ds.ctx, []expression.Expression{cond}, chunk.Row{})
			if err != nil {
				return nil, err
			}
			if !result {
				dual := PhysicalTableDual{}.Init(ds.ctx, ds.stats, ds.blockOffset)
				dual.SetSchema(ds.schema)
				return &rootTask{
					p: dual,
				}, nil
			}
		}
	}
	return nil, nil
}

// candidatePath is used to maintain required info for skyline pruning.
type candidatePath struct {
	path              *util.AccessPath
	accessCondsColMap util.Col2Len // accessCondsColMap maps Column.UniqueID to column length for the columns in AccessConds.
	indexCondsColMap  util.Col2Len // indexCondsColMap maps Column.UniqueID to column length for the columns in AccessConds and indexFilters.
	isMatchProp       bool
}

func compareBool(l, r bool) int {
	if l == r {
		return 0
	}
	if !l {
		return -1
	}
	return 1
}

func compareIndexBack(lhs, rhs *candidatePath) (int, bool) {
	result := compareBool(lhs.path.IsSingleScan, rhs.path.IsSingleScan)
	if result == 0 && !lhs.path.IsSingleScan {
		// if both lhs and rhs need to access table after IndexScan, we utilize the set of columns that occurred in AccessConds and IndexFilters
		// to compare how many table rows will be accessed.
		return util.CompareCol2Len(lhs.indexCondsColMap, rhs.indexCondsColMap)
	}
	return result, true
}

// compareCandidates is the core of skyline pruning. It compares the two candidate paths on three dimensions:
// (1): the set of columns that occurred in the access condition,
// (2): does it require a double scan,
// (3): whether or not it matches the physical property.
// If `x` is not worse than `y` at all factors,
// and there exists one factor that `x` is better than `y`, then `x` is better than `y`.
func compareCandidates(lhs, rhs *candidatePath) int {
	accessResult, comparable1 := util.CompareCol2Len(lhs.accessCondsColMap, rhs.accessCondsColMap)
	if !comparable1 {
		return 0
	}
	scanResult, comparable2 := compareIndexBack(lhs, rhs)
	if !comparable2 {
		return 0
	}
	matchResult := compareBool(lhs.isMatchProp, rhs.isMatchProp)
	sum := accessResult + scanResult + matchResult
	if accessResult >= 0 && scanResult >= 0 && matchResult >= 0 && sum > 0 {
		return 1
	}
	if accessResult <= 0 && scanResult <= 0 && matchResult <= 0 && sum < 0 {
		return -1
	}
	return 0
}

func (ds *DataSource) isMatchProp(path *util.AccessPath, prop *property.PhysicalProperty) bool {
	var isMatchProp bool
	if path.IsIntHandlePath {
		pkCol := ds.getPKIsHandleCol()
		if len(prop.SortItems) == 1 && pkCol != nil {
			isMatchProp = prop.SortItems[0].Col.Equal(nil, pkCol)
			if path.StoreType == kv.TiFlash {
				isMatchProp = isMatchProp && !prop.SortItems[0].Desc
			}
		}
		return isMatchProp
	}
	all, _ := prop.AllSameOrder()
	// When the prop is empty or `all` is false, `isMatchProp` is better to be `false` because
	// it needs not to keep order for index scan.

	// Basically, if `prop.SortItems` is the prefix of `path.IdxCols`, then `isMatchProp` is true. However, we need to consider
	// the situations when some columns of `path.IdxCols` are evaluated as constant. For example:
	// ```
	// create table t(a int, b int, c int, d int, index idx_a_b_c(a, b, c), index idx_d_c_b_a(d, c, b, a));
	// select * from t where a = 1 order by b, c;
	// select * from t where b = 1 order by a, c;
	// select * from t where d = 1 and b = 2 order by c, a;
	// select * from t where d = 1 and b = 2 order by c, b, a;
	// ```
	// In the first two `SELECT` statements, `idx_a_b_c` matches the sort order. In the last two `SELECT` statements, `idx_d_c_b_a`
	// matches the sort order. Hence, we use `path.ConstCols` to deal with the above situations.
	if !prop.IsSortItemEmpty() && all && len(path.IdxCols) >= len(prop.SortItems) {
		isMatchProp = true
		i := 0
		for _, sortItem := range prop.SortItems {
			found := false
			for ; i < len(path.IdxCols); i++ {
				if path.IdxColLens[i] == types.UnspecifiedLength && sortItem.Col.Equal(nil, path.IdxCols[i]) {
					found = true
					i++
					break
				}
				if path.ConstCols == nil || i >= len(path.ConstCols) || !path.ConstCols[i] {
					break
				}
			}
			if !found {
				isMatchProp = false
				break
			}
		}
	}
	return isMatchProp
}

func (ds *DataSource) getTableCandidate(path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
	candidate := &candidatePath{path: path}
	candidate.isMatchProp = ds.isMatchProp(path, prop)
	candidate.accessCondsColMap = util.ExtractCol2Len(path.AccessConds, nil, nil)
	return candidate
}

func (ds *DataSource) getIndexCandidate(path *util.AccessPath, prop *property.PhysicalProperty) *candidatePath {
	candidate := &candidatePath{path: path}
	candidate.isMatchProp = ds.isMatchProp(path, prop)
	candidate.accessCondsColMap = util.ExtractCol2Len(path.AccessConds, path.IdxCols, path.IdxColLens)
	candidate.indexCondsColMap = util.ExtractCol2Len(append(path.AccessConds, path.IndexFilters...), path.FullIdxCols, path.FullIdxColLens)
	return candidate
}

func (ds *DataSource) getIndexMergeCandidate(path *util.AccessPath) *candidatePath {
	candidate := &candidatePath{path: path}
	return candidate
}

// skylinePruning prunes access paths according to different factors. An access path can be pruned only if
// there exists a path that is not worse than it at all factors and there is at least one better factor.
func (ds *DataSource) skylinePruning(prop *property.PhysicalProperty) []*candidatePath {
	candidates := make([]*candidatePath, 0, 4)
	for _, path := range ds.possibleAccessPaths {
		// We should check whether the possible access path is valid first.
		if path.StoreType != kv.TiFlash && prop.IsFlashProp() {
			continue
		}
		if path.PartialIndexPaths != nil {
			candidates = append(candidates, ds.getIndexMergeCandidate(path))
			continue
		}
		// if we already know the range of the scan is empty, just return a TableDual
		if len(path.Ranges) == 0 {
			return []*candidatePath{{path: path}}
		}
		var currentCandidate *candidatePath
		if path.IsTablePath() {
			currentCandidate = ds.getTableCandidate(path, prop)
		} else {
			if len(path.AccessConds) > 0 || !prop.IsSortItemEmpty() || path.Forced || path.IsSingleScan {
				// We will use index to generate physical plan if any of the following conditions is satisfied:
				// 1. This path's access cond is not nil.
				// 2. We have a non-empty prop to match.
				// 3. This index is forced to choose.
				// 4. The needed columns are all covered by index columns(and handleCol).
				currentCandidate = ds.getIndexCandidate(path, prop)
			} else {
				continue
			}
		}
		pruned := false
		for i := len(candidates) - 1; i >= 0; i-- {
			if candidates[i].path.StoreType == kv.TiFlash {
				continue
			}
			result := compareCandidates(candidates[i], currentCandidate)
			if result == 1 {
				pruned = true
				// We can break here because the current candidate cannot prune others anymore.
				break
			} else if result == -1 {
				candidates = append(candidates[:i], candidates[i+1:]...)
			}
		}
		if !pruned {
			candidates = append(candidates, currentCandidate)
		}
	}

	if ds.ctx.GetSessionVars().GetAllowPreferRangeScan() && len(candidates) > 1 {
		// If a candidate path is TiFlash-path or forced-path, we just keep them. For other candidate paths, if there exists
		// any range scan path, we remove full scan paths and keep range scan paths.
		preferredPaths := make([]*candidatePath, 0, len(candidates))
		var hasRangeScanPath bool
		for _, c := range candidates {
			if c.path.Forced || c.path.StoreType == kv.TiFlash {
				preferredPaths = append(preferredPaths, c)
				continue
			}
			var unsignedIntHandle bool
			if c.path.IsIntHandlePath && ds.tableInfo.PKIsHandle {
				if pkColInfo := ds.tableInfo.GetPkColInfo(); pkColInfo != nil {
					unsignedIntHandle = mysql.HasUnsignedFlag(pkColInfo.GetFlag())
				}
			}
			if !ranger.HasFullRange(c.path.Ranges, unsignedIntHandle) {
				preferredPaths = append(preferredPaths, c)
				hasRangeScanPath = true
			}
		}
		if hasRangeScanPath {
			return preferredPaths
		}
	}

	return candidates
}

func (ds *DataSource) getPruningInfo(candidates []*candidatePath, prop *property.PhysicalProperty) string {
	if !ds.ctx.GetSessionVars().StmtCtx.InVerboseExplain || len(candidates) == len(ds.possibleAccessPaths) {
		return ""
	}
	if len(candidates) == 1 && len(candidates[0].path.Ranges) == 0 {
		// For TableDual, we don't need to output pruning info.
		return ""
	}
	names := make([]string, 0, len(candidates))
	var tableName string
	if ds.TableAsName.O == "" {
		tableName = ds.tableInfo.Name.O
	} else {
		tableName = ds.TableAsName.O
	}
	getSimplePathName := func(path *util.AccessPath) string {
		if path.IsTablePath() {
			if path.StoreType == kv.TiFlash {
				return tableName + "(tiflash)"
			}
			return tableName
		}
		return path.Index.Name.O
	}
	for _, cand := range candidates {
		if cand.path.PartialIndexPaths != nil {
			partialNames := make([]string, 0, len(cand.path.PartialIndexPaths))
			for _, partialPath := range cand.path.PartialIndexPaths {
				partialNames = append(partialNames, getSimplePathName(partialPath))
			}
			names = append(names, fmt.Sprintf("IndexMerge{%s}", strings.Join(partialNames, ",")))
		} else {
			names = append(names, getSimplePathName(cand.path))
		}
	}
	items := make([]string, 0, len(prop.SortItems))
	for _, item := range prop.SortItems {
		items = append(items, item.String())
	}
	return fmt.Sprintf("[%s] remain after pruning paths for %s given Prop{SortItems: [%s], TaskTp: %s}",
		strings.Join(names, ","), tableName, strings.Join(items, " "), prop.TaskTp)
}

func (ds *DataSource) isPointGetConvertableSchema() bool {
	for _, col := range ds.Columns {
		if col.Name.L == model.ExtraHandleName.L {
			continue
		}

		// Only handle tables that all columns are public.
		if col.State != model.StatePublic {
			return false
		}
	}
	return true
}

// findBestTask implements the PhysicalPlan interface.
// It will enumerate all the available indices and choose a plan with least cost.
func (ds *DataSource) findBestTask(prop *property.PhysicalProperty, planCounter *PlanCounterTp, opt *physicalOptimizeOp) (t task, cntPlan int64, err error) {
	// If ds is an inner plan in an IndexJoin, the IndexJoin will generate an inner plan by itself,
	// and set inner child prop nil, so here we do nothing.
	if prop == nil {
		planCounter.Dec(1)
		return nil, 1, nil
	}

	t = ds.getTask(prop)
	if t != nil {
		cntPlan = 1
		planCounter.Dec(1)
		return
	}
	var cnt int64
	// If prop.CanAddEnforcer is true, the prop.SortItems need to be set nil for ds.findBestTask.
	// Before function return, reset it for enforcing task prop and storing map<prop,task>.
	oldProp := prop.CloneEssentialFields()
	if prop.CanAddEnforcer {
		// First, get the bestTask without enforced prop
		prop.CanAddEnforcer = false
		t, cnt, err = ds.findBestTask(prop, planCounter, opt)
		if err != nil {
			return nil, 0, err
		}
		prop.CanAddEnforcer = true
		if t != invalidTask {
			ds.storeTask(prop, t)
			cntPlan = cnt
			return
		}
		// Next, get the bestTask with enforced prop
		prop.SortItems = []property.SortItem{}
		prop.MPPPartitionTp = property.AnyType
	} else if prop.MPPPartitionTp != property.AnyType {
		return invalidTask, 0, nil
	}
	defer func() {
		if err != nil {
			return
		}
		if prop.CanAddEnforcer {
			*prop = *oldProp
			t = enforceProperty(prop, t, ds.basePlan.ctx)
			prop.CanAddEnforcer = true
		}
		ds.storeTask(prop, t)
		if ds.SampleInfo != nil && !t.invalid() {
			if _, ok := t.plan().(*PhysicalTableSample); !ok {
				warning := expression.ErrInvalidTableSample.GenWithStackByArgs("plan not supported")
				ds.ctx.GetSessionVars().StmtCtx.AppendWarning(warning)
			}
		}
	}()

	t, err = ds.tryToGetDualTask()
	if err != nil || t != nil {
		planCounter.Dec(1)
		return t, 1, err
	}

	t = invalidTask
	candidates := ds.skylinePruning(prop)
	pruningInfo := ds.getPruningInfo(candidates, prop)
	defer func() {
		if err == nil && t != nil && !t.invalid() && pruningInfo != "" {
			if ds.ctx.GetSessionVars().StmtCtx.OptimInfo == nil {
				ds.ctx.GetSessionVars().StmtCtx.OptimInfo = make(map[int]string)
			}
			ds.ctx.GetSessionVars().StmtCtx.OptimInfo[t.plan().ID()] = pruningInfo
		}
	}()

	cntPlan = 0
	for _, candidate := range candidates {
		path := candidate.path
		if path.PartialIndexPaths != nil {
			idxMergeTask, err := ds.convertToIndexMergeScan(prop, candidate, opt)
			if err != nil {
				return nil, 0, err
			}
			if !idxMergeTask.invalid() {
				cntPlan++
				planCounter.Dec(1)
			}
			appendCandidate(ds, idxMergeTask, prop, opt)

			curIsBetter, err := compareTaskCost(ds.ctx, idxMergeTask, t, opt)
			if err != nil {
				return nil, 0, err
			}
			if curIsBetter || planCounter.Empty() {
				t = idxMergeTask
			}
			if planCounter.Empty() {
				return t, cntPlan, nil
			}
			continue
		}
		// if we already know the range of the scan is empty, just return a TableDual
		if len(path.Ranges) == 0 {
			// We should uncache the tableDual plan.
			if expression.MaybeOverOptimized4PlanCache(ds.ctx, path.AccessConds) {
				ds.ctx.GetSessionVars().StmtCtx.SkipPlanCache = true
			}
			dual := PhysicalTableDual{}.Init(ds.ctx, ds.stats, ds.blockOffset)
			dual.SetSchema(ds.schema)
			cntPlan++
			planCounter.Dec(1)
			return &rootTask{
				p: dual,
			}, cntPlan, nil
		}
		canConvertPointGet := len(path.Ranges) > 0 && path.StoreType == kv.TiKV && ds.isPointGetConvertableSchema()

		if canConvertPointGet && expression.MaybeOverOptimized4PlanCache(ds.ctx, path.AccessConds) {
			canConvertPointGet = ds.canConvertToPointGetForPlanCache(path)
		}

		if canConvertPointGet && !path.IsIntHandlePath {
			// We simply do not build [batch] point get for prefix indexes. This can be optimized.
			canConvertPointGet = path.Index.Unique && !path.Index.HasPrefixIndex()
			// If any range cannot cover all columns of the index, we cannot build [batch] point get.
			idxColsLen := len(path.Index.Columns)
			for _, ran := range path.Ranges {
				if len(ran.LowVal) != idxColsLen {
					canConvertPointGet = false
					break
				}
			}
		}
		var hashPartColName *ast.ColumnName
		if tblInfo := ds.table.Meta(); canConvertPointGet && tblInfo.GetPartitionInfo() != nil {
			// We do not build [batch] point get for dynamic table partitions now. This can be optimized.
			if ds.ctx.GetSessionVars().StmtCtx.UseDynamicPartitionPrune() {
				canConvertPointGet = false
			}
			if canConvertPointGet && len(path.Ranges) > 1 {
				// We can only build batch point get for hash partitions on a simple column now. This is
				// decided by the current implementation of `BatchPointGetExec::initialize()`, specifically,
				// the `getPhysID()` function. Once we optimize that part, we can come back and enable
				// BatchPointGet plan for more cases.
				hashPartColName = getHashPartitionColumnName(ds.ctx, tblInfo)
				if hashPartColName == nil {
					canConvertPointGet = false
				}
			}
			if canConvertPointGet {
				// If the schema contains ExtraPidColID, do not convert to point get.
				// Because the point get executor can not handle the extra partition ID column now.
				for _, col := range ds.schema.Columns {
					if col.ID == model.ExtraPidColID {
						canConvertPointGet = false
						break
					}
				}
			}
		}
		if canConvertPointGet {
			allRangeIsPoint := true
			for _, ran := range path.Ranges {
				if !ran.IsPointNonNullable(ds.ctx) {
					// unique indexes can have duplicated NULL rows so we cannot use PointGet if there is NULL
					allRangeIsPoint = false
					break
				}
			}
			if allRangeIsPoint {
				var pointGetTask task
				if len(path.Ranges) == 1 {
					pointGetTask = ds.convertToPointGet(prop, candidate, opt)
				} else {
					pointGetTask = ds.convertToBatchPointGet(prop, candidate, hashPartColName, opt)
				}
				appendCandidate(ds, pointGetTask, prop, opt)
				if !pointGetTask.invalid() {
					cntPlan++
					planCounter.Dec(1)
				}
				curIsBetter, cerr := compareTaskCost(ds.ctx, pointGetTask, t, opt)
				if cerr != nil {
					return nil, 0, cerr
				}
				if curIsBetter || planCounter.Empty() {
					t = pointGetTask
					if planCounter.Empty() {
						return
					}
					continue
				}
			}
		}
		if path.IsTablePath() {
			if ds.preferStoreType&preferTiFlash != 0 && path.StoreType == kv.TiKV {
				continue
			}
			if ds.preferStoreType&preferTiKV != 0 && path.StoreType == kv.TiFlash {
				continue
			}
			var tblTask task
			if ds.SampleInfo != nil {
				tblTask, err = ds.convertToSampleTable(prop, candidate, opt)
			} else {
				tblTask, err = ds.convertToTableScan(prop, candidate, opt)
			}
			if err != nil {
				return nil, 0, err
			}
			if !tblTask.invalid() {
				cntPlan++
				planCounter.Dec(1)
			}
			appendCandidate(ds, tblTask, prop, opt)
			curIsBetter, err := compareTaskCost(ds.ctx, tblTask, t, opt)
			if err != nil {
				return nil, 0, err
			}
			if curIsBetter || planCounter.Empty() {
				t = tblTask
			}
			if planCounter.Empty() {
				return t, cntPlan, nil
			}
			continue
		}
		// TiFlash storage do not support index scan.
		if ds.preferStoreType&preferTiFlash != 0 {
			continue
		}
		idxTask, err := ds.convertToIndexScan(prop, candidate, opt)
		if err != nil {
			return nil, 0, err
		}
		if !idxTask.invalid() {
			cntPlan++
			planCounter.Dec(1)
		}
		appendCandidate(ds, idxTask, prop, opt)
		curIsBetter, err := compareTaskCost(ds.ctx, idxTask, t, opt)
		if err != nil {
			return nil, 0, err
		}
		if curIsBetter || planCounter.Empty() {
			t = idxTask
		}
		if planCounter.Empty() {
			return t, cntPlan, nil
		}
	}

	return
}

func (ds *DataSource) canConvertToPointGetForPlanCache(path *util.AccessPath) bool {
	// PointGet might contain some over-optimized assumptions, like `a>=1 and a<=1` --> `a=1`, but
	// these assumptions may be broken after parameters change.
	// So for safety, we narrow down the scope and just generate PointGet in some particular and simple scenarios.

	// scenario 1: each column corresponds to a single EQ, `a=1 and b=2 and c=3` --> `[1, 2, 3]`
	if len(path.Ranges) > 0 && path.Ranges[0].Width() == len(path.AccessConds) {
		for _, accessCond := range path.AccessConds {
			f, ok := accessCond.(*expression.ScalarFunction)
			if !ok {
				return false
			}
			if f.FuncName.L != ast.EQ {
				return false
			}
		}
		return true
	}
	return false
}

func (ds *DataSource) convertToIndexMergeScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
	if prop.TaskTp != property.RootTaskType || !prop.IsSortItemEmpty() {
		return invalidTask, nil
	}
	path := candidate.path
	scans := make([]PhysicalPlan, 0, len(path.PartialIndexPaths))
	cop := &copTask{
		indexPlanFinished: true,
		tblColHists:       ds.TblColHists,
	}
	cop.partitionInfo = PartitionInfo{
		PruningConds:   pushDownNot(ds.ctx, ds.allConds),
		PartitionNames: ds.partitionNames,
		Columns:        ds.TblCols,
		ColumnNames:    ds.names,
	}
	for _, partPath := range path.PartialIndexPaths {
		var scan PhysicalPlan
		if partPath.IsTablePath() {
			scan = ds.convertToPartialTableScan(prop, partPath)
		} else {
			scan = ds.convertToPartialIndexScan(prop, partPath)
		}
		scans = append(scans, scan)
	}
	totalRowCount := path.CountAfterAccess
	if prop.ExpectedCnt < ds.stats.RowCount {
		totalRowCount *= prop.ExpectedCnt / ds.stats.RowCount
	}
	ts, remainingFilters, err := ds.buildIndexMergeTableScan(prop, path.TableFilters, totalRowCount)
	if err != nil {
		return nil, err
	}
	cop.tablePlan = ts
	cop.idxMergePartPlans = scans
	if remainingFilters != nil {
		cop.rootTaskConds = remainingFilters
	}
	task = cop.convertToRootTask(ds.ctx)
	ds.addSelection4PlanCache(task.(*rootTask), ds.tableStats.ScaleByExpectCnt(totalRowCount), prop)
	return task, nil
}

func (ds *DataSource) convertToPartialIndexScan(prop *property.PhysicalProperty, path *util.AccessPath) (indexPlan PhysicalPlan) {
	is := ds.getOriginalPhysicalIndexScan(prop, path, false, false)
	// TODO: Consider using isCoveringIndex() to avoid another TableRead
	indexConds := path.IndexFilters
	if indexConds != nil {
		var selectivity float64
		if path.CountAfterAccess > 0 {
			selectivity = path.CountAfterIndex / path.CountAfterAccess
		}
		rowCount := is.stats.RowCount * selectivity
		stats := &property.StatsInfo{RowCount: rowCount}
		stats.StatsVersion = ds.statisticTable.Version
		if ds.statisticTable.Pseudo {
			stats.StatsVersion = statistics.PseudoVersion
		}
		indexPlan := PhysicalSelection{Conditions: indexConds}.Init(is.ctx, stats, ds.blockOffset)
		indexPlan.SetChildren(is)
		return indexPlan
	}
	indexPlan = is
	return indexPlan
}

func checkColinSchema(cols []*expression.Column, schema *expression.Schema) bool {
	for _, col := range cols {
		if schema.ColumnIndex(col) == -1 {
			return false
		}
	}
	return true
}

func (ds *DataSource) convertToPartialTableScan(prop *property.PhysicalProperty, path *util.AccessPath) (tablePlan PhysicalPlan) {
	ts, rowCount := ds.getOriginalPhysicalTableScan(prop, path, false)
	overwritePartialTableScanSchema(ds, ts)
	// remove ineffetive filter condition after overwriting physicalscan schema
	newFilterConds := make([]expression.Expression, 0, len(path.TableFilters))
	for _, cond := range ts.filterCondition {
		cols := expression.ExtractColumns(cond)
		if checkColinSchema(cols, ts.schema) {
			newFilterConds = append(newFilterConds, cond)
		}
	}
	ts.filterCondition = newFilterConds
	if len(ts.filterCondition) > 0 {
		selectivity, _, err := ds.tableStats.HistColl.Selectivity(ds.ctx, ts.filterCondition, nil)
		if err != nil {
			logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
			selectivity = SelectionFactor
		}
		tablePlan = PhysicalSelection{Conditions: ts.filterCondition}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*rowCount), ds.blockOffset)
		tablePlan.SetChildren(ts)
		return tablePlan
	}
	tablePlan = ts
	return tablePlan
}

// overwritePartialTableScanSchema change the schema of partial table scan to handle columns.
func overwritePartialTableScanSchema(ds *DataSource, ts *PhysicalTableScan) {
	handleCols := ds.handleCols
	if handleCols == nil {
		handleCols = NewIntHandleCols(ds.newExtraHandleSchemaCol())
	}
	hdColNum := handleCols.NumCols()
	exprCols := make([]*expression.Column, 0, hdColNum)
	infoCols := make([]*model.ColumnInfo, 0, hdColNum)
	for i := 0; i < hdColNum; i++ {
		col := handleCols.GetCol(i)
		exprCols = append(exprCols, col)
		infoCols = append(infoCols, col.ToInfo())
	}
	ts.schema = expression.NewSchema(exprCols...)
	ts.Columns = infoCols
}

// setIndexMergeTableScanHandleCols set the handle columns of the table scan.
func setIndexMergeTableScanHandleCols(ds *DataSource, ts *PhysicalTableScan) (err error) {
	handleCols := ds.handleCols
	if handleCols == nil {
		handleCols = NewIntHandleCols(ds.newExtraHandleSchemaCol())
	}
	hdColNum := handleCols.NumCols()
	exprCols := make([]*expression.Column, 0, hdColNum)
	for i := 0; i < hdColNum; i++ {
		col := handleCols.GetCol(i)
		exprCols = append(exprCols, col)
	}
	ts.HandleCols, err = handleCols.ResolveIndices(expression.NewSchema(exprCols...))
	return
}

// buildIndexMergeTableScan() returns Selection that will be pushed to TiKV.
// Filters that cannot be pushed to TiKV are also returned, and an extra Selection above IndexMergeReader will be constructed later.
func (ds *DataSource) buildIndexMergeTableScan(_ *property.PhysicalProperty, tableFilters []expression.Expression,
	totalRowCount float64) (PhysicalPlan, []expression.Expression, error) {
	sessVars := ds.ctx.GetSessionVars()
	ts := PhysicalTableScan{
		Table:           ds.tableInfo,
		Columns:         ds.Columns,
		TableAsName:     ds.TableAsName,
		DBName:          ds.DBName,
		isPartition:     ds.isPartition,
		physicalTableID: ds.physicalTableID,
		HandleCols:      ds.handleCols,
		tblCols:         ds.TblCols,
		tblColHists:     ds.TblColHists,
	}.Init(ds.ctx, ds.blockOffset)
	ts.SetSchema(ds.schema.Clone())
	err := setIndexMergeTableScanHandleCols(ds, ts)
	if err != nil {
		return nil, nil, err
	}
	if ts.Table.PKIsHandle {
		if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
			if ds.statisticTable.Columns[pkColInfo.ID] != nil {
				ts.Hist = &ds.statisticTable.Columns[pkColInfo.ID].Histogram
			}
		}
	}
	ts.stats = ds.tableStats.ScaleByExpectCnt(totalRowCount)
	if ds.statisticTable.Pseudo {
		ts.stats.StatsVersion = statistics.PseudoVersion
	}
	if len(tableFilters) > 0 {
		pushedFilters, remainingFilters := extractFiltersForIndexMerge(sessVars.StmtCtx, ds.ctx.GetClient(), tableFilters)
		pushedFilters1, remainingFilters1 := SplitSelCondsWithVirtualColumn(pushedFilters)
		pushedFilters = pushedFilters1
		remainingFilters = append(remainingFilters, remainingFilters1...)
		if len(pushedFilters) != 0 {
			selectivity, _, err := ds.tableStats.HistColl.Selectivity(ds.ctx, pushedFilters, nil)
			if err != nil {
				logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
				selectivity = SelectionFactor
			}
			sel := PhysicalSelection{Conditions: pushedFilters}.Init(ts.ctx, ts.stats.ScaleByExpectCnt(selectivity*totalRowCount), ts.blockOffset)
			sel.SetChildren(ts)
			return sel, remainingFilters, nil
		}
		return ts, remainingFilters, nil
	}
	return ts, nil, nil
}

// extractFiltersForIndexMerge returns:
// `pushed`: exprs that can be pushed to TiKV.
// `remaining`: exprs that can NOT be pushed to TiKV but can be pushed to other storage engines.
// Why do we need this func?
// IndexMerge only works on TiKV, so we need to find all exprs that cannot be pushed to TiKV, and add a new Selection above IndexMergeReader.
//
//	But the new Selection should exclude the exprs that can NOT be pushed to ALL the storage engines.
//	Because these exprs have already been put in another Selection(check rule_predicate_push_down).
func extractFiltersForIndexMerge(sc *stmtctx.StatementContext, client kv.Client, filters []expression.Expression) (pushed []expression.Expression, remaining []expression.Expression) {
	for _, expr := range filters {
		if expression.CanExprsPushDown(sc, []expression.Expression{expr}, client, kv.TiKV) {
			pushed = append(pushed, expr)
			continue
		}
		if expression.CanExprsPushDown(sc, []expression.Expression{expr}, client, kv.UnSpecified) {
			remaining = append(remaining, expr)
		}
	}
	return
}

func indexCoveringCol(col *expression.Column, indexCols []*expression.Column, idxColLens []int) bool {
	for i, indexCol := range indexCols {
		isFullLen := idxColLens[i] == types.UnspecifiedLength || idxColLens[i] == col.RetType.GetFlen()
		if indexCol != nil && col.EqualByExprAndID(nil, indexCol) && isFullLen {
			return true
		}
	}
	return false
}

func (ds *DataSource) isCoveringIndex(columns, indexColumns []*expression.Column, idxColLens []int, tblInfo *model.TableInfo) bool {
	for _, col := range columns {
		if tblInfo.PKIsHandle && mysql.HasPriKeyFlag(col.RetType.GetFlag()) {
			continue
		}
		if col.ID == model.ExtraHandleID {
			continue
		}
		coveredByPlainIndex := indexCoveringCol(col, indexColumns, idxColLens)
		coveredByClusteredIndex := indexCoveringCol(col, ds.commonHandleCols, ds.commonHandleLens)
		if !coveredByPlainIndex && !coveredByClusteredIndex {
			return false
		}
		isClusteredNewCollationIdx := collate.NewCollationEnabled() &&
			col.GetType().EvalType() == types.ETString &&
			!mysql.HasBinaryFlag(col.GetType().GetFlag())
		if !coveredByPlainIndex && coveredByClusteredIndex && isClusteredNewCollationIdx && ds.table.Meta().CommonHandleVersion == 0 {
			return false
		}
	}
	return true
}

// If there is a table reader which needs to keep order, we should append a pk to table scan.
func (ts *PhysicalTableScan) appendExtraHandleCol(ds *DataSource) (*expression.Column, bool) {
	handleCols := ds.handleCols
	if handleCols != nil {
		return handleCols.GetCol(0), false
	}
	handleCol := ds.newExtraHandleSchemaCol()
	ts.schema.Append(handleCol)
	ts.Columns = append(ts.Columns, model.NewExtraHandleColInfo())
	return handleCol, true
}

// addSelection4PlanCache adds an extra safeguard selection upon this root task for safety.
// When reusing cached plans and rebuilding range for them, the range builder may return an loose range after parameters change.
// When we add the extra selection, it should meet two conditions:
// 1. The length of 'ds.pushedDownConds` should not be zero.
// 2. The result of function `MaybeOverOptimized4PlanCache(ds.pushedDownConds)` call needs to return true.
func (ds *DataSource) addSelection4PlanCache(task *rootTask, stats *property.StatsInfo, prop *property.PhysicalProperty) {
	if !expression.MaybeOverOptimized4PlanCache(ds.ctx, ds.pushedDownConds) || len(ds.pushedDownConds) == 0 {
		return
	}
	sel := PhysicalSelection{Conditions: ds.pushedDownConds}.Init(ds.ctx, stats, ds.blockOffset, prop)
	sel.SetChildren(task.p)
	task.p = sel
}

// convertToIndexScan converts the DataSource to index scan with idx.
func (ds *DataSource) convertToIndexScan(prop *property.PhysicalProperty,
	candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
	if !candidate.path.IsSingleScan {
		// If it's parent requires single read task, return max cost.
		if prop.TaskTp == property.CopSingleReadTaskType {
			return invalidTask, nil
		}
	} else if prop.TaskTp == property.CopDoubleReadTaskType {
		// If it's parent requires double read task, return max cost.
		return invalidTask, nil
	}
	if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
		return invalidTask, nil
	}
	path := candidate.path
	is := ds.getOriginalPhysicalIndexScan(prop, path, candidate.isMatchProp, candidate.path.IsSingleScan)
	cop := &copTask{
		indexPlan:   is,
		tblColHists: ds.TblColHists,
		tblCols:     ds.TblCols,
		expectCnt:   uint64(prop.ExpectedCnt),
	}
	cop.partitionInfo = PartitionInfo{
		PruningConds:   pushDownNot(ds.ctx, ds.allConds),
		PartitionNames: ds.partitionNames,
		Columns:        ds.TblCols,
		ColumnNames:    ds.names,
	}
	if !candidate.path.IsSingleScan {
		// On this way, it's double read case.
		ts := PhysicalTableScan{
			Columns:         ds.Columns,
			Table:           is.Table,
			TableAsName:     ds.TableAsName,
			DBName:          ds.DBName,
			isPartition:     ds.isPartition,
			physicalTableID: ds.physicalTableID,
			tblCols:         ds.TblCols,
			tblColHists:     ds.TblColHists,
		}.Init(ds.ctx, is.blockOffset)
		ts.SetSchema(ds.schema.Clone())
		// We set `StatsVersion` here and fill other fields in `(*copTask).finishIndexPlan`. Since `copTask.indexPlan` may
		// change before calling `(*copTask).finishIndexPlan`, we don't know the stats information of `ts` currently and on
		// the other hand, it may be hard to identify `StatsVersion` of `ts` in `(*copTask).finishIndexPlan`.
		ts.stats = &property.StatsInfo{StatsVersion: ds.tableStats.StatsVersion}
		cop.tablePlan = ts
	}
	task = cop
	if cop.tablePlan != nil && ds.tableInfo.IsCommonHandle {
		cop.commonHandleCols = ds.commonHandleCols
		commonHandle := ds.handleCols.(*CommonHandleCols)
		for _, col := range commonHandle.columns {
			if ds.schema.ColumnIndex(col) == -1 {
				ts := cop.tablePlan.(*PhysicalTableScan)
				ts.Schema().Append(col)
				ts.Columns = append(ts.Columns, col.ToInfo())
				cop.needExtraProj = true
			}
		}
	}
	if candidate.isMatchProp {
		if cop.tablePlan != nil && !ds.tableInfo.IsCommonHandle {
			col, isNew := cop.tablePlan.(*PhysicalTableScan).appendExtraHandleCol(ds)
			cop.extraHandleCol = col
			cop.needExtraProj = cop.needExtraProj || isNew
		}
		cop.keepOrder = true
		// IndexScan on partition table can't keep order.
		if ds.tableInfo.GetPartitionInfo() != nil {
			return invalidTask, nil
		}
	}
	if cop.needExtraProj {
		cop.originSchema = ds.schema
	}
	// prop.IsSortItemEmpty() would always return true when coming to here,
	// so we can just use prop.ExpectedCnt as parameter of addPushedDownSelection.
	finalStats := ds.stats.ScaleByExpectCnt(prop.ExpectedCnt)
	is.addPushedDownSelection(cop, ds, path, finalStats)
	if prop.TaskTp == property.RootTaskType {
		task = task.convertToRootTask(ds.ctx)
		ds.addSelection4PlanCache(task.(*rootTask), finalStats, prop)
	} else if _, ok := task.(*rootTask); ok {
		return invalidTask, nil
	}
	return task, nil
}

func (is *PhysicalIndexScan) getScanRowSize() float64 {
	idx := is.Index
	scanCols := make([]*expression.Column, 0, len(idx.Columns)+1)
	// If `initSchema` has already appended the handle column in schema, just use schema columns, otherwise, add extra handle column.
	if len(idx.Columns) == len(is.schema.Columns) {
		scanCols = append(scanCols, is.schema.Columns...)
		handleCol := is.pkIsHandleCol
		if handleCol != nil {
			scanCols = append(scanCols, handleCol)
		}
	} else {
		scanCols = is.schema.Columns
	}
	return is.tblColHists.GetIndexAvgRowSize(is.ctx, scanCols, is.Index.Unique)
}

// initSchema is used to set the schema of PhysicalIndexScan. Before calling this,
// make sure the following field of PhysicalIndexScan are initialized:
//
//	PhysicalIndexScan.Table         *model.TableInfo
//	PhysicalIndexScan.Index         *model.IndexInfo
//	PhysicalIndexScan.Index.Columns []*IndexColumn
//	PhysicalIndexScan.IdxCols       []*expression.Column
//	PhysicalIndexScan.Columns       []*model.ColumnInfo
func (is *PhysicalIndexScan) initSchema(idxExprCols []*expression.Column, isDoubleRead bool) {
	indexCols := make([]*expression.Column, len(is.IdxCols), len(is.Index.Columns)+1)
	copy(indexCols, is.IdxCols)

	for i := len(is.IdxCols); i < len(is.Index.Columns); i++ {
		if idxExprCols[i] != nil {
			indexCols = append(indexCols, idxExprCols[i])
		} else {
			// TODO: try to reuse the col generated when building the DataSource.
			indexCols = append(indexCols, &expression.Column{
				ID:       is.Table.Columns[is.Index.Columns[i].Offset].ID,
				RetType:  &is.Table.Columns[is.Index.Columns[i].Offset].FieldType,
				UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID(),
			})
		}
	}
	is.NeedCommonHandle = is.Table.IsCommonHandle

	if is.NeedCommonHandle {
		for i := len(is.Index.Columns); i < len(idxExprCols); i++ {
			indexCols = append(indexCols, idxExprCols[i])
		}
	}
	setHandle := len(indexCols) > len(is.Index.Columns)
	if !setHandle {
		for i, col := range is.Columns {
			if (mysql.HasPriKeyFlag(col.GetFlag()) && is.Table.PKIsHandle) || col.ID == model.ExtraHandleID {
				indexCols = append(indexCols, is.dataSourceSchema.Columns[i])
				setHandle = true
				break
			}
		}
	}

	if isDoubleRead {
		// If it's double read case, the first index must return handle. So we should add extra handle column
		// if there isn't a handle column.
		if !setHandle {
			if !is.Table.IsCommonHandle {
				indexCols = append(indexCols, &expression.Column{
					RetType:  types.NewFieldType(mysql.TypeLonglong),
					ID:       model.ExtraHandleID,
					UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID(),
				})
			}
		}
		// If index is global, we should add extra column for pid.
		if is.Index.Global {
			indexCols = append(indexCols, &expression.Column{
				RetType:  types.NewFieldType(mysql.TypeLonglong),
				ID:       model.ExtraPidColID,
				UniqueID: is.ctx.GetSessionVars().AllocPlanColumnID(),
			})
		}
	}

	is.SetSchema(expression.NewSchema(indexCols...))
}

func (is *PhysicalIndexScan) addPushedDownSelection(copTask *copTask, p *DataSource, path *util.AccessPath, finalStats *property.StatsInfo) {
	// Add filter condition to table plan now.
	indexConds, tableConds := path.IndexFilters, path.TableFilters
	tableConds, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(tableConds)

	var newRootConds []expression.Expression
	indexConds, newRootConds = expression.PushDownExprs(is.ctx.GetSessionVars().StmtCtx, indexConds, is.ctx.GetClient(), kv.TiKV)
	copTask.rootTaskConds = append(copTask.rootTaskConds, newRootConds...)

	tableConds, newRootConds = expression.PushDownExprs(is.ctx.GetSessionVars().StmtCtx, tableConds, is.ctx.GetClient(), kv.TiKV)
	copTask.rootTaskConds = append(copTask.rootTaskConds, newRootConds...)

	if indexConds != nil {
		var selectivity float64
		if path.CountAfterAccess > 0 {
			selectivity = path.CountAfterIndex / path.CountAfterAccess
		}
		count := is.stats.RowCount * selectivity
		stats := p.tableStats.ScaleByExpectCnt(count)
		indexSel := PhysicalSelection{Conditions: indexConds}.Init(is.ctx, stats, is.blockOffset)
		indexSel.SetChildren(is)
		copTask.indexPlan = indexSel
	}
	if len(tableConds) > 0 {
		copTask.finishIndexPlan()
		tableSel := PhysicalSelection{Conditions: tableConds}.Init(is.ctx, finalStats, is.blockOffset)
		if len(copTask.rootTaskConds) != 0 {
			selectivity, _, err := copTask.tblColHists.Selectivity(is.ctx, tableConds, nil)
			if err != nil {
				logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
				selectivity = SelectionFactor
			}
			tableSel.stats = copTask.plan().statsInfo().Scale(selectivity)
		}
		tableSel.SetChildren(copTask.tablePlan)
		copTask.tablePlan = tableSel
	}
}

// SplitSelCondsWithVirtualColumn filter the select conditions which contain virtual column
func SplitSelCondsWithVirtualColumn(conds []expression.Expression) (withoutVirt []expression.Expression, withVirt []expression.Expression) {
	for i := range conds {
		if expression.ContainVirtualColumn(conds[i : i+1]) {
			withVirt = append(withVirt, conds[i])
		} else {
			withoutVirt = append(withoutVirt, conds[i])
		}
	}
	return withoutVirt, withVirt
}

func matchIndicesProp(idxCols []*expression.Column, colLens []int, propItems []property.SortItem) bool {
	if len(idxCols) < len(propItems) {
		return false
	}
	for i, item := range propItems {
		if colLens[i] != types.UnspecifiedLength || !item.Col.EqualByExprAndID(nil, idxCols[i]) {
			return false
		}
	}
	return true
}

func (ds *DataSource) splitIndexFilterConditions(conditions []expression.Expression, indexColumns []*expression.Column, idxColLens []int,
	table *model.TableInfo) (indexConds, tableConds []expression.Expression) {
	var indexConditions, tableConditions []expression.Expression
	for _, cond := range conditions {
		if ds.isCoveringIndex(expression.ExtractColumns(cond), indexColumns, idxColLens, table) {
			indexConditions = append(indexConditions, cond)
		} else {
			tableConditions = append(tableConditions, cond)
		}
	}
	return indexConditions, tableConditions
}

// getMostCorrCol4Handle checks if column in the condition is correlated enough with handle. If the condition
// contains multiple columns, return nil and get the max correlation, which would be used in the heuristic estimation.
func getMostCorrCol4Handle(exprs []expression.Expression, histColl *statistics.Table, threshold float64) (*expression.Column, float64) {
	var cols []*expression.Column
	cols = expression.ExtractColumnsFromExpressions(cols, exprs, nil)
	if len(cols) == 0 {
		return nil, 0
	}
	colSet := set.NewInt64Set()
	var corr float64
	var corrCol *expression.Column
	for _, col := range cols {
		if colSet.Exist(col.UniqueID) {
			continue
		}
		colSet.Insert(col.UniqueID)
		hist, ok := histColl.Columns[col.ID]
		if !ok {
			continue
		}
		curCorr := hist.Correlation
		if corrCol == nil || math.Abs(corr) < math.Abs(curCorr) {
			corrCol = col
			corr = curCorr
		}
	}
	if len(colSet) == 1 && math.Abs(corr) >= threshold {
		return corrCol, corr
	}
	return nil, corr
}

// getColumnRangeCounts estimates row count for each range respectively.
func getColumnRangeCounts(sctx sessionctx.Context, colID int64, ranges []*ranger.Range, histColl *statistics.HistColl, idxID int64) ([]float64, bool) {
	var err error
	var count float64
	rangeCounts := make([]float64, len(ranges))
	for i, ran := range ranges {
		if idxID >= 0 {
			idxHist := histColl.Indices[idxID]
			if idxHist == nil || idxHist.IsInvalid(false) {
				return nil, false
			}
			count, err = histColl.GetRowCountByIndexRanges(sctx, idxID, []*ranger.Range{ran})
		} else {
			colHist, ok := histColl.Columns[colID]
			if !ok || colHist.IsInvalid(sctx, false) {
				return nil, false
			}
			count, err = histColl.GetRowCountByColumnRanges(sctx, colID, []*ranger.Range{ran})
		}
		if err != nil {
			return nil, false
		}
		rangeCounts[i] = count
	}
	return rangeCounts, true
}

// convertRangeFromExpectedCnt builds new ranges used to estimate row count we need to scan in table scan before finding specified
// number of tuples which fall into input ranges.
func convertRangeFromExpectedCnt(ranges []*ranger.Range, rangeCounts []float64, expectedCnt float64, desc bool) ([]*ranger.Range, float64, bool) {
	var i int
	var count float64
	var convertedRanges []*ranger.Range
	if desc {
		for i = len(ranges) - 1; i >= 0; i-- {
			if count+rangeCounts[i] >= expectedCnt {
				break
			}
			count += rangeCounts[i]
		}
		if i < 0 {
			return nil, 0, true
		}
		convertedRanges = []*ranger.Range{{LowVal: ranges[i].HighVal, HighVal: []types.Datum{types.MaxValueDatum()}, LowExclude: !ranges[i].HighExclude, Collators: ranges[i].Collators}}
	} else {
		for i = 0; i < len(ranges); i++ {
			if count+rangeCounts[i] >= expectedCnt {
				break
			}
			count += rangeCounts[i]
		}
		if i == len(ranges) {
			return nil, 0, true
		}
		convertedRanges = []*ranger.Range{{LowVal: []types.Datum{{}}, HighVal: ranges[i].LowVal, HighExclude: !ranges[i].LowExclude, Collators: ranges[i].Collators}}
	}
	return convertedRanges, count, false
}

// crossEstimateTableRowCount estimates row count of table scan using histogram of another column which is in TableFilters
// and has high order correlation with handle column. For example, if the query is like:
// `select * from tbl where a = 1 order by pk limit 1`
// if order of column `a` is strictly correlated with column `pk`, the row count of table scan should be:
// `1 + row_count(a < 1 or a is null)`
func (ds *DataSource) crossEstimateTableRowCount(path *util.AccessPath, expectedCnt float64, desc bool) (float64, bool, float64) {
	if ds.statisticTable.Pseudo || len(path.TableFilters) == 0 || !ds.ctx.GetSessionVars().EnableCorrelationAdjustment {
		return 0, false, 0
	}
	col, corr := getMostCorrCol4Handle(path.TableFilters, ds.statisticTable, ds.ctx.GetSessionVars().CorrelationThreshold)
	return ds.crossEstimateRowCount(path, path.TableFilters, col, corr, expectedCnt, desc)
}

// crossEstimateRowCount is the common logic of crossEstimateTableRowCount and crossEstimateIndexRowCount.
func (ds *DataSource) crossEstimateRowCount(path *util.AccessPath, conds []expression.Expression, col *expression.Column, corr, expectedCnt float64, desc bool) (float64, bool, float64) {
	// If the scan is not full range scan, we cannot use histogram of other columns for estimation, because
	// the histogram reflects value distribution in the whole table level.
	if col == nil || len(path.AccessConds) > 0 {
		return 0, false, corr
	}
	colID := col.UniqueID
	if corr < 0 {
		desc = !desc
	}
	accessConds, remained := ranger.DetachCondsForColumn(ds.ctx, conds, col)
	if len(accessConds) == 0 {
		return 0, false, corr
	}
	ranges, accessConds, _, err := ranger.BuildColumnRange(accessConds, ds.ctx, col.RetType, types.UnspecifiedLength, ds.ctx.GetSessionVars().RangeMaxSize)
	if len(ranges) == 0 || len(accessConds) == 0 || err != nil {
		return 0, err == nil, corr
	}
	idxID, idxExists := ds.stats.HistColl.ColID2IdxID[colID]
	if !idxExists {
		idxID = -1
	}
	rangeCounts, ok := getColumnRangeCounts(ds.ctx, colID, ranges, ds.tableStats.HistColl, idxID)
	if !ok {
		return 0, false, corr
	}
	convertedRanges, count, isFull := convertRangeFromExpectedCnt(ranges, rangeCounts, expectedCnt, desc)
	if isFull {
		return path.CountAfterAccess, true, 0
	}
	var rangeCount float64
	if idxExists {
		rangeCount, err = ds.tableStats.HistColl.GetRowCountByIndexRanges(ds.ctx, idxID, convertedRanges)
	} else {
		rangeCount, err = ds.tableStats.HistColl.GetRowCountByColumnRanges(ds.ctx, colID, convertedRanges)
	}
	if err != nil {
		return 0, false, corr
	}
	scanCount := rangeCount + expectedCnt - count
	if len(remained) > 0 {
		scanCount = scanCount / SelectionFactor
	}
	scanCount = math.Min(scanCount, path.CountAfterAccess)
	return scanCount, true, 0
}

// crossEstimateIndexRowCount estimates row count of index scan using histogram of another column which is in TableFilters/IndexFilters
// and has high order correlation with the first index column. For example, if the query is like:
// `select * from tbl where a = 1 order by b limit 1`
// if order of column `a` is strictly correlated with column `b`, the row count of IndexScan(b) should be:
// `1 + row_count(a < 1 or a is null)`
func (ds *DataSource) crossEstimateIndexRowCount(path *util.AccessPath, expectedCnt float64, desc bool) (float64, bool, float64) {
	filtersLen := len(path.TableFilters) + len(path.IndexFilters)
	sessVars := ds.ctx.GetSessionVars()
	if ds.statisticTable.Pseudo || filtersLen == 0 || !sessVars.EnableExtendedStats || !ds.ctx.GetSessionVars().EnableCorrelationAdjustment {
		return 0, false, 0
	}
	col, corr := getMostCorrCol4Index(path, ds.statisticTable, sessVars.CorrelationThreshold)
	filters := make([]expression.Expression, 0, filtersLen)
	filters = append(filters, path.TableFilters...)
	filters = append(filters, path.IndexFilters...)
	return ds.crossEstimateRowCount(path, filters, col, corr, expectedCnt, desc)
}

// getMostCorrCol4Index checks if column in the condition is correlated enough with the first index column. If the condition
// contains multiple columns, return nil and get the max correlation, which would be used in the heuristic estimation.
func getMostCorrCol4Index(path *util.AccessPath, histColl *statistics.Table, threshold float64) (*expression.Column, float64) {
	if histColl.ExtendedStats == nil || len(histColl.ExtendedStats.Stats) == 0 {
		return nil, 0
	}
	var cols []*expression.Column
	cols = expression.ExtractColumnsFromExpressions(cols, path.TableFilters, nil)
	cols = expression.ExtractColumnsFromExpressions(cols, path.IndexFilters, nil)
	if len(cols) == 0 {
		return nil, 0
	}
	colSet := set.NewInt64Set()
	var corr float64
	var corrCol *expression.Column
	for _, col := range cols {
		if colSet.Exist(col.UniqueID) {
			continue
		}
		colSet.Insert(col.UniqueID)
		curCorr := float64(0)
		for _, item := range histColl.ExtendedStats.Stats {
			if (col.ID == item.ColIDs[0] && path.FullIdxCols[0].ID == item.ColIDs[1]) ||
				(col.ID == item.ColIDs[1] && path.FullIdxCols[0].ID == item.ColIDs[0]) {
				curCorr = item.ScalarVals
				break
			}
		}
		if corrCol == nil || math.Abs(corr) < math.Abs(curCorr) {
			corrCol = col
			corr = curCorr
		}
	}
	if len(colSet) == 1 && math.Abs(corr) >= threshold {
		return corrCol, corr
	}
	return nil, corr
}

// GetPhysicalScan returns PhysicalTableScan for the LogicalTableScan.
func (s *LogicalTableScan) GetPhysicalScan(schema *expression.Schema, stats *property.StatsInfo) *PhysicalTableScan {
	ds := s.Source
	ts := PhysicalTableScan{
		Table:           ds.tableInfo,
		Columns:         ds.Columns,
		TableAsName:     ds.TableAsName,
		DBName:          ds.DBName,
		isPartition:     ds.isPartition,
		physicalTableID: ds.physicalTableID,
		Ranges:          s.Ranges,
		AccessCondition: s.AccessConds,
		tblCols:         ds.TblCols,
		tblColHists:     ds.TblColHists,
	}.Init(s.ctx, s.blockOffset)
	ts.stats = stats
	ts.SetSchema(schema.Clone())
	if ts.Table.PKIsHandle {
		if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
			if ds.statisticTable.Columns[pkColInfo.ID] != nil {
				ts.Hist = &ds.statisticTable.Columns[pkColInfo.ID].Histogram
			}
		}
	}
	return ts
}

// GetPhysicalIndexScan returns PhysicalIndexScan for the logical IndexScan.
func (s *LogicalIndexScan) GetPhysicalIndexScan(_ *expression.Schema, stats *property.StatsInfo) *PhysicalIndexScan {
	ds := s.Source
	is := PhysicalIndexScan{
		Table:            ds.tableInfo,
		TableAsName:      ds.TableAsName,
		DBName:           ds.DBName,
		Columns:          s.Columns,
		Index:            s.Index,
		IdxCols:          s.IdxCols,
		IdxColLens:       s.IdxColLens,
		AccessCondition:  s.AccessConds,
		Ranges:           s.Ranges,
		dataSourceSchema: ds.schema,
		isPartition:      ds.isPartition,
		physicalTableID:  ds.physicalTableID,
		tblColHists:      ds.TblColHists,
		pkIsHandleCol:    ds.getPKIsHandleCol(),
	}.Init(ds.ctx, ds.blockOffset)
	is.stats = stats
	is.initSchema(s.FullIdxCols, s.IsDoubleRead)
	return is
}

// convertToTableScan converts the DataSource to table scan.
func (ds *DataSource) convertToTableScan(prop *property.PhysicalProperty, candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
	// It will be handled in convertToIndexScan.
	if prop.TaskTp == property.CopDoubleReadTaskType {
		return invalidTask, nil
	}
	if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
		return invalidTask, nil
	}
	ts, _ := ds.getOriginalPhysicalTableScan(prop, candidate.path, candidate.isMatchProp)
	if ts.KeepOrder && ts.StoreType == kv.TiFlash && (ts.Desc || ds.SCtx().GetSessionVars().TiFlashFastScan) {
		// TiFlash fast mode(https://github.com/pingcap/tidb/pull/35851) does not keep order in TableScan
		return invalidTask, nil
	}
	if ts.StoreType == kv.TiFlash {
		for _, col := range ts.schema.Columns {
			// In theory, TiFlash does not support virtual expr, but in non-mpp mode, if the cop request only contain table scan, then
			// TiDB will fill the virtual column after decoding the cop response(executor.FillVirtualColumnValue), that is to say, the virtual
			// columns in Cop request is just a placeholder, so TiFlash can support virtual column in cop request mode. However, virtual column
			// with TiDBShard is special, it can be added using create index statement, TiFlash's ddl does not handle create index statement, so
			// there is a chance that the TiDBShard's virtual column is not seen by TiFlash, in this case, TiFlash will throw column not found error
			if ds.containExprPrefixUk && expression.GcColumnExprIsTidbShard(col.VirtualExpr) {
				ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
				return invalidTask, nil
			}
		}
	}
	if prop.TaskTp == property.MppTaskType {
		if ts.KeepOrder {
			return invalidTask, nil
		}
		if prop.MPPPartitionTp != property.AnyType || ts.isPartition {
			// If ts is a single partition, then this partition table is in static-only prune, then we should not choose mpp execution.
			ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because table `" + ds.tableInfo.Name.O + "`is a partition table which is not supported when `@@tidb_partition_prune_mode=static`.")
			return invalidTask, nil
		}
		for _, col := range ts.schema.Columns {
			if col.VirtualExpr != nil {
				ds.SCtx().GetSessionVars().RaiseWarningWhenMPPEnforced("MPP mode may be blocked because column `" + col.OrigName + "` is a virtual column which is not supported now.")
				return invalidTask, nil
			}
		}
		mppTask := &mppTask{
			p:      ts,
			partTp: property.AnyType,
		}
		ts.PartitionInfo = PartitionInfo{
			PruningConds:   pushDownNot(ds.ctx, ds.allConds),
			PartitionNames: ds.partitionNames,
			Columns:        ds.TblCols,
			ColumnNames:    ds.names,
		}
		mppTask = ts.addPushedDownSelectionToMppTask(mppTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt))
		return mppTask, nil
	}
	copTask := &copTask{
		tablePlan:         ts,
		indexPlanFinished: true,
		tblColHists:       ds.TblColHists,
	}
	copTask.partitionInfo = PartitionInfo{
		PruningConds:   pushDownNot(ds.ctx, ds.allConds),
		PartitionNames: ds.partitionNames,
		Columns:        ds.TblCols,
		ColumnNames:    ds.names,
	}
	ts.PartitionInfo = copTask.partitionInfo
	task = copTask
	if candidate.isMatchProp {
		copTask.keepOrder = true
		// TableScan on partition table can't keep order.
		if ds.tableInfo.GetPartitionInfo() != nil {
			return invalidTask, nil
		}
	}
	ts.addPushedDownSelection(copTask, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt))
	if prop.IsFlashProp() && len(copTask.rootTaskConds) != 0 {
		return invalidTask, nil
	}
	if prop.TaskTp == property.RootTaskType {
		task = task.convertToRootTask(ds.ctx)
		ds.addSelection4PlanCache(task.(*rootTask), ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), prop)
	} else if _, ok := task.(*rootTask); ok {
		return invalidTask, nil
	}
	return task, nil
}

func (ds *DataSource) convertToSampleTable(prop *property.PhysicalProperty,
	candidate *candidatePath, _ *physicalOptimizeOp) (task task, err error) {
	if prop.TaskTp == property.CopDoubleReadTaskType {
		return invalidTask, nil
	}
	if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
		return invalidTask, nil
	}
	if candidate.isMatchProp {
		// TableSample on partition table can't keep order.
		if ds.tableInfo.GetPartitionInfo() != nil {
			return invalidTask, nil
		}
	}
	p := PhysicalTableSample{
		TableSampleInfo: ds.SampleInfo,
		TableInfo:       ds.table,
		Desc:            candidate.isMatchProp && prop.SortItems[0].Desc,
	}.Init(ds.ctx, ds.SelectBlockOffset())
	p.schema = ds.schema
	return &rootTask{
		p: p,
	}, nil
}

func (ds *DataSource) convertToPointGet(prop *property.PhysicalProperty, candidate *candidatePath, opt *physicalOptimizeOp) (task task) {
	if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
		return invalidTask
	}
	if prop.TaskTp == property.CopDoubleReadTaskType && candidate.path.IsSingleScan ||
		prop.TaskTp == property.CopSingleReadTaskType && !candidate.path.IsSingleScan {
		return invalidTask
	}

	if tidbutil.IsMemDB(ds.DBName.L) {
		return invalidTask
	}

	accessCnt := math.Min(candidate.path.CountAfterAccess, float64(1))
	pointGetPlan := PointGetPlan{
		ctx:              ds.ctx,
		AccessConditions: candidate.path.AccessConds,
		schema:           ds.schema.Clone(),
		dbName:           ds.DBName.L,
		TblInfo:          ds.TableInfo(),
		outputNames:      ds.OutputNames(),
		LockWaitTime:     ds.ctx.GetSessionVars().LockWaitTimeout,
		Columns:          ds.Columns,
	}.Init(ds.ctx, ds.tableStats.ScaleByExpectCnt(accessCnt), ds.blockOffset)
	var partitionInfo *model.PartitionDefinition
	if ds.isPartition {
		if pi := ds.tableInfo.GetPartitionInfo(); pi != nil {
			for i := range pi.Definitions {
				def := pi.Definitions[i]
				if def.ID == ds.physicalTableID {
					partitionInfo = &def
					break
				}
			}
		}
		if partitionInfo == nil {
			return invalidTask
		}
	}
	rTsk := &rootTask{p: pointGetPlan}
	if candidate.path.IsIntHandlePath {
		pointGetPlan.Handle = kv.IntHandle(candidate.path.Ranges[0].LowVal[0].GetInt64())
		pointGetPlan.UnsignedHandle = mysql.HasUnsignedFlag(ds.handleCols.GetCol(0).RetType.GetFlag())
		pointGetPlan.PartitionInfo = partitionInfo
		pointGetPlan.accessCols = ds.TblCols
		// Add filter condition to table plan now.
		if len(candidate.path.TableFilters) > 0 {
			sel := PhysicalSelection{
				Conditions: candidate.path.TableFilters,
			}.Init(ds.ctx, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), ds.blockOffset)
			sel.SetChildren(pointGetPlan)
			rTsk.p = sel
		}
	} else {
		pointGetPlan.IndexInfo = candidate.path.Index
		pointGetPlan.IdxCols = candidate.path.IdxCols
		pointGetPlan.IdxColLens = candidate.path.IdxColLens
		pointGetPlan.IndexValues = candidate.path.Ranges[0].LowVal
		pointGetPlan.PartitionInfo = partitionInfo
		if candidate.path.IsSingleScan {
			pointGetPlan.accessCols = candidate.path.IdxCols
		} else {
			pointGetPlan.accessCols = ds.TblCols
		}
		// Add index condition to table plan now.
		if len(candidate.path.IndexFilters)+len(candidate.path.TableFilters) > 0 {
			sel := PhysicalSelection{
				Conditions: append(candidate.path.IndexFilters, candidate.path.TableFilters...),
			}.Init(ds.ctx, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), ds.blockOffset)
			sel.SetChildren(pointGetPlan)
			rTsk.p = sel
		}
	}

	return rTsk
}

func (ds *DataSource) convertToBatchPointGet(prop *property.PhysicalProperty,
	candidate *candidatePath, hashPartColName *ast.ColumnName, opt *physicalOptimizeOp) (task task) {
	if !prop.IsSortItemEmpty() && !candidate.isMatchProp {
		return invalidTask
	}
	if prop.TaskTp == property.CopDoubleReadTaskType && candidate.path.IsSingleScan ||
		prop.TaskTp == property.CopSingleReadTaskType && !candidate.path.IsSingleScan {
		return invalidTask
	}

	accessCnt := math.Min(candidate.path.CountAfterAccess, float64(len(candidate.path.Ranges)))
	batchPointGetPlan := BatchPointGetPlan{
		ctx:              ds.ctx,
		dbName:           ds.DBName.L,
		AccessConditions: candidate.path.AccessConds,
		TblInfo:          ds.TableInfo(),
		KeepOrder:        !prop.IsSortItemEmpty(),
		Columns:          ds.Columns,
		SinglePart:       ds.isPartition,
		PartTblID:        ds.physicalTableID,
		PartitionExpr:    getPartitionExpr(ds.ctx, ds.TableInfo()),
	}.Init(ds.ctx, ds.tableStats.ScaleByExpectCnt(accessCnt), ds.schema.Clone(), ds.names, ds.blockOffset)
	if batchPointGetPlan.KeepOrder {
		batchPointGetPlan.Desc = prop.SortItems[0].Desc
	}
	rTsk := &rootTask{p: batchPointGetPlan}
	if candidate.path.IsIntHandlePath {
		for _, ran := range candidate.path.Ranges {
			batchPointGetPlan.Handles = append(batchPointGetPlan.Handles, kv.IntHandle(ran.LowVal[0].GetInt64()))
		}
		batchPointGetPlan.accessCols = ds.TblCols
		// Add filter condition to table plan now.
		if len(candidate.path.TableFilters) > 0 {
			sel := PhysicalSelection{
				Conditions: candidate.path.TableFilters,
			}.Init(ds.ctx, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), ds.blockOffset)
			sel.SetChildren(batchPointGetPlan)
			rTsk.p = sel
		}
	} else {
		batchPointGetPlan.IndexInfo = candidate.path.Index
		batchPointGetPlan.IdxCols = candidate.path.IdxCols
		batchPointGetPlan.IdxColLens = candidate.path.IdxColLens
		batchPointGetPlan.PartitionColPos = getHashPartitionColumnPos(candidate.path.Index, hashPartColName)
		for _, ran := range candidate.path.Ranges {
			batchPointGetPlan.IndexValues = append(batchPointGetPlan.IndexValues, ran.LowVal)
		}
		if !prop.IsSortItemEmpty() {
			batchPointGetPlan.KeepOrder = true
			batchPointGetPlan.Desc = prop.SortItems[0].Desc
		}
		if candidate.path.IsSingleScan {
			batchPointGetPlan.accessCols = candidate.path.IdxCols
		} else {
			batchPointGetPlan.accessCols = ds.TblCols
		}
		// Add index condition to table plan now.
		if len(candidate.path.IndexFilters)+len(candidate.path.TableFilters) > 0 {
			sel := PhysicalSelection{
				Conditions: append(candidate.path.IndexFilters, candidate.path.TableFilters...),
			}.Init(ds.ctx, ds.stats.ScaleByExpectCnt(prop.ExpectedCnt), ds.blockOffset)
			sel.SetChildren(batchPointGetPlan)
			rTsk.p = sel
		}
	}

	return rTsk
}

func (ts *PhysicalTableScan) addPushedDownSelectionToMppTask(mpp *mppTask, stats *property.StatsInfo) *mppTask {
	filterCondition, rootTaskConds := SplitSelCondsWithVirtualColumn(ts.filterCondition)
	var newRootConds []expression.Expression
	filterCondition, newRootConds = expression.PushDownExprs(ts.ctx.GetSessionVars().StmtCtx, filterCondition, ts.ctx.GetClient(), ts.StoreType)
	rootTaskConds = append(rootTaskConds, newRootConds...)
	if len(rootTaskConds) > 0 {
		return &mppTask{}
	}
	ts.filterCondition = filterCondition
	// Add filter condition to table plan now.
	if len(ts.filterCondition) > 0 {
		sel := PhysicalSelection{Conditions: ts.filterCondition}.Init(ts.ctx, stats, ts.blockOffset)
		sel.SetChildren(ts)
		mpp.p = sel
	}
	return mpp
}

func (ts *PhysicalTableScan) addPushedDownSelection(copTask *copTask, stats *property.StatsInfo) {
	ts.filterCondition, copTask.rootTaskConds = SplitSelCondsWithVirtualColumn(ts.filterCondition)
	var newRootConds []expression.Expression
	ts.filterCondition, newRootConds = expression.PushDownExprs(ts.ctx.GetSessionVars().StmtCtx, ts.filterCondition, ts.ctx.GetClient(), ts.StoreType)
	copTask.rootTaskConds = append(copTask.rootTaskConds, newRootConds...)

	// Add filter condition to table plan now.
	if len(ts.filterCondition) > 0 {
		sel := PhysicalSelection{Conditions: ts.filterCondition}.Init(ts.ctx, stats, ts.blockOffset)
		if len(copTask.rootTaskConds) != 0 {
			selectivity, _, err := copTask.tblColHists.Selectivity(ts.ctx, ts.filterCondition, nil)
			if err != nil {
				logutil.BgLogger().Debug("calculate selectivity failed, use selection factor", zap.Error(err))
				selectivity = SelectionFactor
			}
			sel.stats = ts.stats.Scale(selectivity)
		}
		sel.SetChildren(ts)
		copTask.tablePlan = sel
	}
}

func (ts *PhysicalTableScan) getScanRowSize() float64 {
	if ts.StoreType == kv.TiKV {
		return ts.tblColHists.GetTableAvgRowSize(ts.ctx, ts.tblCols, ts.StoreType, true)
	}
	// If `ts.handleCol` is nil, then the schema of tableScan doesn't have handle column.
	// This logic can be ensured in column pruning.
	return ts.tblColHists.GetTableAvgRowSize(ts.ctx, ts.Schema().Columns, ts.StoreType, ts.HandleCols != nil)
}

func (ds *DataSource) getOriginalPhysicalTableScan(prop *property.PhysicalProperty, path *util.AccessPath, isMatchProp bool) (*PhysicalTableScan, float64) {
	ts := PhysicalTableScan{
		Table:           ds.tableInfo,
		Columns:         ds.Columns,
		TableAsName:     ds.TableAsName,
		DBName:          ds.DBName,
		isPartition:     ds.isPartition,
		physicalTableID: ds.physicalTableID,
		Ranges:          path.Ranges,
		AccessCondition: path.AccessConds,
		StoreType:       path.StoreType,
		HandleCols:      ds.handleCols,
		tblCols:         ds.TblCols,
		tblColHists:     ds.TblColHists,
		prop:            prop,
	}.Init(ds.ctx, ds.blockOffset)
	ts.filterCondition = make([]expression.Expression, len(path.TableFilters))
	copy(ts.filterCondition, path.TableFilters)
	ts.SetSchema(ds.schema.Clone())
	if ts.Table.PKIsHandle {
		if pkColInfo := ts.Table.GetPkColInfo(); pkColInfo != nil {
			if ds.statisticTable.Columns[pkColInfo.ID] != nil {
				ts.Hist = &ds.statisticTable.Columns[pkColInfo.ID].Histogram
			}
		}
	}
	rowCount := path.CountAfterAccess
	if prop.ExpectedCnt < ds.stats.RowCount {
		selectivity := ds.stats.RowCount / path.CountAfterAccess
		uniformEst := math.Min(path.CountAfterAccess, prop.ExpectedCnt/selectivity)

		corrEst, ok, corr := ds.crossEstimateTableRowCount(path, prop.ExpectedCnt, isMatchProp && prop.SortItems[0].Desc)
		if ok {
			// TODO: actually, before using this count as the estimated row count of table scan, we need additionally
			// check if count < row_count(first_region | last_region), and use the larger one since we build one copTask
			// for one region now, so even if it is `limit 1`, we have to scan at least one region in table scan.
			// Currently, we can use `tikvrpc.CmdDebugGetRegionProperties` interface as `getSampRegionsRowCount()` does
			// to get the row count in a region, but that result contains MVCC old version rows, so it is not that accurate.
			// Considering that when this scenario happens, the execution time is close between IndexScan and TableScan,
			// we do not add this check temporarily.

			// to reduce risks of correlation adjustment, use the maximum between uniformEst and corrEst
			rowCount = math.Max(uniformEst, corrEst)
		} else if abs := math.Abs(corr); abs < 1 {
			correlationFactor := math.Pow(1-abs, float64(ds.ctx.GetSessionVars().CorrelationExpFactor))
			rowCount = math.Min(path.CountAfterAccess, uniformEst/correlationFactor)
		}
	}
	// We need NDV of columns since it may be used in cost estimation of join. Precisely speaking,
	// we should track NDV of each histogram bucket, and sum up the NDV of buckets we actually need
	// to scan, but this would only help improve accuracy of NDV for one column, for other columns,
	// we still need to assume values are uniformly distributed. For simplicity, we use uniform-assumption
	// for all columns now, as we do in `deriveStatsByFilter`.
	ts.stats = ds.tableStats.ScaleByExpectCnt(rowCount)
	if isMatchProp {
		ts.Desc = prop.SortItems[0].Desc
		ts.KeepOrder = true
	}
	return ts, rowCount
}

func (ds *DataSource) getOriginalPhysicalIndexScan(prop *property.PhysicalProperty, path *util.AccessPath, isMatchProp bool, isSingleScan bool) *PhysicalIndexScan {
	idx := path.Index
	is := PhysicalIndexScan{
		Table:            ds.tableInfo,
		TableAsName:      ds.TableAsName,
		DBName:           ds.DBName,
		Columns:          ds.Columns,
		Index:            idx,
		IdxCols:          path.IdxCols,
		IdxColLens:       path.IdxColLens,
		AccessCondition:  path.AccessConds,
		Ranges:           path.Ranges,
		dataSourceSchema: ds.schema,
		isPartition:      ds.isPartition,
		physicalTableID:  ds.physicalTableID,
		tblColHists:      ds.TblColHists,
		pkIsHandleCol:    ds.getPKIsHandleCol(),
		prop:             prop,
	}.Init(ds.ctx, ds.blockOffset)
	statsTbl := ds.statisticTable
	if statsTbl.Indices[idx.ID] != nil {
		is.Hist = &statsTbl.Indices[idx.ID].Histogram
	}
	rowCount := path.CountAfterAccess
	is.initSchema(append(path.FullIdxCols, ds.commonHandleCols...), !isSingleScan)
	if (isMatchProp || prop.IsSortItemEmpty()) && prop.ExpectedCnt < ds.stats.RowCount {
		count, ok, corr := ds.crossEstimateIndexRowCount(path, prop.ExpectedCnt, isMatchProp && prop.SortItems[0].Desc)
		if ok {
			rowCount = count
		} else if abs := math.Abs(corr); abs < 1 {
			correlationFactor := math.Pow(1-abs, float64(ds.ctx.GetSessionVars().CorrelationExpFactor))
			selectivity := ds.stats.RowCount / rowCount
			rowCount = math.Min(prop.ExpectedCnt/selectivity/correlationFactor, rowCount)
		}
	}
	is.stats = ds.tableStats.ScaleByExpectCnt(rowCount)
	if isMatchProp {
		is.Desc = prop.SortItems[0].Desc
		is.KeepOrder = true
	}
	return is
}

func (p *LogicalCTE) findBestTask(prop *property.PhysicalProperty, _ *PlanCounterTp, _ *physicalOptimizeOp) (t task, cntPlan int64, err error) {
	if !prop.IsSortItemEmpty() && !prop.CanAddEnforcer {
		return invalidTask, 1, nil
	}
	// The physical plan has been build when derive stats.
	pcte := PhysicalCTE{SeedPlan: p.cte.seedPartPhysicalPlan, RecurPlan: p.cte.recursivePartPhysicalPlan, CTE: p.cte, cteAsName: p.cteAsName}.Init(p.ctx, p.stats)
	pcte.SetSchema(p.schema)
	t = &rootTask{pcte, false}
	if prop.CanAddEnforcer {
		t = enforceProperty(prop, t, p.basePlan.ctx)
	}
	return t, 1, nil
}

func (p *LogicalCTETable) findBestTask(prop *property.PhysicalProperty, _ *PlanCounterTp, _ *physicalOptimizeOp) (t task, cntPlan int64, err error) {
	if !prop.IsSortItemEmpty() {
		return nil, 1, nil
	}

	pcteTable := PhysicalCTETable{IDForStorage: p.idForStorage}.Init(p.ctx, p.stats)
	pcteTable.SetSchema(p.schema)
	t = &rootTask{p: pcteTable}
	return t, 1, nil
}

func appendCandidate(lp LogicalPlan, task task, prop *property.PhysicalProperty, opt *physicalOptimizeOp) {
	if task == nil || task.invalid() {
		return
	}
	opt.appendCandidate(lp, task.plan(), prop)
}

// PushDownNot here can convert condition 'not (a != 1)' to 'a = 1'. When we build range from conds, the condition like
// 'not (a != 1)' would not be handled so we need to convert it to 'a = 1', which can be handled when building range.
func pushDownNot(ctx sessionctx.Context, conds []expression.Expression) []expression.Expression {
	for i, cond := range conds {
		conds[i] = expression.PushDownNot(ctx, cond)
	}
	return conds
}

相关信息

tidb 源码目录

相关文章

tidb access_object 源码

tidb collect_column_stats_usage 源码

tidb common_plans 源码

tidb encode 源码

tidb errors 源码

tidb exhaust_physical_plans 源码

tidb explain 源码

tidb expression_rewriter 源码

tidb flat_plan 源码

tidb fragment 源码

0  赞