tidb memstore 源码

  • 2022-09-19
  • 浏览 (517)

tidb memstore 代码

文件路径:/br/pkg/storage/memstore.go

// Copyright 2022 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.

package storage

import (
	"bytes"
	"context"
	"io"
	"path"
	"sort"
	"strings"
	"sync"

	"github.com/pingcap/errors"
	"go.uber.org/atomic"
)

type memFile struct {
	Data atomic.Value // the atomic value is a byte slice, which can only be get/set atomically
}

// GetData gets the underlying byte slice of the atomic value
func (f *memFile) GetData() []byte {
	var fileData []byte
	fileDataVal := f.Data.Load()
	if fileDataVal != nil {
		fileData = fileDataVal.([]byte)
	}
	return fileData
}

// MemStorage represents a in-memory storage.
type MemStorage struct {
	rwm       sync.RWMutex
	dataStore map[string]*memFile
}

// NewMemStorage creates a new in-memory storage.
func NewMemStorage() *MemStorage {
	return &MemStorage{
		dataStore: make(map[string]*memFile),
	}
}

func (s *MemStorage) loadMap(name string) (*memFile, bool) {
	s.rwm.RLock()
	defer s.rwm.RUnlock()
	theFile, ok := s.dataStore[name]
	return theFile, ok
}

// DeleteFile delete the file in storage
// It implements the `ExternalStorage` interface
func (s *MemStorage) DeleteFile(ctx context.Context, name string) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(name) {
		return errors.Errorf("file name is not an absolute path: %s", name)
	}
	s.rwm.Lock()
	defer s.rwm.Unlock()
	if _, ok := s.dataStore[name]; !ok {
		return errors.Errorf("cannot find the file: %s", name)
	}
	delete(s.dataStore, name)
	return nil
}

// WriteFile file to storage.
// It implements the `ExternalStorage` interface
func (s *MemStorage) WriteFile(ctx context.Context, name string, data []byte) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(name) {
		return errors.Errorf("file name is not an absolute path: %s", name)
	}
	fileData := append([]byte{}, data...)
	s.rwm.Lock()
	defer s.rwm.Unlock()
	theFile, ok := s.dataStore[name]
	if ok {
		theFile.Data.Store(fileData)
	} else {
		theFile := new(memFile)
		theFile.Data.Store(fileData)
		s.dataStore[name] = theFile
	}
	return nil
}

// ReadFile reads the storage file.
// It implements the `ExternalStorage` interface
func (s *MemStorage) ReadFile(ctx context.Context, name string) ([]byte, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(name) {
		return nil, errors.Errorf("file name is not an absolute path: %s", name)
	}
	theFile, ok := s.loadMap(name)
	if !ok {
		return nil, errors.Errorf("cannot find the file: %s", name)
	}
	fileData := theFile.GetData()
	return append([]byte{}, fileData...), nil
}

// FileExists return true if file exists.
// It implements the `ExternalStorage` interface
func (s *MemStorage) FileExists(ctx context.Context, name string) (bool, error) {
	select {
	case <-ctx.Done():
		return false, ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(name) {
		return false, errors.Errorf("file name is not an absolute path: %s", name)
	}
	_, ok := s.loadMap(name)
	return ok, nil
}

// Open opens a Reader by file path.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Open(ctx context.Context, filePath string) (ExternalFileReader, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(filePath) {
		return nil, errors.Errorf("file name is not an absolute path: %s", filePath)
	}
	theFile, ok := s.loadMap(filePath)
	if !ok {
		return nil, errors.Errorf("cannot find the file: %s", filePath)
	}
	r := bytes.NewReader(theFile.GetData())
	return &memFileReader{
		br: r,
	}, nil
}

