tidb reporter 源码

  • 2022-09-19
  • 浏览 (376)

tidb reporter 代码

文件路径:/util/logutil/consistency/reporter.go

// Copyright 2021 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package consistency

import (
	"context"
	"encoding/hex"
	"encoding/json"
	"fmt"
	"strconv"
	"strings"
	"time"

	"github.com/pingcap/kvproto/pkg/kvrpcpb"
	"github.com/pingcap/tidb/errno"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/model"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/store/helper"
	"github.com/pingcap/tidb/tablecodec"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util/dbterror"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/tikv/client-go/v2/tikv"
	"go.uber.org/zap"
)

var (
	// ErrAdminCheckInconsistent returns for data inconsistency for admin check.
	ErrAdminCheckInconsistent = dbterror.ClassAdmin.NewStd(errno.ErrDataInconsistent)
	// ErrLookupInconsistent returns for data inconsistency for index lookup.
	ErrLookupInconsistent = dbterror.ClassExecutor.NewStd(errno.ErrDataInconsistentMismatchCount)
	// ErrAdminCheckInconsistentWithColInfo returns for data inconsistency for admin check but with column info.
	ErrAdminCheckInconsistentWithColInfo = dbterror.ClassExecutor.NewStd(errno.ErrDataInconsistentMismatchIndex)
)

// GetMvccByKey gets the MVCC value by key, and returns a json string including decoded data
func GetMvccByKey(tikvStore helper.Storage, key kv.Key, decodeMvccFn func(kv.Key, *kvrpcpb.MvccGetByKeyResponse, map[string]interface{})) string {
	if key == nil {
		return ""
	}
	h := helper.NewHelper(tikvStore)
	data, err := h.GetMvccByEncodedKey(key)
	if err != nil {
		return ""
	}
	regionID := getRegionIDByKey(tikvStore, key)

	decodeKey := strings.ToUpper(hex.EncodeToString(key))

	resp := map[string]interface{}{
		"key":      decodeKey,
		"regionID": regionID,
		"mvcc":     data,
	}

	if decodeMvccFn != nil {
		decodeMvccFn(key, data, resp)
	}

	rj, err := json.Marshal(resp)
	if err != nil {
		return ""
	}
	const maxMvccInfoLen = 5000
	s := string(rj)
	if len(s) > maxMvccInfoLen {
		s = s[:maxMvccInfoLen] + "[truncated]..."
	}

	return s
}

func getRegionIDByKey(tikvStore helper.Storage, encodedKey []byte) uint64 {
	keyLocation, err := tikvStore.GetRegionCache().LocateKey(tikv.NewBackofferWithVars(context.Background(), 500, nil), encodedKey)
	if err != nil {
		return 0
	}
	return keyLocation.Region.GetID()
}

// Reporter is a helper to generate report.
type Reporter struct {
	HandleEncode func(handle kv.Handle) kv.Key
	IndexEncode  func(idxRow *RecordData) kv.Key
	Tbl          *model.TableInfo
	Idx          *model.IndexInfo
	Sctx         sessionctx.Context
}

// DecodeRowMvccData creates a closure that captures the tableInfo to be used a decode function in GetMvccByKey.
func DecodeRowMvccData(tableInfo *model.TableInfo) func(kv.Key, *kvrpcpb.MvccGetByKeyResponse, map[string]interface{}) {
	return func(key kv.Key, respValue *kvrpcpb.MvccGetByKeyResponse, outMap map[string]interface{}) {
		colMap := make(map[int64]*types.FieldType, 3)
		for _, col := range tableInfo.Columns {
			var fieldType = col.FieldType
			colMap[col.ID] = &fieldType
		}

		if respValue.Info != nil {
			var err error
			datas := make(map[string]map[string]string)
			for _, w := range respValue.Info.Writes {
				if len(w.ShortValue) > 0 {
					datas[strconv.FormatUint(w.StartTs, 10)], err = decodeMvccRecordValue(w.ShortValue, colMap, tableInfo)
				}
			}

			for _, v := range respValue.Info.Values {
				if len(v.Value) > 0 {
					datas[strconv.FormatUint(v.StartTs, 10)], err = decodeMvccRecordValue(v.Value, colMap, tableInfo)
				}
			}
			if len(datas) > 0 {
				outMap["decoded"] = datas
				if err != nil {
					outMap["decode_error"] = err.Error()
				}
			}
		}
	}
}

