tidb repeatable_read 源码

  • 2022-09-19
  • 浏览 (427)

tidb repeatable_read 代码

文件路径:/sessiontxn/isolation/repeatable_read.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package isolation

import (
	"context"

	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/kv"
	"github.com/pingcap/tidb/parser/ast"
	"github.com/pingcap/tidb/parser/terror"
	plannercore "github.com/pingcap/tidb/planner/core"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/sessionctx/variable"
	"github.com/pingcap/tidb/sessiontxn"
	"github.com/pingcap/tidb/util/logutil"
	tikverr "github.com/tikv/client-go/v2/error"
	"go.uber.org/zap"
)

// PessimisticRRTxnContextProvider provides txn context for isolation level repeatable-read
type PessimisticRRTxnContextProvider struct {
	baseTxnContextProvider

	// Used for ForUpdateRead statement
	forUpdateTS       uint64
	latestForUpdateTS uint64
	// It may decide whether to update forUpdateTs when calling provider's getForUpdateTs
	// See more details in the comments of optimizeWithPlan
	optimizeForNotFetchingLatestTS bool
}

// NewPessimisticRRTxnContextProvider returns a new PessimisticRRTxnContextProvider
func NewPessimisticRRTxnContextProvider(sctx sessionctx.Context, causalConsistencyOnly bool) *PessimisticRRTxnContextProvider {
	provider := &PessimisticRRTxnContextProvider{
		baseTxnContextProvider: baseTxnContextProvider{
			sctx:                  sctx,
			causalConsistencyOnly: causalConsistencyOnly,
			onInitializeTxnCtx: func(txnCtx *variable.TransactionContext) {
				txnCtx.IsPessimistic = true
				txnCtx.Isolation = ast.RepeatableRead
			},
			onTxnActiveFunc: func(txn kv.Transaction, _ sessiontxn.EnterNewTxnType) {
				txn.SetOption(kv.Pessimistic, true)
			},
		},
	}

	provider.getStmtReadTSFunc = provider.getTxnStartTS
	provider.getStmtForUpdateTSFunc = provider.getForUpdateTs

	return provider
}

func (p *PessimisticRRTxnContextProvider) getForUpdateTs() (ts uint64, err error) {
	if p.forUpdateTS != 0 {
		return p.forUpdateTS, nil
	}

	var txn kv.Transaction
	if txn, err = p.ActivateTxn(); err != nil {
		return 0, err
	}

	if p.optimizeForNotFetchingLatestTS {
		p.forUpdateTS = p.sctx.GetSessionVars().TxnCtx.GetForUpdateTS()
		return p.forUpdateTS, nil
	}

	txnCtx := p.sctx.GetSessionVars().TxnCtx
	futureTS := newOracleFuture(p.ctx, p.sctx, txnCtx.TxnScope)

	if ts, err = futureTS.Wait(); err != nil {
		return 0, err
	}

	txnCtx.SetForUpdateTS(ts)
	txn.SetOption(kv.SnapshotTS, ts)

	p.forUpdateTS = ts

	return
}

// updateForUpdateTS acquires the latest TSO and update the TransactionContext and kv.Transaction with it.
func (p *PessimisticRRTxnContextProvider) updateForUpdateTS() (err error) {
	sctx := p.sctx
	var txn kv.Transaction

	if txn, err = sctx.Txn(false); err != nil {
		return err
	}

	if !txn.Valid() {
		return errors.Trace(kv.ErrInvalidTxn)
	}

	failpoint.Inject("RequestTsoFromPD", func() {
		sessiontxn.TsoRequestCountInc(sctx)
	})

	// Because the ForUpdateTS is used for the snapshot for reading data in DML.
	// We can avoid allocating a global TSO here to speed it up by using the local TSO.
	version, err := sctx.GetStore().CurrentVersion(sctx.GetSessionVars().TxnCtx.TxnScope)
	if err != nil {
		return err
	}

	sctx.GetSessionVars().TxnCtx.SetForUpdateTS(version.Ver)
	p.latestForUpdateTS = version.Ver
	txn.SetOption(kv.SnapshotTS, version.Ver)

	return nil
}

// OnStmtStart is the hook that should be called when a new statement started
func (p *PessimisticRRTxnContextProvider) OnStmtStart(ctx context.Context, node ast.StmtNode) error {
	if err := p.baseTxnContextProvider.OnStmtStart(ctx, node); err != nil {
		return err
	}

	p.forUpdateTS = 0
	p.optimizeForNotFetchingLatestTS = false

	return nil
}

// OnStmtRetry is the hook that should be called when a statement is retried internally.
func (p *PessimisticRRTxnContextProvider) OnStmtRetry(ctx context.Context) (err error) {
	if err = p.baseTxnContextProvider.OnStmtRetry(ctx); err != nil {
		return err
	}

	// If TxnCtx.forUpdateTS is updated in OnStmtErrorForNextAction, we assign the value to the provider
	if p.latestForUpdateTS > p.forUpdateTS {
		p.forUpdateTS = p.latestForUpdateTS
	} else {
		p.forUpdateTS = 0
	}

	p.optimizeForNotFetchingLatestTS = false

	return nil
}

// OnStmtErrorForNextAction is the hook that should be called when a new statement get an error
func (p *PessimisticRRTxnContextProvider) OnStmtErrorForNextAction(point sessiontxn.StmtErrorHandlePoint, err error) (sessiontxn.StmtErrorAction, error) {
	switch point {
	case sessiontxn.StmtErrAfterPessimisticLock:
		return p.handleAfterPessimisticLockError(err)
	default:
		return sessiontxn.NoIdea()
	}
}

