tidb infoschema_reader 源码

  • 2022-09-19
  • 浏览 (448)

tidb infoschema_reader 代码

文件路径:/executor/infoschema_reader.go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
	"bytes"
	"context"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"io"
	"net/http"
	"strconv"
	"strings"
	"sync"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/kvproto/pkg/deadlock"
	"github.com/pingcap/tidb/ddl/label"
	"github.com/pingcap/tidb/domain"
	"github.com/pingcap/tidb/domain/infosync"
	"github.com/pingcap/tidb/errno"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/infoschema"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/meta/autoid"
	"github.com/pingcap/tidb/parser/charset"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/parser/terror"
	plannercore "github.com/pingcap/tidb/planner/core"
	"github.com/pingcap/tidb/privilege"
	"github.com/pingcap/tidb/privilege/privileges"
	"github.com/pingcap/tidb/session/txninfo"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/sessiontxn"
	"github.com/pingcap/tidb/store/helper"
	"github.com/pingcap/tidb/table"
	"github.com/pingcap/tidb/tablecodec"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/codec"
	"github.com/pingcap/tidb/util/collate"
	"github.com/pingcap/tidb/util/deadlockhistory"
	"github.com/pingcap/tidb/util/hint"
	"github.com/pingcap/tidb/util/keydecoder"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/mathutil"
	"github.com/pingcap/tidb/util/pdapi"
	"github.com/pingcap/tidb/util/resourcegrouptag"
	"github.com/pingcap/tidb/util/sem"
	"github.com/pingcap/tidb/util/set"
	"github.com/pingcap/tidb/util/sqlexec"
	"github.com/pingcap/tidb/util/stmtsummary"
	"github.com/pingcap/tidb/util/stringutil"
	"github.com/tikv/client-go/v2/txnkv/txnlock"
	"go.uber.org/zap"
	"golang.org/x/exp/slices"
)

type memtableRetriever struct {
	dummyCloser
	table       *model.TableInfo
	columns     []*model.ColumnInfo
	rows        [][]types.Datum
	rowIdx      int
	retrieved   bool
	initialized bool
	extractor   plannercore.MemTablePredicateExtractor
}

// retrieve implements the infoschemaRetriever interface
func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.table.Name.O == infoschema.TableClusterInfo && !hasPriv(sctx, mysql.ProcessPriv) {
		return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
	}
	if e.retrieved {
		return nil, nil
	}

	// Cache the ret full rows in schemataRetriever
	if !e.initialized {
		is := sctx.GetInfoSchema().(infoschema.InfoSchema)
		dbs := is.AllSchemas()
		slices.SortFunc(dbs, model.LessDBInfo)
		var err error
		switch e.table.Name.O {
		case infoschema.TableSchemata:
			e.setDataFromSchemata(sctx, dbs)
		case infoschema.TableStatistics:
			e.setDataForStatistics(sctx, dbs)
		case infoschema.TableTables:
			err = e.setDataFromTables(ctx, sctx, dbs)
		case infoschema.TableReferConst:
			err = e.setDataFromReferConst(ctx, sctx, dbs)
		case infoschema.TableSequences:
			e.setDataFromSequences(sctx, dbs)
		case infoschema.TablePartitions:
			err = e.setDataFromPartitions(ctx, sctx, dbs)
		case infoschema.TableClusterInfo:
			err = e.dataForTiDBClusterInfo(sctx)
		case infoschema.TableAnalyzeStatus:
			err = e.setDataForAnalyzeStatus(sctx)
		case infoschema.TableTiDBIndexes:
			e.setDataFromIndexes(sctx, dbs)
		case infoschema.TableViews:
			e.setDataFromViews(sctx, dbs)
		case infoschema.TableEngines:
			e.setDataFromEngines()
		case infoschema.TableCharacterSets:
			e.setDataFromCharacterSets()
		case infoschema.TableCollations:
			e.setDataFromCollations()
		case infoschema.TableKeyColumn:
			e.setDataFromKeyColumnUsage(sctx, dbs)
		case infoschema.TableMetricTables:
			e.setDataForMetricTables(sctx)
		case infoschema.TableProfiling:
			e.setDataForPseudoProfiling(sctx)
		case infoschema.TableCollationCharacterSetApplicability:
			e.dataForCollationCharacterSetApplicability()
		case infoschema.TableProcesslist:
			e.setDataForProcessList(sctx)
		case infoschema.ClusterTableProcesslist:
			err = e.setDataForClusterProcessList(sctx)
		case infoschema.TableUserPrivileges:
			e.setDataFromUserPrivileges(sctx)
		case infoschema.TableTiKVRegionStatus:
			err = e.setDataForTiKVRegionStatus(sctx)
		case infoschema.TableTiDBHotRegions:
			err = e.setDataForTiDBHotRegions(sctx)
		case infoschema.TableConstraints:
			e.setDataFromTableConstraints(sctx, dbs)
		case infoschema.TableSessionVar:
			e.rows, err = infoschema.GetDataFromSessionVariables(sctx)
		case infoschema.TableTiDBServersInfo:
			err = e.setDataForServersInfo(sctx)
		case infoschema.TableTiFlashReplica:
			e.dataForTableTiFlashReplica(sctx, dbs)
		case infoschema.TableTiKVStoreStatus:
			err = e.dataForTiKVStoreStatus(sctx)
		case infoschema.TableStatementsSummaryEvicted,
			infoschema.ClusterTableStatementsSummaryEvicted:
			err = e.setDataForStatementsSummaryEvicted(sctx)
		case infoschema.TableClientErrorsSummaryGlobal,
			infoschema.TableClientErrorsSummaryByUser,
			infoschema.TableClientErrorsSummaryByHost:
			err = e.setDataForClientErrorsSummary(sctx, e.table.Name.O)
		case infoschema.TableAttributes:
			err = e.setDataForAttributes(sctx, is)
		case infoschema.TablePlacementPolicies:
			err = e.setDataFromPlacementPolicies(sctx)
		case infoschema.TableTrxSummary:
			err = e.setDataForTrxSummary(sctx)
		case infoschema.ClusterTableTrxSummary:
			err = e.setDataForClusterTrxSummary(sctx)
		case infoschema.TableVariablesInfo:
			err = e.setDataForVariablesInfo(sctx)
		}
		if err != nil {
			return nil, err
		}
		e.initialized = true
	}

	// Adjust the amount of each return
	maxCount := 1024
	retCount := maxCount
	if e.rowIdx+maxCount > len(e.rows) {
		retCount = len(e.rows) - e.rowIdx
		e.retrieved = true
	}
	ret := make([][]types.Datum, retCount)
	for i := e.rowIdx; i < e.rowIdx+retCount; i++ {
		ret[i-e.rowIdx] = e.rows[i]
	}
	e.rowIdx += retCount
	return adjustColumns(ret, e.columns, e.table), nil
}

func getRowCountTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[int64]uint64, error) {
	exec := sctx.(sqlexec.RestrictedSQLExecutor)
	var rows []chunk.Row
	var err error
	if len(tableIDs) == 0 {
		rows, _, err = exec.ExecRestrictedSQL(ctx, nil, "select table_id, count from mysql.stats_meta")
	} else {
		inTblIDs := buildInTableIDsString(tableIDs)
		sql := "select table_id, count from mysql.stats_meta where " + inTblIDs
		rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
	}
	if err != nil {
		return nil, err
	}

	rowCountMap := make(map[int64]uint64, len(rows))
	for _, row := range rows {
		tableID := row.GetInt64(0)
		rowCnt := row.GetUint64(1)
		rowCountMap[tableID] = rowCnt
	}
	return rowCountMap, nil
}

func buildInTableIDsString(tableIDs []int64) string {
	var whereBuilder strings.Builder
	whereBuilder.WriteString("table_id in (")
	for i, id := range tableIDs {
		whereBuilder.WriteString(strconv.FormatInt(id, 10))
		if i != len(tableIDs)-1 {
			whereBuilder.WriteString(",")
		}
	}
	whereBuilder.WriteString(")")
	return whereBuilder.String()
}

type tableHistID struct {
	tableID int64
	histID  int64
}

func getColLengthTables(ctx context.Context, sctx sessionctx.Context, tableIDs ...int64) (map[tableHistID]uint64, error) {
	exec := sctx.(sqlexec.RestrictedSQLExecutor)
	var rows []chunk.Row
	var err error
	if len(tableIDs) == 0 {
		sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0"
		rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
	} else {
		inTblIDs := buildInTableIDsString(tableIDs)
		sql := "select table_id, hist_id, tot_col_size from mysql.stats_histograms where is_index = 0 and " + inTblIDs
		rows, _, err = exec.ExecRestrictedSQL(ctx, nil, sql)
	}
	if err != nil {
		return nil, err
	}

	colLengthMap := make(map[tableHistID]uint64, len(rows))
	for _, row := range rows {
		tableID := row.GetInt64(0)
		histID := row.GetInt64(1)
		totalSize := row.GetInt64(2)
		if totalSize < 0 {
			totalSize = 0
		}
		colLengthMap[tableHistID{tableID: tableID, histID: histID}] = uint64(totalSize)
	}
	return colLengthMap, nil
}

func getDataAndIndexLength(info *model.TableInfo, physicalID int64, rowCount uint64) (uint64, uint64) {
	columnLength := make(map[string]uint64, len(info.Columns))
	for _, col := range info.Columns {
		if col.State != model.StatePublic {
			continue
		}
		length := col.FieldType.StorageLength()
		if length != types.VarStorageLen {
			columnLength[col.Name.L] = rowCount * uint64(length)
		} else {
			length := tableStatsCache.GetColLength(tableHistID{tableID: physicalID, histID: col.ID})
			columnLength[col.Name.L] = length
		}
	}
	dataLength, indexLength := uint64(0), uint64(0)
	for _, length := range columnLength {
		dataLength += length
	}
	for _, idx := range info.Indices {
		if idx.State != model.StatePublic {
			continue
		}
		for _, col := range idx.Columns {
			if col.Length == types.UnspecifiedLength {
				indexLength += columnLength[col.Name.L]
			} else {
				indexLength += rowCount * uint64(col.Length)
			}
		}
	}
	return dataLength, indexLength
}

type statsCache struct {
	mu         sync.RWMutex
	modifyTime time.Time
	tableRows  map[int64]uint64
	colLength  map[tableHistID]uint64
	dirtyIDs   []int64
}

var tableStatsCache = &statsCache{}

// TableStatsCacheExpiry is the expiry time for table stats cache.
var TableStatsCacheExpiry = 3 * time.Second

func invalidInfoSchemaStatCache(tblID int64) {
	tableStatsCache.mu.Lock()
	defer tableStatsCache.mu.Unlock()
	tableStatsCache.dirtyIDs = append(tableStatsCache.dirtyIDs, tblID)
}

func (c *statsCache) GetTableRows(id int64) uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.tableRows[id]
}

func (c *statsCache) GetColLength(id tableHistID) uint64 {
	c.mu.RLock()
	defer c.mu.RUnlock()
	return c.colLength[id]
}

func (c *statsCache) update(ctx context.Context, sctx sessionctx.Context) error {
	c.mu.Lock()
	defer c.mu.Unlock()
	ctx = kv.WithInternalSourceType(ctx, kv.InternalTxnStats)
	if time.Since(c.modifyTime) < TableStatsCacheExpiry {
		if len(c.dirtyIDs) > 0 {
			tableRows, err := getRowCountTables(ctx, sctx, c.dirtyIDs...)
			if err != nil {
				return err
			}
			for id, tr := range tableRows {
				c.tableRows[id] = tr
			}
			colLength, err := getColLengthTables(ctx, sctx, c.dirtyIDs...)
			if err != nil {
				return err
			}
			for id, cl := range colLength {
				c.colLength[id] = cl
			}
			c.dirtyIDs = nil
		}
		return nil
	}
	tableRows, err := getRowCountTables(ctx, sctx)
	if err != nil {
		return err
	}
	colLength, err := getColLengthTables(ctx, sctx)
	if err != nil {
		return err
	}
	c.tableRows = tableRows
	c.colLength = colLength
	c.modifyTime = time.Now()
	c.dirtyIDs = nil
	return nil
}

func getAutoIncrementID(ctx sessionctx.Context, schema *model.DBInfo, tblInfo *model.TableInfo) (int64, error) {
	is := ctx.GetInfoSchema().(infoschema.InfoSchema)
	tbl, err := is.TableByName(schema.Name, tblInfo.Name)
	if err != nil {
		return 0, err
	}
	return tbl.Allocators(ctx).Get(autoid.RowIDAllocType).Base() + 1, nil
}

func hasPriv(ctx sessionctx.Context, priv mysql.PrivilegeType) bool {
	pm := privilege.GetPrivilegeManager(ctx)
	if pm == nil {
		// internal session created with createSession doesn't has the PrivilegeManager. For most experienced cases before,
		// we use it like this:
		// ```
		// checker := privilege.GetPrivilegeManager(ctx)
		// if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
		//	  continue
		// }
		// do something.
		// ```
		// So once the privilege manager is nil, it's a signature of internal sql, so just passing the checker through.
		return true
	}
	return pm.RequestVerification(ctx.GetSessionVars().ActiveRoles, "", "", "", priv)
}

func scopeStr(sv *variable.SysVar) string {
	var scopes []string
	if sv.HasNoneScope() {
		return "NONE"
	}
	if sv.HasSessionScope() {
		scopes = append(scopes, "SESSION")
	}
	if sv.HasGlobalScope() {
		scopes = append(scopes, "GLOBAL")
	}
	if sv.HasInstanceScope() {
		scopes = append(scopes, "INSTANCE")
	}
	return strings.Join(scopes, ",")
}

