tidb parallel_apply 源码

  • 2022-09-19
  • 浏览 (520)

tidb parallel_apply 代码

文件路径:/executor/parallel_apply.go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package executor

import (
	"context"
	"runtime/trace"
	"sync"
	"sync/atomic"

	"github.com/pingcap/errors"
	"github.com/pingcap/failpoint"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/parser/terror"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/codec"
	"github.com/pingcap/tidb/util/execdetails"
	"github.com/pingcap/tidb/util/logutil"
	"github.com/pingcap/tidb/util/memory"
	"go.uber.org/zap"
)

type result struct {
	chk *chunk.Chunk
	err error
}

type outerRow struct {
	row      *chunk.Row
	selected bool // if this row is selected by the outer side
}

// ParallelNestedLoopApplyExec is the executor for apply.
type ParallelNestedLoopApplyExec struct {
	baseExecutor

	// outer-side fields
	outerExec   Executor
	outerFilter expression.CNFExprs
	outerList   *chunk.List
	outer       bool

	// inner-side fields
	// use slices since the inner side is paralleled
	corCols       [][]*expression.CorrelatedColumn
	innerFilter   []expression.CNFExprs
	innerExecs    []Executor
	innerList     []*chunk.List
	innerChunk    []*chunk.Chunk
	innerSelected [][]bool
	innerIter     []chunk.Iterator
	outerRow      []*chunk.Row
	hasMatch      []bool
	hasNull       []bool
	joiners       []joiner

	// fields about concurrency control
	concurrency int
	started     uint32
	drained     uint32 // drained == true indicates there is no more data
	freeChkCh   chan *chunk.Chunk
	resultChkCh chan result
	outerRowCh  chan outerRow
	exit        chan struct{}
	workerWg    sync.WaitGroup
	notifyWg    sync.WaitGroup

	// fields about cache
	cache              *applyCache
	useCache           bool
	cacheHitCounter    int64
	cacheAccessCounter int64

	memTracker *memory.Tracker // track memory usage.
}

// Open implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Open(ctx context.Context) error {
	err := e.outerExec.Open(ctx)
	if err != nil {
		return err
	}
	e.memTracker = memory.NewTracker(e.id, -1)
	e.memTracker.AttachTo(e.ctx.GetSessionVars().StmtCtx.MemTracker)

	e.outerList = chunk.NewList(retTypes(e.outerExec), e.initCap, e.maxChunkSize)
	e.outerList.GetMemTracker().SetLabel(memory.LabelForOuterList)
	e.outerList.GetMemTracker().AttachTo(e.memTracker)

	e.innerList = make([]*chunk.List, e.concurrency)
	e.innerChunk = make([]*chunk.Chunk, e.concurrency)
	e.innerSelected = make([][]bool, e.concurrency)
	e.innerIter = make([]chunk.Iterator, e.concurrency)
	e.outerRow = make([]*chunk.Row, e.concurrency)
	e.hasMatch = make([]bool, e.concurrency)
	e.hasNull = make([]bool, e.concurrency)
	for i := 0; i < e.concurrency; i++ {
		e.innerChunk[i] = newFirstChunk(e.innerExecs[i])
		e.innerList[i] = chunk.NewList(retTypes(e.innerExecs[i]), e.initCap, e.maxChunkSize)
		e.innerList[i].GetMemTracker().SetLabel(memory.LabelForInnerList)
		e.innerList[i].GetMemTracker().AttachTo(e.memTracker)
	}

	e.freeChkCh = make(chan *chunk.Chunk, e.concurrency)
	e.resultChkCh = make(chan result, e.concurrency+1) // innerWorkers + outerWorker
	e.outerRowCh = make(chan outerRow)
	e.exit = make(chan struct{})
	for i := 0; i < e.concurrency; i++ {
		e.freeChkCh <- newFirstChunk(e)
	}

	if e.useCache {
		if e.cache, err = newApplyCache(e.ctx); err != nil {
			return err
		}
		e.cache.GetMemTracker().AttachTo(e.memTracker)
	}
	return nil
}

