spark RocksDBTypeInfo 源码

  • 2022-10-20
  • 浏览 (395)

spark RocksDBTypeInfo 代码

文件路径:/common/kvstore/src/main/java/org/apache/spark/util/kvstore/RocksDBTypeInfo.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.spark.util.kvstore;

import com.google.common.base.Preconditions;
import org.rocksdb.RocksDBException;
import org.rocksdb.WriteBatch;

import java.lang.reflect.Array;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;

import static java.nio.charset.StandardCharsets.UTF_8;

/**
 * Holds metadata about app-specific types stored in RocksDB. Serves as a cache for data collected
 * via reflection, to make it cheaper to access it multiple times.
 *
 * <p>
 * The hierarchy of keys stored in RocksDB looks roughly like the following. This hierarchy ensures
 * that iteration over indices is easy, and that updating values in the store is not overly
 * expensive. Of note, indices choose using more disk space (one value per key) instead of keeping
 * lists of pointers, which would be more expensive to update at runtime.
 * </p>
 *
 * <p>
 * Indentation defines when a sub-key lives under a parent key. In RocksDB, this means the full
 * key would be the concatenation of everything up to that point in the hierarchy, with each
 * component separated by a NULL byte.
 * </p>
 *
 * <pre>
 * +TYPE_NAME
 *   NATURAL_INDEX
 *     +NATURAL_KEY
 *     -
 *   -NATURAL_INDEX
 *   INDEX_NAME
 *     +INDEX_VALUE
 *       +NATURAL_KEY
 *     -INDEX_VALUE
 *     .INDEX_VALUE
 *       CHILD_INDEX_NAME
 *         +CHILD_INDEX_VALUE
 *           NATURAL_KEY_OR_DATA
 *         -
 *   -INDEX_NAME
 * </pre>
 *
 * <p>
 * Entity data (either the entity's natural key or a copy of the data) is stored in all keys
 * that end with "+<something>". A count of all objects that match a particular top-level index
 * value is kept at the end marker ("-<something>"). A count is also kept at the natural index's end
 * marker, to make it easy to retrieve the number of all elements of a particular type.
 * </p>
 *
 * <p>
 * To illustrate, given a type "Foo", with a natural index and a second index called "bar", you'd
 * have these keys and values in the store for two instances, one with natural key "key1" and the
 * other "key2", both with value "yes" for "bar":
 * </p>
 *
 * <pre>
 * Foo __main__ +key1   [data for instance 1]
 * Foo __main__ +key2   [data for instance 2]
 * Foo __main__ -       [count of all Foo]
 * Foo bar +yes +key1   [instance 1 key or data, depending on index type]
 * Foo bar +yes +key2   [instance 2 key or data, depending on index type]
 * Foo bar +yes -       [count of all Foo with "bar=yes" ]
 * </pre>
 *
 * <p>
 * Note that all indexed values are prepended with "+", even if the index itself does not have an
 * explicit end marker. This allows for easily skipping to the end of an index by telling RocksDB
 * to seek to the "phantom" end marker of the index. Throughout the code and comments, this part
 * of the full RocksDB key is generally referred to as the "index value" of the entity.
 * </p>
 *
 * <p>
 * Child indices are stored after their parent index. In the example above, let's assume there is
 * a child index "child", whose parent is "bar". If both instances have value "no" for this field,
 * the data in the store would look something like the following:
 * </p>
 *
 * <pre>
 * ...
 * Foo bar +yes -
 * Foo bar .yes .child +no +key1   [instance 1 key or data, depending on index type]
 * Foo bar .yes .child +no +key2   [instance 2 key or data, depending on index type]
 * ...
 * </pre>
 */
class RocksDBTypeInfo {

  static final byte[] END_MARKER = new byte[] { '-' };
  static final byte ENTRY_PREFIX = (byte) '+';
  static final byte KEY_SEPARATOR = 0x0;
  static byte TRUE = (byte) '1';
  static byte FALSE = (byte) '0';

  private static final byte SECONDARY_IDX_PREFIX = (byte) '.';
  private static final byte POSITIVE_MARKER = (byte) '=';
  private static final byte NEGATIVE_MARKER = (byte) '*';
  private static final byte[] HEX_BYTES = new byte[] {
    '0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'
  };