func (e *memtableRetriever) setDataForVariablesInfo(ctx sessionctx.Context) error {
	sysVars := variable.GetSysVars()
	rows := make([][]types.Datum, 0, len(sysVars))
	for _, sv := range sysVars {
		if infoschema.SysVarHiddenForSem(ctx, sv.Name) {
			continue
		}
		currentVal, err := ctx.GetSessionVars().GetSessionOrGlobalSystemVar(sv.Name)
		if err != nil {
			currentVal = ""
		}
		isNoop := "NO"
		if sv.IsNoop {
			isNoop = "YES"
		}
		row := types.MakeDatums(
			sv.Name,      // VARIABLE_NAME
			scopeStr(sv), // VARIABLE_SCOPE
			sv.Value,     // DEFAULT_VALUE
			currentVal,   // CURRENT_VALUE
			sv.MinValue,  // MIN_VALUE
			sv.MaxValue,  // MAX_VALUE
			nil,          // POSSIBLE_VALUES
			isNoop,       // IS_NOOP
		)
		// min and max value is only supported for numeric types
		if !(sv.Type == variable.TypeUnsigned || sv.Type == variable.TypeInt || sv.Type == variable.TypeFloat) {
			row[4].SetNull()
			row[5].SetNull()
		}
		if sv.Type == variable.TypeEnum {
			possibleValues := strings.Join(sv.PossibleValues, ",")
			row[6].SetString(possibleValues, mysql.DefaultCollationName)
		}
		rows = append(rows, row)
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromSchemata(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	rows := make([][]types.Datum, 0, len(schemas))

	for _, schema := range schemas {
		charset := mysql.DefaultCharset
		collation := mysql.DefaultCollationName

		if len(schema.Charset) > 0 {
			charset = schema.Charset // Overwrite default
		}

		if len(schema.Collate) > 0 {
			collation = schema.Collate // Overwrite default
		}
		var policyName interface{}
		if schema.PlacementPolicyRef != nil {
			policyName = schema.PlacementPolicyRef.Name.O
		}

		if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, "", "", mysql.AllPrivMask) {
			continue
		}
		record := types.MakeDatums(
			infoschema.CatalogVal, // CATALOG_NAME
			schema.Name.O,         // SCHEMA_NAME
			charset,               // DEFAULT_CHARACTER_SET_NAME
			collation,             // DEFAULT_COLLATION_NAME
			nil,                   // SQL_PATH
			policyName,            // TIDB_PLACEMENT_POLICY_NAME
		)
		rows = append(rows, record)
	}
	e.rows = rows
}

func (e *memtableRetriever) setDataForStatistics(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			e.setDataForStatisticsInTable(schema, table)
		}
	}
}

func (e *memtableRetriever) setDataForStatisticsInTable(schema *model.DBInfo, table *model.TableInfo) {
	var rows [][]types.Datum
	if table.PKIsHandle {
		for _, col := range table.Columns {
			if mysql.HasPriKeyFlag(col.GetFlag()) {
				record := types.MakeDatums(
					infoschema.CatalogVal, // TABLE_CATALOG
					schema.Name.O,         // TABLE_SCHEMA
					table.Name.O,          // TABLE_NAME
					"0",                   // NON_UNIQUE
					schema.Name.O,         // INDEX_SCHEMA
					"PRIMARY",             // INDEX_NAME
					1,                     // SEQ_IN_INDEX
					col.Name.O,            // COLUMN_NAME
					"A",                   // COLLATION
					0,                     // CARDINALITY
					nil,                   // SUB_PART
					nil,                   // PACKED
					"",                    // NULLABLE
					"BTREE",               // INDEX_TYPE
					"",                    // COMMENT
					"",                    // INDEX_COMMENT
					"YES",                 // IS_VISIBLE
					nil,                   // Expression
				)
				rows = append(rows, record)
			}
		}
	}
	nameToCol := make(map[string]*model.ColumnInfo, len(table.Columns))
	for _, c := range table.Columns {
		nameToCol[c.Name.L] = c
	}
	for _, index := range table.Indices {
		nonUnique := "1"
		if index.Unique {
			nonUnique = "0"
		}
		for i, key := range index.Columns {
			col := nameToCol[key.Name.L]
			nullable := "YES"
			if mysql.HasNotNullFlag(col.GetFlag()) {
				nullable = ""
			}

			visible := "YES"
			if index.Invisible {
				visible = "NO"
			}

			colName := col.Name.O
			var expression interface{}
			expression = nil
			tblCol := table.Columns[col.Offset]
			if tblCol.Hidden {
				colName = "NULL"
				expression = tblCol.GeneratedExprString
			}

			record := types.MakeDatums(
				infoschema.CatalogVal, // TABLE_CATALOG
				schema.Name.O,         // TABLE_SCHEMA
				table.Name.O,          // TABLE_NAME
				nonUnique,             // NON_UNIQUE
				schema.Name.O,         // INDEX_SCHEMA
				index.Name.O,          // INDEX_NAME
				i+1,                   // SEQ_IN_INDEX
				colName,               // COLUMN_NAME
				"A",                   // COLLATION
				0,                     // CARDINALITY
				nil,                   // SUB_PART
				nil,                   // PACKED
				nullable,              // NULLABLE
				"BTREE",               // INDEX_TYPE
				"",                    // COMMENT
				index.Comment,         // INDEX_COMMENT
				visible,               // IS_VISIBLE
				expression,            // Expression
			)
			rows = append(rows, record)
		}
	}
	e.rows = append(e.rows, rows...)
}

func (e *memtableRetriever) setDataFromReferConst(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error {
	checker := privilege.GetPrivilegeManager(sctx)
	var rows [][]types.Datum
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if !table.IsBaseTable() {
				continue
			}
			if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			for _, fk := range table.ForeignKeys {
				updateRule, deleteRule := "NO ACTION", "NO ACTION"
				if model.ReferOptionType(fk.OnUpdate) != 0 {
					updateRule = model.ReferOptionType(fk.OnUpdate).String()
				}
				if model.ReferOptionType(fk.OnDelete) != 0 {
					deleteRule = model.ReferOptionType(fk.OnDelete).String()
				}
				record := types.MakeDatums(
					infoschema.CatalogVal, // CONSTRAINT_CATALOG
					schema.Name.O,         // CONSTRAINT_SCHEMA
					fk.Name.O,             // CONSTRAINT_NAME
					infoschema.CatalogVal, // UNIQUE_CONSTRAINT_CATALOG
					schema.Name.O,         // UNIQUE_CONSTRAINT_SCHEMA
					"PRIMARY",             // UNIQUE_CONSTRAINT_NAME
					"NONE",                // MATCH_OPTION
					updateRule,            // UPDATE_RULE
					deleteRule,            // DELETE_RULE
					table.Name.O,          // TABLE_NAME
					fk.RefTable.O,         // REFERENCED_TABLE_NAME
				)
				rows = append(rows, record)
			}
		}
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromTables(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error {
	err := tableStatsCache.update(ctx, sctx)
	if err != nil {
		return err
	}

	checker := privilege.GetPrivilegeManager(sctx)

	var rows [][]types.Datum
	createTimeTp := mysql.TypeDatetime
	loc := sctx.GetSessionVars().TimeZone
	if loc == nil {
		loc = time.Local
	}
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			collation := table.Collate
			if collation == "" {
				collation = mysql.DefaultCollationName
			}
			createTime := types.NewTime(types.FromGoTime(table.GetUpdateTime().In(loc)), createTimeTp, types.DefaultFsp)

			createOptions := ""

			if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			pkType := "NONCLUSTERED"
			if !table.IsView() {
				if table.GetPartitionInfo() != nil {
					createOptions = "partitioned"
				} else if table.TableCacheStatusType == model.TableCacheStatusEnable {
					createOptions = "cached=on"
				}
				var autoIncID interface{}
				hasAutoIncID, _ := infoschema.HasAutoIncrementColumn(table)
				if hasAutoIncID {
					autoIncID, err = getAutoIncrementID(sctx, schema, table)
					if err != nil {
						return err
					}
				}

				var rowCount, dataLength, indexLength uint64
				if table.GetPartitionInfo() == nil {
					rowCount = tableStatsCache.GetTableRows(table.ID)
					dataLength, indexLength = getDataAndIndexLength(table, table.ID, rowCount)
				} else {
					for _, pi := range table.GetPartitionInfo().Definitions {
						piRowCnt := tableStatsCache.GetTableRows(pi.ID)
						rowCount += piRowCnt
						parDataLen, parIndexLen := getDataAndIndexLength(table, pi.ID, piRowCnt)
						dataLength += parDataLen
						indexLength += parIndexLen
					}
				}
				avgRowLength := uint64(0)
				if rowCount != 0 {
					avgRowLength = dataLength / rowCount
				}
				tableType := "BASE TABLE"
				if util.IsSystemView(schema.Name.L) {
					tableType = "SYSTEM VIEW"
				}
				if table.IsSequence() {
					tableType = "SEQUENCE"
					// sequence is always 1 row regardless of stats.
					rowCount = 1
				}
				if table.HasClusteredIndex() {
					pkType = "CLUSTERED"
				}
				shardingInfo := infoschema.GetShardingInfo(schema, table)
				var policyName interface{}
				if table.PlacementPolicyRef != nil {
					policyName = table.PlacementPolicyRef.Name.O
				}
				record := types.MakeDatums(
					infoschema.CatalogVal, // TABLE_CATALOG
					schema.Name.O,         // TABLE_SCHEMA
					table.Name.O,          // TABLE_NAME
					tableType,             // TABLE_TYPE
					"InnoDB",              // ENGINE
					uint64(10),            // VERSION
					"Compact",             // ROW_FORMAT
					rowCount,              // TABLE_ROWS
					avgRowLength,          // AVG_ROW_LENGTH
					dataLength,            // DATA_LENGTH
					uint64(0),             // MAX_DATA_LENGTH
					indexLength,           // INDEX_LENGTH
					uint64(0),             // DATA_FREE
					autoIncID,             // AUTO_INCREMENT
					createTime,            // CREATE_TIME
					nil,                   // UPDATE_TIME
					nil,                   // CHECK_TIME
					collation,             // TABLE_COLLATION
					nil,                   // CHECKSUM
					createOptions,         // CREATE_OPTIONS
					table.Comment,         // TABLE_COMMENT
					table.ID,              // TIDB_TABLE_ID
					shardingInfo,          // TIDB_ROW_ID_SHARDING_INFO
					pkType,                // TIDB_PK_TYPE
					policyName,            // TIDB_PLACEMENT_POLICY_NAME
				)
				rows = append(rows, record)
			} else {
				record := types.MakeDatums(
					infoschema.CatalogVal, // TABLE_CATALOG
					schema.Name.O,         // TABLE_SCHEMA
					table.Name.O,          // TABLE_NAME
					"VIEW",                // TABLE_TYPE
					nil,                   // ENGINE
					nil,                   // VERSION
					nil,                   // ROW_FORMAT
					nil,                   // TABLE_ROWS
					nil,                   // AVG_ROW_LENGTH
					nil,                   // DATA_LENGTH
					nil,                   // MAX_DATA_LENGTH
					nil,                   // INDEX_LENGTH
					nil,                   // DATA_FREE
					nil,                   // AUTO_INCREMENT
					createTime,            // CREATE_TIME
					nil,                   // UPDATE_TIME
					nil,                   // CHECK_TIME
					nil,                   // TABLE_COLLATION
					nil,                   // CHECKSUM
					nil,                   // CREATE_OPTIONS
					"VIEW",                // TABLE_COMMENT
					table.ID,              // TIDB_TABLE_ID
					nil,                   // TIDB_ROW_ID_SHARDING_INFO
					pkType,                // TIDB_PK_TYPE
					nil,                   // TIDB_PLACEMENT_POLICY_NAME
				)
				rows = append(rows, record)
			}
		}
	}
	e.rows = rows
	return nil
}

func (e *hugeMemTableRetriever) setDataForColumns(ctx context.Context, sctx sessionctx.Context, extractor *plannercore.ColumnsTableExtractor) error {
	checker := privilege.GetPrivilegeManager(sctx)
	e.rows = e.rows[:0]
	batch := 1024
	for ; e.dbsIdx < len(e.dbs); e.dbsIdx++ {
		schema := e.dbs[e.dbsIdx]
		for e.tblIdx < len(schema.Tables) {
			table := schema.Tables[e.tblIdx]
			e.tblIdx++
			hasPrivs := false
			var priv mysql.PrivilegeType
			if checker != nil {
				for _, p := range mysql.AllColumnPrivs {
					if checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", p) {
						hasPrivs = true
						priv |= p
					}
				}
				if !hasPrivs {
					continue
				}
			}

			e.dataForColumnsInTable(ctx, sctx, schema, table, priv, extractor)
			if len(e.rows) >= batch {
				return nil
			}
		}
		e.tblIdx = 0
	}
	return nil
}

func (e *hugeMemTableRetriever) dataForColumnsInTable(ctx context.Context, sctx sessionctx.Context, schema *model.DBInfo, tbl *model.TableInfo, priv mysql.PrivilegeType, extractor *plannercore.ColumnsTableExtractor) {
	is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema()
	if tbl.IsView() {
		e.viewMu.Lock()
		_, ok := e.viewSchemaMap[tbl.ID]
		if !ok {
			var viewLogicalPlan plannercore.Plan
			internalCtx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnOthers)
			// Build plan is not thread safe, there will be concurrency on sessionctx.
			if err := runWithSystemSession(internalCtx, sctx, func(s sessionctx.Context) error {
				planBuilder, _ := plannercore.NewPlanBuilder().Init(s, is, &hint.BlockHintProcessor{})
				var err error
				viewLogicalPlan, err = planBuilder.BuildDataSourceFromView(ctx, schema.Name, tbl)
				return errors.Trace(err)
			}); err != nil {
				sctx.GetSessionVars().StmtCtx.AppendWarning(err)
				e.viewMu.Unlock()
				return
			}
			e.viewSchemaMap[tbl.ID] = viewLogicalPlan.Schema()
			e.viewOutputNamesMap[tbl.ID] = viewLogicalPlan.OutputNames()
		}
		e.viewMu.Unlock()
	}

	var tableSchemaRegexp, tableNameRegexp, columnsRegexp []collate.WildcardPattern
	var tableSchemaFilterEnable,
		tableNameFilterEnable, columnsFilterEnable bool
	if !extractor.SkipRequest {
		tableSchemaFilterEnable = extractor.TableSchema.Count() > 0
		tableNameFilterEnable = extractor.TableName.Count() > 0
		columnsFilterEnable = extractor.ColumnName.Count() > 0
		if len(extractor.TableSchemaPatterns) > 0 {
			tableSchemaRegexp = make([]collate.WildcardPattern, len(extractor.TableSchemaPatterns))
			for i, pattern := range extractor.TableSchemaPatterns {
				tableSchemaRegexp[i] = collate.GetCollatorByID(collate.CollationName2ID(mysql.UTF8MB4DefaultCollation)).Pattern()
				tableSchemaRegexp[i].Compile(pattern, byte('\\'))
			}
		}
		if len(extractor.TableNamePatterns) > 0 {
			tableNameRegexp = make([]collate.WildcardPattern, len(extractor.TableNamePatterns))
			for i, pattern := range extractor.TableNamePatterns {
				tableNameRegexp[i] = collate.GetCollatorByID(collate.CollationName2ID(mysql.UTF8MB4DefaultCollation)).Pattern()
				tableNameRegexp[i].Compile(pattern, byte('\\'))
			}
		}
		if len(extractor.ColumnNamePatterns) > 0 {
			columnsRegexp = make([]collate.WildcardPattern, len(extractor.ColumnNamePatterns))
			for i, pattern := range extractor.ColumnNamePatterns {
				columnsRegexp[i] = collate.GetCollatorByID(collate.CollationName2ID(mysql.UTF8MB4DefaultCollation)).Pattern()
				columnsRegexp[i].Compile(pattern, byte('\\'))
			}
		}
	}
	i := 1
