tidb func_count_distinct 源码

  • 2022-09-19
  • 浏览 (473)

tidb func_count_distinct 代码

文件路径:/executor/aggfuncs/func_count_distinct.go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
	"encoding/binary"
	"math"
	"unsafe"

	"github.com/dgryski/go-farm"
	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/expression"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/codec"
	"github.com/pingcap/tidb/util/collate"
	"github.com/pingcap/tidb/util/hack"
	"github.com/pingcap/tidb/util/mathutil"
	"github.com/pingcap/tidb/util/set"
	"github.com/pingcap/tidb/util/stringutil"
)

const (
	// DefPartialResult4CountDistinctIntSize is the size of partialResult4CountDistinctInt
	DefPartialResult4CountDistinctIntSize = int64(unsafe.Sizeof(partialResult4CountDistinctInt{}))
	// DefPartialResult4CountDistinctRealSize is the size of partialResult4CountDistinctReal
	DefPartialResult4CountDistinctRealSize = int64(unsafe.Sizeof(partialResult4CountDistinctReal{}))
	// DefPartialResult4CountDistinctDecimalSize is the size of partialResult4CountDistinctDecimal
	DefPartialResult4CountDistinctDecimalSize = int64(unsafe.Sizeof(partialResult4CountDistinctDecimal{}))
	// DefPartialResult4CountDistinctDurationSize is the size of partialResult4CountDistinctDuration
	DefPartialResult4CountDistinctDurationSize = int64(unsafe.Sizeof(partialResult4CountDistinctDuration{}))
	// DefPartialResult4CountDistinctStringSize is the size of partialResult4CountDistinctString
	DefPartialResult4CountDistinctStringSize = int64(unsafe.Sizeof(partialResult4CountDistinctString{}))
	// DefPartialResult4CountWithDistinctSize is the size of partialResult4CountWithDistinct
	DefPartialResult4CountWithDistinctSize = int64(unsafe.Sizeof(partialResult4CountWithDistinct{}))
	// DefPartialResult4ApproxCountDistinctSize is the size of partialResult4ApproxCountDistinct
	DefPartialResult4ApproxCountDistinctSize = int64(unsafe.Sizeof(partialResult4ApproxCountDistinct{}))
)

type partialResult4CountDistinctInt struct {
	valSet set.Int64SetWithMemoryUsage
}

type countOriginalWithDistinct4Int struct {
	baseCount
}

func (e *countOriginalWithDistinct4Int) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewInt64SetWithMemoryUsage()
	return PartialResult(&partialResult4CountDistinctInt{
		valSet: valSet,
	}), DefPartialResult4CountDistinctIntSize + setSize
}

func (e *countOriginalWithDistinct4Int) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountDistinctInt)(pr)
	p.valSet, _ = set.NewInt64SetWithMemoryUsage()
}

func (e *countOriginalWithDistinct4Int) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountDistinctInt)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct4Int) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountDistinctInt)(pr)

	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalInt(sctx, row)
		if err != nil {
			return memDelta, err
		}
		if isNull {
			continue
		}
		if p.valSet.Exist(input) {
			continue
		}
		memDelta += p.valSet.Insert(input)
	}

	return memDelta, nil
}

type partialResult4CountDistinctReal struct {
	valSet set.Float64SetWithMemoryUsage
}

type countOriginalWithDistinct4Real struct {
	baseCount
}

func (e *countOriginalWithDistinct4Real) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewFloat64SetWithMemoryUsage()
	return PartialResult(&partialResult4CountDistinctReal{
		valSet: valSet,
	}), DefPartialResult4CountDistinctRealSize + setSize
}

func (e *countOriginalWithDistinct4Real) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountDistinctReal)(pr)
	p.valSet, _ = set.NewFloat64SetWithMemoryUsage()
}

func (e *countOriginalWithDistinct4Real) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountDistinctReal)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct4Real) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountDistinctReal)(pr)

	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalReal(sctx, row)
		if err != nil {
			return memDelta, err
		}
		if isNull {
			continue
		}
		if p.valSet.Exist(input) {
			continue
		}
		memDelta += p.valSet.Insert(input)
	}

	return memDelta, nil
}

type partialResult4CountDistinctDecimal struct {
	valSet set.StringSetWithMemoryUsage
}

type countOriginalWithDistinct4Decimal struct {
	baseCount
}