// AdviseOptimizeWithPlan optimizes for update point get related execution.
// Use case: In for update point get related operations, we do not fetch ts from PD but use the last ts we fetched.
//
//	We expect that the data that the point get acquires has not been changed.
//
// Benefit: Save the cost of acquiring ts from PD.
// Drawbacks: If the data has been changed since the ts we used, we need to retry.
// One exception is insert operation, when it has no select plan, we do not fetch the latest ts immediately. We only update ts
// if write conflict is incurred.
func (p *PessimisticRRTxnContextProvider) AdviseOptimizeWithPlan(val interface{}) (err error) {
	if p.isTidbSnapshotEnabled() || p.isBeginStmtWithStaleRead() {
		return nil
	}

	plan, ok := val.(plannercore.Plan)
	if !ok {
		return nil
	}

	if execute, ok := plan.(*plannercore.Execute); ok {
		plan = execute.Plan
	}

	p.optimizeForNotFetchingLatestTS = notNeedGetLatestTSFromPD(plan, false)

	return nil
}

// notNeedGetLatestTSFromPD searches for optimization condition recursively
// Note: For point get and batch point get (name it plan), if one of the ancestor node is update/delete/physicalLock,
// we should check whether the plan.Lock is true or false. See comments in needNotToBeOptimized.
// inLockOrWriteStmt = true means one of the ancestor node is update/delete/physicalLock.
func notNeedGetLatestTSFromPD(plan plannercore.Plan, inLockOrWriteStmt bool) bool {
	switch v := plan.(type) {
	case *plannercore.PointGetPlan:
		// We do not optimize the point get/ batch point get if plan.lock = false and inLockOrWriteStmt = true.
		// Theoretically, the plan.lock should be true if the flag is true. But due to the bug describing in Issue35524,
		// the plan.lock can be false in the case of inLockOrWriteStmt being true. In this case, optimization here can lead to different results
		// which cannot be accepted as AdviseOptimizeWithPlan cannot change results.
		return !inLockOrWriteStmt || v.Lock
	case *plannercore.BatchPointGetPlan:
		return !inLockOrWriteStmt || v.Lock
	case plannercore.PhysicalPlan:
		if len(v.Children()) == 0 {
			return false
		}
		_, isPhysicalLock := v.(*plannercore.PhysicalLock)
		for _, p := range v.Children() {
			if !notNeedGetLatestTSFromPD(p, isPhysicalLock || inLockOrWriteStmt) {
				return false
			}
		}
		return true
	case *plannercore.Update:
		return notNeedGetLatestTSFromPD(v.SelectPlan, true)
	case *plannercore.Delete:
		return notNeedGetLatestTSFromPD(v.SelectPlan, true)
	case *plannercore.Insert:
		return v.SelectPlan == nil
	}
	return false
}

func (p *PessimisticRRTxnContextProvider) handleAfterPessimisticLockError(lockErr error) (sessiontxn.StmtErrorAction, error) {
	sessVars := p.sctx.GetSessionVars()
	txnCtx := sessVars.TxnCtx

	if deadlock, ok := errors.Cause(lockErr).(*tikverr.ErrDeadlock); ok {
		if !deadlock.IsRetryable {
			return sessiontxn.ErrorAction(lockErr)
		}

		logutil.Logger(p.ctx).Info("single statement deadlock, retry statement",
			zap.Uint64("txn", txnCtx.StartTS),
			zap.Uint64("lockTS", deadlock.LockTs),
			zap.Stringer("lockKey", kv.Key(deadlock.LockKey)),
			zap.Uint64("deadlockKeyHash", deadlock.DeadlockKeyHash))
	} else if terror.ErrorEqual(kv.ErrWriteConflict, lockErr) {
		// Always update forUpdateTS by getting a new timestamp from PD.
		// If we use the conflict commitTS as the new forUpdateTS and async commit
		// is used, the commitTS of this transaction may exceed the max timestamp
		// that PD allocates. Then, the change may be invisible to a new transaction,
		// which means linearizability is broken.
		errStr := lockErr.Error()
		forUpdateTS := txnCtx.GetForUpdateTS()

		logutil.Logger(p.ctx).Debug("pessimistic write conflict, retry statement",
			zap.Uint64("txn", txnCtx.StartTS),
			zap.Uint64("forUpdateTS", forUpdateTS),
			zap.String("err", errStr))
	} else {
		// This branch: if err is not nil, always update forUpdateTS to avoid problem described below.
		// For nowait, when ErrLock happened, ErrLockAcquireFailAndNoWaitSet will be returned, and in the same txn
		// the select for updateTs must be updated, otherwise there maybe rollback problem.
		// 		   begin
		// 		   select for update key1 (here encounters ErrLocked or other errors (or max_execution_time like util),
		//         					     key1 lock has not gotten and async rollback key1 is raised)
		//         select for update key1 again (this time lock is acquired successfully (maybe lock was released by others))
		//         the async rollback operation rollbacks the lock just acquired
		if err := p.updateForUpdateTS(); err != nil {
			logutil.Logger(p.ctx).Warn("UpdateForUpdateTS failed", zap.Error(err))
		}

		return sessiontxn.ErrorAction(lockErr)
	}

	if err := p.updateForUpdateTS(); err != nil {
		return sessiontxn.ErrorAction(lockErr)
	}

	return sessiontxn.RetryReady()
}

相关信息

tidb 源码目录

相关文章

tidb base 源码

tidb optimistic 源码

tidb readcommitted 源码

tidb serializable 源码

0  赞