ForColumnsTag:
	for _, col := range tbl.Columns {
		if col.Hidden {
			continue
		}

		ft := &(col.FieldType)
		if tbl.IsView() {
			e.viewMu.RLock()
			if e.viewSchemaMap[tbl.ID] != nil {
				// If this is a view, replace the column with the view column.
				idx := expression.FindFieldNameIdxByColName(e.viewOutputNamesMap[tbl.ID], col.Name.L)
				if idx >= 0 {
					col1 := e.viewSchemaMap[tbl.ID].Columns[idx]
					ft = col1.GetType()
				}
			}
			e.viewMu.RUnlock()
		}
		if !extractor.SkipRequest {
			if tableSchemaFilterEnable && !extractor.TableSchema.Exist(schema.Name.L) {
				continue
			}
			if tableNameFilterEnable && !extractor.TableName.Exist(tbl.Name.L) {
				continue
			}
			if columnsFilterEnable && !extractor.ColumnName.Exist(col.Name.L) {
				continue
			}
			for _, re := range tableSchemaRegexp {
				if !re.DoMatch(schema.Name.L) {
					continue ForColumnsTag
				}
			}
			for _, re := range tableNameRegexp {
				if !re.DoMatch(tbl.Name.L) {
					continue ForColumnsTag
				}
			}
			for _, re := range columnsRegexp {
				if !re.DoMatch(col.Name.L) {
					continue ForColumnsTag
				}
			}
		}

		var charMaxLen, charOctLen, numericPrecision, numericScale, datetimePrecision interface{}
		colLen, decimal := ft.GetFlen(), ft.GetDecimal()
		defaultFlen, defaultDecimal := mysql.GetDefaultFieldLengthAndDecimal(ft.GetType())
		if decimal == types.UnspecifiedLength {
			decimal = defaultDecimal
		}
		if colLen == types.UnspecifiedLength {
			colLen = defaultFlen
		}
		if ft.GetType() == mysql.TypeSet {
			// Example: In MySQL set('a','bc','def','ghij') has length 13, because
			// len('a')+len('bc')+len('def')+len('ghij')+len(ThreeComma)=13
			// Reference link: https://bugs.mysql.com/bug.php?id=22613
			colLen = 0
			for _, ele := range ft.GetElems() {
				colLen += len(ele)
			}
			if len(ft.GetElems()) != 0 {
				colLen += (len(ft.GetElems()) - 1)
			}
			charMaxLen = colLen
			charOctLen = calcCharOctLength(colLen, ft.GetCharset())
		} else if ft.GetType() == mysql.TypeEnum {
			// Example: In MySQL enum('a', 'ab', 'cdef') has length 4, because
			// the longest string in the enum is 'cdef'
			// Reference link: https://bugs.mysql.com/bug.php?id=22613
			colLen = 0
			for _, ele := range ft.GetElems() {
				if len(ele) > colLen {
					colLen = len(ele)
				}
			}
			charMaxLen = colLen
			charOctLen = calcCharOctLength(colLen, ft.GetCharset())
		} else if types.IsString(ft.GetType()) {
			charMaxLen = colLen
			charOctLen = calcCharOctLength(colLen, ft.GetCharset())
		} else if types.IsTypeFractionable(ft.GetType()) {
			datetimePrecision = decimal
		} else if types.IsTypeNumeric(ft.GetType()) {
			numericPrecision = colLen
			if ft.GetType() != mysql.TypeFloat && ft.GetType() != mysql.TypeDouble {
				numericScale = decimal
			} else if decimal != -1 {
				numericScale = decimal
			}
		}
		columnType := ft.InfoSchemaStr()
		columnDesc := table.NewColDesc(table.ToColumn(col))
		var columnDefault interface{}
		if columnDesc.DefaultValue != nil {
			columnDefault = fmt.Sprintf("%v", columnDesc.DefaultValue)
			switch col.GetDefaultValue() {
			case "CURRENT_TIMESTAMP":
			default:
				if ft.GetType() == mysql.TypeTimestamp && columnDefault != types.ZeroDatetimeStr {
					timeValue, err := table.GetColDefaultValue(sctx, col)
					if err == nil {
						columnDefault = timeValue.GetMysqlTime().String()
					}
				}
				if ft.GetType() == mysql.TypeBit && !col.DefaultIsExpr {
					defaultValBinaryLiteral := types.BinaryLiteral(columnDefault.(string))
					columnDefault = defaultValBinaryLiteral.ToBitLiteralString(true)
				}
			}
		}
		record := types.MakeDatums(
			infoschema.CatalogVal, // TABLE_CATALOG
			schema.Name.O,         // TABLE_SCHEMA
			tbl.Name.O,            // TABLE_NAME
			col.Name.O,            // COLUMN_NAME
			i,                     // ORDINAL_POSITION
			columnDefault,         // COLUMN_DEFAULT
			columnDesc.Null,       // IS_NULLABLE
			types.TypeToStr(ft.GetType(), ft.GetCharset()), // DATA_TYPE
			charMaxLen,           // CHARACTER_MAXIMUM_LENGTH
			charOctLen,           // CHARACTER_OCTET_LENGTH
			numericPrecision,     // NUMERIC_PRECISION
			numericScale,         // NUMERIC_SCALE
			datetimePrecision,    // DATETIME_PRECISION
			columnDesc.Charset,   // CHARACTER_SET_NAME
			columnDesc.Collation, // COLLATION_NAME
			columnType,           // COLUMN_TYPE
			columnDesc.Key,       // COLUMN_KEY
			columnDesc.Extra,     // EXTRA
			strings.ToLower(privileges.PrivToString(priv, mysql.AllColumnPrivs, mysql.Priv2Str)), // PRIVILEGES
			columnDesc.Comment,      // COLUMN_COMMENT
			col.GeneratedExprString, // GENERATION_EXPRESSION
		)
		e.rows = append(e.rows, record)
		i++
	}
}

func calcCharOctLength(lenInChar int, cs string) int {
	lenInBytes := lenInChar
	if desc, err := charset.GetCharsetInfo(cs); err == nil {
		lenInBytes = desc.Maxlen * lenInChar
	}
	return lenInBytes
}

func (e *memtableRetriever) setDataFromPartitions(ctx context.Context, sctx sessionctx.Context, schemas []*model.DBInfo) error {
	err := tableStatsCache.update(ctx, sctx)
	if err != nil {
		return err
	}
	checker := privilege.GetPrivilegeManager(sctx)
	var rows [][]types.Datum
	createTimeTp := mysql.TypeDatetime
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.SelectPriv) {
				continue
			}
			createTime := types.NewTime(types.FromGoTime(table.GetUpdateTime()), createTimeTp, types.DefaultFsp)

			var rowCount, dataLength, indexLength uint64
			if table.GetPartitionInfo() == nil {
				rowCount = tableStatsCache.GetTableRows(table.ID)
				dataLength, indexLength = getDataAndIndexLength(table, table.ID, rowCount)
				avgRowLength := uint64(0)
				if rowCount != 0 {
					avgRowLength = dataLength / rowCount
				}
				record := types.MakeDatums(
					infoschema.CatalogVal, // TABLE_CATALOG
					schema.Name.O,         // TABLE_SCHEMA
					table.Name.O,          // TABLE_NAME
					nil,                   // PARTITION_NAME
					nil,                   // SUBPARTITION_NAME
					nil,                   // PARTITION_ORDINAL_POSITION
					nil,                   // SUBPARTITION_ORDINAL_POSITION
					nil,                   // PARTITION_METHOD
					nil,                   // SUBPARTITION_METHOD
					nil,                   // PARTITION_EXPRESSION
					nil,                   // SUBPARTITION_EXPRESSION
					nil,                   // PARTITION_DESCRIPTION
					rowCount,              // TABLE_ROWS
					avgRowLength,          // AVG_ROW_LENGTH
					dataLength,            // DATA_LENGTH
					nil,                   // MAX_DATA_LENGTH
					indexLength,           // INDEX_LENGTH
					nil,                   // DATA_FREE
					createTime,            // CREATE_TIME
					nil,                   // UPDATE_TIME
					nil,                   // CHECK_TIME
					nil,                   // CHECKSUM
					nil,                   // PARTITION_COMMENT
					nil,                   // NODEGROUP
					nil,                   // TABLESPACE_NAME
					nil,                   // TIDB_PARTITION_ID
					nil,                   // TIDB_PLACEMENT_POLICY_NAME
				)
				rows = append(rows, record)
			} else {
				for i, pi := range table.GetPartitionInfo().Definitions {
					rowCount = tableStatsCache.GetTableRows(pi.ID)
					dataLength, indexLength = getDataAndIndexLength(table, pi.ID, rowCount)

					avgRowLength := uint64(0)
					if rowCount != 0 {
						avgRowLength = dataLength / rowCount
					}

					var partitionDesc string
					if table.Partition.Type == model.PartitionTypeRange {
						partitionDesc = strings.Join(pi.LessThan, ",")
					} else if table.Partition.Type == model.PartitionTypeList {
						if len(pi.InValues) > 0 {
							buf := bytes.NewBuffer(nil)
							if len(pi.InValues[0]) == 1 {
								for i, vs := range pi.InValues {
									if i > 0 {
										buf.WriteString(",")
									}
									buf.WriteString(vs[0])
								}
							} else if len(pi.InValues[0]) > 1 {
								for i, vs := range pi.InValues {
									if i > 0 {
										buf.WriteString(",")
									}
									buf.WriteString("(")
									buf.WriteString(strings.Join(vs, ","))
									buf.WriteString(")")
								}
							}
							partitionDesc = buf.String()
						}
					}

					partitionMethod := table.Partition.Type.String()
					partitionExpr := table.Partition.Expr
					if table.Partition.Type == model.PartitionTypeRange && len(table.Partition.Columns) > 0 {
						partitionMethod = "RANGE COLUMNS"
						partitionExpr = table.Partition.Columns[0].String()
					} else if table.Partition.Type == model.PartitionTypeList && len(table.Partition.Columns) > 0 {
						partitionMethod = "LIST COLUMNS"
						buf := bytes.NewBuffer(nil)
						for i, col := range table.Partition.Columns {
							if i > 0 {
								buf.WriteString(",")
							}
							buf.WriteString(col.String())
						}
						partitionExpr = buf.String()
					}

					var policyName interface{}
					if pi.PlacementPolicyRef != nil {
						policyName = pi.PlacementPolicyRef.Name.O
					}
					record := types.MakeDatums(
						infoschema.CatalogVal, // TABLE_CATALOG
						schema.Name.O,         // TABLE_SCHEMA
						table.Name.O,          // TABLE_NAME
						pi.Name.O,             // PARTITION_NAME
						nil,                   // SUBPARTITION_NAME
						i+1,                   // PARTITION_ORDINAL_POSITION
						nil,                   // SUBPARTITION_ORDINAL_POSITION
						partitionMethod,       // PARTITION_METHOD
						nil,                   // SUBPARTITION_METHOD
						partitionExpr,         // PARTITION_EXPRESSION
						nil,                   // SUBPARTITION_EXPRESSION
						partitionDesc,         // PARTITION_DESCRIPTION
						rowCount,              // TABLE_ROWS
						avgRowLength,          // AVG_ROW_LENGTH
						dataLength,            // DATA_LENGTH
						uint64(0),             // MAX_DATA_LENGTH
						indexLength,           // INDEX_LENGTH
						uint64(0),             // DATA_FREE
						createTime,            // CREATE_TIME
						nil,                   // UPDATE_TIME
						nil,                   // CHECK_TIME
						nil,                   // CHECKSUM
						pi.Comment,            // PARTITION_COMMENT
						nil,                   // NODEGROUP
						nil,                   // TABLESPACE_NAME
						pi.ID,                 // TIDB_PARTITION_ID
						policyName,            // TIDB_PLACEMENT_POLICY_NAME
					)
					rows = append(rows, record)
				}
			}
		}
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromIndexes(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	var rows [][]types.Datum
	for _, schema := range schemas {
		for _, tb := range schema.Tables {
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, tb.Name.L, "", mysql.AllPrivMask) {
				continue
			}

			if tb.PKIsHandle {
				var pkCol *model.ColumnInfo
				for _, col := range tb.Cols() {
					if mysql.HasPriKeyFlag(col.GetFlag()) {
						pkCol = col
						break
					}
				}
				record := types.MakeDatums(
					schema.Name.O, // TABLE_SCHEMA
					tb.Name.O,     // TABLE_NAME
					0,             // NON_UNIQUE
					"PRIMARY",     // KEY_NAME
					1,             // SEQ_IN_INDEX
					pkCol.Name.O,  // COLUMN_NAME
					nil,           // SUB_PART
					"",            // INDEX_COMMENT
					nil,           // Expression
					0,             // INDEX_ID
					"YES",         // IS_VISIBLE
					"YES",         // CLUSTERED
				)
				rows = append(rows, record)
			}
			for _, idxInfo := range tb.Indices {
				if idxInfo.State != model.StatePublic {
					continue
				}
				isClustered := "NO"
				if tb.IsCommonHandle && idxInfo.Primary {
					isClustered = "YES"
				}
				for i, col := range idxInfo.Columns {
					nonUniq := 1
					if idxInfo.Unique {
						nonUniq = 0
					}
					var subPart interface{}
					if col.Length != types.UnspecifiedLength {
						subPart = col.Length
					}
					colName := col.Name.O
					var expression interface{}
					expression = nil
					tblCol := tb.Columns[col.Offset]
					if tblCol.Hidden {
						colName = "NULL"
						expression = tblCol.GeneratedExprString
					}
					visible := "YES"
					if idxInfo.Invisible {
						visible = "NO"
					}
					record := types.MakeDatums(
						schema.Name.O,   // TABLE_SCHEMA
						tb.Name.O,       // TABLE_NAME
						nonUniq,         // NON_UNIQUE
						idxInfo.Name.O,  // KEY_NAME
						i+1,             // SEQ_IN_INDEX
						colName,         // COLUMN_NAME
						subPart,         // SUB_PART
						idxInfo.Comment, // INDEX_COMMENT
						expression,      // Expression
						idxInfo.ID,      // INDEX_ID
						visible,         // IS_VISIBLE
						isClustered,     // CLUSTERED
					)
					rows = append(rows, record)
				}
			}
		}
	}
	e.rows = rows
}

func (e *memtableRetriever) setDataFromViews(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	var rows [][]types.Datum
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if !table.IsView() {
				continue
			}
			collation := table.Collate
			charset := table.Charset
			if collation == "" {
				collation = mysql.DefaultCollationName
			}
			if charset == "" {
				charset = mysql.DefaultCharset
			}
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			record := types.MakeDatums(
				infoschema.CatalogVal,           // TABLE_CATALOG
				schema.Name.O,                   // TABLE_SCHEMA
				table.Name.O,                    // TABLE_NAME
				table.View.SelectStmt,           // VIEW_DEFINITION
				table.View.CheckOption.String(), // CHECK_OPTION
				"NO",                            // IS_UPDATABLE
				table.View.Definer.String(),     // DEFINER
				table.View.Security.String(),    // SECURITY_TYPE
				charset,                         // CHARACTER_SET_CLIENT
				collation,                       // COLLATION_CONNECTION
			)
			rows = append(rows, record)
		}
	}
	e.rows = rows
}