func (e *countOriginalWithDistinct4Decimal) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewStringSetWithMemoryUsage()
	return PartialResult(&partialResult4CountDistinctDecimal{
		valSet: valSet,
	}), DefPartialResult4CountDistinctDecimalSize + setSize
}

func (e *countOriginalWithDistinct4Decimal) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountDistinctDecimal)(pr)
	p.valSet, _ = set.NewStringSetWithMemoryUsage()
}

func (e *countOriginalWithDistinct4Decimal) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountDistinctDecimal)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct4Decimal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountDistinctDecimal)(pr)

	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalDecimal(sctx, row)
		if err != nil {
			return memDelta, err
		}
		if isNull {
			continue
		}
		hash, err := input.ToHashKey()
		if err != nil {
			return memDelta, err
		}
		decStr := string(hack.String(hash))
		if p.valSet.Exist(decStr) {
			continue
		}
		memDelta += p.valSet.Insert(decStr)
		memDelta += int64(len(decStr))
	}

	return memDelta, nil
}

type partialResult4CountDistinctDuration struct {
	valSet set.Int64SetWithMemoryUsage
}

type countOriginalWithDistinct4Duration struct {
	baseCount
}

func (e *countOriginalWithDistinct4Duration) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewInt64SetWithMemoryUsage()
	return PartialResult(&partialResult4CountDistinctDuration{
		valSet: valSet,
	}), DefPartialResult4CountDistinctDurationSize + setSize
}

func (e *countOriginalWithDistinct4Duration) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountDistinctDuration)(pr)
	p.valSet, _ = set.NewInt64SetWithMemoryUsage()
}

func (e *countOriginalWithDistinct4Duration) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountDistinctDuration)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct4Duration) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountDistinctDuration)(pr)

	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalDuration(sctx, row)
		if err != nil {
			return memDelta, err
		}
		if isNull {
			continue
		}

		if p.valSet.Exist(int64(input.Duration)) {
			continue
		}
		memDelta += p.valSet.Insert(int64(input.Duration))
	}

	return memDelta, nil
}

type partialResult4CountDistinctString struct {
	valSet set.StringSetWithMemoryUsage
}

type countOriginalWithDistinct4String struct {
	baseCount
}

func (e *countOriginalWithDistinct4String) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewStringSetWithMemoryUsage()
	return PartialResult(&partialResult4CountDistinctString{
		valSet: valSet,
	}), DefPartialResult4CountDistinctStringSize + setSize
}

func (e *countOriginalWithDistinct4String) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountDistinctString)(pr)
	p.valSet, _ = set.NewStringSetWithMemoryUsage()
}

func (e *countOriginalWithDistinct4String) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountDistinctString)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct4String) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountDistinctString)(pr)
	collator := collate.GetCollator(e.args[0].GetType().GetCollate())

	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalString(sctx, row)
		if err != nil {
			return memDelta, err
		}
		if isNull {
			continue
		}
		input = string(collator.Key(input))

		if p.valSet.Exist(input) {
			continue
		}
		input = stringutil.Copy(input)
		memDelta += p.valSet.Insert(input)
		memDelta += int64(len(input))
	}

	return memDelta, nil
}

type countOriginalWithDistinct struct {
	baseCount
}

type partialResult4CountWithDistinct struct {
	valSet set.StringSetWithMemoryUsage
}

func (e *countOriginalWithDistinct) AllocPartialResult() (pr PartialResult, memDelta int64) {
	valSet, setSize := set.NewStringSetWithMemoryUsage()
	return PartialResult(&partialResult4CountWithDistinct{
		valSet: valSet,
	}), DefPartialResult4CountWithDistinctSize + setSize
}

func (e *countOriginalWithDistinct) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4CountWithDistinct)(pr)
	p.valSet, _ = set.NewStringSetWithMemoryUsage()
}

func (e *countOriginalWithDistinct) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4CountWithDistinct)(pr)
	chk.AppendInt64(e.ordinal, int64(p.valSet.Count()))
	return nil
}

