hadoop StateStoreZooKeeperImpl 源码

  • 2022-10-20
  • 浏览 (187)

haddop StateStoreZooKeeperImpl 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/driver/impl/StateStoreZooKeeperImpl.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.store.driver.impl;

import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.filterMultiple;
import static org.apache.hadoop.hdfs.server.federation.store.StateStoreUtils.getRecordName;
import static org.apache.hadoop.util.curator.ZKCuratorManager.getNodePath;
import static org.apache.hadoop.util.Time.monotonicNow;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.imps.CuratorFrameworkState;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.federation.router.RBFConfigKeys;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.curator.ZKCuratorManager;
import org.apache.zookeeper.data.ACL;
import org.apache.zookeeper.data.Stat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * {@link StateStoreDriver} driver implementation that uses ZooKeeper as a
 * backend.
 * <p>
 * The structure of the znodes in the ensemble is:
 * PARENT_PATH
 * |--- MOUNT
 * |--- MEMBERSHIP
 * |--- REBALANCER
 * |--- ROUTERS
 * |--- DISABLE_NAMESERVICE
 */
public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {

  private static final Logger LOG =
      LoggerFactory.getLogger(StateStoreZooKeeperImpl.class);


  /** Configuration keys. */
  public static final String FEDERATION_STORE_ZK_DRIVER_PREFIX =
      RBFConfigKeys.FEDERATION_STORE_PREFIX + "driver.zk.";
  public static final String FEDERATION_STORE_ZK_PARENT_PATH =
      FEDERATION_STORE_ZK_DRIVER_PREFIX + "parent-path";
  public static final String FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT =
      "/hdfs-federation";


  /** Directory to store the state store data. */
  private String baseZNode;

  /** Interface to ZooKeeper. */
  private ZKCuratorManager zkManager;
  /** ACLs for ZooKeeper. */
  private List<ACL> zkAcl;


  @Override
  public boolean initDriver() {
    LOG.info("Initializing ZooKeeper connection");

    Configuration conf = getConf();
    baseZNode = conf.get(
        FEDERATION_STORE_ZK_PARENT_PATH,
        FEDERATION_STORE_ZK_PARENT_PATH_DEFAULT);
    try {
      this.zkManager = new ZKCuratorManager(conf);
      this.zkManager.start();
      this.zkAcl = ZKCuratorManager.getZKAcls(conf);
    } catch (IOException e) {
      LOG.error("Cannot initialize the ZK connection", e);
      return false;
    }
    return true;
  }

  @Override
  public <T extends BaseRecord> boolean initRecordStorage(
      String className, Class<T> clazz) {
    try {
      String checkPath = getNodePath(baseZNode, className);
      zkManager.createRootDirRecursively(checkPath, zkAcl);
      return true;
    } catch (Exception e) {
      LOG.error("Cannot initialize ZK node for {}: {}",
          className, e.getMessage());
      return false;
    }
  }

  @Override
  public void close() throws Exception {
    if (zkManager  != null) {
      zkManager.close();
    }
  }

  @Override
  public boolean isDriverReady() {
    if (zkManager == null) {
      return false;
    }
    CuratorFramework curator = zkManager.getCurator();
    if (curator == null) {
      return false;
    }
    return curator.getState() == CuratorFrameworkState.STARTED;
  }

  @Override
  public <T extends BaseRecord> QueryResult<T> get(Class<T> clazz)
      throws IOException {
    verifyDriverReady();
    long start = monotonicNow();
    List<T> ret = new ArrayList<>();
    String znode = getZNodeForClass(clazz);
    try {
      List<String> children = zkManager.getChildren(znode);
      for (String child : children) {
        try {
          String path = getNodePath(znode, child);
          Stat stat = new Stat();
          String data = zkManager.getStringData(path, stat);
          boolean corrupted = false;
          if (data == null || data.equals("")) {
            // All records should have data, otherwise this is corrupted
            corrupted = true;
          } else {
            try {
              T record = createRecord(data, stat, clazz);
              ret.add(record);
            } catch (IOException e) {
              LOG.error("Cannot create record type \"{}\" from \"{}\": {}",
                  clazz.getSimpleName(), data, e.getMessage());
              corrupted = true;
            }
          }

          if (corrupted) {
            LOG.error("Cannot get data for {} at {}, cleaning corrupted data",
                child, path);
            zkManager.delete(path);
          }
        } catch (Exception e) {
          LOG.error("Cannot get data for {}: {}", child, e.getMessage());
        }
      }
    } catch (Exception e) {
      getMetrics().addFailure(monotonicNow() - start);
      String msg = "Cannot get children for \"" + znode + "\": " +
          e.getMessage();
      LOG.error(msg);
      throw new IOException(msg);
    }
    long end = monotonicNow();
    getMetrics().addRead(end - start);
    return new QueryResult<T>(ret, getTime());
  }

  @Override
  public <T extends BaseRecord> boolean putAll(
      List<T> records, boolean update, boolean error) throws IOException {
    verifyDriverReady();
    if (records.isEmpty()) {
      return true;
    }

    // All records should be the same
    T record0 = records.get(0);
    Class<? extends BaseRecord> recordClass = record0.getClass();
    String znode = getZNodeForClass(recordClass);

    long start = monotonicNow();
    boolean status = true;
    for (T record : records) {
      String primaryKey = getPrimaryKey(record);
      String recordZNode = getNodePath(znode, primaryKey);
      byte[] data = serialize(record);
      if (!writeNode(recordZNode, data, update, error)){
        status = false;
      }
    }
    long end = monotonicNow();
    if (status) {
      getMetrics().addWrite(end - start);
    } else {
      getMetrics().addFailure(end - start);
    }
    return status;
  }

  @Override
  public <T extends BaseRecord> int remove(
      Class<T> clazz, Query<T> query) throws IOException {
    verifyDriverReady();
    if (query == null) {
      return 0;
    }

    // Read the current data
    long start = monotonicNow();
    List<T> records = null;
    try {
      QueryResult<T> result = get(clazz);
      records = result.getRecords();
    } catch (IOException ex) {
      LOG.error("Cannot get existing records", ex);
      getMetrics().addFailure(monotonicNow() - start);
      return 0;
    }

    // Check the records to remove
    String znode = getZNodeForClass(clazz);
    List<T> recordsToRemove = filterMultiple(query, records);

    // Remove the records
    int removed = 0;
    for (T existingRecord : recordsToRemove) {
      LOG.info("Removing \"{}\"", existingRecord);
      try {
        String primaryKey = getPrimaryKey(existingRecord);
        String path = getNodePath(znode, primaryKey);
        if (zkManager.delete(path)) {
          removed++;
        } else {
          LOG.error("Did not remove \"{}\"", existingRecord);
        }
      } catch (Exception e) {
        LOG.error("Cannot remove \"{}\"", existingRecord, e);
        getMetrics().addFailure(monotonicNow() - start);
      }
    }
    long end = monotonicNow();
    if (removed > 0) {
      getMetrics().addRemove(end - start);
    }
    return removed;
  }

  @Override
  public <T extends BaseRecord> boolean removeAll(Class<T> clazz)
      throws IOException {
    long start = monotonicNow();
    boolean status = true;
    String znode = getZNodeForClass(clazz);
    LOG.info("Deleting all children under {}", znode);
    try {
      List<String> children = zkManager.getChildren(znode);
      for (String child : children) {
        String path = getNodePath(znode, child);
        LOG.info("Deleting {}", path);
        zkManager.delete(path);
      }
    } catch (Exception e) {
      LOG.error("Cannot remove {}: {}", znode, e.getMessage());
      status = false;
    }
    long time = monotonicNow() - start;
    if (status) {
      getMetrics().addRemove(time);
    } else {
      getMetrics().addFailure(time);
    }
    return status;
  }

  private boolean writeNode(
      String znode, byte[] bytes, boolean update, boolean error) {
    try {
      boolean created = zkManager.create(znode);
      if (!update && !created && error) {
        LOG.info("Cannot write record \"{}\", it already exists", znode);
        return false;
      }

      // Write data
      zkManager.setData(znode, bytes, -1);
      return true;
    } catch (Exception e) {
      LOG.error("Cannot write record \"{}\": {}", znode, e.getMessage());
    }
    return false;
  }

  /**
   * Get the ZNode for a class.
   *
   * @param clazz Record class to evaluate.
   * @return The ZNode for the class.
   */
  private <T extends BaseRecord> String getZNodeForClass(Class<T> clazz) {
    String className = getRecordName(clazz);
    return getNodePath(baseZNode, className);
  }

  /**
   * Creates a record from a string returned by ZooKeeper.
   *
   * @param data The data to write.
   * @param stat Stat of the data record to create.
   * @param clazz The data record type to create.
   * @return The created record.
   * @throws IOException
   */
  private <T extends BaseRecord> T createRecord(
      String data, Stat stat, Class<T> clazz) throws IOException {
    T record = newRecord(data, clazz, false);
    record.setDateCreated(stat.getCtime());
    record.setDateModified(stat.getMtime());
    return record;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop StateStoreBaseImpl 源码

hadoop StateStoreFileBaseImpl 源码

hadoop StateStoreFileImpl 源码

hadoop StateStoreFileSystemImpl 源码

hadoop StateStoreSerializableImpl 源码

hadoop StateStoreSerializerPBImpl 源码

hadoop package-info 源码

0  赞