func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err error) {
	tikvStore, ok := ctx.GetStore().(helper.Storage)
	if !ok {
		return errors.New("Information about TiKV store status can be gotten only when the storage is TiKV")
	}
	tikvHelper := &helper.Helper{
		Store:       tikvStore,
		RegionCache: tikvStore.GetRegionCache(),
	}
	storesStat, err := tikvHelper.GetStoresStat()
	if err != nil {
		return err
	}
	for _, storeStat := range storesStat.Stores {
		row := make([]types.Datum, len(infoschema.TableTiKVStoreStatusCols))
		row[0].SetInt64(storeStat.Store.ID)
		row[1].SetString(storeStat.Store.Address, mysql.DefaultCollationName)
		row[2].SetInt64(storeStat.Store.State)
		row[3].SetString(storeStat.Store.StateName, mysql.DefaultCollationName)
		data, err := json.Marshal(storeStat.Store.Labels)
		if err != nil {
			return err
		}
		bj := types.BinaryJSON{}
		if err = bj.UnmarshalJSON(data); err != nil {
			return err
		}
		row[4].SetMysqlJSON(bj)
		row[5].SetString(storeStat.Store.Version, mysql.DefaultCollationName)
		row[6].SetString(storeStat.Status.Capacity, mysql.DefaultCollationName)
		row[7].SetString(storeStat.Status.Available, mysql.DefaultCollationName)
		row[8].SetInt64(storeStat.Status.LeaderCount)
		row[9].SetFloat64(storeStat.Status.LeaderWeight)
		row[10].SetFloat64(storeStat.Status.LeaderScore)
		row[11].SetInt64(storeStat.Status.LeaderSize)
		row[12].SetInt64(storeStat.Status.RegionCount)
		row[13].SetFloat64(storeStat.Status.RegionWeight)
		row[14].SetFloat64(storeStat.Status.RegionScore)
		row[15].SetInt64(storeStat.Status.RegionSize)
		startTs := types.NewTime(types.FromGoTime(storeStat.Status.StartTs), mysql.TypeDatetime, types.DefaultFsp)
		row[16].SetMysqlTime(startTs)
		lastHeartbeatTs := types.NewTime(types.FromGoTime(storeStat.Status.LastHeartbeatTs), mysql.TypeDatetime, types.DefaultFsp)
		row[17].SetMysqlTime(lastHeartbeatTs)
		row[18].SetString(storeStat.Status.Uptime, mysql.DefaultCollationName)
		if sem.IsEnabled() {
			// Patch out IP addresses etc if the user does not have the RESTRICTED_TABLES_ADMIN privilege
			checker := privilege.GetPrivilegeManager(ctx)
			if checker == nil || !checker.RequestDynamicVerification(ctx.GetSessionVars().ActiveRoles, "RESTRICTED_TABLES_ADMIN", false) {
				row[1].SetString(strconv.FormatInt(storeStat.Store.ID, 10), mysql.DefaultCollationName)
				row[1].SetNull()
				row[6].SetNull()
				row[7].SetNull()
				row[16].SetNull()
				row[18].SetNull()
			}
		}
		e.rows = append(e.rows, row)
	}
	return nil
}

// DDLJobsReaderExec executes DDLJobs information retrieving.
type DDLJobsReaderExec struct {
	baseExecutor
	DDLJobRetriever

	cacheJobs []*model.Job
	is        infoschema.InfoSchema
	sess      sessionctx.Context
}

// Open implements the Executor Next interface.
func (e *DDLJobsReaderExec) Open(ctx context.Context) error {
	if err := e.baseExecutor.Open(ctx); err != nil {
		return err
	}
	e.DDLJobRetriever.is = e.is
	e.activeRoles = e.ctx.GetSessionVars().ActiveRoles
	sess, err := e.getSysSession()
	if err != nil {
		return err
	}
	e.sess = sess
	sess.GetSessionVars().SetInTxn(true)
	err = sessiontxn.NewTxn(context.Background(), sess)
	if err != nil {
		return err
	}
	txn, err := sess.Txn(true)
	if err != nil {
		return err
	}
	err = e.DDLJobRetriever.initial(txn, sess)
	if err != nil {
		return err
	}
	return nil
}

// Next implements the Executor Next interface.
func (e *DDLJobsReaderExec) Next(ctx context.Context, req *chunk.Chunk) error {
	req.GrowAndReset(e.maxChunkSize)
	checker := privilege.GetPrivilegeManager(e.ctx)
	count := 0

	// Append running DDL jobs.
	if e.cursor < len(e.runningJobs) {
		num := mathutil.Min(req.Capacity(), len(e.runningJobs)-e.cursor)
		for i := e.cursor; i < e.cursor+num; i++ {
			e.appendJobToChunk(req, e.runningJobs[i], checker)
			req.AppendString(12, e.runningJobs[i].Query)
			if e.runningJobs[i].MultiSchemaInfo != nil {
				for range e.runningJobs[i].MultiSchemaInfo.SubJobs {
					req.AppendString(12, e.runningJobs[i].Query)
				}
			}
		}
		e.cursor += num
		count += num
	}
	var err error

	// Append history DDL jobs.
	if count < req.Capacity() {
		e.cacheJobs, err = e.historyJobIter.GetLastJobs(req.Capacity()-count, e.cacheJobs)
		if err != nil {
			return err
		}
		for _, job := range e.cacheJobs {
			e.appendJobToChunk(req, job, checker)
			req.AppendString(12, job.Query)
			if job.MultiSchemaInfo != nil {
				for range job.MultiSchemaInfo.SubJobs {
					req.AppendString(12, job.Query)
				}
			}
		}
		e.cursor += len(e.cacheJobs)
	}
	return nil
}

// Close implements the Executor Close interface.
func (e *DDLJobsReaderExec) Close() error {
	e.releaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), e.sess)
	return e.baseExecutor.Close()
}

func (e *memtableRetriever) setDataFromEngines() {
	var rows [][]types.Datum
	rows = append(rows,
		types.MakeDatums(
			"InnoDB",  // Engine
			"DEFAULT", // Support
			"Supports transactions, row-level locking, and foreign keys", // Comment
			"YES", // Transactions
			"YES", // XA
			"YES", // Savepoints
		),
	)
	e.rows = rows
}

func (e *memtableRetriever) setDataFromCharacterSets() {
	charsets := charset.GetSupportedCharsets()
	var rows = make([][]types.Datum, 0, len(charsets))
	for _, charset := range charsets {
		rows = append(rows,
			types.MakeDatums(charset.Name, charset.DefaultCollation, charset.Desc, charset.Maxlen),
		)
	}
	e.rows = rows
}

func (e *memtableRetriever) setDataFromCollations() {
	collations := collate.GetSupportedCollations()
	var rows = make([][]types.Datum, 0, len(collations))
	for _, collation := range collations {
		isDefault := ""
		if collation.IsDefault {
			isDefault = "Yes"
		}
		rows = append(rows,
			types.MakeDatums(collation.Name, collation.CharsetName, collation.ID, isDefault, "Yes", 1),
		)
	}
	e.rows = rows
}

func (e *memtableRetriever) dataForCollationCharacterSetApplicability() {
	collations := collate.GetSupportedCollations()
	var rows = make([][]types.Datum, 0, len(collations))
	for _, collation := range collations {
		rows = append(rows,
			types.MakeDatums(collation.Name, collation.CharsetName),
		)
	}
	e.rows = rows
}

func (e *memtableRetriever) dataForTiDBClusterInfo(ctx sessionctx.Context) error {
	servers, err := infoschema.GetClusterServerInfo(ctx)
	if err != nil {
		e.rows = nil
		return err
	}
	rows := make([][]types.Datum, 0, len(servers))
	for _, server := range servers {
		startTimeStr := ""
		upTimeStr := ""
		if server.StartTimestamp > 0 {
			startTime := time.Unix(server.StartTimestamp, 0)
			startTimeStr = startTime.Format(time.RFC3339)
			upTimeStr = time.Since(startTime).String()
		}
		row := types.MakeDatums(
			server.ServerType,
			server.Address,
			server.StatusAddr,
			server.Version,
			server.GitHash,
			startTimeStr,
			upTimeStr,
			server.ServerID,
		)
		if sem.IsEnabled() {
			checker := privilege.GetPrivilegeManager(ctx)
			if checker == nil || !checker.RequestDynamicVerification(ctx.GetSessionVars().ActiveRoles, "RESTRICTED_TABLES_ADMIN", false) {
				row[1].SetString(strconv.FormatUint(server.ServerID, 10), mysql.DefaultCollationName)
				row[2].SetNull()
				row[5].SetNull()
				row[6].SetNull()
			}
		}
		rows = append(rows, row)
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromKeyColumnUsage(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	rows := make([][]types.Datum, 0, len(schemas)) // The capacity is not accurate, but it is not a big problem.
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			rs := keyColumnUsageInTable(schema, table)
			rows = append(rows, rs...)
		}
	}
	e.rows = rows
}

func (e *memtableRetriever) setDataForClusterProcessList(ctx sessionctx.Context) error {
	e.setDataForProcessList(ctx)
	rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
	if err != nil {
		return err
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataForProcessList(ctx sessionctx.Context) {
	sm := ctx.GetSessionManager()
	if sm == nil {
		return
	}

	loginUser := ctx.GetSessionVars().User
	hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
	pl := sm.ShowProcessList()

	records := make([][]types.Datum, 0, len(pl))
	for _, pi := range pl {
		// If you have the PROCESS privilege, you can see all threads.
		// Otherwise, you can see only your own threads.
		if !hasProcessPriv && loginUser != nil && pi.User != loginUser.Username {
			continue
		}

		rows := pi.ToRow(ctx.GetSessionVars().StmtCtx.TimeZone)
		record := types.MakeDatums(rows...)
		records = append(records, record)
	}
	e.rows = records
}

func (e *memtableRetriever) setDataFromUserPrivileges(ctx sessionctx.Context) {
	pm := privilege.GetPrivilegeManager(ctx)
	// The results depend on the user querying the information.
	e.rows = pm.UserPrivilegesTable(ctx.GetSessionVars().ActiveRoles, ctx.GetSessionVars().User.Username, ctx.GetSessionVars().User.Hostname)
}

func (e *memtableRetriever) setDataForMetricTables(ctx sessionctx.Context) {
	tables := make([]string, 0, len(infoschema.MetricTableMap))
	for name := range infoschema.MetricTableMap {
		tables = append(tables, name)
	}
	slices.Sort(tables)
	rows := make([][]types.Datum, 0, len(tables))
	for _, name := range tables {
		schema := infoschema.MetricTableMap[name]
		record := types.MakeDatums(
			name,                             // METRICS_NAME
			schema.PromQL,                    // PROMQL
			strings.Join(schema.Labels, ","), // LABELS
			schema.Quantile,                  // QUANTILE
			schema.Comment,                   // COMMENT
		)
		rows = append(rows, record)
	}
	e.rows = rows
}

func keyColumnUsageInTable(schema *model.DBInfo, table *model.TableInfo) [][]types.Datum {
	var rows [][]types.Datum
	if table.PKIsHandle {
		for _, col := range table.Columns {
			if mysql.HasPriKeyFlag(col.GetFlag()) {
				record := types.MakeDatums(
					infoschema.CatalogVal,        // CONSTRAINT_CATALOG
					schema.Name.O,                // CONSTRAINT_SCHEMA
					infoschema.PrimaryConstraint, // CONSTRAINT_NAME
					infoschema.CatalogVal,        // TABLE_CATALOG
					schema.Name.O,                // TABLE_SCHEMA
					table.Name.O,                 // TABLE_NAME
					col.Name.O,                   // COLUMN_NAME
					1,                            // ORDINAL_POSITION
					1,                            // POSITION_IN_UNIQUE_CONSTRAINT
					nil,                          // REFERENCED_TABLE_SCHEMA
					nil,                          // REFERENCED_TABLE_NAME
					nil,                          // REFERENCED_COLUMN_NAME
				)
				rows = append(rows, record)
				break
			}
		}
	}
	nameToCol := make(map[string]*model.ColumnInfo, len(table.Columns))
	for _, c := range table.Columns {
		nameToCol[c.Name.L] = c
	}
	for _, index := range table.Indices {
		var idxName string
		if index.Primary {
			idxName = infoschema.PrimaryConstraint
		} else if index.Unique {
			idxName = index.Name.O
		} else {
			// Only handle unique/primary key
			continue
		}
		for i, key := range index.Columns {
			col := nameToCol[key.Name.L]
			if col.Hidden {
				continue
			}
			record := types.MakeDatums(
				infoschema.CatalogVal, // CONSTRAINT_CATALOG
				schema.Name.O,         // CONSTRAINT_SCHEMA
				idxName,               // CONSTRAINT_NAME
				infoschema.CatalogVal, // TABLE_CATALOG
				schema.Name.O,         // TABLE_SCHEMA
				table.Name.O,          // TABLE_NAME
				col.Name.O,            // COLUMN_NAME
				i+1,                   // ORDINAL_POSITION,
				nil,                   // POSITION_IN_UNIQUE_CONSTRAINT
				nil,                   // REFERENCED_TABLE_SCHEMA
				nil,                   // REFERENCED_TABLE_NAME
				nil,                   // REFERENCED_COLUMN_NAME
			)
			rows = append(rows, record)
		}
	}
	for _, fk := range table.ForeignKeys {
		fkRefCol := ""
		if len(fk.RefCols) > 0 {
			fkRefCol = fk.RefCols[0].O
		}
		for i, key := range fk.Cols {
			col := nameToCol[key.L]
			record := types.MakeDatums(
				infoschema.CatalogVal, // CONSTRAINT_CATALOG
				schema.Name.O,         // CONSTRAINT_SCHEMA
				fk.Name.O,             // CONSTRAINT_NAME
				infoschema.CatalogVal, // TABLE_CATALOG
				schema.Name.O,         // TABLE_SCHEMA
				table.Name.O,          // TABLE_NAME
				col.Name.O,            // COLUMN_NAME
				i+1,                   // ORDINAL_POSITION,
				1,                     // POSITION_IN_UNIQUE_CONSTRAINT
				schema.Name.O,         // REFERENCED_TABLE_SCHEMA
				fk.RefTable.O,         // REFERENCED_TABLE_NAME
				fkRefCol,              // REFERENCED_COLUMN_NAME
			)
			rows = append(rows, record)
		}
	}
	return rows
}

func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx sessionctx.Context) (err error) {
	tikvStore, ok := ctx.GetStore().(helper.Storage)
	if !ok {
		return errors.New("Information about TiKV region status can be gotten only when the storage is TiKV")
	}
	tikvHelper := &helper.Helper{
		Store:       tikvStore,
		RegionCache: tikvStore.GetRegionCache(),
	}
	requestByTableRange := false
	allRegionsInfo := helper.NewRegionsInfo()
	is := ctx.GetDomainInfoSchema().(infoschema.InfoSchema)
	if e.extractor != nil {
		extractor, ok := e.extractor.(*plannercore.TiKVRegionStatusExtractor)
		if ok && len(extractor.GetTablesID()) > 0 {
			for _, tableID := range extractor.GetTablesID() {
				regionsInfo, err := e.getRegionsInfoForTable(tikvHelper, is, tableID)
				if err != nil {
					return err
				}
				allRegionsInfo = allRegionsInfo.Merge(regionsInfo)
			}
			requestByTableRange = true
		}
	}
	if !requestByTableRange {
		allRegionsInfo, err = tikvHelper.GetRegionsInfo()
		if err != nil {
			return err
		}
	}
	tableInfos := tikvHelper.GetRegionsTableInfo(allRegionsInfo, is.AllSchemas())
	for i := range allRegionsInfo.Regions {
		tableList := tableInfos[allRegionsInfo.Regions[i].ID]
		if len(tableList) == 0 {
			e.setNewTiKVRegionStatusCol(&allRegionsInfo.Regions[i], nil)
		}
		for j := range tableList {
			e.setNewTiKVRegionStatusCol(&allRegionsInfo.Regions[i], &tableList[j])
		}
	}
	return nil
}

func (e *memtableRetriever) getRegionsInfoForTable(h *helper.Helper, is infoschema.InfoSchema, tableID int64) (*helper.RegionsInfo, error) {
	tbl, _ := is.TableByID(tableID)
	if tbl == nil {
		return nil, infoschema.ErrTableExists.GenWithStackByArgs(tableID)
	}

	pt := tbl.Meta().GetPartitionInfo()
	if pt == nil {
		regionsInfo, err := e.getRegionsInfoForSingleTable(h, tableID)
		if err != nil {
			return nil, err
		}
		return regionsInfo, nil
	}

	allRegionsInfo := helper.NewRegionsInfo()
	for _, def := range pt.Definitions {
		regionsInfo, err := e.getRegionsInfoForSingleTable(h, def.ID)
		if err != nil {
			return nil, err
		}
		allRegionsInfo = allRegionsInfo.Merge(regionsInfo)
	}
	return allRegionsInfo, nil
}

func (e *memtableRetriever) getRegionsInfoForSingleTable(helper *helper.Helper, tableID int64) (*helper.RegionsInfo, error) {
	sk, ek := tablecodec.GetTableHandleKeyRange(tableID)
	sRegion, err := helper.GetRegionByKey(codec.EncodeBytes(nil, sk))
	if err != nil {
		return nil, err
	}
	eRegion, err := helper.GetRegionByKey(codec.EncodeBytes(nil, ek))
	if err != nil {
		return nil, err
	}
	sk, err = hex.DecodeString(sRegion.StartKey)
	if err != nil {
		return nil, err
	}
	ek, err = hex.DecodeString(eRegion.EndKey)
	if err != nil {
		return nil, err
	}
	return helper.GetRegionsInfoByRange(sk, ek)
}

func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *helper.RegionInfo, table *helper.TableInfo) {
	row := make([]types.Datum, len(infoschema.TableTiKVRegionStatusCols))
	row[0].SetInt64(region.ID)
	row[1].SetString(region.StartKey, mysql.DefaultCollationName)
	row[2].SetString(region.EndKey, mysql.DefaultCollationName)
	if table != nil {
		row[3].SetInt64(table.Table.ID)
		row[4].SetString(table.DB.Name.O, mysql.DefaultCollationName)
		row[5].SetString(table.Table.Name.O, mysql.DefaultCollationName)
		if table.IsIndex {
			row[6].SetInt64(1)
			row[7].SetInt64(table.Index.ID)
			row[8].SetString(table.Index.Name.O, mysql.DefaultCollationName)
		} else {
			row[6].SetInt64(0)
		}
	} else {
		row[6].SetInt64(0)
	}
	row[9].SetInt64(region.Epoch.ConfVer)
	row[10].SetInt64(region.Epoch.Version)
	row[11].SetUint64(region.WrittenBytes)
	row[12].SetUint64(region.ReadBytes)
	row[13].SetInt64(region.ApproximateSize)
	row[14].SetInt64(region.ApproximateKeys)
	if region.ReplicationStatus != nil {
		row[15].SetString(region.ReplicationStatus.State, mysql.DefaultCollationName)
		row[16].SetInt64(region.ReplicationStatus.StateID)
	}
	e.rows = append(e.rows, row)
}