// Next implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Next(ctx context.Context, req *chunk.Chunk) (err error) {
	if atomic.LoadUint32(&e.drained) == 1 {
		req.Reset()
		return nil
	}

	if atomic.CompareAndSwapUint32(&e.started, 0, 1) {
		e.workerWg.Add(1)
		go e.outerWorker(ctx)
		for i := 0; i < e.concurrency; i++ {
			e.workerWg.Add(1)
			workID := i
			go e.innerWorker(ctx, workID)
		}
		e.notifyWg.Add(1)
		go e.notifyWorker(ctx)
	}
	result := <-e.resultChkCh
	if result.err != nil {
		return result.err
	}
	if result.chk == nil { // no more data
		req.Reset()
		atomic.StoreUint32(&e.drained, 1)
		return nil
	}
	req.SwapColumns(result.chk)
	e.freeChkCh <- result.chk
	return nil
}

// Close implements the Executor interface.
func (e *ParallelNestedLoopApplyExec) Close() error {
	e.memTracker = nil
	if atomic.LoadUint32(&e.started) == 1 {
		close(e.exit)
		e.notifyWg.Wait()
		e.started = 0
	}
	// Wait all workers to finish before Close() is called.
	// Otherwise we may got data race.
	err := e.outerExec.Close()

	if e.runtimeStats != nil {
		runtimeStats := newJoinRuntimeStats()
		e.ctx.GetSessionVars().StmtCtx.RuntimeStatsColl.RegisterStats(e.id, runtimeStats)
		if e.useCache {
			var hitRatio float64
			if e.cacheAccessCounter > 0 {
				hitRatio = float64(e.cacheHitCounter) / float64(e.cacheAccessCounter)
			}
			runtimeStats.setCacheInfo(true, hitRatio)
		} else {
			runtimeStats.setCacheInfo(false, 0)
		}
		runtimeStats.SetConcurrencyInfo(execdetails.NewConcurrencyInfo("Concurrency", e.concurrency))
	}
	return err
}

// notifyWorker waits for all inner/outer-workers finishing and then put an empty
// chunk into the resultCh to notify the upper executor there is no more data.
func (e *ParallelNestedLoopApplyExec) notifyWorker(ctx context.Context) {
	defer e.handleWorkerPanic(ctx, &e.notifyWg)
	e.workerWg.Wait()
	e.putResult(nil, nil)
}

func (e *ParallelNestedLoopApplyExec) outerWorker(ctx context.Context) {
	defer trace.StartRegion(ctx, "ParallelApplyOuterWorker").End()
	defer e.handleWorkerPanic(ctx, &e.workerWg)
	var selected []bool
	var err error
	for {
		failpoint.Inject("parallelApplyOuterWorkerPanic", nil)
		chk := newFirstChunk(e.outerExec)
		if err := Next(ctx, e.outerExec, chk); err != nil {
			e.putResult(nil, err)
			return
		}
		if chk.NumRows() == 0 {
			close(e.outerRowCh)
			return
		}
		e.outerList.Add(chk)
		outerIter := chunk.NewIterator4Chunk(chk)
		selected, err = expression.VectorizedFilter(e.ctx, e.outerFilter, outerIter, selected)
		if err != nil {
			e.putResult(nil, err)
			return
		}
		for i := 0; i < chk.NumRows(); i++ {
			row := chk.GetRow(i)
			select {
			case e.outerRowCh <- outerRow{&row, selected[i]}:
			case <-e.exit:
				return
			}
		}
	}
}

func (e *ParallelNestedLoopApplyExec) innerWorker(ctx context.Context, id int) {
	defer trace.StartRegion(ctx, "ParallelApplyInnerWorker").End()
	defer e.handleWorkerPanic(ctx, &e.workerWg)
	for {
		var chk *chunk.Chunk
		select {
		case chk = <-e.freeChkCh:
		case <-e.exit:
			return
		}
		failpoint.Inject("parallelApplyInnerWorkerPanic", nil)
		err := e.fillInnerChunk(ctx, id, chk)
		if err == nil && chk.NumRows() == 0 { // no more data, this goroutine can exit
			return
		}
		if e.putResult(chk, err) {
			return
		}
	}
}

func (e *ParallelNestedLoopApplyExec) putResult(chk *chunk.Chunk, err error) (exit bool) {
	select {
	case e.resultChkCh <- result{chk, err}:
		return false
	case <-e.exit:
		return true
	}
}

func (e *ParallelNestedLoopApplyExec) handleWorkerPanic(ctx context.Context, wg *sync.WaitGroup) {
	if r := recover(); r != nil {
		err := errors.Errorf("%v", r)
		logutil.Logger(ctx).Error("parallel nested loop join worker panicked", zap.Error(err), zap.Stack("stack"))
		e.resultChkCh <- result{nil, err}
	}
	if wg != nil {
		wg.Done()
	}
}