// DecodeIndexMvccData creates a closure that captures the indexInfo to be used a decode function in GetMvccByKey.
func DecodeIndexMvccData(indexInfo *model.IndexInfo) func(kv.Key, *kvrpcpb.MvccGetByKeyResponse, map[string]interface{}) {
	return func(key kv.Key, respValue *kvrpcpb.MvccGetByKeyResponse, outMap map[string]interface{}) {
		if respValue.Info != nil {
			var (
				hd    kv.Handle
				err   error
				datas = make(map[string]map[string]string)
			)
			for _, w := range respValue.Info.Writes {
				if len(w.ShortValue) > 0 {
					hd, err = tablecodec.DecodeIndexHandle(key, w.ShortValue, len(indexInfo.Columns))
					if err == nil {
						datas[strconv.FormatUint(w.StartTs, 10)] = map[string]string{"handle": hd.String()}
					}
				}
			}
			for _, v := range respValue.Info.Values {
				if len(v.Value) > 0 {
					hd, err = tablecodec.DecodeIndexHandle(key, v.Value, len(indexInfo.Columns))
					if err == nil {
						datas[strconv.FormatUint(v.StartTs, 10)] = map[string]string{"handle": hd.String()}
					}
				}
			}
			if len(datas) > 0 {
				outMap["decoded"] = datas
				if err != nil {
					outMap["decode_error"] = err.Error()
				}
			}
		}
	}
}

func decodeMvccRecordValue(bs []byte, colMap map[int64]*types.FieldType, tb *model.TableInfo) (map[string]string, error) {
	rs, err := tablecodec.DecodeRowToDatumMap(bs, colMap, time.UTC)
	record := make(map[string]string, len(tb.Columns))
	for _, col := range tb.Columns {
		if c, ok := rs[col.ID]; ok {
			data := "nil"
			if !c.IsNull() {
				data, err = c.ToString()
			}
			record[col.Name.O] = data
		}
	}
	return record, err
}

// ReportLookupInconsistent reports inconsistent when index rows is more than record rows.
func (r *Reporter) ReportLookupInconsistent(ctx context.Context, idxCnt, tblCnt int, missHd, fullHd []kv.Handle, missRowIdx []RecordData) error {
	if r.Sctx.GetSessionVars().EnableRedactLog {
		logutil.Logger(ctx).Error("indexLookup found data inconsistency",
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index_name", r.Idx.Name.O),
			zap.Int("index_cnt", idxCnt),
			zap.Int("table_cnt", tblCnt),
			zap.Stack("stack"))
	} else {
		const maxFullHandleCnt = 50
		displayFullHdCnt := len(fullHd)
		if displayFullHdCnt > maxFullHandleCnt {
			displayFullHdCnt = maxFullHandleCnt
		}
		fs := []zap.Field{
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index_name", r.Idx.Name.O),
			zap.Int("index_cnt", idxCnt), zap.Int("table_cnt", tblCnt),
			zap.String("missing_handles", fmt.Sprint(missHd)),
			zap.String("total_handles", fmt.Sprint(fullHd[:displayFullHdCnt])),
		}
		store, ok := r.Sctx.GetStore().(helper.Storage)
		if ok {
			for i, hd := range missHd {
				fs = append(fs, zap.String("row_mvcc_"+strconv.Itoa(i), GetMvccByKey(store, r.HandleEncode(hd), DecodeRowMvccData(r.Tbl))))
			}
			for i := range missRowIdx {
				fs = append(fs, zap.String("index_mvcc_"+strconv.Itoa(i), GetMvccByKey(store, r.IndexEncode(&missRowIdx[i]), DecodeIndexMvccData(r.Idx))))
			}
		}

		logutil.Logger(ctx).Error("indexLookup found data inconsistency", fs...)
	}
	return ErrLookupInconsistent.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, idxCnt, tblCnt)
}

