tidb import_retry 源码

  • 2022-09-19
  • 浏览 (552)

tidb import_retry 代码

文件路径:/br/pkg/restore/import_retry.go

// Copyright 2022 PingCAP, Inc. Licensed under Apache-2.0.

package restore

import (
	"context"
	"time"

	"github.com/pingcap/errors"
	"github.com/pingcap/kvproto/pkg/errorpb"
	"github.com/pingcap/kvproto/pkg/import_sstpb"
	"github.com/pingcap/kvproto/pkg/metapb"
	berrors "github.com/pingcap/tidb/br/pkg/errors"
	"github.com/pingcap/tidb/br/pkg/logutil"
	"github.com/pingcap/tidb/br/pkg/restore/split"
	"github.com/pingcap/tidb/br/pkg/utils"
	"github.com/tikv/client-go/v2/kv"
	"go.uber.org/multierr"
	"google.golang.org/grpc/codes"
	"google.golang.org/grpc/status"
)

type RegionFunc func(ctx context.Context, r *split.RegionInfo) RPCResult

type OverRegionsInRangeController struct {
	start      []byte
	end        []byte
	metaClient split.SplitClient

	errors error
	rs     *utils.RetryState
}

// OverRegionsInRange creates a controller that cloud be used to scan regions in a range and
// apply a function over these regions.
// You can then call the `Run` method for applying some functions.
func OverRegionsInRange(start, end []byte, metaClient split.SplitClient, retryStatus *utils.RetryState) OverRegionsInRangeController {
	// IMPORTANT: we record the start/end key with TimeStamp.
	// but scanRegion will drop the TimeStamp and the end key is exclusive.
	// if we do not use PrefixNextKey. we might scan fewer regions than we expected.
	// and finally cause the data lost.
	end = TruncateTS(end)
	end = kv.PrefixNextKey(end)

	return OverRegionsInRangeController{
		start:      start,
		end:        end,
		metaClient: metaClient,
		rs:         retryStatus,
	}
}

func (o *OverRegionsInRangeController) onError(ctx context.Context, result RPCResult, region *split.RegionInfo) {
	o.errors = multierr.Append(o.errors, errors.Annotatef(&result, "execute over region %v failed", region.Region))
	// TODO: Maybe handle some of region errors like `epoch not match`?
}

func (o *OverRegionsInRangeController) tryFindLeader(ctx context.Context, region *split.RegionInfo) (*metapb.Peer, error) {
	var leader *metapb.Peer
	failed := false
	leaderRs := utils.InitialRetryState(4, 5*time.Second, 10*time.Second)
	err := utils.WithRetry(ctx, func() error {
		r, err := o.metaClient.GetRegionByID(ctx, region.Region.Id)
		if err != nil {
			return err
		}
		if !split.CheckRegionEpoch(r, region) {
			failed = true
			return nil
		}
		if r.Leader != nil {
			leader = r.Leader
			return nil
		}
		return errors.Annotatef(berrors.ErrPDLeaderNotFound, "there is no leader for region %d", region.Region.Id)
	}, &leaderRs)
	if failed {
		return nil, errors.Annotatef(berrors.ErrKVEpochNotMatch, "the current epoch of %s is changed", region)
	}
	if err != nil {
		return nil, err
	}
	return leader, nil
}

// handleInRegionError handles the error happens internal in the region. Update the region info, and perform a suitable backoff.
func (o *OverRegionsInRangeController) handleInRegionError(ctx context.Context, result RPCResult, region *split.RegionInfo) (cont bool) {
	if nl := result.StoreError.GetNotLeader(); nl != nil {
		if nl.Leader != nil {
			region.Leader = nl.Leader
			// try the new leader immediately.
			return true
		}
		// we retry manually, simply record the retry event.
		time.Sleep(o.rs.ExponentialBackoff())
		// There may not be leader, waiting...
		leader, err := o.tryFindLeader(ctx, region)
		if err != nil {
			// Leave the region info unchanged, let it retry then.
			logutil.CL(ctx).Warn("failed to find leader", logutil.Region(region.Region), logutil.ShortError(err))
			return false
		}
		region.Leader = leader
		return true
	}
	// For other errors, like `ServerIsBusy`, `RegionIsNotInitialized`, just trivially backoff.
	time.Sleep(o.rs.ExponentialBackoff())
	return true
}

func (o *OverRegionsInRangeController) prepareLogCtx(ctx context.Context) context.Context {
	lctx := logutil.ContextWithField(
		ctx,
		logutil.Key("startKey", o.start),
		logutil.Key("endKey", o.end),
	)
	return lctx
}