// fetchAllInners reads all data from the inner table and stores them in a List.
func (e *ParallelNestedLoopApplyExec) fetchAllInners(ctx context.Context, id int) (err error) {
	var key []byte
	for _, col := range e.corCols[id] {
		*col.Data = e.outerRow[id].GetDatum(col.Index, col.RetType)
		if e.useCache {
			if key, err = codec.EncodeKey(e.ctx.GetSessionVars().StmtCtx, key, *col.Data); err != nil {
				return err
			}
		}
	}
	if e.useCache { // look up the cache
		atomic.AddInt64(&e.cacheAccessCounter, 1)
		failpoint.Inject("parallelApplyGetCachePanic", nil)
		value, err := e.cache.Get(key)
		if err != nil {
			return err
		}
		if value != nil {
			e.innerList[id] = value
			atomic.AddInt64(&e.cacheHitCounter, 1)
			return nil
		}
	}

	err = e.innerExecs[id].Open(ctx)
	defer terror.Call(e.innerExecs[id].Close)
	if err != nil {
		return err
	}

	if e.useCache {
		// create a new one in this case since it may be in the cache
		e.innerList[id] = chunk.NewList(retTypes(e.innerExecs[id]), e.initCap, e.maxChunkSize)
	} else {
		e.innerList[id].Reset()
	}

	innerIter := chunk.NewIterator4Chunk(e.innerChunk[id])
	for {
		err := Next(ctx, e.innerExecs[id], e.innerChunk[id])
		if err != nil {
			return err
		}
		if e.innerChunk[id].NumRows() == 0 {
			break
		}

		e.innerSelected[id], err = expression.VectorizedFilter(e.ctx, e.innerFilter[id], innerIter, e.innerSelected[id])
		if err != nil {
			return err
		}
		for row := innerIter.Begin(); row != innerIter.End(); row = innerIter.Next() {
			if e.innerSelected[id][row.Idx()] {
				e.innerList[id].AppendRow(row)
			}
		}
	}

	if e.useCache { // update the cache
		failpoint.Inject("parallelApplySetCachePanic", nil)
		if _, err := e.cache.Set(key, e.innerList[id]); err != nil {
			return err
		}
	}
	return nil
}

func (e *ParallelNestedLoopApplyExec) fetchNextOuterRow(id int, req *chunk.Chunk) (row *chunk.Row, exit bool) {
	for {
		select {
		case outerRow, ok := <-e.outerRowCh:
			if !ok { // no more data
				return nil, false
			}
			if !outerRow.selected {
				if e.outer {
					e.joiners[id].onMissMatch(false, *outerRow.row, req)
					if req.IsFull() {
						return nil, false
					}
				}
				continue // try the next outer row
			}
			return outerRow.row, false
		case <-e.exit:
			return nil, true
		}
	}
}

func (e *ParallelNestedLoopApplyExec) fillInnerChunk(ctx context.Context, id int, req *chunk.Chunk) (err error) {
	req.Reset()
	for {
		if e.innerIter[id] == nil || e.innerIter[id].Current() == e.innerIter[id].End() {
			if e.outerRow[id] != nil && !e.hasMatch[id] {
				e.joiners[id].onMissMatch(e.hasNull[id], *e.outerRow[id], req)
			}
			var exit bool
			e.outerRow[id], exit = e.fetchNextOuterRow(id, req)
			if exit || req.IsFull() || e.outerRow[id] == nil {
				return nil
			}

			e.hasMatch[id] = false
			e.hasNull[id] = false

			err = e.fetchAllInners(ctx, id)
			if err != nil {
				return err
			}
			e.innerIter[id] = chunk.NewIterator4List(e.innerList[id])
			e.innerIter[id].Begin()
		}

		matched, isNull, err := e.joiners[id].tryToMatchInners(*e.outerRow[id], e.innerIter[id], req)
		e.hasMatch[id] = e.hasMatch[id] || matched
		e.hasNull[id] = e.hasNull[id] || isNull

		if err != nil || req.IsFull() {
			return err
		}
	}
}

相关信息

tidb 源码目录

相关文章

tidb adapter 源码

tidb admin 源码

tidb admin_plugins 源码

tidb admin_telemetry 源码

tidb aggregate 源码

tidb analyze 源码

tidb analyze_col 源码

tidb analyze_col_v2 源码

tidb analyze_fast 源码

tidb analyze_global_stats 源码

0  赞