// ReportAdminCheckInconsistentWithColInfo reports inconsistent when the value of index row is different from record row.
func (r *Reporter) ReportAdminCheckInconsistentWithColInfo(ctx context.Context, handle kv.Handle, colName string, idxDat, tblDat fmt.Stringer, err error, idxRow *RecordData) error {
	if r.Sctx.GetSessionVars().EnableRedactLog {
		logutil.Logger(ctx).Error("admin check found data inconsistency",
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index", r.Idx.Name.O),
			zap.String("col", colName),
			zap.Error(err),
			zap.Stack("stack"),
		)
	} else {
		fs := []zap.Field{
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index_name", r.Idx.Name.O),
			zap.String("col", colName),
			zap.Stringer("row_id", handle),
			zap.Stringer("idxDatum", idxDat),
			zap.Stringer("rowDatum", tblDat),
		}
		store, ok := r.Sctx.GetStore().(helper.Storage)
		if ok {
			fs = append(fs, zap.String("row_mvcc", GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl))))
			fs = append(fs, zap.String("index_mvcc", GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx))))
		}
		fs = append(fs, zap.Error(err))
		fs = append(fs, zap.Stack("stack"))
		logutil.Logger(ctx).Error("admin check found data inconsistency", fs...)
	}
	return ErrAdminCheckInconsistentWithColInfo.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, colName, fmt.Sprint(handle), fmt.Sprint(idxDat), fmt.Sprint(tblDat), err)
}

// RecordData is the record data composed of a handle and values.
type RecordData struct {
	Handle kv.Handle
	Values []types.Datum
}

func (r *RecordData) String() string {
	if r == nil {
		return ""
	}
	return fmt.Sprintf("handle: %s, values: %s", fmt.Sprint(r.Handle), fmt.Sprint(r.Values))
}

// ReportAdminCheckInconsistent reports inconsistent when single index row not found in record rows.
func (r *Reporter) ReportAdminCheckInconsistent(ctx context.Context, handle kv.Handle, idxRow, tblRow *RecordData) error {
	if r.Sctx.GetSessionVars().EnableRedactLog {
		logutil.Logger(ctx).Error("admin check found data inconsistency",
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index", r.Idx.Name.O),
			zap.Stack("stack"),
		)
	} else {
		fs := []zap.Field{
			zap.String("table_name", r.Tbl.Name.O),
			zap.String("index_name", r.Idx.Name.O),
			zap.Stringer("row_id", handle),
			zap.Stringer("index", idxRow),
			zap.Stringer("row", tblRow),
		}
		store, ok := r.Sctx.GetStore().(helper.Storage)
		if ok {
			fs = append(fs, zap.String("row_mvcc", GetMvccByKey(store, r.HandleEncode(handle), DecodeRowMvccData(r.Tbl))))
			if idxRow != nil {
				fs = append(fs, zap.String("index_mvcc", GetMvccByKey(store, r.IndexEncode(idxRow), DecodeIndexMvccData(r.Idx))))
			}
		}
		fs = append(fs, zap.Stack("stack"))
		logutil.Logger(ctx).Error("admin check found data inconsistency", fs...)
	}
	return ErrAdminCheckInconsistent.GenWithStackByArgs(r.Tbl.Name.O, r.Idx.Name.O, fmt.Sprint(handle), fmt.Sprint(idxRow), fmt.Sprint(tblRow))
}

相关信息

tidb 源码目录

相关文章

tidb bind_cache 源码

tidb bind_record 源码

tidb handle 源码

tidb session_handle 源码

tidb stat 源码

tidb backup 源码

tidb cmd 源码

tidb debug 源码

tidb main 源码

tidb restore 源码

0  赞