hadoop CachedRecordStore 源码

  • 2022-10-20
  • 浏览 (186)

haddop CachedRecordStore 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/CachedRecordStore.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.federation.store;

import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;

import org.apache.hadoop.hdfs.server.federation.metrics.StateStoreMetrics;
import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreDriver;
import org.apache.hadoop.hdfs.server.federation.store.records.BaseRecord;
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * Record store that takes care of caching the records in memory.
 *
 * @param <R> Record to store by this interface.
 */
public abstract class CachedRecordStore<R extends BaseRecord>
    extends RecordStore<R> implements StateStoreCache {

  private static final Logger LOG =
      LoggerFactory.getLogger(CachedRecordStore.class);


  /** Prevent loading the cache more than once every 500 ms. */
  private static final long MIN_UPDATE_MS = 500;


  /** Cached entries. */
  private List<R> records = new ArrayList<>();

  /** Time stamp of the cached entries. */
  private long timestamp = -1;

  /** If the cache is initialized. */
  private boolean initialized = false;

  /** Last time the cache was updated. */
  private long lastUpdate = -1;

  /** Lock to access the memory cache. */
  private final ReadWriteLock readWriteLock = new ReentrantReadWriteLock();
  private final Lock readLock = readWriteLock.readLock();
  private final Lock writeLock = readWriteLock.writeLock();

  /** If it should override the expired values when loading the cache. */
  private boolean override = false;


  /**
   * Create a new cached record store.
   *
   * @param clazz Class of the record to store.
   * @param driver State Store driver.
   */
  protected CachedRecordStore(Class<R> clazz, StateStoreDriver driver) {
    this(clazz, driver, false);
  }

  /**
   * Create a new cached record store.
   *
   * @param clazz Class of the record to store.
   * @param driver State Store driver.
   * @param over If the entries should be overridden if they expire
   */
  protected CachedRecordStore(
      Class<R> clazz, StateStoreDriver driver, boolean over) {
    super(clazz, driver);

    this.override = over;
  }

  /**
   * Check that the cache of the State Store information is available.
   *
   * @throws StateStoreUnavailableException If the cache is not initialized.
   */
  private void checkCacheAvailable() throws StateStoreUnavailableException {
    if (!this.initialized) {
      throw new StateStoreUnavailableException(
          "Cached State Store not initialized, " +
          getRecordClass().getSimpleName() + " records not valid");
    }
  }

  @Override
  public boolean loadCache(boolean force) throws IOException {
    // Prevent loading the cache too frequently
    if (force || isUpdateTime()) {
      List<R> newRecords = null;
      long t = -1;
      try {
        QueryResult<R> result = getDriver().get(getRecordClass());
        newRecords = result.getRecords();
        t = result.getTimestamp();

        // If we have any expired record, update the State Store
        if (this.override) {
          overrideExpiredRecords(result);
        }
      } catch (IOException e) {
        LOG.error("Cannot get \"{}\" records from the State Store",
            getRecordClass().getSimpleName());
        this.initialized = false;
        return false;
      }

      // Update cache atomically
      writeLock.lock();
      try {
        this.records.clear();
        this.records.addAll(newRecords);
        this.timestamp = t;
        this.initialized = true;
      } finally {
        writeLock.unlock();
      }

      // Update the metrics for the cache State Store size
      StateStoreMetrics metrics = getDriver().getMetrics();
      if (metrics != null) {
        String recordName = getRecordClass().getSimpleName();
        metrics.setCacheSize(recordName, this.records.size());
      }

      lastUpdate = Time.monotonicNow();
    }
    return true;
  }

  /**
   * Check if it's time to update the cache. Update it was never updated.
   *
   * @return If it's time to update this cache.
   */
  private boolean isUpdateTime() {
    return Time.monotonicNow() - lastUpdate > MIN_UPDATE_MS;
  }

  /**
   * Updates the state store with any record overrides we detected, such as an
   * expired state. If an expired record exists beyond deletion time, it is
   * removed.
   *
   * @param query RecordQueryResult containing the data to be inspected.
   * @throws IOException If the values cannot be updated.
   */
  public void overrideExpiredRecords(QueryResult<R> query) throws IOException {
    List<R> commitRecords = new ArrayList<>();
    List<R> deleteRecords = new ArrayList<>();
    List<R> newRecords = query.getRecords();
    long currentDriverTime = query.getTimestamp();
    if (newRecords == null || currentDriverTime <= 0) {
      LOG.error("Cannot check overrides for record");
      return;
    }
    for (R record : newRecords) {
      if (record.shouldBeDeleted(currentDriverTime)) {
        String recordName = StateStoreUtils.getRecordName(record.getClass());
        if (getDriver().remove(record)) {
          deleteRecords.add(record);
          LOG.info("Deleted State Store record {}: {}", recordName, record);
        } else {
          LOG.warn("Couldn't delete State Store record {}: {}", recordName,
              record);
        }
      } else if (record.checkExpired(currentDriverTime)) {
        String recordName = StateStoreUtils.getRecordName(record.getClass());
        LOG.info("Override State Store record {}: {}", recordName, record);
        commitRecords.add(record);
      }
    }
    if (commitRecords.size() > 0) {
      getDriver().putAll(commitRecords, true, false);
    }
    if (deleteRecords.size() > 0) {
      newRecords.removeAll(deleteRecords);
    }
  }

  /**
   * Updates the state store with any record overrides we detected, such as an
   * expired state.
   *
   * @param record record to be updated.
   * @throws IOException If the values cannot be updated.
   */
  public void overrideExpiredRecord(R record) throws IOException {
    List<R> newRecords = new ArrayList<>();
    newRecords.add(record);
    long time = getDriver().getTime();
    QueryResult<R> query = new QueryResult<>(newRecords, time);
    overrideExpiredRecords(query);
  }

  /**
   * Get all the cached records.
   *
   * @return Copy of the cached records.
   * @throws StateStoreUnavailableException If the State store is not available.
   */
  public List<R> getCachedRecords() throws StateStoreUnavailableException {
    checkCacheAvailable();

    List<R> ret = new LinkedList<R>();
    this.readLock.lock();
    try {
      ret.addAll(this.records);
    } finally {
      this.readLock.unlock();
    }
    return ret;
  }

  /**
   * Get all the cached records and the time stamp of the cache.
   *
   * @return Copy of the cached records and the time stamp.
   * @throws StateStoreUnavailableException If the State store is not available.
   */
  protected QueryResult<R> getCachedRecordsAndTimeStamp()
      throws StateStoreUnavailableException {
    checkCacheAvailable();

    this.readLock.lock();
    try {
      return new QueryResult<R>(this.records, this.timestamp);
    } finally {
      this.readLock.unlock();
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop DisabledNameserviceStore 源码

hadoop MembershipStore 源码

hadoop MountTableStore 源码

hadoop RecordStore 源码

hadoop RouterStore 源码

hadoop StateStoreCache 源码

hadoop StateStoreCacheUpdateService 源码

hadoop StateStoreConnectionMonitorService 源码

hadoop StateStoreService 源码

hadoop StateStoreUnavailableException 源码

0  赞