const (
	normalPeer  = "NORMAL"
	pendingPeer = "PENDING"
	downPeer    = "DOWN"
)

func (e *memtableRetriever) setDataForTiDBHotRegions(ctx sessionctx.Context) error {
	tikvStore, ok := ctx.GetStore().(helper.Storage)
	if !ok {
		return errors.New("Information about hot region can be gotten only when the storage is TiKV")
	}
	allSchemas := ctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas()
	tikvHelper := &helper.Helper{
		Store:       tikvStore,
		RegionCache: tikvStore.GetRegionCache(),
	}
	metrics, err := tikvHelper.ScrapeHotInfo(pdapi.HotRead, allSchemas)
	if err != nil {
		return err
	}
	e.setDataForHotRegionByMetrics(metrics, "read")
	metrics, err = tikvHelper.ScrapeHotInfo(pdapi.HotWrite, allSchemas)
	if err != nil {
		return err
	}
	e.setDataForHotRegionByMetrics(metrics, "write")
	return nil
}

func (e *memtableRetriever) setDataForHotRegionByMetrics(metrics []helper.HotTableIndex, tp string) {
	rows := make([][]types.Datum, 0, len(metrics))
	for _, tblIndex := range metrics {
		row := make([]types.Datum, len(infoschema.TableTiDBHotRegionsCols))
		if tblIndex.IndexName != "" {
			row[1].SetInt64(tblIndex.IndexID)
			row[4].SetString(tblIndex.IndexName, mysql.DefaultCollationName)
		} else {
			row[1].SetNull()
			row[4].SetNull()
		}
		row[0].SetInt64(tblIndex.TableID)
		row[2].SetString(tblIndex.DbName, mysql.DefaultCollationName)
		row[3].SetString(tblIndex.TableName, mysql.DefaultCollationName)
		row[5].SetUint64(tblIndex.RegionID)
		row[6].SetString(tp, mysql.DefaultCollationName)
		if tblIndex.RegionMetric == nil {
			row[7].SetNull()
			row[8].SetNull()
		} else {
			row[7].SetInt64(int64(tblIndex.RegionMetric.MaxHotDegree))
			row[8].SetInt64(int64(tblIndex.RegionMetric.Count))
		}
		row[9].SetUint64(tblIndex.RegionMetric.FlowBytes)
		rows = append(rows, row)
	}
	e.rows = append(e.rows, rows...)
}

// setDataFromTableConstraints constructs data for table information_schema.constraints.See https://dev.mysql.com/doc/refman/5.7/en/table-constraints-table.html
func (e *memtableRetriever) setDataFromTableConstraints(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	var rows [][]types.Datum
	for _, schema := range schemas {
		for _, tbl := range schema.Tables {
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, tbl.Name.L, "", mysql.AllPrivMask) {
				continue
			}

			if tbl.PKIsHandle {
				record := types.MakeDatums(
					infoschema.CatalogVal,     // CONSTRAINT_CATALOG
					schema.Name.O,             // CONSTRAINT_SCHEMA
					mysql.PrimaryKeyName,      // CONSTRAINT_NAME
					schema.Name.O,             // TABLE_SCHEMA
					tbl.Name.O,                // TABLE_NAME
					infoschema.PrimaryKeyType, // CONSTRAINT_TYPE
				)
				rows = append(rows, record)
			}

			for _, idx := range tbl.Indices {
				var cname, ctype string
				if idx.Primary {
					cname = mysql.PrimaryKeyName
					ctype = infoschema.PrimaryKeyType
				} else if idx.Unique {
					cname = idx.Name.O
					ctype = infoschema.UniqueKeyType
				} else {
					// The index has no constriant.
					continue
				}
				record := types.MakeDatums(
					infoschema.CatalogVal, // CONSTRAINT_CATALOG
					schema.Name.O,         // CONSTRAINT_SCHEMA
					cname,                 // CONSTRAINT_NAME
					schema.Name.O,         // TABLE_SCHEMA
					tbl.Name.O,            // TABLE_NAME
					ctype,                 // CONSTRAINT_TYPE
				)
				rows = append(rows, record)
			}
			//  TiDB includes foreign key information for compatibility but foreign keys are not yet enforced.
			for _, fk := range tbl.ForeignKeys {
				record := types.MakeDatums(
					infoschema.CatalogVal,     // CONSTRAINT_CATALOG
					schema.Name.O,             // CONSTRAINT_SCHEMA
					fk.Name.O,                 // CONSTRAINT_NAME
					schema.Name.O,             // TABLE_SCHEMA
					tbl.Name.O,                // TABLE_NAME
					infoschema.ForeignKeyType, // CONSTRAINT_TYPE
				)
				rows = append(rows, record)
			}
		}
	}
	e.rows = rows
}

// tableStorageStatsRetriever is used to read slow log data.
type tableStorageStatsRetriever struct {
	dummyCloser
	table         *model.TableInfo
	outputCols    []*model.ColumnInfo
	retrieved     bool
	initialized   bool
	extractor     *plannercore.TableStorageStatsExtractor
	initialTables []*initialTable
	curTable      int
	helper        *helper.Helper
	stats         helper.PDRegionStats
}

func (e *tableStorageStatsRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.retrieved {
		return nil, nil
	}
	if !e.initialized {
		err := e.initialize(sctx)
		if err != nil {
			return nil, err
		}
	}
	if len(e.initialTables) == 0 || e.curTable >= len(e.initialTables) {
		e.retrieved = true
		return nil, nil
	}

	rows, err := e.setDataForTableStorageStats(sctx)
	if err != nil {
		return nil, err
	}
	if len(e.outputCols) == len(e.table.Columns) {
		return rows, nil
	}
	retRows := make([][]types.Datum, len(rows))
	for i, fullRow := range rows {
		row := make([]types.Datum, len(e.outputCols))
		for j, col := range e.outputCols {
			row[j] = fullRow[col.Offset]
		}
		retRows[i] = row
	}
	return retRows, nil
}

type initialTable struct {
	db string
	*model.TableInfo
}

func (e *tableStorageStatsRetriever) initialize(sctx sessionctx.Context) error {
	is := sctx.GetInfoSchema().(infoschema.InfoSchema)
	var databases []string
	schemas := e.extractor.TableSchema
	tables := e.extractor.TableName

	// If not specify the table_schema, return an error to avoid traverse all schemas and their tables.
	if len(schemas) == 0 {
		return errors.Errorf("Please add where clause to filter the column TABLE_SCHEMA. " +
			"For example, where TABLE_SCHEMA = 'xxx' or where TABLE_SCHEMA in ('xxx', 'yyy')")
	}

	// Filter the sys or memory schema.
	for schema := range schemas {
		if !util.IsMemDB(schema) {
			databases = append(databases, schema)
		}
	}

	// Privilege checker.
	checker := func(db, table string) bool {
		if pm := privilege.GetPrivilegeManager(sctx); pm != nil {
			return pm.RequestVerification(sctx.GetSessionVars().ActiveRoles, db, table, "", mysql.AllPrivMask)
		}
		return true
	}

	// Extract the tables to the initialTable.
	for _, DB := range databases {
		// The user didn't specified the table, extract all tables of this db to initialTable.
		if len(tables) == 0 {
			tbs := is.SchemaTables(model.NewCIStr(DB))
			for _, tb := range tbs {
				// For every db.table, check it's privileges.
				if checker(DB, tb.Meta().Name.L) {
					e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
				}
			}
		} else {
			// The user specified the table, extract the specified tables of this db to initialTable.
			for tb := range tables {
				if tb, err := is.TableByName(model.NewCIStr(DB), model.NewCIStr(tb)); err == nil {
					// For every db.table, check it's privileges.
					if checker(DB, tb.Meta().Name.L) {
						e.initialTables = append(e.initialTables, &initialTable{DB, tb.Meta()})
					}
				}
			}
		}
	}

	// Cache the helper and return an error if PD unavailable.
	tikvStore, ok := sctx.GetStore().(helper.Storage)
	if !ok {
		return errors.Errorf("Information about TiKV region status can be gotten only when the storage is TiKV")
	}
	e.helper = helper.NewHelper(tikvStore)
	_, err := e.helper.GetPDAddr()
	if err != nil {
		return err
	}
	e.initialized = true
	return nil
}

func (e *tableStorageStatsRetriever) setDataForTableStorageStats(ctx sessionctx.Context) ([][]types.Datum, error) {
	rows := make([][]types.Datum, 0, 1024)
	count := 0
	for e.curTable < len(e.initialTables) && count < 1024 {
		table := e.initialTables[e.curTable]
		tableID := table.ID
		err := e.helper.GetPDRegionStats(tableID, &e.stats, false)
		if err != nil {
			return nil, err
		}
		peerCount := len(e.stats.StorePeerCount)

		record := types.MakeDatums(
			table.db,            // TABLE_SCHEMA
			table.Name.O,        // TABLE_NAME
			tableID,             // TABLE_ID
			peerCount,           // TABLE_PEER_COUNT
			e.stats.Count,       // TABLE_REGION_COUNT
			e.stats.EmptyCount,  // TABLE_EMPTY_REGION_COUNT
			e.stats.StorageSize, // TABLE_SIZE
			e.stats.StorageKeys, // TABLE_KEYS
		)
		rows = append(rows, record)
		count++
		e.curTable++
	}
	return rows, nil
}

