tidb codec 源码

  • 2022-09-19
  • 浏览 (345)

tidb codec 代码

文件路径:/util/plancodec/codec.go

// Copyright 2019 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package plancodec

import (
	"bytes"
	"encoding/base64"
	"math"
	"strconv"
	"strings"
	"sync"

	"github.com/golang/snappy"
	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/util/hack"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/texttree"
	"github.com/pingcap/tipb/go-tipb"
	"go.uber.org/zap"
)

const (
	rootTaskType = "0"
	copTaskType  = "1"
)

const (
	idSeparator    = "_"
	lineBreaker    = '\n'
	lineBreakerStr = "\n"
	separator      = '\t'
	separatorStr   = "\t"
)

var (
	// PlanDiscardedEncoded indicates the discard plan because it is too long
	PlanDiscardedEncoded = "[discard]"
	planDiscardedDecoded = "(plan discarded because too long)"
	// BinaryPlanDiscardedEncoded is a special binary plan that represents it's discarded because of too long.
	BinaryPlanDiscardedEncoded = func() string {
		binary := &tipb.ExplainData{DiscardedDueToTooLong: true}
		proto, err := binary.Marshal()
		if err != nil {
			return ""
		}
		return Compress(proto)
	}()
)

var decoderPool = sync.Pool{
	New: func() interface{} {
		return &planDecoder{}
	},
}

// DecodePlan use to decode the string to plan tree.
func DecodePlan(planString string) (res string, err error) {
	defer func() {
		if r := recover(); r != nil {
			logutil.BgLogger().Error("DecodePlan panic", zap.Stack("stack"), zap.Any("recover", r))
			err = errors.New("DecodePlan panicked")
		}
	}()
	if len(planString) == 0 {
		return "", nil
	}
	pd := decoderPool.Get().(*planDecoder)
	defer decoderPool.Put(pd)
	pd.buf.Reset()
	pd.addHeader = true
	return pd.decode(planString)
}

// DecodeNormalizedPlan decodes the string to plan tree.
func DecodeNormalizedPlan(planString string) (string, error) {
	if len(planString) == 0 {
		return "", nil
	}
	pd := decoderPool.Get().(*planDecoder)
	defer decoderPool.Put(pd)
	pd.buf.Reset()
	pd.addHeader = false
	return pd.buildPlanTree(planString)
}

type planDecoder struct {
	buf              bytes.Buffer
	depths           []int
	indents          [][]rune
	planInfos        []*planInfo
	addHeader        bool
	cacheParentIdent map[int]int
}

type planInfo struct {
	depth  int
	fields []string
}

func (pd *planDecoder) decode(planString string) (string, error) {
	b, err := decompress(planString)
	if err != nil {
		if planString == PlanDiscardedEncoded {
			return planDiscardedDecoded, nil
		}
		return "", err
	}
	return pd.buildPlanTree(string(hack.String(b)))
}

func (pd *planDecoder) buildPlanTree(planString string) (string, error) {
	nodes := strings.Split(planString, lineBreakerStr)
	if len(pd.depths) < len(nodes) {
		pd.depths = make([]int, 0, len(nodes))
		pd.planInfos = make([]*planInfo, 0, len(nodes))
		pd.indents = make([][]rune, 0, len(nodes))
	}
	pd.depths = pd.depths[:0]
	pd.planInfos = pd.planInfos[:0]
	for _, node := range nodes {
		p, err := decodePlanInfo(node)
		if err != nil {
			return "", err
		}
		if p == nil {
			continue
		}
		pd.planInfos = append(pd.planInfos, p)
		pd.depths = append(pd.depths, p.depth)
	}

	if pd.addHeader {
		pd.addPlanHeader()
	}

	// Calculated indentation of plans.
	pd.initPlanTreeIndents()
	pd.cacheParentIdent = make(map[int]int)
	for i := 1; i < len(pd.depths); i++ {
		parentIndex := pd.findParentIndex(i)
		pd.fillIndent(parentIndex, i)
	}

	// Align the value of plan fields.
	pd.alignFields()

	for i, p := range pd.planInfos {
		if i > 0 {
			pd.buf.WriteByte(lineBreaker)
		}
		// This is for alignment.
		pd.buf.WriteByte(separator)
		pd.buf.WriteString(string(pd.indents[i]))
		for j := 0; j < len(p.fields); j++ {
			if j > 0 {
				pd.buf.WriteByte(separator)
			}
			pd.buf.WriteString(p.fields[j])
		}
	}
	return pd.buf.String(), nil
}