  private final RocksDB db;
  private final Class<?> type;
  private final Map<String, Index> indices;
  private final byte[] typePrefix;

  RocksDBTypeInfo(RocksDB db, Class<?> type, byte[] alias) throws Exception {
    this.db = db;
    this.type = type;
    this.indices = new HashMap<>();

    KVTypeInfo ti = new KVTypeInfo(type);

    // First create the parent indices, then the child indices.
    ti.indices().forEach(idx -> {
      // In RocksDB, there is no parent index for the NATURAL INDEX.
      if (idx.parent().isEmpty() || idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()), null));
      }
    });
    ti.indices().forEach(idx -> {
      if (!idx.parent().isEmpty() && !idx.value().equals(KVIndex.NATURAL_INDEX_NAME)) {
        indices.put(idx.value(), new Index(idx, ti.getAccessor(idx.value()),
          indices.get(idx.parent())));
      }
    });

    this.typePrefix = alias;
  }

  Class<?> type() {
    return type;
  }

  byte[] keyPrefix() {
    return typePrefix;
  }

  Index naturalIndex() {
    return index(KVIndex.NATURAL_INDEX_NAME);
  }

  Index index(String name) {
    Index i = indices.get(name);
    Preconditions.checkArgument(i != null, "Index %s does not exist for type %s.", name,
      type.getName());
    return i;
  }

  Collection<Index> indices() {
    return indices.values();
  }

  byte[] buildKey(byte[]... components) {
    return buildKey(true, components);
  }

  byte[] buildKey(boolean addTypePrefix, byte[]... components) {
    int len = 0;
    if (addTypePrefix) {
      len += typePrefix.length + 1;
    }
    for (byte[] comp : components) {
      len += comp.length;
    }
    len += components.length - 1;

    byte[] dest = new byte[len];
    int written = 0;

    if (addTypePrefix) {
      System.arraycopy(typePrefix, 0, dest, 0, typePrefix.length);
      dest[typePrefix.length] = KEY_SEPARATOR;
      written += typePrefix.length + 1;
    }

    for (byte[] comp : components) {
      System.arraycopy(comp, 0, dest, written, comp.length);
      written += comp.length;
      if (written < dest.length) {
        dest[written] = KEY_SEPARATOR;
        written++;
      }
    }

    return dest;
  }

  /**
   * Models a single index in RocksDB. See top-level class's javadoc for a description of how the
   * keys are generated.
   */
  class Index {

    private final boolean copy;
    private final boolean isNatural;
    private final byte[] name;
    private final KVTypeInfo.Accessor accessor;
    private final Index parent;

    private Index(KVIndex self, KVTypeInfo.Accessor accessor, Index parent) {
      byte[] name = self.value().getBytes(UTF_8);
      if (parent != null) {
        byte[] child = new byte[name.length + 1];
        child[0] = SECONDARY_IDX_PREFIX;
        System.arraycopy(name, 0, child, 1, name.length);
      }

      this.name = name;
      this.isNatural = self.value().equals(KVIndex.NATURAL_INDEX_NAME);
      this.copy = isNatural || self.copy();
      this.accessor = accessor;
      this.parent = parent;
    }

    boolean isCopy() {
      return copy;
    }

    boolean isChild() {
      return parent != null;
    }

    Index parent() {
      return parent;
    }

    /**
     * Creates a key prefix for child indices of this index. This allows the prefix to be
     * calculated only once, avoiding redundant work when multiple child indices of the
     * same parent index exist.
     */
    byte[] childPrefix(Object value) {
      Preconditions.checkState(parent == null, "Not a parent index.");
      return buildKey(name, toParentKey(value));
    }

    /**
     * Gets the index value for a particular entity (which is the value of the field or method
     * tagged with the index annotation). This is used as part of the RocksDB key where the
     * entity (or its id) is stored.
     */
    Object getValue(Object entity) throws Exception {
      return accessor.get(entity);
    }

    private void checkParent(byte[] prefix) {
      if (prefix != null) {
        Preconditions.checkState(parent != null, "Parent prefix provided for parent index.");
      } else {
        Preconditions.checkState(parent == null, "Parent prefix missing for child index.");
      }
    }

    /** The prefix for all keys that belong to this index. */
    byte[] keyPrefix(byte[] prefix) {
      checkParent(prefix);
      return (parent != null) ? buildKey(false, prefix, name) : buildKey(name);
    }

    /**
     * The key where to start ascending iteration for entities whose value for the indexed field
     * match the given value.
     */
    byte[] start(byte[] prefix, Object value) {
      checkParent(prefix);
      return (parent != null) ? buildKey(false, prefix, name, toKey(value))
        : buildKey(name, toKey(value));
    }

    /** The key for the index's end marker. */
    byte[] end(byte[] prefix) {
      checkParent(prefix);
      return (parent != null) ? buildKey(false, prefix, name, END_MARKER)
        : buildKey(name, END_MARKER);
    }

    /** The key for the end marker for entries with the given value. */
    byte[] end(byte[] prefix, Object value) {
      checkParent(prefix);
      return (parent != null) ? buildKey(false, prefix, name, toKey(value), END_MARKER)
        : buildKey(name, toKey(value), END_MARKER);
    }

    /** The full key in the index that identifies the given entity. */
    byte[] entityKey(byte[] prefix, Object entity) throws Exception {
      Object indexValue = getValue(entity);
      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
        name, type.getName());
      byte[] entityKey = start(prefix, indexValue);
      if (!isNatural) {
        entityKey = buildKey(false, entityKey, toKey(naturalIndex().getValue(entity)));
      }
      return entityKey;
    }

    private void updateCount(WriteBatch batch, byte[] key, long delta) throws RocksDBException {
      long updated = getCount(key) + delta;
      if (updated > 0) {
        batch.put(key, db.serializer.serialize(updated));
      } else {
        batch.delete(key);
      }
    }

    private void addOrRemove(
        WriteBatch batch,
        Object entity,
        Object existing,
        byte[] data,
        byte[] naturalKey,
        byte[] prefix) throws Exception {
      Object indexValue = getValue(entity);
      Preconditions.checkNotNull(indexValue, "Null index value for %s in type %s.",
        name, type.getName());

      byte[] entityKey = start(prefix, indexValue);
      if (!isNatural) {
        entityKey = buildKey(false, entityKey, naturalKey);
      }

      boolean needCountUpdate = (existing == null);

      // Check whether there's a need to update the index. The index needs to be updated in two
      // cases:
      //
      // - There is no existing value for the entity, so a new index value will be added.
      // - If there is a previously stored value for the entity, and the index value for the
      //   current index does not match the new value, the old entry needs to be deleted and
      //   the new one added.
      //
      // Natural indices don't need to be checked, because by definition both old and new entities
      // will have the same key. The put() call is all that's needed in that case.
      //
      // Also check whether we need to update the counts. If the indexed value is changing, we
      // need to decrement the count at the old index value, and the new indexed value count needs
      // to be incremented.
      if (existing != null && !isNatural) {
        byte[] oldPrefix = null;
        Object oldIndexedValue = getValue(existing);
        boolean removeExisting = !indexValue.equals(oldIndexedValue);
        if (!removeExisting && isChild()) {
          oldPrefix = parent().childPrefix(parent().getValue(existing));
          removeExisting = RocksDBIterator.compare(prefix, oldPrefix) != 0;
        }

        if (removeExisting) {
          if (oldPrefix == null && isChild()) {
            oldPrefix = parent().childPrefix(parent().getValue(existing));
          }

          byte[] oldKey = entityKey(oldPrefix, existing);
          batch.delete(oldKey);

          // If the indexed value has changed, we need to update the counts at the old and new
          // end markers for the indexed value.
          if (!isChild()) {
            byte[] oldCountKey = end(null, oldIndexedValue);
            updateCount(batch, oldCountKey, -1L);
            needCountUpdate = true;
          }
        }
      }

      if (data != null) {
        byte[] stored = copy ? data : naturalKey;
        batch.put(entityKey, stored);
      } else {
        batch.delete(entityKey);
      }

      if (needCountUpdate && !isChild()) {
        long delta = data != null ? 1L : -1L;
        byte[] countKey = isNatural ? end(prefix) : end(prefix, indexValue);
        updateCount(batch, countKey, delta);
      }
    }

    /**
     * Add an entry to the index.
     *
     * @param batch Write batch with other related changes.
     * @param entity The entity being added to the index.
     * @param existing The entity being replaced in the index, or null.
     * @param data Serialized entity to store (when storing the entity, not a reference).
     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
     * @param prefix The parent index prefix, if this is a child index.
     */
    void add(
        WriteBatch batch,
        Object entity,
        Object existing,
        byte[] data,
        byte[] naturalKey,
        byte[] prefix) throws Exception {
      addOrRemove(batch, entity, existing, data, naturalKey, prefix);
    }

    /**
     * Remove a value from the index.
     *
     * @param batch Write batch with other related changes.
     * @param entity The entity being removed, to identify the index entry to modify.
     * @param naturalKey The value's natural key (to avoid re-computing it for every index).
     * @param prefix The parent index prefix, if this is a child index.
     */
    void remove(
        WriteBatch batch,
        Object entity,
        byte[] naturalKey,
        byte[] prefix) throws Exception {
      addOrRemove(batch, entity, null, null, naturalKey, prefix);
    }

    long getCount(byte[] key) throws RocksDBException {
      byte[] data = db.db().get(key);
      return data != null ? db.serializer.deserializeLong(data) : 0;
    }

    byte[] toParentKey(Object value) {
      return toKey(value, SECONDARY_IDX_PREFIX);
    }

    byte[] toKey(Object value) {
      return toKey(value, ENTRY_PREFIX);
    }

    /**
     * Translates a value to be used as part of the store key.
     *
     * Integral numbers are encoded as a string in a way that preserves lexicographical
     * ordering. The string is prepended with a marker telling whether the number is negative
     * or positive ("*" for negative and "=" for positive are used since "-" and "+" have the
     * opposite of the desired order), and then the number is encoded into a hex string (so
     * it occupies twice the number of bytes as the original type).
     *
     * Arrays are encoded by encoding each element separately, separated by KEY_SEPARATOR.
     */
    byte[] toKey(Object value, byte prefix) {
      final byte[] result;

      if (value instanceof String) {
        byte[] str = ((String) value).getBytes(UTF_8);
        result = new byte[str.length + 1];
        result[0] = prefix;
        System.arraycopy(str, 0, result, 1, str.length);
      } else if (value instanceof Boolean) {
        result = new byte[] { prefix, (Boolean) value ? TRUE : FALSE };
      } else if (value.getClass().isArray()) {
        int length = Array.getLength(value);
        byte[][] components = new byte[length][];
        for (int i = 0; i < length; i++) {
          components[i] = toKey(Array.get(value, i));
        }
        result = buildKey(false, components);
      } else {
        int bytes;

        if (value instanceof Integer) {
          bytes = Integer.SIZE;
        } else if (value instanceof Long) {
          bytes = Long.SIZE;
        } else if (value instanceof Short) {
          bytes = Short.SIZE;
        } else if (value instanceof Byte) {
          bytes = Byte.SIZE;
        } else {
          throw new IllegalArgumentException(String.format("Type %s not allowed as key.",
            value.getClass().getName()));
        }

        bytes = bytes / Byte.SIZE;

        byte[] key = new byte[bytes * 2 + 2];
        long longValue = ((Number) value).longValue();
        key[0] = prefix;
        key[1] = longValue >= 0 ? POSITIVE_MARKER : NEGATIVE_MARKER;

        for (int i = 0; i < key.length - 2; i++) {
          int masked = (int) ((longValue >>> (4 * i)) & 0xF);
          key[key.length - i - 1] = HEX_BYTES[masked];
        }

        result = key;
      }

      return result;
    }

  }

}

相关信息

spark 源码目录

相关文章

spark ArrayWrappers 源码

spark InMemoryStore 源码

spark KVIndex 源码

spark KVStore 源码

spark KVStoreIterator 源码

spark KVStoreSerializer 源码

spark KVStoreView 源码

spark KVTypeInfo 源码

spark LevelDB 源码

spark LevelDBIterator 源码

0  赞