// dataForAnalyzeStatusHelper is a helper function which can be used in show_stats.go
func dataForAnalyzeStatusHelper(sctx sessionctx.Context) (rows [][]types.Datum, err error) {
	const maxAnalyzeJobs = 30
	const sql = "SELECT table_schema, table_name, partition_name, job_info, processed_rows, CONVERT_TZ(start_time, @@TIME_ZONE, '+00:00'), CONVERT_TZ(end_time, @@TIME_ZONE, '+00:00'), state, fail_reason, instance, process_id FROM mysql.analyze_jobs ORDER BY update_time DESC LIMIT %?"
	exec := sctx.(sqlexec.RestrictedSQLExecutor)
	ctx := kv.WithInternalSourceType(context.Background(), kv.InternalTxnStats)
	chunkRows, _, err := exec.ExecRestrictedSQL(ctx, nil, sql, maxAnalyzeJobs)
	if err != nil {
		return nil, err
	}
	checker := privilege.GetPrivilegeManager(sctx)
	for _, chunkRow := range chunkRows {
		dbName := chunkRow.GetString(0)
		tableName := chunkRow.GetString(1)
		if checker != nil && !checker.RequestVerification(sctx.GetSessionVars().ActiveRoles, dbName, tableName, "", mysql.AllPrivMask) {
			continue
		}
		partitionName := chunkRow.GetString(2)
		jobInfo := chunkRow.GetString(3)
		processedRows := chunkRow.GetInt64(4)
		var startTime, endTime interface{}
		if !chunkRow.IsNull(5) {
			t, err := chunkRow.GetTime(5).GoTime(time.UTC)
			if err != nil {
				return nil, err
			}
			startTime = types.NewTime(types.FromGoTime(t.In(sctx.GetSessionVars().TimeZone)), mysql.TypeDatetime, 0)
		}
		if !chunkRow.IsNull(6) {
			t, err := chunkRow.GetTime(6).GoTime(time.UTC)
			if err != nil {
				return nil, err
			}
			endTime = types.NewTime(types.FromGoTime(t.In(sctx.GetSessionVars().TimeZone)), mysql.TypeDatetime, 0)
		}
		state := chunkRow.GetEnum(7).String()
		var failReason interface{}
		if !chunkRow.IsNull(8) {
			failReason = chunkRow.GetString(8)
		}
		instance := chunkRow.GetString(9)
		var procID interface{}
		if !chunkRow.IsNull(10) {
			procID = chunkRow.GetUint64(10)
		}
		rows = append(rows, types.MakeDatums(
			dbName,        // TABLE_SCHEMA
			tableName,     // TABLE_NAME
			partitionName, // PARTITION_NAME
			jobInfo,       // JOB_INFO
			processedRows, // ROW_COUNT
			startTime,     // START_TIME
			endTime,       // END_TIME
			state,         // STATE
			failReason,    // FAIL_REASON
			instance,      // INSTANCE
			procID,        // PROCESS_ID
		))
	}
	return
}

// setDataForAnalyzeStatus gets all the analyze jobs.
func (e *memtableRetriever) setDataForAnalyzeStatus(sctx sessionctx.Context) (err error) {
	e.rows, err = dataForAnalyzeStatusHelper(sctx)
	return
}

// setDataForPseudoProfiling returns pseudo data for table profiling when system variable `profiling` is set to `ON`.
func (e *memtableRetriever) setDataForPseudoProfiling(sctx sessionctx.Context) {
	if v, ok := sctx.GetSessionVars().GetSystemVar("profiling"); ok && variable.TiDBOptOn(v) {
		row := types.MakeDatums(
			0,                      // QUERY_ID
			0,                      // SEQ
			"",                     // STATE
			types.NewDecFromInt(0), // DURATION
			types.NewDecFromInt(0), // CPU_USER
			types.NewDecFromInt(0), // CPU_SYSTEM
			0,                      // CONTEXT_VOLUNTARY
			0,                      // CONTEXT_INVOLUNTARY
			0,                      // BLOCK_OPS_IN
			0,                      // BLOCK_OPS_OUT
			0,                      // MESSAGES_SENT
			0,                      // MESSAGES_RECEIVED
			0,                      // PAGE_FAULTS_MAJOR
			0,                      // PAGE_FAULTS_MINOR
			0,                      // SWAPS
			"",                     // SOURCE_FUNCTION
			"",                     // SOURCE_FILE
			0,                      // SOURCE_LINE
		)
		e.rows = append(e.rows, row)
	}
}

func (e *memtableRetriever) setDataForServersInfo(ctx sessionctx.Context) error {
	serversInfo, err := infosync.GetAllServerInfo(context.Background())
	if err != nil {
		return err
	}
	rows := make([][]types.Datum, 0, len(serversInfo))
	for _, info := range serversInfo {
		row := types.MakeDatums(
			info.ID,              // DDL_ID
			info.IP,              // IP
			int(info.Port),       // PORT
			int(info.StatusPort), // STATUS_PORT
			info.Lease,           // LEASE
			info.Version,         // VERSION
			info.GitHash,         // GIT_HASH
			info.BinlogStatus,    // BINLOG_STATUS
			stringutil.BuildStringFromLabels(info.Labels), // LABELS
		)
		if sem.IsEnabled() {
			checker := privilege.GetPrivilegeManager(ctx)
			if checker == nil || !checker.RequestDynamicVerification(ctx.GetSessionVars().ActiveRoles, "RESTRICTED_TABLES_ADMIN", false) {
				row[1].SetNull() // clear IP
			}
		}
		rows = append(rows, row)
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromSequences(ctx sessionctx.Context, schemas []*model.DBInfo) {
	checker := privilege.GetPrivilegeManager(ctx)
	var rows [][]types.Datum
	for _, schema := range schemas {
		for _, table := range schema.Tables {
			if !table.IsSequence() {
				continue
			}
			if checker != nil && !checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, schema.Name.L, table.Name.L, "", mysql.AllPrivMask) {
				continue
			}
			record := types.MakeDatums(
				infoschema.CatalogVal,     // TABLE_CATALOG
				schema.Name.O,             // TABLE_SCHEMA
				table.Name.O,              // TABLE_NAME
				table.Sequence.Cache,      // Cache
				table.Sequence.CacheValue, // CACHE_VALUE
				table.Sequence.Cycle,      // CYCLE
				table.Sequence.Increment,  // INCREMENT
				table.Sequence.MaxValue,   // MAXVALUE
				table.Sequence.MinValue,   // MINVALUE
				table.Sequence.Start,      // START
				table.Sequence.Comment,    // COMMENT
			)
			rows = append(rows, record)
		}
	}
	e.rows = rows
}

// dataForTableTiFlashReplica constructs data for table tiflash replica info.
func (e *memtableRetriever) dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) {
	var rows [][]types.Datum
	progressMap, err := infosync.GetTiFlashTableSyncProgress(context.Background())
	if err != nil {
		ctx.GetSessionVars().StmtCtx.AppendWarning(err)
	}
	for _, schema := range schemas {
		for _, tbl := range schema.Tables {
			if tbl.TiFlashReplica == nil {
				continue
			}
			progress := 1.0
			if !tbl.TiFlashReplica.Available {
				if pi := tbl.GetPartitionInfo(); pi != nil && len(pi.Definitions) > 0 {
					progress = 0
					for _, p := range pi.Definitions {
						if tbl.TiFlashReplica.IsPartitionAvailable(p.ID) {
							progress += 1
						} else {
							progress += progressMap[p.ID]
						}
					}
					progress = progress / float64(len(pi.Definitions))
				} else {
					progress = progressMap[tbl.ID]
				}
			}
			record := types.MakeDatums(
				schema.Name.O,                   // TABLE_SCHEMA
				tbl.Name.O,                      // TABLE_NAME
				tbl.ID,                          // TABLE_ID
				int64(tbl.TiFlashReplica.Count), // REPLICA_COUNT
				strings.Join(tbl.TiFlashReplica.LocationLabels, ","), // LOCATION_LABELS
				tbl.TiFlashReplica.Available,                         // AVAILABLE
				progress,                                             // PROGRESS
			)
			rows = append(rows, record)
		}
	}
	e.rows = rows
}

func (e *memtableRetriever) setDataForStatementsSummaryEvicted(ctx sessionctx.Context) error {
	if !hasPriv(ctx, mysql.ProcessPriv) {
		return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
	}
	e.rows = stmtsummary.StmtSummaryByDigestMap.ToEvictedCountDatum()
	switch e.table.Name.O {
	case infoschema.ClusterTableStatementsSummaryEvicted:
		rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
		if err != nil {
			return err
		}
		e.rows = rows
	}
	return nil
}

