hadoop LeveldbConfigurationStore 源码

  • 2022-10-20
  • 浏览 (227)

haddop LeveldbConfigurationStore 代码

文件路径:/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/LeveldbConfigurationStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.DBManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.records.Version;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.iq80.leveldb.DB;
import org.iq80.leveldb.DBComparator;
import org.iq80.leveldb.DBException;
import org.iq80.leveldb.DBIterator;
import org.iq80.leveldb.Options;
import org.iq80.leveldb.WriteBatch;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.ObjectInput;
import java.io.ObjectInputStream;
import java.io.ObjectOutput;
import java.io.ObjectOutputStream;
import java.nio.charset.StandardCharsets;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;

import static org.fusesource.leveldbjni.JniDBFactory.bytes;

/**
 * A LevelDB implementation of {@link YarnConfigurationStore}.
 */
public class LeveldbConfigurationStore extends YarnConfigurationStore {

  public static final Logger LOG =
      LoggerFactory.getLogger(LeveldbConfigurationStore.class);

  private static final String DB_NAME = "yarn-conf-store";
  private static final String LOG_KEY = "log";
  private static final String VERSION_KEY = "version";
  private static final String CONF_VERSION_NAME = "conf-version-store";
  private static final String CONF_VERSION_KEY = "conf-version";
  private DB db;
  private DBManager dbManager;
  private DBManager versionDbManager;
  private DB versionDb;
  private long maxLogs;
  private Configuration conf;
  private Configuration initSchedConf;
  @VisibleForTesting
  protected static final Version CURRENT_VERSION_INFO = Version
      .newInstance(0, 1);

  @Override
  public void initialize(Configuration config, Configuration schedConf,
      RMContext rmContext) throws IOException {
    this.conf = config;
    this.initSchedConf = schedConf;
    this.dbManager = new DBManager();
    this.versionDbManager = new DBManager();
    try {
      initDatabase();
      this.maxLogs = config.getLong(
          YarnConfiguration.RM_SCHEDCONF_MAX_LOGS,
          YarnConfiguration.DEFAULT_RM_SCHEDCONF_LEVELDB_MAX_LOGS);
      long compactionIntervalMsec = config.getLong(
          YarnConfiguration.RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS,
          YarnConfiguration
              .DEFAULT_RM_SCHEDCONF_LEVELDB_COMPACTION_INTERVAL_SECS) * 1000;
      dbManager.startCompactionTimer(compactionIntervalMsec,
          this.getClass().getSimpleName());
    } catch (Exception e) {
      throw new IOException(e);
    }
  }

  @Override
  public void format() throws Exception {
    close();
    FileSystem fs = FileSystem.getLocal(conf);
    fs.delete(getStorageDir(DB_NAME), true);
  }

  private void initDatabase() throws Exception {
    Path confVersion = createStorageDir(CONF_VERSION_NAME);
    Options confOptions = new Options();
    confOptions.createIfMissing(false);
    File confVersionFile = new File(confVersion.toString());

    versionDb = versionDbManager.initDatabase(confVersionFile, confOptions,
        this::initVersionDb);

    Path storeRoot = createStorageDir(DB_NAME);
    Options options = new Options();
    options.createIfMissing(false);
    options.comparator(new DBComparator() {
      @Override
      public int compare(byte[] key1, byte[] key2) {
        String key1Str = new String(key1, StandardCharsets.UTF_8);
        String key2Str = new String(key2, StandardCharsets.UTF_8);
        if (key1Str.equals(key2Str)) {
          return 0;
        } else if (key1Str.equals(VERSION_KEY)) {
          return 1;
        } else if (key2Str.equals(VERSION_KEY)) {
          return -1;
        } else if (key1Str.equals(LOG_KEY)) {
          return 1;
        } else if (key2Str.equals(LOG_KEY)) {
          return -1;
        }
        return key1Str.compareTo(key2Str);
      }

      @Override
      public String name() {
        return "keyComparator";
      }

      public byte[] findShortestSeparator(byte[] start, byte[] limit) {
        return start;
      }

      public byte[] findShortSuccessor(byte[] key) {
        return key;
      }
    });
    LOG.info("Using conf database at {}", storeRoot);
    File dbFile = new File(storeRoot.toString());
    db = dbManager.initDatabase(dbFile, options, this::initDb);
  }

  private void initVersionDb(DB database) {
    database.put(bytes(CONF_VERSION_KEY), bytes(String.valueOf(0)));
  }

  private void initDb(DB database) {
    WriteBatch initBatch = database.createWriteBatch();
    for (Map.Entry<String, String> kv : initSchedConf) {
      initBatch.put(bytes(kv.getKey()), bytes(kv.getValue()));
    }
    database.write(initBatch);
    increaseConfigVersion();
  }

  private Path createStorageDir(String storageName) throws IOException {
    Path root = getStorageDir(storageName);
    FileSystem fs = FileSystem.getLocal(conf);
    fs.mkdirs(root, new FsPermission((short) 0700));
    return root;
  }