func (pd *planDecoder) addPlanHeader() {
	if len(pd.planInfos) == 0 {
		return
	}
	header := &planInfo{
		depth:  0,
		fields: []string{"id", "task", "estRows", "operator info", "actRows", "execution info", "memory", "disk"},
	}
	if len(pd.planInfos[0].fields) < len(header.fields) {
		// plan without runtime information.
		header.fields = header.fields[:len(pd.planInfos[0].fields)]
	}
	planInfos := make([]*planInfo, 0, len(pd.planInfos)+1)
	depths := make([]int, 0, len(pd.planInfos)+1)
	planInfos = append(planInfos, header)
	planInfos = append(planInfos, pd.planInfos...)
	depths = append(depths, header.depth)
	depths = append(depths, pd.depths...)
	pd.planInfos = planInfos
	pd.depths = depths
}

func (pd *planDecoder) initPlanTreeIndents() {
	pd.indents = pd.indents[:0]
	for i := 0; i < len(pd.depths); i++ {
		indent := make([]rune, 2*pd.depths[i])
		pd.indents = append(pd.indents, indent)
		if len(indent) == 0 {
			continue
		}
		for i := 0; i < len(indent)-2; i++ {
			indent[i] = ' '
		}
		indent[len(indent)-2] = texttree.TreeLastNode
		indent[len(indent)-1] = texttree.TreeNodeIdentifier
	}
}

func (pd *planDecoder) findParentIndex(childIndex int) int {
	pd.cacheParentIdent[pd.depths[childIndex]] = childIndex
	parentDepth := pd.depths[childIndex] - 1
	if parentIdx, ok := pd.cacheParentIdent[parentDepth]; ok {
		return parentIdx
	}
	for i := childIndex - 1; i > 0; i-- {
		if pd.depths[i] == parentDepth {
			pd.cacheParentIdent[pd.depths[i]] = i
			return i
		}
	}
	return 0
}

func (pd *planDecoder) fillIndent(parentIndex, childIndex int) {
	depth := pd.depths[childIndex]
	if depth == 0 {
		return
	}
	idx := depth*2 - 2
	for i := childIndex - 1; i > parentIndex; i-- {
		if pd.indents[i][idx] == texttree.TreeLastNode {
			pd.indents[i][idx] = texttree.TreeMiddleNode
			break
		}
		pd.indents[i][idx] = texttree.TreeBody
	}
}

func (pd *planDecoder) alignFields() {
	if len(pd.planInfos) == 0 {
		return
	}
	// Align fields length. Some plan may doesn't have runtime info, need append `` to align with other plan fields.
	maxLen := -1
	for _, p := range pd.planInfos {
		if len(p.fields) > maxLen {
			maxLen = len(p.fields)
		}
	}
	for _, p := range pd.planInfos {
		for len(p.fields) < maxLen {
			p.fields = append(p.fields, "")
		}
	}

	fieldsLen := len(pd.planInfos[0].fields)
	// Last field no need to align.
	fieldsLen--
	var buf []byte
	for colIdx := 0; colIdx < fieldsLen; colIdx++ {
		maxFieldLen := pd.getMaxFieldLength(colIdx)
		for rowIdx, p := range pd.planInfos {
			fillLen := maxFieldLen - pd.getPlanFieldLen(rowIdx, colIdx, p)
			for len(buf) < fillLen {
				buf = append(buf, ' ')
			}
			buf = buf[:fillLen]
			p.fields[colIdx] += string(buf)
		}
	}
}

func (pd *planDecoder) getMaxFieldLength(idx int) int {
	maxLength := -1
	for rowIdx, p := range pd.planInfos {
		l := pd.getPlanFieldLen(rowIdx, idx, p)
		if l > maxLength {
			maxLength = l
		}
	}
	return maxLength
}

func (pd *planDecoder) getPlanFieldLen(rowIdx, colIdx int, p *planInfo) int {
	if colIdx == 0 {
		return len(p.fields[0]) + len(pd.indents[rowIdx])
	}
	return len(p.fields[colIdx])
}

func decodePlanInfo(str string) (*planInfo, error) {
	values := strings.Split(str, separatorStr)
	if len(values) < 2 {
		return nil, nil
	}

	p := &planInfo{
		fields: make([]string, 0, len(values)-1),
	}
	for i, v := range values {
		switch i {
		// depth
		case 0:
			depth, err := strconv.Atoi(v)
			if err != nil {
				return nil, errors.Errorf("decode plan: %v, depth: %v, error: %v", str, v, err)
			}
			p.depth = depth
		// plan ID
		case 1:
			ids := strings.Split(v, idSeparator)
			if len(ids) != 1 && len(ids) != 2 {
				return nil, errors.Errorf("decode plan: %v error, invalid plan id: %v", str, v)
			}
			planID, err := strconv.Atoi(ids[0])
			if err != nil {
				return nil, errors.Errorf("decode plan: %v, plan id: %v, error: %v", str, v, err)
			}
			if len(ids) == 1 {
				p.fields = append(p.fields, PhysicalIDToTypeString(planID))
			} else {
				p.fields = append(p.fields, PhysicalIDToTypeString(planID)+idSeparator+ids[1])
			}
		// task type
		case 2:
			task, err := decodeTaskType(v)
			if err != nil {
				return nil, errors.Errorf("decode plan: %v, task type: %v, error: %v", str, v, err)
			}
			p.fields = append(p.fields, task)
		default:
			p.fields = append(p.fields, v)
		}
	}
	return p, nil
}