func (e *memtableRetriever) setDataForClientErrorsSummary(ctx sessionctx.Context, tableName string) error {
	// Seeing client errors should require the PROCESS privilege, with the exception of errors for your own user.
	// This is similar to information_schema.processlist, which is the closest comparison.
	hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
	loginUser := ctx.GetSessionVars().User

	var rows [][]types.Datum
	switch tableName {
	case infoschema.TableClientErrorsSummaryGlobal:
		if !hasProcessPriv {
			return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
		}
		for code, summary := range errno.GlobalStats() {
			firstSeen := types.NewTime(types.FromGoTime(summary.FirstSeen), mysql.TypeTimestamp, types.DefaultFsp)
			lastSeen := types.NewTime(types.FromGoTime(summary.LastSeen), mysql.TypeTimestamp, types.DefaultFsp)
			row := types.MakeDatums(
				int(code),                    // ERROR_NUMBER
				errno.MySQLErrName[code].Raw, // ERROR_MESSAGE
				summary.ErrorCount,           // ERROR_COUNT
				summary.WarningCount,         // WARNING_COUNT
				firstSeen,                    // FIRST_SEEN
				lastSeen,                     // LAST_SEEN
			)
			rows = append(rows, row)
		}
	case infoschema.TableClientErrorsSummaryByUser:
		for user, agg := range errno.UserStats() {
			for code, summary := range agg {
				// Allow anyone to see their own errors.
				if !hasProcessPriv && loginUser != nil && loginUser.Username != user {
					continue
				}
				firstSeen := types.NewTime(types.FromGoTime(summary.FirstSeen), mysql.TypeTimestamp, types.DefaultFsp)
				lastSeen := types.NewTime(types.FromGoTime(summary.LastSeen), mysql.TypeTimestamp, types.DefaultFsp)
				row := types.MakeDatums(
					user,                         // USER
					int(code),                    // ERROR_NUMBER
					errno.MySQLErrName[code].Raw, // ERROR_MESSAGE
					summary.ErrorCount,           // ERROR_COUNT
					summary.WarningCount,         // WARNING_COUNT
					firstSeen,                    // FIRST_SEEN
					lastSeen,                     // LAST_SEEN
				)
				rows = append(rows, row)
			}
		}
	case infoschema.TableClientErrorsSummaryByHost:
		if !hasProcessPriv {
			return plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
		}
		for host, agg := range errno.HostStats() {
			for code, summary := range agg {
				firstSeen := types.NewTime(types.FromGoTime(summary.FirstSeen), mysql.TypeTimestamp, types.DefaultFsp)
				lastSeen := types.NewTime(types.FromGoTime(summary.LastSeen), mysql.TypeTimestamp, types.DefaultFsp)
				row := types.MakeDatums(
					host,                         // HOST
					int(code),                    // ERROR_NUMBER
					errno.MySQLErrName[code].Raw, // ERROR_MESSAGE
					summary.ErrorCount,           // ERROR_COUNT
					summary.WarningCount,         // WARNING_COUNT
					firstSeen,                    // FIRST_SEEN
					lastSeen,                     // LAST_SEEN
				)
				rows = append(rows, row)
			}
		}
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataForTrxSummary(ctx sessionctx.Context) error {
	hasProcessPriv := hasPriv(ctx, mysql.ProcessPriv)
	if !hasProcessPriv {
		return nil
	}
	rows := txninfo.Recorder.DumpTrxSummary()
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataForClusterTrxSummary(ctx sessionctx.Context) error {
	err := e.setDataForTrxSummary(ctx)
	if err != nil {
		return err
	}
	rows, err := infoschema.AppendHostInfoToRows(ctx, e.rows)
	if err != nil {
		return err
	}
	e.rows = rows
	return nil
}

type stmtSummaryTableRetriever struct {
	dummyCloser
	table     *model.TableInfo
	columns   []*model.ColumnInfo
	retrieved bool
	extractor *plannercore.StatementsSummaryExtractor
}

// retrieve implements the infoschemaRetriever interface
func (e *stmtSummaryTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.extractor.SkipRequest || e.retrieved {
		return nil, nil
	}
	e.retrieved = true

	var err error
	var instanceAddr string
	switch e.table.Name.O {
	case infoschema.ClusterTableStatementsSummary,
		infoschema.ClusterTableStatementsSummaryHistory:
		instanceAddr, err = infoschema.GetInstanceAddr(sctx)
		if err != nil {
			return nil, err
		}
	}
	user := sctx.GetSessionVars().User
	reader := stmtsummary.NewStmtSummaryReader(user, hasPriv(sctx, mysql.ProcessPriv), e.columns, instanceAddr, sctx.GetSessionVars().StmtCtx.TimeZone)
	if e.extractor.Enable {
		checker := stmtsummary.NewStmtSummaryChecker(e.extractor.Digests)
		reader.SetChecker(checker)
	}
	var rows [][]types.Datum
	switch e.table.Name.O {
	case infoschema.TableStatementsSummary,
		infoschema.ClusterTableStatementsSummary:
		rows = reader.GetStmtSummaryCurrentRows()
	case infoschema.TableStatementsSummaryHistory,
		infoschema.ClusterTableStatementsSummaryHistory:
		rows = reader.GetStmtSummaryHistoryRows()
	}

	return rows, nil
}

// tidbTrxTableRetriever is the memtable retriever for the TIDB_TRX and CLUSTER_TIDB_TRX table.
type tidbTrxTableRetriever struct {
	dummyCloser
	batchRetrieverHelper
	table       *model.TableInfo
	columns     []*model.ColumnInfo
	txnInfo     []*txninfo.TxnInfo
	initialized bool
}

func (e *tidbTrxTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.retrieved {
		return nil, nil
	}

	if !e.initialized {
		e.initialized = true

		sm := sctx.GetSessionManager()
		if sm == nil {
			e.retrieved = true
			return nil, nil
		}

		loginUser := sctx.GetSessionVars().User
		hasProcessPriv := hasPriv(sctx, mysql.ProcessPriv)
		infoList := sm.ShowTxnList()
		e.txnInfo = make([]*txninfo.TxnInfo, 0, len(infoList))
		for _, info := range infoList {
			// If you have the PROCESS privilege, you can see all running transactions.
			// Otherwise, you can see only your own transactions.
			if !hasProcessPriv && loginUser != nil && info.Username != loginUser.Username {
				continue
			}
			e.txnInfo = append(e.txnInfo, info)
		}

		e.batchRetrieverHelper.totalRows = len(e.txnInfo)
		e.batchRetrieverHelper.batchSize = 1024
	}

	// The current TiDB node's address is needed by the CLUSTER_TIDB_TRX table.
	var err error
	var instanceAddr string
	switch e.table.Name.O {
	case infoschema.ClusterTableTiDBTrx:
		instanceAddr, err = infoschema.GetInstanceAddr(sctx)
		if err != nil {
			return nil, err
		}
	}

	var res [][]types.Datum
	err = e.nextBatch(func(start, end int) error {
		// Before getting rows, collect the SQL digests that needs to be retrieved first.
		var sqlRetriever *expression.SQLDigestTextRetriever
		for _, c := range e.columns {
			if c.Name.O == txninfo.CurrentSQLDigestTextStr {
				if sqlRetriever == nil {
					sqlRetriever = expression.NewSQLDigestTextRetriever()
				}

				for i := start; i < end; i++ {
					sqlRetriever.SQLDigestsMap[e.txnInfo[i].CurrentSQLDigest] = ""
				}
			}
		}
		// Retrieve the SQL texts if necessary.
		if sqlRetriever != nil {
			err1 := sqlRetriever.RetrieveLocal(ctx, sctx)
			if err1 != nil {
				return errors.Trace(err1)
			}
		}

		res = make([][]types.Datum, 0, end-start)

		// Calculate rows.
		for i := start; i < end; i++ {
			row := make([]types.Datum, 0, len(e.columns))
			for _, c := range e.columns {
				if c.Name.O == util.ClusterTableInstanceColumnName {
					row = append(row, types.NewDatum(instanceAddr))
				} else if c.Name.O == txninfo.CurrentSQLDigestTextStr {
					if text, ok := sqlRetriever.SQLDigestsMap[e.txnInfo[i].CurrentSQLDigest]; ok && len(text) != 0 {
						row = append(row, types.NewDatum(text))
					} else {
						row = append(row, types.NewDatum(nil))
					}
				} else {
					row = append(row, e.txnInfo[i].ToDatum(c.Name.O))
				}
			}
			res = append(res, row)
		}

		return nil
	})

	if err != nil {
		return nil, err
	}

	return res, nil
}

// dataLockWaitsTableRetriever is the memtable retriever for the DATA_LOCK_WAITS table.
type dataLockWaitsTableRetriever struct {
	dummyCloser
	batchRetrieverHelper
	table          *model.TableInfo
	columns        []*model.ColumnInfo
	lockWaits      []*deadlock.WaitForEntry
	resolvingLocks []txnlock.ResolvingLock
	initialized    bool
}

func (r *dataLockWaitsTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if r.retrieved {
		return nil, nil
	}

	if !r.initialized {
		if !hasPriv(sctx, mysql.ProcessPriv) {
			return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
		}

		r.initialized = true
		var err error
		r.lockWaits, err = sctx.GetStore().GetLockWaits()
		tikvStore, _ := sctx.GetStore().(helper.Storage)
		r.resolvingLocks = tikvStore.GetLockResolver().Resolving()
		if err != nil {
			r.retrieved = true
			return nil, err
		}

		r.batchRetrieverHelper.totalRows = len(r.lockWaits) + len(r.resolvingLocks)
		r.batchRetrieverHelper.batchSize = 1024
	}

	var res [][]types.Datum

	err := r.nextBatch(func(start, end int) error {
		// Before getting rows, collect the SQL digests that needs to be retrieved first.
		var needDigest bool
		var needSQLText bool
		for _, c := range r.columns {
			if c.Name.O == infoschema.DataLockWaitsColumnSQLDigestText {
				needSQLText = true
			} else if c.Name.O == infoschema.DataLockWaitsColumnSQLDigest {
				needDigest = true
			}
		}

		var digests []string
		if needDigest || needSQLText {
			digests = make([]string, end-start)
			for i, lockWait := range r.lockWaits {
				digest, err := resourcegrouptag.DecodeResourceGroupTag(lockWait.ResourceGroupTag)
				if err != nil {
					// Ignore the error if failed to decode the digest from resource_group_tag. We still want to show
					// as much information as possible even we can't retrieve some of them.
					logutil.Logger(ctx).Warn("failed to decode resource group tag", zap.Error(err))
				} else {
					digests[i] = hex.EncodeToString(digest)
				}
			}
			// todo: support resourcegrouptag for resolvingLocks
		}

		// Fetch the SQL Texts of the digests above if necessary.
		var sqlRetriever *expression.SQLDigestTextRetriever
		if needSQLText {
			sqlRetriever = expression.NewSQLDigestTextRetriever()
			for _, digest := range digests {
				if len(digest) > 0 {
					sqlRetriever.SQLDigestsMap[digest] = ""
				}
			}
			err := sqlRetriever.RetrieveGlobal(ctx, sctx)
			if err != nil {
				return errors.Trace(err)
			}
		}

		// Calculate rows.
		res = make([][]types.Datum, 0, end-start)
		// data_lock_waits contains both lockWaits (pessimistic lock waiting)
		// and resolving (optimistic lock "waiting") info
		// first we'll return the lockWaits, and then resolving, so we need to
		// do some index calculation here
		lockWaitsStart := mathutil.Min(start, len(r.lockWaits))
		resolvingStart := start - lockWaitsStart
		lockWaitsEnd := mathutil.Min(end, len(r.lockWaits))
		resolvingEnd := end - lockWaitsEnd
		for rowIdx, lockWait := range r.lockWaits[lockWaitsStart:lockWaitsEnd] {
			row := make([]types.Datum, 0, len(r.columns))

			for _, col := range r.columns {
				switch col.Name.O {
				case infoschema.DataLockWaitsColumnKey:
					row = append(row, types.NewDatum(strings.ToUpper(hex.EncodeToString(lockWait.Key))))
				case infoschema.DataLockWaitsColumnKeyInfo:
					infoSchema := sctx.GetInfoSchema().(infoschema.InfoSchema)
					var decodedKeyStr interface{} = nil
					decodedKey, err := keydecoder.DecodeKey(lockWait.Key, infoSchema)
					if err == nil {
						decodedKeyBytes, err := json.Marshal(decodedKey)
						if err != nil {
							logutil.BgLogger().Warn("marshal decoded key info to JSON failed", zap.Error(err))
						} else {
							decodedKeyStr = string(decodedKeyBytes)
						}
					} else {
						logutil.Logger(ctx).Warn("decode key failed", zap.Error(err))
					}
					row = append(row, types.NewDatum(decodedKeyStr))
				case infoschema.DataLockWaitsColumnTrxID:
					row = append(row, types.NewDatum(lockWait.Txn))
				case infoschema.DataLockWaitsColumnCurrentHoldingTrxID:
					row = append(row, types.NewDatum(lockWait.WaitForTxn))
				case infoschema.DataLockWaitsColumnSQLDigest:
					digest := digests[rowIdx]
					if len(digest) == 0 {
						row = append(row, types.NewDatum(nil))
					} else {
						row = append(row, types.NewDatum(digest))
					}
				case infoschema.DataLockWaitsColumnSQLDigestText:
					text := sqlRetriever.SQLDigestsMap[digests[rowIdx]]
					if len(text) > 0 {
						row = append(row, types.NewDatum(text))
					} else {
						row = append(row, types.NewDatum(nil))
					}
				default:
					row = append(row, types.NewDatum(nil))
				}
			}

			res = append(res, row)
		}
		for _, resolving := range r.resolvingLocks[resolvingStart:resolvingEnd] {
			row := make([]types.Datum, 0, len(r.columns))

			for _, col := range r.columns {
				switch col.Name.O {
				case infoschema.DataLockWaitsColumnKey:
					row = append(row, types.NewDatum(strings.ToUpper(hex.EncodeToString(resolving.Key))))
				case infoschema.DataLockWaitsColumnKeyInfo:
					infoSchema := domain.GetDomain(sctx).InfoSchema()
					var decodedKeyStr interface{} = nil
					decodedKey, err := keydecoder.DecodeKey(resolving.Key, infoSchema)
					if err == nil {
						decodedKeyBytes, err := json.Marshal(decodedKey)
						if err != nil {
							logutil.Logger(ctx).Warn("marshal decoded key info to JSON failed", zap.Error(err))
						} else {
							decodedKeyStr = string(decodedKeyBytes)
						}
					} else {
						logutil.Logger(ctx).Warn("decode key failed", zap.Error(err))
					}
					row = append(row, types.NewDatum(decodedKeyStr))
				case infoschema.DataLockWaitsColumnTrxID:
					row = append(row, types.NewDatum(resolving.TxnID))
				case infoschema.DataLockWaitsColumnCurrentHoldingTrxID:
					row = append(row, types.NewDatum(resolving.LockTxnID))
				case infoschema.DataLockWaitsColumnSQLDigest:
					// todo: support resourcegrouptag for resolvingLocks
					row = append(row, types.NewDatum(nil))
				case infoschema.DataLockWaitsColumnSQLDigestText:
					// todo: support resourcegrouptag for resolvingLocks
					row = append(row, types.NewDatum(nil))
				default:
					row = append(row, types.NewDatum(nil))
				}
			}

			res = append(res, row)
		}
		return nil
	})

	if err != nil {
		return nil, err
	}

	return res, nil
}

// deadlocksTableRetriever is the memtable retriever for the DEADLOCKS and CLUSTER_DEADLOCKS table.
type deadlocksTableRetriever struct {
	dummyCloser
	batchRetrieverHelper

	currentIdx          int
	currentWaitChainIdx int

	table       *model.TableInfo
	columns     []*model.ColumnInfo
	deadlocks   []*deadlockhistory.DeadlockRecord
	initialized bool
}

// nextIndexPair advances a index pair (where `idx` is the index of the DeadlockRecord, and `waitChainIdx` is the index
// of the wait chain item in the `idx`-th DeadlockRecord. This function helps iterate over each wait chain item
// in all DeadlockRecords.
func (r *deadlocksTableRetriever) nextIndexPair(idx, waitChainIdx int) (int, int) {
	waitChainIdx++
	if waitChainIdx >= len(r.deadlocks[idx].WaitChain) {
		waitChainIdx = 0
		idx++
		for idx < len(r.deadlocks) && len(r.deadlocks[idx].WaitChain) == 0 {
			idx++
		}
	}
	return idx, waitChainIdx
}

func (r *deadlocksTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if r.retrieved {
		return nil, nil
	}

	if !r.initialized {
		if !hasPriv(sctx, mysql.ProcessPriv) {
			return nil, plannercore.ErrSpecificAccessDenied.GenWithStackByArgs("PROCESS")
		}

		r.initialized = true
		r.deadlocks = deadlockhistory.GlobalDeadlockHistory.GetAll()

		r.batchRetrieverHelper.totalRows = 0
		for _, d := range r.deadlocks {
			r.batchRetrieverHelper.totalRows += len(d.WaitChain)
		}
		r.batchRetrieverHelper.batchSize = 1024
	}

	// The current TiDB node's address is needed by the CLUSTER_DEADLOCKS table.
	var err error
	var instanceAddr string
	switch r.table.Name.O {
	case infoschema.ClusterTableDeadlocks:
		instanceAddr, err = infoschema.GetInstanceAddr(sctx)
		if err != nil {
			return nil, err
		}
	}

	infoSchema := sctx.GetInfoSchema().(infoschema.InfoSchema)

	var res [][]types.Datum

	err = r.nextBatch(func(start, end int) error {
		// Before getting rows, collect the SQL digests that needs to be retrieved first.
		var sqlRetriever *expression.SQLDigestTextRetriever
		for _, c := range r.columns {
			if c.Name.O == deadlockhistory.ColCurrentSQLDigestTextStr {
				if sqlRetriever == nil {
					sqlRetriever = expression.NewSQLDigestTextRetriever()
				}

				idx, waitChainIdx := r.currentIdx, r.currentWaitChainIdx
				for i := start; i < end; i++ {
					if idx >= len(r.deadlocks) {
						return errors.New("reading information_schema.(cluster_)deadlocks table meets corrupted index")
					}

					sqlRetriever.SQLDigestsMap[r.deadlocks[idx].WaitChain[waitChainIdx].SQLDigest] = ""
					// Step to the next entry
					idx, waitChainIdx = r.nextIndexPair(idx, waitChainIdx)
				}
			}
		}
		// Retrieve the SQL texts if necessary.
		if sqlRetriever != nil {
			err1 := sqlRetriever.RetrieveGlobal(ctx, sctx)
			if err1 != nil {
				return errors.Trace(err1)
			}
		}

		res = make([][]types.Datum, 0, end-start)

		for i := start; i < end; i++ {
			if r.currentIdx >= len(r.deadlocks) {
				return errors.New("reading information_schema.(cluster_)deadlocks table meets corrupted index")
			}

			row := make([]types.Datum, 0, len(r.columns))
			deadlock := r.deadlocks[r.currentIdx]
			waitChainItem := deadlock.WaitChain[r.currentWaitChainIdx]

			for _, c := range r.columns {
				if c.Name.O == util.ClusterTableInstanceColumnName {
					row = append(row, types.NewDatum(instanceAddr))
				} else if c.Name.O == deadlockhistory.ColCurrentSQLDigestTextStr {
					if text, ok := sqlRetriever.SQLDigestsMap[waitChainItem.SQLDigest]; ok && len(text) > 0 {
						row = append(row, types.NewDatum(text))
					} else {
						row = append(row, types.NewDatum(nil))
					}
				} else if c.Name.O == deadlockhistory.ColKeyInfoStr {
					value := types.NewDatum(nil)
					if len(waitChainItem.Key) > 0 {
						decodedKey, err := keydecoder.DecodeKey(waitChainItem.Key, infoSchema)
						if err == nil {
							decodedKeyJSON, err := json.Marshal(decodedKey)
							if err != nil {
								logutil.BgLogger().Warn("marshal decoded key info to JSON failed", zap.Error(err))
							} else {
								value = types.NewDatum(string(decodedKeyJSON))
							}
						} else {
							logutil.Logger(ctx).Warn("decode key failed", zap.Error(err))
						}
					}
					row = append(row, value)
				} else {
					row = append(row, deadlock.ToDatum(r.currentWaitChainIdx, c.Name.O))
				}
			}

			res = append(res, row)
			// Step to the next entry
			r.currentIdx, r.currentWaitChainIdx = r.nextIndexPair(r.currentIdx, r.currentWaitChainIdx)
		}

		return nil
	})

	if err != nil {
		return nil, err
	}

	return res, nil
}

type hugeMemTableRetriever struct {
	dummyCloser
	extractor          *plannercore.ColumnsTableExtractor
	table              *model.TableInfo
	columns            []*model.ColumnInfo
	retrieved          bool
	initialized        bool
	rows               [][]types.Datum
	dbs                []*model.DBInfo
	dbsIdx             int
	tblIdx             int
	viewMu             sync.RWMutex
	viewSchemaMap      map[int64]*expression.Schema // table id to view schema
	viewOutputNamesMap map[int64]types.NameSlice    // table id to view output names
}

// retrieve implements the infoschemaRetriever interface
func (e *hugeMemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.retrieved {
		return nil, nil
	}

	if !e.initialized {
		is := sctx.GetInfoSchema().(infoschema.InfoSchema)
		dbs := is.AllSchemas()
		slices.SortFunc(dbs, func(i, j *model.DBInfo) bool {
			return i.Name.L < j.Name.L
		})
		e.dbs = dbs
		e.initialized = true
		e.rows = make([][]types.Datum, 0, 1024)
	}

	var err error
	switch e.table.Name.O {
	case infoschema.TableColumns:
		err = e.setDataForColumns(ctx, sctx, e.extractor)
	}
	if err != nil {
		return nil, err
	}
	e.retrieved = len(e.rows) == 0

	return adjustColumns(e.rows, e.columns, e.table), nil
}

func adjustColumns(input [][]types.Datum, outColumns []*model.ColumnInfo, table *model.TableInfo) [][]types.Datum {
	if table.Name.O == infoschema.TableStatementsSummary {
		return input
	}
	if len(outColumns) == len(table.Columns) {
		return input
	}
	rows := make([][]types.Datum, len(input))
	for i, fullRow := range input {
		row := make([]types.Datum, len(outColumns))
		for j, col := range outColumns {
			row[j] = fullRow[col.Offset]
		}
		rows[i] = row
	}
	return rows
}

// TiFlashSystemTableRetriever is used to read system table from tiflash.
type TiFlashSystemTableRetriever struct {
	dummyCloser
	table         *model.TableInfo
	outputCols    []*model.ColumnInfo
	instanceCount int
	instanceIdx   int
	instanceInfos []tiflashInstanceInfo
	rowIdx        int
	retrieved     bool
	initialized   bool
	extractor     *plannercore.TiFlashSystemTableExtractor
}

func (e *TiFlashSystemTableRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) {
	if e.extractor.SkipRequest || e.retrieved {
		return nil, nil
	}
	if !e.initialized {
		err := e.initialize(sctx, e.extractor.TiFlashInstances)
		if err != nil {
			return nil, err
		}
	}
	if e.instanceCount == 0 || e.instanceIdx >= e.instanceCount {
		e.retrieved = true
		return nil, nil
	}

	for {
		rows, err := e.dataForTiFlashSystemTables(sctx, e.extractor.TiDBDatabases, e.extractor.TiDBTables)
		if err != nil {
			return nil, err
		}
		if len(rows) > 0 || e.instanceIdx >= e.instanceCount {
			return rows, nil
		}
	}
}

type tiflashInstanceInfo struct {
	id  string
	url string
}

func (e *TiFlashSystemTableRetriever) initialize(sctx sessionctx.Context, tiflashInstances set.StringSet) error {
	storeInfo, err := infoschema.GetStoreServerInfo(sctx)
	if err != nil {
		return err
	}

	for _, info := range storeInfo {
		if info.ServerType != kv.TiFlash.Name() {
			continue
		}
		info.ResolveLoopBackAddr()
		if len(tiflashInstances) > 0 && !tiflashInstances.Exist(info.Address) {
			continue
		}
		hostAndStatusPort := strings.Split(info.StatusAddr, ":")
		if len(hostAndStatusPort) != 2 {
			return errors.Errorf("node status addr: %s format illegal", info.StatusAddr)
		}
		// fetch tiflash config
		configURL := fmt.Sprintf("%s://%s/config", util.InternalHTTPSchema(), info.StatusAddr)
		resp, err := util.InternalHTTPClient().Get(configURL)
		if err != nil {
			sctx.GetSessionVars().StmtCtx.AppendWarning(err)
			continue
		}
		if resp.StatusCode != http.StatusOK {
			return errors.Errorf("request %s failed: %s", configURL, resp.Status)
		}
		// parse http_port or https_port from the fetched config
		var nestedConfig map[string]interface{}
		if err = json.NewDecoder(resp.Body).Decode(&nestedConfig); err != nil {
			return err
		}
		if engineStoreConfig, ok := nestedConfig["engine-store"]; ok {
			foundPort := false
			var port interface{}
			portProtocol := ""
			if httpPort, ok := engineStoreConfig.(map[string]interface{})["http_port"]; ok {
				foundPort = true
				port = httpPort
				portProtocol = "http"
			} else if httpsPort, ok := engineStoreConfig.(map[string]interface{})["https_port"]; ok {
				foundPort = true
				port = httpsPort
				portProtocol = "https"
			}
			if !foundPort {
				return errors.Errorf("engine-store.http_port/https_port not found in server %s", info.Address)
			}
			switch portValue := port.(type) {
			case float64:
				e.instanceInfos = append(e.instanceInfos, tiflashInstanceInfo{
					id:  info.Address,
					url: fmt.Sprintf("%s://%s:%d", portProtocol, hostAndStatusPort[0], int(portValue)),
				})
				e.instanceCount += 1
			default:
				return errors.Errorf("engine-store.http_port value(%p) unexpected in server %s", port, info.Address)
			}
		} else {
			return errors.Errorf("engine-store config not found in server %s", info.Address)
		}
		if err = resp.Body.Close(); err != nil {
			return err
		}
	}
	e.initialized = true
	return nil
}

func (e *TiFlashSystemTableRetriever) dataForTiFlashSystemTables(ctx sessionctx.Context, tidbDatabases string, tidbTables string) ([][]types.Datum, error) {
	var columnNames []string //nolint: prealloc
	for _, c := range e.outputCols {
		if c.Name.O == "TIFLASH_INSTANCE" {
			continue
		}
		columnNames = append(columnNames, c.Name.L)
	}
	maxCount := 1024
	targetTable := strings.ToLower(strings.Replace(e.table.Name.O, "TIFLASH", "DT", 1))
	var filters []string
	if len(tidbDatabases) > 0 {
		filters = append(filters, fmt.Sprintf("tidb_database IN (%s)", strings.ReplaceAll(tidbDatabases, "\"", "'")))
	}
	if len(tidbTables) > 0 {
		filters = append(filters, fmt.Sprintf("tidb_table IN (%s)", strings.ReplaceAll(tidbTables, "\"", "'")))
	}
	sql := fmt.Sprintf("SELECT %s FROM system.%s", strings.Join(columnNames, ","), targetTable)
	if len(filters) > 0 {
		sql = fmt.Sprintf("%s WHERE %s", sql, strings.Join(filters, " AND "))
	}
	sql = fmt.Sprintf("%s LIMIT %d, %d", sql, e.rowIdx, maxCount)
	notNumber := "nan"
	instanceInfo := e.instanceInfos[e.instanceIdx]
	url := instanceInfo.url
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return nil, errors.Trace(err)
	}
	q := req.URL.Query()
	q.Add("query", sql)
	req.URL.RawQuery = q.Encode()
	resp, err := util.InternalHTTPClient().Do(req)
	if err != nil {
		return nil, errors.Trace(err)
	}
	body, err := io.ReadAll(resp.Body)
	terror.Log(resp.Body.Close())
	if err != nil {
		return nil, errors.Trace(err)
	}
	records := strings.Split(string(body), "\n")
	rows := make([][]types.Datum, 0, len(records))
	for _, record := range records {
		if len(record) == 0 {
			continue
		}
		fields := strings.Split(record, "\t")
		if len(fields) < len(e.outputCols)-1 {
			return nil, errors.Errorf("Record from tiflash doesn't match schema %v", fields)
		}
		row := make([]types.Datum, len(e.outputCols))
		for index, column := range e.outputCols {
			if column.Name.O == "TIFLASH_INSTANCE" {
				continue
			}
			if column.GetType() == mysql.TypeVarchar {
				row[index].SetString(fields[index], mysql.DefaultCollationName)
			} else if column.GetType() == mysql.TypeLonglong {
				if fields[index] == notNumber {
					continue
				}
				value, err := strconv.ParseInt(fields[index], 10, 64)
				if err != nil {
					return nil, errors.Trace(err)
				}
				row[index].SetInt64(value)
			} else if column.GetType() == mysql.TypeDouble {
				if fields[index] == notNumber {
					continue
				}
				value, err := strconv.ParseFloat(fields[index], 64)
				if err != nil {
					return nil, errors.Trace(err)
				}
				row[index].SetFloat64(value)
			} else {
				return nil, errors.Errorf("Meet column of unknown type %v", column)
			}
		}
		row[len(e.outputCols)-1].SetString(instanceInfo.id, mysql.DefaultCollationName)
		rows = append(rows, row)
	}
	e.rowIdx += len(rows)
	if len(rows) < maxCount {
		e.instanceIdx += 1
		e.rowIdx = 0
	}
	return rows, nil
}

func (e *memtableRetriever) setDataForAttributes(ctx sessionctx.Context, is infoschema.InfoSchema) error {
	checker := privilege.GetPrivilegeManager(ctx)
	rules, err := infosync.GetAllLabelRules(context.TODO())
	skipValidateTable := false
	failpoint.Inject("mockOutputOfAttributes", func() {
		convert := func(i interface{}) []interface{} {
			return []interface{}{i}
		}
		rules = []*label.Rule{
			{
				ID:       "schema/test/test_label",
				Labels:   []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}},
				RuleType: "key-range",
				Data: convert(map[string]interface{}{
					"start_key": "7480000000000000ff395f720000000000fa",
					"end_key":   "7480000000000000ff3a5f720000000000fa",
				}),
			},
			{
				ID:       "invalidIDtest",
				Labels:   []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}},
				RuleType: "key-range",
				Data: convert(map[string]interface{}{
					"start_key": "7480000000000000ff395f720000000000fa",
					"end_key":   "7480000000000000ff3a5f720000000000fa",
				}),
			},
			{
				ID:       "schema/test/test_label",
				Labels:   []label.Label{{Key: "merge_option", Value: "allow"}, {Key: "db", Value: "test"}, {Key: "table", Value: "test_label"}},
				RuleType: "key-range",
				Data: convert(map[string]interface{}{
					"start_key": "aaaaa",
					"end_key":   "bbbbb",
				}),
			},
		}
		err = nil
		skipValidateTable = true
	})

	if err != nil {
		return errors.Wrap(err, "get the label rules failed")
	}

	rows := make([][]types.Datum, 0, len(rules))
	for _, rule := range rules {
		skip := true
		dbName, tableName, partitionName, err := checkRule(rule)
		if err != nil {
			logutil.BgLogger().Warn("check table-rule failed", zap.String("ID", rule.ID), zap.Error(err))
			continue
		}
		tableID, err := decodeTableIDFromRule(rule)
		if err != nil {
			logutil.BgLogger().Warn("decode table ID from rule failed", zap.String("ID", rule.ID), zap.Error(err))
			continue
		}

		if !skipValidateTable && tableOrPartitionNotExist(dbName, tableName, partitionName, is, tableID) {
			continue
		}

		if tableName != "" && dbName != "" && (checker == nil || checker.RequestVerification(ctx.GetSessionVars().ActiveRoles, dbName, tableName, "", mysql.SelectPriv)) {
			skip = false
		}
		if skip {
			continue
		}

		labels := rule.Labels.Restore()
		var ranges []string
		for _, data := range rule.Data {
			if kv, ok := data.(map[string]interface{}); ok {
				startKey := kv["start_key"]
				endKey := kv["end_key"]
				ranges = append(ranges, fmt.Sprintf("[%s, %s]", startKey, endKey))
			}
		}
		kr := strings.Join(ranges, ", ")

		row := types.MakeDatums(
			rule.ID,
			rule.RuleType,
			labels,
			kr,
		)
		rows = append(rows, row)
	}
	e.rows = rows
	return nil
}

