tidb func_json_objectagg 源码

  • 2022-09-19
  • 浏览 (548)

tidb func_json_objectagg 代码

文件路径:/executor/aggfuncs/func_json_objectagg.go

// Copyright 2020 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package aggfuncs

import (
	"strings"
	"unsafe"

	"github.com/pingcap/errors"
	"github.com/pingcap/tidb/parser/charset"
	"github.com/pingcap/tidb/parser/mysql"
	"github.com/pingcap/tidb/sessionctx"
	"github.com/pingcap/tidb/types"
	"github.com/pingcap/tidb/util/chunk"
	"github.com/pingcap/tidb/util/hack"
)

const (
	// DefPartialResult4JsonObjectAgg is the size of partialResult4JsonObject
	DefPartialResult4JsonObjectAgg = int64(unsafe.Sizeof(partialResult4JsonObjectAgg{}))
)

type jsonObjectAgg struct {
	baseAggFunc
}

type partialResult4JsonObjectAgg struct {
	entries map[string]interface{}
	bInMap  int // indicate there are 2^bInMap buckets in entries.
}

func (e *jsonObjectAgg) AllocPartialResult() (pr PartialResult, memDelta int64) {
	p := partialResult4JsonObjectAgg{}
	p.entries = make(map[string]interface{})
	p.bInMap = 0
	return PartialResult(&p), DefPartialResult4JsonObjectAgg + (1<<p.bInMap)*hack.DefBucketMemoryUsageForMapStringToAny
}

func (e *jsonObjectAgg) ResetPartialResult(pr PartialResult) {
	p := (*partialResult4JsonObjectAgg)(pr)
	p.entries = make(map[string]interface{})
	p.bInMap = 0
}

func (e *jsonObjectAgg) AppendFinalResult2Chunk(sctx sessionctx.Context, pr PartialResult, chk *chunk.Chunk) error {
	p := (*partialResult4JsonObjectAgg)(pr)
	if len(p.entries) == 0 {
		chk.AppendNull(e.ordinal)
		return nil
	}

	bj, err := types.CreateBinaryJSONWithCheck(p.entries)
	if err != nil {
		return errors.Trace(err)
	}
	chk.AppendJSON(e.ordinal, bj)
	return nil
}

func (e *jsonObjectAgg) UpdatePartialResult(sctx sessionctx.Context, rowsInGroup []chunk.Row, pr PartialResult) (memDelta int64, err error) {
	p := (*partialResult4JsonObjectAgg)(pr)
	for _, row := range rowsInGroup {
		key, keyIsNull, err := e.args[0].EvalString(sctx, row)
		if err != nil {
			return 0, errors.Trace(err)
		}
		key = strings.Clone(key)

		if keyIsNull {
			return 0, types.ErrJSONDocumentNULLKey
		}

		if e.args[0].GetType().GetCharset() == charset.CharsetBin {
			return 0, types.ErrInvalidJSONCharset.GenWithStackByArgs(e.args[0].GetType().GetCharset())
		}

		value, err := e.args[1].Eval(row)
		if err != nil {
			return 0, errors.Trace(err)
		}

		realVal, err := getRealJSONValue(value, e.args[1].GetType())
		if err != nil {
			return 0, errors.Trace(err)
		}

		switch x := realVal.(type) {
		case nil, bool, int64, uint64, float64, string, types.BinaryJSON, types.Opaque, types.Time, types.Duration:
			if _, ok := p.entries[key]; !ok {
				memDelta += int64(len(key)) + getValMemDelta(realVal)
				if len(p.entries)+1 > (1<<p.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
					memDelta += (1 << p.bInMap) * hack.DefBucketMemoryUsageForMapStringToAny
					p.bInMap++
				}
			}
			p.entries[key] = realVal

		default:
			return 0, types.ErrUnsupportedSecondArgumentType.GenWithStackByArgs(x)
		}
	}
	return memDelta, nil
}

func getRealJSONValue(value types.Datum, ft *types.FieldType) (interface{}, error) {
	realVal := value.Clone().GetValue()
	switch value.Kind() {
	case types.KindBinaryLiteral, types.KindMysqlBit, types.KindBytes:
		buf := value.GetBytes()
		realVal = types.Opaque{
			TypeCode: ft.GetType(),
			Buf:      buf,
		}
	case types.KindString:
		if ft.GetCharset() == charset.CharsetBin {
			buf := value.GetBytes()
			resultBuf := buf
			if ft.GetType() == mysql.TypeString {
				// the tailing zero should also be in the opaque json
				resultBuf = make([]byte, ft.GetFlen())
				copy(resultBuf, buf)
			}
			realVal = types.Opaque{
				TypeCode: ft.GetType(),
				Buf:      resultBuf,
			}
		}
	}

	// appendBinary does not support some type such as uint8、types.time,so convert is needed here
	switch x := realVal.(type) {
	case float32:
		realVal = float64(x)
	case *types.MyDecimal:
		float64Val, err := x.ToFloat64()
		if err != nil {
			return nil, errors.Trace(err)
		}
		realVal = float64Val
	case []uint8:
		strVal, err := types.ToString(x)
		if err != nil {
			return nil, errors.Trace(err)
		}
		realVal = strVal
	}

	return realVal, nil
}

func getValMemDelta(val interface{}) (memDelta int64) {
	memDelta = DefInterfaceSize
	switch v := val.(type) {
	case bool:
		memDelta += DefBoolSize
	case int64:
		memDelta += DefInt64Size
	case uint64:
		memDelta += DefUint64Size
	case float64:
		memDelta += DefFloat64Size
	case string:
		memDelta += int64(len(v))
	case types.BinaryJSON:
		// +1 for the memory usage of the JSONTypeCode of json
		memDelta += int64(len(v.Value) + 1)
	case types.Opaque:
		// +1 for the memory usage of the JSONTypeCode of opaque value
		memDelta += int64(len(v.Buf) + 1)
	case *types.MyDecimal:
		memDelta += DefMyDecimalSize
	case []uint8:
		memDelta += int64(len(v))
	case types.Time:
		memDelta += DefTimeSize
	case types.Duration:
		memDelta += DefDurationSize
	}
	return memDelta
}

func (e *jsonObjectAgg) MergePartialResult(sctx sessionctx.Context, src, dst PartialResult) (memDelta int64, err error) {
	p1, p2 := (*partialResult4JsonObjectAgg)(src), (*partialResult4JsonObjectAgg)(dst)
	// When the result of this function is normalized, values having duplicate keys are discarded,
	// and only the last value encountered is used with that key in the returned object
	for k, v := range p1.entries {
		p2.entries[k] = v
		memDelta += int64(len(k)) + getValMemDelta(v)
		if len(p2.entries)+1 > (1<<p2.bInMap)*hack.LoadFactorNum/hack.LoadFactorDen {
			memDelta += (1 << p2.bInMap) * hack.DefBucketMemoryUsageForMapStringToAny
			p2.bInMap++
		}
	}
	return 0, nil
}

相关信息

tidb 源码目录

相关文章

tidb aggfuncs 源码

tidb builder 源码

tidb func_avg 源码

tidb func_bitfuncs 源码

tidb func_count 源码

tidb func_count_distinct 源码

tidb func_cume_dist 源码

tidb func_first_row 源码

tidb func_group_concat 源码

tidb func_json_arrayagg 源码

0  赞