// EncodePlanNode is used to encode the plan to a string.
func EncodePlanNode(depth int, pid, planType string, rowCount float64,
	taskTypeInfo, explainInfo, actRows, analyzeInfo, memoryInfo, diskInfo string, buf *bytes.Buffer) {
	explainInfo = escapeString(explainInfo)
	buf.WriteString(strconv.Itoa(depth))
	buf.WriteByte(separator)
	buf.WriteString(encodeID(planType, pid))
	buf.WriteByte(separator)
	buf.WriteString(taskTypeInfo)
	buf.WriteByte(separator)
	if math.Round(rowCount) == rowCount {
		buf.WriteString(strconv.FormatFloat(rowCount, 'f', 0, 64))
	} else {
		buf.WriteString(strconv.FormatFloat(rowCount, 'f', 2, 64))
	}
	buf.WriteByte(separator)
	buf.WriteString(explainInfo)
	// Check whether has runtime info.
	if len(actRows) > 0 || len(analyzeInfo) > 0 || len(memoryInfo) > 0 || len(diskInfo) > 0 {
		buf.WriteByte(separator)
		buf.WriteString(actRows)
		buf.WriteByte(separator)
		buf.WriteString(analyzeInfo)
		buf.WriteByte(separator)
		buf.WriteString(memoryInfo)
		buf.WriteByte(separator)
		buf.WriteString(diskInfo)
	}
	buf.WriteByte(lineBreaker)
}

func escapeString(s string) string {
	s = strings.Replace(s, string([]byte{separator}), "\\t", -1)
	return strings.Replace(s, string([]byte{lineBreaker}), "\\n", -1)
}

// NormalizePlanNode is used to normalize the plan to a string.
func NormalizePlanNode(depth int, planType string, taskTypeInfo string, explainInfo string, buf *bytes.Buffer) {
	buf.WriteString(strconv.Itoa(depth))
	buf.WriteByte(separator)
	planID := TypeStringToPhysicalID(planType)
	buf.WriteString(strconv.Itoa(planID))
	buf.WriteByte(separator)
	buf.WriteString(taskTypeInfo)
	buf.WriteByte(separator)
	buf.WriteString(explainInfo)
	buf.WriteByte(lineBreaker)
}

func encodeID(planType, id string) string {
	planID := TypeStringToPhysicalID(planType)
	return strconv.Itoa(planID) + idSeparator + id
}

// EncodeTaskType is used to encode task type to a string.
func EncodeTaskType(isRoot bool, storeType kv.StoreType) string {
	if isRoot {
		return rootTaskType
	}
	return copTaskType + idSeparator + strconv.Itoa((int)(storeType))
}

// EncodeTaskTypeForNormalize is used to encode task type to a string. Only use for normalize plan.
func EncodeTaskTypeForNormalize(isRoot bool, storeType kv.StoreType) string {
	if isRoot {
		return rootTaskType
	} else if storeType == kv.TiKV {
		return copTaskType
	}
	return copTaskType + idSeparator + strconv.Itoa((int)(storeType))
}

func decodeTaskType(str string) (string, error) {
	segs := strings.Split(str, idSeparator)
	if segs[0] == rootTaskType {
		return "root", nil
	}
	if len(segs) == 1 { // be compatible to `NormalizePlanNode`, which doesn't encode storeType in task field.
		return "cop", nil
	}
	storeType, err := strconv.Atoi(segs[1])
	if err != nil {
		return "", err
	}
	return "cop[" + ((kv.StoreType)(storeType)).Name() + "]", nil
}

// Compress compresses the input with snappy then encodes it with base64.
func Compress(input []byte) string {
	compressBytes := snappy.Encode(nil, input)
	return base64.StdEncoding.EncodeToString(compressBytes)
}

func decompress(str string) ([]byte, error) {
	decodeBytes, err := base64.StdEncoding.DecodeString(str)
	if err != nil {
		return nil, err
	}

	bs, err := snappy.Decode(nil, decodeBytes)
	if err != nil {
		return nil, err
	}
	return bs, nil
}

相关信息

tidb 源码目录

相关文章

tidb binary_plan_decode 源码

tidb id 源码

0  赞