func (e *countOriginalWithDistinct) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4CountWithDistinct)(pr)

	encodedBytes := make([]byte, 0)
	collators := make([]collate.Collator, 0, len(e.args))
	for _, arg := range e.args {
		collators = append(collators, collate.GetCollator(arg.GetType().GetCollate()))
	}
	// decimal struct is the biggest type we will use.
	buf := make([]byte, types.MyDecimalStructSize)

	for _, row := range rowsInGroup {
		var err error
		var hasNull, isNull bool
		encodedBytes = encodedBytes[:0]

		for i := 0; i < len(e.args) && !hasNull; i++ {
			encodedBytes, isNull, err = evalAndEncode(sctx, e.args[i], collators[i], row, buf, encodedBytes)
			if err != nil {
				return memDelta, err
			}
			if isNull {
				hasNull = true
				break
			}
		}
		encodedString := string(encodedBytes)
		if hasNull || p.valSet.Exist(encodedString) {
			continue
		}
		memDelta += p.valSet.Insert(encodedString)
		memDelta += int64(len(encodedString))
	}

	return memDelta, nil
}

// evalAndEncode eval one row with an expression and encode value to bytes.
func evalAndEncode(
	sctx sessionctx.Context, arg expression.Expression, collator collate.Collator,
	row chunk.Row, buf, encodedBytes []byte,
) (_ []byte, isNull bool, err error) {
	switch tp := arg.GetType().EvalType(); tp {
	case types.ETInt:
		var val int64
		val, isNull, err = arg.EvalInt(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = appendInt64(encodedBytes, buf, val)
	case types.ETReal:
		var val float64
		val, isNull, err = arg.EvalReal(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = appendFloat64(encodedBytes, buf, val)
	case types.ETDecimal:
		var val *types.MyDecimal
		val, isNull, err = arg.EvalDecimal(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes, err = appendDecimal(encodedBytes, val)
	case types.ETTimestamp, types.ETDatetime:
		var val types.Time
		val, isNull, err = arg.EvalTime(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = appendTime(encodedBytes, buf, val)
	case types.ETDuration:
		var val types.Duration
		val, isNull, err = arg.EvalDuration(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = appendDuration(encodedBytes, buf, val)
	case types.ETJson:
		var val types.BinaryJSON
		val, isNull, err = arg.EvalJSON(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = appendJSON(encodedBytes, buf, val)
	case types.ETString:
		var val string
		val, isNull, err = arg.EvalString(sctx, row)
		if err != nil || isNull {
			break
		}
		encodedBytes = codec.EncodeCompactBytes(encodedBytes, collator.Key(val))
	default:
		return nil, false, errors.Errorf("unsupported column type for encode %d", tp)
	}
	return encodedBytes, isNull, err
}

func appendInt64(encodedBytes, buf []byte, val int64) []byte {
	*(*int64)(unsafe.Pointer(&buf[0])) = val
	buf = buf[:8]
	encodedBytes = append(encodedBytes, buf...)
	return encodedBytes
}

func appendFloat64(encodedBytes, buf []byte, val float64) []byte {
	*(*float64)(unsafe.Pointer(&buf[0])) = val
	buf = buf[:8]
	encodedBytes = append(encodedBytes, buf...)
	return encodedBytes
}

func appendDecimal(encodedBytes []byte, val *types.MyDecimal) ([]byte, error) {
	hash, err := val.ToHashKey()
	encodedBytes = append(encodedBytes, hash...)
	return encodedBytes, err
}

// WriteTime writes `t` into `buf`.
func WriteTime(buf []byte, t types.Time) {
	binary.BigEndian.PutUint16(buf, uint16(t.Year()))
	buf[2] = uint8(t.Month())
	buf[3] = uint8(t.Day())
	buf[4] = uint8(t.Hour())
	buf[5] = uint8(t.Minute())
	buf[6] = uint8(t.Second())
	binary.BigEndian.PutUint32(buf[8:], uint32(t.Microsecond()))
	buf[12] = t.Type()
	buf[13] = uint8(t.Fsp())

	buf[7], buf[14], buf[15] = uint8(0), uint8(0), uint8(0)
}

func appendTime(encodedBytes, buf []byte, val types.Time) []byte {
	WriteTime(buf, val)
	buf = buf[:16]
	encodedBytes = append(encodedBytes, buf...)
	return encodedBytes
}

func appendDuration(encodedBytes, buf []byte, val types.Duration) []byte {
	*(*types.Duration)(unsafe.Pointer(&buf[0])) = val
	buf = buf[:16]
	encodedBytes = append(encodedBytes, buf...)
	return encodedBytes
}

func appendJSON(encodedBytes, _ []byte, val types.BinaryJSON) []byte {
	encodedBytes = append(encodedBytes, val.TypeCode)
	encodedBytes = append(encodedBytes, val.Value...)
	return encodedBytes
}

func intHash64(x uint64) uint64 {
	x ^= x >> 33
	x *= 0xff51afd7ed558ccd
	x ^= x >> 33
	x *= 0xc4ceb9fe1a85ec53
	x ^= x >> 33
	return x
}

type baseApproxCountDistinct struct {
	baseAggFunc
}

const (
	// The maximum degree of buffer size before the values are discarded
	uniquesHashMaxSizeDegree uint8 = 17
	// The maximum number of elements before the values are discarded
	uniquesHashMaxSize = uint32(1) << (uniquesHashMaxSizeDegree - 1)
	// Initial buffer size degree
	uniquesHashSetInitialSizeDegree uint8 = 4
	// The number of least significant bits used for thinning. The remaining high-order bits are used to determine the position in the hash table.
	uniquesHashBitsForSkip = 32 - uniquesHashMaxSizeDegree
)

type approxCountDistinctHashValue uint32

// partialResult4ApproxCountDistinct use `BJKST` algorithm to compute approximate result of count distinct.
// According to an experimental survey http://www.vldb.org/pvldb/vol11/p499-harmouch.pdf, the error guarantee of BJKST
// was even better than the theoretical lower bounds.
// For the calculation state, it uses a sample of element hash values with a size up to uniquesHashMaxSize. Compared
// with the widely known HyperLogLog algorithm, this algorithm is less effective in terms of accuracy and
// memory consumption (even up to proportionality), but it is adaptive. This means that with fairly high accuracy, it
// consumes less memory during simultaneous computation of cardinality for a large number of data sets whose cardinality
// has power law distribution (i.e. in cases when most of the data sets are small).
// This algorithm is also very accurate for data sets with small cardinality and very efficient on CPU. If number of
// distinct element is more than 2^32, relative error may be high.
type partialResult4ApproxCountDistinct struct {
	size       uint32 // Number of elements.
	sizeDegree uint8  // The size of the table as a power of 2.
	skipDegree uint8  // Skip elements not divisible by 2 ^ skipDegree.
	hasZero    bool   // The hash table contains an element with a hash value of 0.
	buf        []approxCountDistinctHashValue
}

// NewPartialResult4ApproxCountDistinct builds a partial result for agg function ApproxCountDistinct.
func NewPartialResult4ApproxCountDistinct() *partialResult4ApproxCountDistinct {
	p := &partialResult4ApproxCountDistinct{}
	p.reset()
	return p
}

func (p *partialResult4ApproxCountDistinct) InsertHash64(x uint64) {
	// no need to rehash, just cast into uint32
	p.insertHash(approxCountDistinctHashValue(x))
}

func (p *partialResult4ApproxCountDistinct) MemUsage() int64 {
	return int64(len(p.buf)) * DefUint32Size
}

func (p *partialResult4ApproxCountDistinct) alloc(newSizeDegree uint8) {
	p.size = 0
	p.skipDegree = 0
	p.hasZero = false
	p.buf = make([]approxCountDistinctHashValue, uint32(1)<<newSizeDegree)
	p.sizeDegree = newSizeDegree
}

func (p *partialResult4ApproxCountDistinct) reset() {
	p.alloc(uniquesHashSetInitialSizeDegree)
}

func (p *partialResult4ApproxCountDistinct) bufSize() uint32 {
	return uint32(1) << p.sizeDegree
}

func (p *partialResult4ApproxCountDistinct) mask() uint32 {
	return p.bufSize() - 1
}

func (p *partialResult4ApproxCountDistinct) place(x approxCountDistinctHashValue) uint32 {
	return uint32(x>>uniquesHashBitsForSkip) & p.mask()
}

// Increase the size of the buffer 2 times or up to new size degree.
func (p *partialResult4ApproxCountDistinct) resize(newSizeDegree uint8) {
	oldSize := p.bufSize()
	oldBuf := p.buf

	if 0 == newSizeDegree {
		newSizeDegree = p.sizeDegree + 1
	}

	p.buf = make([]approxCountDistinctHashValue, uint32(1)<<newSizeDegree)
	p.sizeDegree = newSizeDegree

	// Move some items to new locations.
	for i := uint32(0); i < oldSize; i++ {
		x := oldBuf[i]
		if x != 0 {
			p.reinsertImpl(x)
		}
	}
}

func (p *partialResult4ApproxCountDistinct) readAndMerge(rb []byte) error {
	rhsSkipDegree := rb[0]
	rb = rb[1:]

	if rhsSkipDegree > p.skipDegree {
		p.skipDegree = rhsSkipDegree
		p.rehash()
	}

	rb, rhsSize, err := codec.DecodeUvarint(rb)

	if err != nil {
		return err
	}

	if rhsSize > uint64(uniquesHashMaxSize) {
		return errors.New("Cannot read partialResult4ApproxCountDistinct: too large size degree")
	}

	if p.bufSize() < uint32(rhsSize) {
		newSizeDegree := mathutil.Max(uniquesHashSetInitialSizeDegree, uint8(math.Log2(float64(rhsSize-1)))+2)
		p.resize(newSizeDegree)
	}

	for i := uint32(0); i < uint32(rhsSize); i++ {
		x := *(*approxCountDistinctHashValue)(unsafe.Pointer(&rb[0]))
		rb = rb[4:]
		p.insertHash(x)
	}

	return err
}

// Correct system errors due to collisions during hashing in uint32.
func (p *partialResult4ApproxCountDistinct) fixedSize() uint64 {
	if 0 == p.skipDegree {
		return uint64(p.size)
	}

	res := uint64(p.size) * (uint64(1) << p.skipDegree)

	// Pseudo-random remainder.
	res += intHash64(uint64(p.size)) & ((uint64(1) << p.skipDegree) - 1)

	// When different elements randomly scattered across 2^32 buckets, filled buckets with average of `res` obtained.
	p32 := uint64(1) << 32
	fixedRes := math.Round(float64(p32) * (math.Log(float64(p32)) - math.Log(float64(p32-res))))
	return uint64(fixedRes)
}

func (p *partialResult4ApproxCountDistinct) insertHash(hashValue approxCountDistinctHashValue) {
	if !p.good(hashValue) {
		return
	}

	p.insertImpl(hashValue)
	p.shrinkIfNeed()
}

// The value is divided by 2 ^ skip_degree
func (p *partialResult4ApproxCountDistinct) good(hash approxCountDistinctHashValue) bool {
	return hash == ((hash >> p.skipDegree) << p.skipDegree)
}

// Insert a value
func (p *partialResult4ApproxCountDistinct) insertImpl(x approxCountDistinctHashValue) {
	if x == 0 {
		if !p.hasZero {
			p.size += 1
		}
		p.hasZero = true
		return
	}

	placeValue := p.place(x)
	for p.buf[placeValue] != 0 && p.buf[placeValue] != x {
		placeValue++
		placeValue &= p.mask()
	}

	if p.buf[placeValue] == x {
		return
	}

	p.buf[placeValue] = x
	p.size++
}

// If the hash table is full enough, then do resize.
// If there are too many items, then throw half the pieces until they are small enough.
func (p *partialResult4ApproxCountDistinct) shrinkIfNeed() {
	if p.size > p.maxFill() {
		if p.size > uniquesHashMaxSize {
			for p.size > uniquesHashMaxSize {
				p.skipDegree++
				p.rehash()
			}
		} else {
			p.resize(0)
		}
	}
}

func (p *partialResult4ApproxCountDistinct) maxFill() uint32 {
	return uint32(1) << (p.sizeDegree - 1)
}

// Delete all values whose hashes do not divide by 2 ^ skip_degree
func (p *partialResult4ApproxCountDistinct) rehash() {
	for i := uint32(0); i < p.bufSize(); i++ {
		if p.buf[i] != 0 && !p.good(p.buf[i]) {
			p.buf[i] = 0
			p.size--
		}
	}

	for i := uint32(0); i < p.bufSize(); i++ {
		if p.buf[i] != 0 && i != p.place(p.buf[i]) {
			x := p.buf[i]
			p.buf[i] = 0
			p.reinsertImpl(x)
		}
	}
}

// Insert a value into the new buffer that was in the old buffer.
// Used when increasing the size of the buffer, as well as when reading from a file.
func (p *partialResult4ApproxCountDistinct) reinsertImpl(x approxCountDistinctHashValue) {
	placeValue := p.place(x)
	for p.buf[placeValue] != 0 {
		placeValue++
		placeValue &= p.mask()
	}

	p.buf[placeValue] = x
}

func (p *partialResult4ApproxCountDistinct) merge(tar *partialResult4ApproxCountDistinct) {
	if tar.skipDegree > p.skipDegree {
		p.skipDegree = tar.skipDegree
		p.rehash()
	}

	if !p.hasZero && tar.hasZero {
		p.hasZero = true
		p.size++
		p.shrinkIfNeed()
	}

	for i := uint32(0); i < tar.bufSize(); i++ {
		if tar.buf[i] != 0 && p.good(tar.buf[i]) {
			p.insertImpl(tar.buf[i])
			p.shrinkIfNeed()
		}
	}
}

func (p *partialResult4ApproxCountDistinct) Serialize() []byte {
	var buf [4]byte
	res := make([]byte, 0, 1+binary.MaxVarintLen64+p.size*4)

	res = append(res, p.skipDegree)
	res = codec.EncodeUvarint(res, uint64(p.size))

	if p.hasZero {
		binary.LittleEndian.PutUint32(buf[:], 0)
		res = append(res, buf[:]...)
	}

	for i := uint32(0); i < p.bufSize(); i++ {
		if p.buf[i] != 0 {
			binary.LittleEndian.PutUint32(buf[:], uint32(p.buf[i]))
			res = append(res, buf[:]...)
		}
	}
	return res
}

func (e *baseApproxCountDistinct) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4ApproxCountDistinct)(pr)
	chk.AppendInt64(e.ordinal, int64(p.fixedSize()))
	return nil
}

func (e *baseApproxCountDistinct) AllocPartialResult() (pr PartialResult, memDelta int64) {
	return (PartialResult)(NewPartialResult4ApproxCountDistinct()), DefPartialResult4ApproxCountDistinctSize
}

func (e *baseApproxCountDistinct) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4ApproxCountDistinct)(pr)
	p.reset()
}

func (e *baseApproxCountDistinct) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) (memDelta int64, err error) {
	p1, p2 := (*partialResult4ApproxCountDistinct)(src), (*partialResult4ApproxCountDistinct)(dst)
	p2.merge(p1)
	return 0, nil
}

type approxCountDistinctOriginal struct {
	baseApproxCountDistinct
}

func (e *approxCountDistinctOriginal) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4ApproxCountDistinct)(pr)
	encodedBytes := make([]byte, 0)
	// decimal struct is the biggest type we will use.
	buf := make([]byte, types.MyDecimalStructSize)
	collators := make([]collate.Collator, 0, len(e.args))
	for _, arg := range e.args {
		collators = append(collators, collate.GetCollator(arg.GetType().GetCollate()))
	}

	for _, row := range rowsInGroup {
		var err error
		var hasNull, isNull bool
		encodedBytes = encodedBytes[:0]

		for i := 0; i < len(e.args) && !hasNull; i++ {
			encodedBytes, isNull, err = evalAndEncode(sctx, e.args[i], collators[i], row, buf, encodedBytes)
			if err != nil {
				return memDelta, err
			}
			if isNull {
				hasNull = true
				break
			}
		}
		if hasNull {
			continue
		}
		oldMemUsage := p.MemUsage()
		x := farm.Hash64(encodedBytes)
		p.InsertHash64(x)
		newMemUsage := p.MemUsage()
		memDelta += newMemUsage - oldMemUsage
	}

	return memDelta, nil
}