  private Path getStorageDir(String storageName) throws IOException {
    String storePath = conf.get(YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
    if (storePath == null) {
      throw new IOException("No store location directory configured in " +
          YarnConfiguration.RM_SCHEDCONF_STORE_PATH);
    }
    return new Path(storePath, storageName);
  }

  @Override
  public void close() throws IOException {
    dbManager.close();
    versionDbManager.close();
  }

  @Override
  public void logMutation(LogMutation logMutation) throws IOException {
    if (maxLogs > 0) {
      LinkedList<LogMutation> logs = deserLogMutations(db.get(bytes(LOG_KEY)));
      logs.add(logMutation);
      if (logs.size() > maxLogs) {
        logs.removeFirst();
      }
      db.put(bytes(LOG_KEY), serLogMutations(logs));
    }
  }

  @Override
  public void confirmMutation(LogMutation pendingMutation,
      boolean isValid) {
    if (isValid) {
      WriteBatch updateBatch = db.createWriteBatch();
      for (Map.Entry<String, String> changes :
          pendingMutation.getUpdates().entrySet()) {
        if (changes.getValue() == null || changes.getValue().isEmpty()) {
          updateBatch.delete(bytes(changes.getKey()));
        } else {
          updateBatch.put(bytes(changes.getKey()), bytes(changes.getValue()));
        }
      }
      increaseConfigVersion();
      db.write(updateBatch);
    }
  }

  private byte[] serLogMutations(LinkedList<LogMutation> mutations) throws
      IOException {
    ByteArrayOutputStream baos = new ByteArrayOutputStream();
    try (ObjectOutput oos = new ObjectOutputStream(baos)) {
      oos.writeObject(mutations);
      oos.flush();
      return baos.toByteArray();
    }
  }

  // Because of type erasure casting to LinkedList<LogMutation> will be
  // unchecked. A way around that would be to iterate over the logMutations
  // which is overkill in this case.
  @SuppressWarnings("unchecked")
  private LinkedList<LogMutation> deserLogMutations(byte[] mutations) throws
      IOException {
    if (mutations == null) {
      return new LinkedList<>();
    }

    try (ObjectInput input = new ObjectInputStream(
        new ByteArrayInputStream(mutations))) {
      return (LinkedList<LogMutation>) input.readObject();
    } catch (ClassNotFoundException e) {
      throw new IOException(e);
    }
  }

  @Override
  public synchronized Configuration retrieve() {
    DBIterator itr = db.iterator();
    itr.seekToFirst();
    Configuration config = new Configuration(false);
    while (itr.hasNext()) {
      Map.Entry<byte[], byte[]> entry = itr.next();
      String key = new String(entry.getKey(), StandardCharsets.UTF_8);
      String value = new String(entry.getValue(), StandardCharsets.UTF_8);
      if (key.equals(LOG_KEY) || key.equals(VERSION_KEY)) {
        break;
      }
      config.set(key, value);
    }
    return config;
  }

  private void increaseConfigVersion() {
    long configVersion = getConfigVersion() + 1L;
    versionDb.put(bytes(CONF_VERSION_KEY),
        bytes(String.valueOf(configVersion)));
  }

  @Override
  public long getConfigVersion() {
    String version = new String(versionDb.get(bytes(CONF_VERSION_KEY)),
        StandardCharsets.UTF_8);
    return Long.parseLong(version);
  }

  @Override
  public List<LogMutation> getConfirmedConfHistory(long fromId) {
    return null; // unimplemented
  }

  @Override
  public Version getConfStoreVersion() throws Exception {
    return dbManager.loadVersion(VERSION_KEY);
  }

  @VisibleForTesting
  @Override
  protected LinkedList<LogMutation> getLogs() throws Exception {
    return deserLogMutations(db.get(bytes(LOG_KEY)));
  }

  @VisibleForTesting
  protected DB getDB() {
    return db;
  }

  @Override
  public void storeVersion() throws Exception {
    try {
      storeVersion(CURRENT_VERSION_INFO);
    } catch (DBException e) {
      throw new IOException(e);
    }
  }

  @VisibleForTesting
  protected void storeVersion(Version version) {
    dbManager.storeVersion(VERSION_KEY, version);
  }

  @Override
  public Version getCurrentVersion() {
    return CURRENT_VERSION_INFO;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop CSConfigurationProvider 源码

hadoop FSSchedulerConfigurationStore 源码

hadoop FileBasedCSConfigurationProvider 源码

hadoop InMemoryConfigurationStore 源码

hadoop MutableCSConfigurationProvider 源码

hadoop QueueAdminConfigurationMutationACLPolicy 源码

hadoop QueueCapacityConfigParser 源码

hadoop YarnConfStoreVersionIncompatibleException 源码

hadoop YarnConfigurationStore 源码

hadoop YarnConfigurationStoreFactory 源码

0  赞