func (e *memtableRetriever) setDataFromPlacementPolicies(sctx sessionctx.Context) error {
	is := sessiontxn.GetTxnManager(sctx).GetTxnInfoSchema()
	placementPolicies := is.AllPlacementPolicies()
	rows := make([][]types.Datum, 0, len(placementPolicies))
	// Get global PLACEMENT POLICIES
	// Currently no privileges needed for seeing global PLACEMENT POLICIES!
	for _, policy := range placementPolicies {
		// Currently we skip converting syntactic sugar. We might revisit this decision still in the future
		// I.e.: if PrimaryRegion or Regions are set,
		// also convert them to LeaderConstraints and FollowerConstraints
		// for better user experience searching for particular constraints

		// Followers == 0 means not set, so the default value 2 will be used
		followerCnt := policy.PlacementSettings.Followers
		if followerCnt == 0 {
			followerCnt = 2
		}

		row := types.MakeDatums(
			policy.ID,
			infoschema.CatalogVal, // CATALOG
			policy.Name.O,         // Policy Name
			policy.PlacementSettings.PrimaryRegion,
			policy.PlacementSettings.Regions,
			policy.PlacementSettings.Constraints,
			policy.PlacementSettings.LeaderConstraints,
			policy.PlacementSettings.FollowerConstraints,
			policy.PlacementSettings.LearnerConstraints,
			policy.PlacementSettings.Schedule,
			followerCnt,
			policy.PlacementSettings.Learners,
		)
		rows = append(rows, row)
	}
	e.rows = rows
	return nil
}

func checkRule(rule *label.Rule) (dbName, tableName string, partitionName string, err error) {
	s := strings.Split(rule.ID, "/")
	if len(s) < 3 {
		err = errors.Errorf("invalid label rule ID: %v", rule.ID)
		return
	}
	if rule.RuleType == "" {
		err = errors.New("empty label rule type")
		return
	}
	if rule.Labels == nil || len(rule.Labels) == 0 {
		err = errors.New("the label rule has no label")
		return
	}
	if rule.Data == nil {
		err = errors.New("the label rule has no data")
		return
	}
	dbName = s[1]
	tableName = s[2]
	if len(s) > 3 {
		partitionName = s[3]
	}
	return
}

func decodeTableIDFromRule(rule *label.Rule) (tableID int64, err error) {
	if len(rule.Data) == 0 {
		err = fmt.Errorf("there is no data in rule %s", rule.ID)
		return
	}
	data := rule.Data[0]
	dataMap, ok := data.(map[string]interface{})
	if !ok {
		err = fmt.Errorf("get the label rules %s failed", rule.ID)
		return
	}
	key, err := hex.DecodeString(fmt.Sprintf("%s", dataMap["start_key"]))
	if err != nil {
		err = fmt.Errorf("decode key from start_key %s in rule %s failed", dataMap["start_key"], rule.ID)
		return
	}
	_, bs, err := codec.DecodeBytes(key, nil)
	if err == nil {
		key = bs
	}
	tableID = tablecodec.DecodeTableID(key)
	if tableID == 0 {
		err = fmt.Errorf("decode tableID from key %s in rule %s failed", key, rule.ID)
		return
	}
	return
}

func tableOrPartitionNotExist(dbName string, tableName string, partitionName string, is infoschema.InfoSchema, tableID int64) (tableNotExist bool) {
	if len(partitionName) == 0 {
		curTable, _ := is.TableByName(model.NewCIStr(dbName), model.NewCIStr(tableName))
		if curTable == nil {
			return true
		}
		curTableID := curTable.Meta().ID
		if curTableID != tableID {
			return true
		}
	} else {
		_, _, partInfo := is.FindTableByPartitionID(tableID)
		if partInfo == nil {
			return true
		}
	}
	return false
}

相关信息

tidb 源码目录

相关文章

tidb adapter 源码

tidb admin 源码

tidb admin_plugins 源码

tidb admin_telemetry 源码

tidb aggregate 源码

tidb analyze 源码

tidb analyze_col 源码

tidb analyze_col_v2 源码

tidb analyze_fast 源码

tidb analyze_global_stats 源码

0  赞