type approxCountDistinctPartial1 struct {
	approxCountDistinctOriginal
}

func (e *approxCountDistinctPartial1) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4ApproxCountDistinct)(pr)
	chk.AppendBytes(e.ordinal, p.Serialize())
	return nil
}

type approxCountDistinctPartial2 struct {
	approxCountDistinctPartial1
}

func (e *approxCountDistinctPartial2) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4ApproxCountDistinct)(pr)
	for _, row := range rowsInGroup {
		input, isNull, err := e.args[0].EvalString(sctx, row)
		if err != nil {
			return memDelta, err
		}

		if isNull {
			continue
		}

		oldMemUsage := p.MemUsage()
		err = p.readAndMerge(hack.Slice(input))
		if err != nil {
			return memDelta, err
		}
		newMemUsage := p.MemUsage()
		memDelta += newMemUsage - oldMemUsage
	}
	return memDelta, nil
}

type approxCountDistinctFinal struct {
	approxCountDistinctPartial2
}

func (e *approxCountDistinctFinal) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	return e.baseApproxCountDistinct.AppendFinalResult2Chunk(sctx, pr, chk)
}

相关信息

tidb 源码目录

相关文章

tidb aggfuncs 源码

tidb builder 源码

tidb func_avg 源码

tidb func_bitfuncs 源码

tidb func_count 源码

tidb func_cume_dist 源码

tidb func_first_row 源码

tidb func_group_concat 源码

tidb func_json_arrayagg 源码

tidb func_json_objectagg 源码

0  赞