// WalkDir traverse all the files in a dir.
// It implements the `ExternalStorage` interface
func (s *MemStorage) WalkDir(ctx context.Context, opt *WalkOption, fn func(string, int64) error) error {
	allFileNames := func() []string {
		fileNames := []string{}
		s.rwm.RLock()
		defer s.rwm.RUnlock()
		for fileName := range s.dataStore {
			if opt != nil {
				if len(opt.SubDir) > 0 {
					if !strings.HasPrefix(fileName, opt.SubDir) {
						continue
					}
				}
				if len(opt.ObjPrefix) > 0 {
					baseName := path.Base(fileName)
					if !strings.HasPrefix(baseName, opt.ObjPrefix) {
						continue
					}
				}
			}
			fileNames = append(fileNames, fileName)
		}
		return fileNames
	}()
	sort.Strings(allFileNames)

	for _, fileName := range allFileNames {
		select {
		case <-ctx.Done():
			return ctx.Err()
		default:
			// continue on
		}
		theFile, ok := s.loadMap(fileName)
		if !ok {
			continue
		}
		fileSize := len(theFile.GetData())
		if err := fn(fileName, int64(fileSize)); err != nil {
			return err
		}
	}
	return nil
}

// URI returns the URI of the storage.
func (*MemStorage) URI() string {
	return "memstore://"
}

// Create creates a file and returning a writer to write data into.
// When the writer is closed, the data is stored in the file.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Create(ctx context.Context, name string) (ExternalFileWriter, error) {
	select {
	case <-ctx.Done():
		return nil, ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(name) {
		return nil, errors.Errorf("file name is not an absolute path: %s", name)
	}
	s.rwm.Lock()
	defer s.rwm.Unlock()
	if _, ok := s.dataStore[name]; ok {
		return nil, errors.Errorf("the file already exists: %s", name)
	}
	theFile := new(memFile)
	s.dataStore[name] = theFile
	return &memFileWriter{
		file: theFile,
	}, nil
}

// Rename renames a file name to another file name.
// It implements the `ExternalStorage` interface
func (s *MemStorage) Rename(ctx context.Context, oldFileName, newFileName string) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		// continue on
	}
	if !path.IsAbs(newFileName) {
		return errors.Errorf("new file name is not an absolute path: %s", newFileName)
	}
	s.rwm.Lock()
	defer s.rwm.Unlock()
	theFile, ok := s.dataStore[oldFileName]
	if !ok {
		return errors.Errorf("the file doesn't exist: %s", oldFileName)
	}
	s.dataStore[newFileName] = theFile
	delete(s.dataStore, oldFileName)
	return nil
}

// memFileReader is the struct to read data from an opend mem storage file
type memFileReader struct {
	br       *bytes.Reader
	isClosed atomic.Bool
}

// Read reads the mem storage file data
// It implements the `io.Reader` interface
func (r *memFileReader) Read(p []byte) (int, error) {
	if r.isClosed.Load() {
		return 0, io.EOF
	}
	return r.br.Read(p)
}

// Close closes the mem storage file data
// It implements the `io.Closer` interface
func (r *memFileReader) Close() error {
	r.isClosed.Store(true)
	return nil
}

// Seeker seekds the offset inside the mem storage file
// It implements the `io.Seeker` interface
func (r *memFileReader) Seek(offset int64, whence int) (int64, error) {
	if r.isClosed.Load() {
		return 0, errors.New("reader closed")
	}
	return r.br.Seek(offset, whence)
}

// memFileReader is the struct to write data into the opened mem storage file
type memFileWriter struct {
	buf      bytes.Buffer
	file     *memFile
	isClosed atomic.Bool
}

// Write writes the data into the mem storage file buffer.
// It implements the `ExternalFileWriter` interface
func (w *memFileWriter) Write(ctx context.Context, p []byte) (int, error) {
	select {
	case <-ctx.Done():
		return 0, ctx.Err()
	default:
		// continue on
	}
	if w.isClosed.Load() {
		return 0, errors.New("writer closed")
	}
	return w.buf.Write(p)
}

func (w *memFileWriter) Close(ctx context.Context) error {
	select {
	case <-ctx.Done():
		return ctx.Err()
	default:
		// continue on
	}
	fileData := append([]byte{}, w.buf.Bytes()...)
	w.file.Data.Store(fileData)
	w.isClosed.Store(true)
	return nil
}

相关信息

tidb 源码目录

相关文章

tidb azblob 源码

tidb compress 源码

tidb flags 源码

tidb gcs 源码

tidb hdfs 源码

tidb local 源码

tidb local_unix 源码

tidb local_windows 源码

tidb noop 源码

tidb parse 源码

0  赞