// Run executes the `regionFunc` over the regions in `o.start` and `o.end`.
// It would retry the errors according to the `rpcResponse`.
func (o *OverRegionsInRangeController) Run(ctx context.Context, f RegionFunc) error {
	return o.runOverRegions(o.prepareLogCtx(ctx), f)
}

func (o *OverRegionsInRangeController) runOverRegions(ctx context.Context, f RegionFunc) error {
	if !o.rs.ShouldRetry() {
		return o.errors
	}

	// Scan regions covered by the file range
	regionInfos, errScanRegion := split.PaginateScanRegion(
		ctx, o.metaClient, o.start, o.end, split.ScanRegionPaginationLimit)
	if errScanRegion != nil {
		return errors.Trace(errScanRegion)
	}

	for _, region := range regionInfos {
		cont, err := o.runInRegion(ctx, f, region)
		if err != nil {
			return err
		}
		if !cont {
			return nil
		}
	}
	return nil
}

// runInRegion executes the function in the region, and returns `cont = false` if no need for trying for next region.
func (o *OverRegionsInRangeController) runInRegion(ctx context.Context, f RegionFunc, region *split.RegionInfo) (cont bool, err error) {
	if !o.rs.ShouldRetry() {
		return false, o.errors
	}
	result := f(ctx, region)

	if !result.OK() {
		o.onError(ctx, result, region)
		switch result.StrategyForRetry() {
		case StrategyGiveUp:
			logutil.CL(ctx).Warn("unexpected error, should stop to retry", logutil.ShortError(&result), logutil.Region(region.Region))
			return false, o.errors
		case StrategyFromThisRegion:
			logutil.CL(ctx).Warn("retry for region", logutil.Region(region.Region), logutil.ShortError(&result))
			if !o.handleInRegionError(ctx, result, region) {
				return false, o.runOverRegions(ctx, f)
			}
			return o.runInRegion(ctx, f, region)
		case StrategyFromStart:
			logutil.CL(ctx).Warn("retry for execution over regions", logutil.ShortError(&result))
			// TODO: make a backoffer considering more about the error info,
			//       instead of ingore the result and retry.
			time.Sleep(o.rs.ExponentialBackoff())
			return false, o.runOverRegions(ctx, f)
		}
	}
	return true, nil
}

// RPCResult is the result after executing some RPCs to TiKV.
type RPCResult struct {
	Err error

	ImportError string
	StoreError  *errorpb.Error
}

func RPCResultFromPBError(err *import_sstpb.Error) RPCResult {
	return RPCResult{
		ImportError: err.GetMessage(),
		StoreError:  err.GetStoreError(),
	}
}

func RPCResultFromError(err error) RPCResult {
	return RPCResult{
		Err: err,
	}
}

func RPCResultOK() RPCResult {
	return RPCResult{}
}

type RetryStrategy int

const (
	StrategyGiveUp RetryStrategy = iota
	StrategyFromThisRegion
	StrategyFromStart
)

func (r *RPCResult) StrategyForRetry() RetryStrategy {
	if r.Err != nil {
		return r.StrategyForRetryGoError()
	}
	return r.StrategyForRetryStoreError()
}

func (r *RPCResult) StrategyForRetryStoreError() RetryStrategy {
	if r.StoreError == nil && r.ImportError == "" {
		return StrategyGiveUp
	}

	if r.StoreError.GetServerIsBusy() != nil ||
		r.StoreError.GetRegionNotInitialized() != nil ||
		r.StoreError.GetNotLeader() != nil {
		return StrategyFromThisRegion
	}

	return StrategyFromStart
}

func (r *RPCResult) StrategyForRetryGoError() RetryStrategy {
	if r.Err == nil {
		return StrategyGiveUp
	}

	// we should unwrap the error or we cannot get the write gRPC status.
	if gRPCErr, ok := status.FromError(errors.Cause(r.Err)); ok {
		switch gRPCErr.Code() {
		case codes.Unavailable, codes.Aborted, codes.ResourceExhausted:
			return StrategyFromThisRegion
		}
	}

	return StrategyGiveUp
}

func (r *RPCResult) Error() string {
	if r.Err != nil {
		return r.Err.Error()
	}
	if r.StoreError != nil {
		return r.StoreError.GetMessage()
	}
	if r.ImportError != "" {
		return r.ImportError
	}
	return "BUG(There is no error but reported as error)"
}

func (r *RPCResult) OK() bool {
	return r.Err == nil && r.ImportError == "" && r.StoreError == nil
}

相关信息

tidb 源码目录

相关文章

tidb batcher 源码

tidb client 源码

tidb data 源码

tidb db 源码

tidb import 源码

tidb merge 源码

tidb pipeline_items 源码

tidb range 源码

tidb rawkv_client 源码

tidb search 源码

0  赞