hadoop ShortCircuitRegistry 源码

  • 2022-10-20
  • 浏览 (220)

haddop ShortCircuitRegistry 代码

文件路径:/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/ShortCircuitRegistry.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.hdfs.server.datanode;

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT;

import java.io.Closeable;
import java.io.FileInputStream;
import java.io.IOException;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Set;

import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.InvalidRequestException;
import org.apache.hadoop.hdfs.ExtendedBlockId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.ShmId;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.Slot;
import org.apache.hadoop.hdfs.shortcircuit.ShortCircuitShm.SlotId;
import org.apache.hadoop.io.nativeio.SharedFileDescriptorFactory;
import org.apache.hadoop.net.unix.DomainSocket;
import org.apache.hadoop.net.unix.DomainSocketWatcher;
import org.apache.hadoop.hdfs.shortcircuit.DfsClientShmManager;

import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.collect.HashMultimap;

/**
 * Manages client short-circuit memory segments on the DataNode.
 *
 * DFSClients request shared memory segments from the DataNode.  The 
 * ShortCircuitRegistry generates and manages these segments.  Each segment
 * has a randomly generated 128-bit ID which uniquely identifies it.  The
 * segments each contain several "slots."
 *
 * Before performing a short-circuit read, DFSClients must request a pair of
 * file descriptors from the DataNode via the REQUEST_SHORT_CIRCUIT_FDS
 * operation.  As part of this operation, DFSClients pass the ID of the shared
 * memory segment they would like to use to communicate information about this
 * replica, as well as the slot number within that segment they would like to
 * use.  Slot allocation is always done by the client.
 *
 * Slots are used to track the state of the block on the both the client and
 * datanode. When this DataNode mlocks a block, the corresponding slots for the
 * replicas are marked as "anchorable".  Anchorable blocks can be safely read
 * without verifying the checksum.  This means that BlockReaderLocal objects
 * using these replicas can skip checksumming.  It also means that we can do
 * zero-copy reads on these replicas (the ZCR interface has no way of
 * verifying checksums.)
 * 
 * When a DN needs to munlock a block, it needs to first wait for the block to
 * be unanchored by clients doing a no-checksum read or a zero-copy read. The 
 * DN also marks the block's slots as "unanchorable" to prevent additional 
 * clients from initiating these operations in the future.
 * 
 * The counterpart of this class on the client is {@link DfsClientShmManager}.
 */
public class ShortCircuitRegistry {
  public static final Logger LOG =
      LoggerFactory.getLogger(ShortCircuitRegistry.class);

  private static final int SHM_LENGTH = 8192;

  public static class RegisteredShm extends ShortCircuitShm
      implements DomainSocketWatcher.Handler {
    private final String clientName;
    private final ShortCircuitRegistry registry;

    RegisteredShm(String clientName, ShmId shmId, FileInputStream stream,
        ShortCircuitRegistry registry) throws IOException {
      super(shmId, stream);
      this.clientName = clientName;
      this.registry = registry;
    }

    @Override
    public boolean handle(DomainSocket sock) {
      synchronized (registry) {
        synchronized (this) {
          registry.removeShm(this);
        }
      }
      return true;
    }

    String getClientName() {
      return clientName;
    }
  }

  public synchronized void removeShm(ShortCircuitShm shm) {
    if (LOG.isTraceEnabled()) {
      LOG.trace("removing shm " + shm);
    }
    // Stop tracking the shmId.
    RegisteredShm removedShm = segments.remove(shm.getShmId());
    Preconditions.checkState(removedShm == shm,
        "failed to remove " + shm.getShmId());
    // Stop tracking the slots.
    for (Iterator<Slot> iter = shm.slotIterator(); iter.hasNext(); ) {
      Slot slot = iter.next();
      boolean removed = slots.remove(slot.getBlockId(), slot);
      Preconditions.checkState(removed);
      slot.makeInvalid();
    }
    // De-allocate the memory map and close the shared file. 
    shm.free();
  }

  /**
   * Whether or not the registry is enabled.
   */
  private boolean enabled;

  /**
   * The factory which creates shared file descriptors.
   */
  private final SharedFileDescriptorFactory shmFactory;
  
  /**
   * A watcher which sends out callbacks when the UNIX domain socket
   * associated with a shared memory segment closes.
   */
  private final DomainSocketWatcher watcher;

  private final HashMap<ShmId, RegisteredShm> segments =
      new HashMap<ShmId, RegisteredShm>(0);
  
  private final HashMultimap<ExtendedBlockId, Slot> slots =
      HashMultimap.create(0, 1);
  
  public ShortCircuitRegistry(Configuration conf) throws IOException {
    boolean enabled = false;
    SharedFileDescriptorFactory shmFactory = null;
    DomainSocketWatcher watcher = null;
    try {
      int interruptCheck = conf.getInt(
          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS,
          DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS_DEFAULT);
      if (interruptCheck <= 0) {
        throw new IOException(
            DFS_SHORT_CIRCUIT_SHARED_MEMORY_WATCHER_INTERRUPT_CHECK_MS +
            " was set to " + interruptCheck);
      }
      String[] shmPaths =
          conf.getTrimmedStrings(DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS);
      if (shmPaths.length == 0) {
        shmPaths =
            DFS_DATANODE_SHARED_FILE_DESCRIPTOR_PATHS_DEFAULT.split(",");
      }
      shmFactory = SharedFileDescriptorFactory.
          create("HadoopShortCircuitShm_", shmPaths);
      String dswLoadingFailure = DomainSocketWatcher.getLoadingFailureReason();
      if (dswLoadingFailure != null) {
        throw new IOException(dswLoadingFailure);
      }
      watcher = new DomainSocketWatcher(interruptCheck, "datanode");
      enabled = true;
      if (LOG.isDebugEnabled()) {
        LOG.debug("created new ShortCircuitRegistry with interruptCheck=" +
                  interruptCheck + ", shmPath=" + shmFactory.getPath());
      }
    } catch (IOException e) {
      if (LOG.isDebugEnabled()) {
        LOG.debug("Disabling ShortCircuitRegistry", e);
      }
    } finally {
      this.enabled = enabled;
      this.shmFactory = shmFactory;
      this.watcher = watcher;
    }
  }

  /**
   * Process a block mlock event from the FsDatasetCache.
   *
   * @param blockId    The block that was mlocked.
   */
  public synchronized void processBlockMlockEvent(ExtendedBlockId blockId) {
    if (!enabled) return;
    Set<Slot> affectedSlots = slots.get(blockId);
    for (Slot slot : affectedSlots) {
      slot.makeAnchorable();
    }
  }

  /**
   * Mark any slots associated with this blockId as unanchorable.
   *
   * @param blockId        The block ID.
   * @return               True if we should allow the munlock request.
   */
  public synchronized boolean processBlockMunlockRequest(
      ExtendedBlockId blockId) {
    if (!enabled) return true;
    boolean allowMunlock = true;
    Set<Slot> affectedSlots = slots.get(blockId);
    for (Slot slot : affectedSlots) {
      slot.makeUnanchorable();
      if (slot.isAnchored()) {
        allowMunlock = false;
      }
    }
    return allowMunlock;
  }

  /**
   * Invalidate any slot associated with a blockId that we are invalidating
   * (deleting) from this DataNode.  When a slot is invalid, the DFSClient will
   * not use the corresponding replica for new read or mmap operations (although
   * existing, ongoing read or mmap operations will complete.)
   *
   * @param blockId        The block ID.
   */
  public synchronized void processBlockInvalidation(ExtendedBlockId blockId) {
    if (!enabled) return;
    final Set<Slot> affectedSlots = slots.get(blockId);
    if (!affectedSlots.isEmpty()) {
      final StringBuilder bld = new StringBuilder();
      String prefix = "";
      bld.append("Block ").append(blockId).append(" has been invalidated.  ").
          append("Marking short-circuit slots as invalid: ");
      for (Slot slot : affectedSlots) {
        slot.makeInvalid();
        bld.append(prefix).append(slot.toString());
        prefix = ", ";
      }
      LOG.info(bld.toString());
    }
  }

  public synchronized String getClientNames(ExtendedBlockId blockId) {
    if (!enabled) return "";
    final HashSet<String> clientNames = new HashSet<String>();
    final Set<Slot> affectedSlots = slots.get(blockId);
    for (Slot slot : affectedSlots) {
      clientNames.add(((RegisteredShm)slot.getShm()).getClientName());
    }
    return Joiner.on(",").join(clientNames);
  }

  public static class NewShmInfo implements Closeable {
    private final ShmId shmId;
    private final FileInputStream stream;

    NewShmInfo(ShmId shmId, FileInputStream stream) {
      this.shmId = shmId;
      this.stream = stream;
    }

    public ShmId getShmId() {
      return shmId;
    }

    public FileInputStream getFileStream() {
      return stream;
    }

    @Override
    public void close() throws IOException {
      stream.close();
    }
  }

  /**
   * Handle a DFSClient request to create a new memory segment.
   *
   * @param clientName    Client name as reported by the client.
   * @param sock          The DomainSocket to associate with this memory
   *                        segment.  When this socket is closed, or the
   *                        other side writes anything to the socket, the
   *                        segment will be closed.  This can happen at any
   *                        time, including right after this function returns.
   * @return              A NewShmInfo object.  The caller must close the
   *                        NewShmInfo object once they are done with it.
   * @throws IOException  If the new memory segment could not be created.
   */
  public NewShmInfo createNewMemorySegment(String clientName,
      DomainSocket sock) throws IOException {
    NewShmInfo info = null;
    RegisteredShm shm = null;
    ShmId shmId = null;
    synchronized (this) {
      if (!enabled) {
        if (LOG.isTraceEnabled()) {
          LOG.trace("createNewMemorySegment: ShortCircuitRegistry is " +
              "not enabled.");
        }
        throw new UnsupportedOperationException();
      }
      FileInputStream fis = null;
      try {
        do {
          shmId = ShmId.createRandom();
        } while (segments.containsKey(shmId));
        fis = shmFactory.createDescriptor(clientName, SHM_LENGTH);
        shm = new RegisteredShm(clientName, shmId, fis, this);
      } finally {
        if (shm == null) {
          IOUtils.closeStream(fis);
        }
      }
      info = new NewShmInfo(shmId, fis);
      segments.put(shmId, shm);
    }
    // Drop the registry lock to prevent deadlock.
    // After this point, RegisteredShm#handle may be called at any time.
    watcher.add(sock, shm);
    if (LOG.isTraceEnabled()) {
      LOG.trace("createNewMemorySegment: created " + info.shmId);
    }
    return info;
  }
  
  public synchronized void registerSlot(ExtendedBlockId blockId, SlotId slotId,
      boolean isCached) throws InvalidRequestException {
    if (!enabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace(this + " can't register a slot because the " +
            "ShortCircuitRegistry is not enabled.");
      }
      throw new UnsupportedOperationException();
    }
    ShmId shmId = slotId.getShmId();
    RegisteredShm shm = segments.get(shmId);
    if (shm == null) {
      throw new InvalidRequestException("there is no shared memory segment " +
          "registered with shmId " + shmId);
    }
    Slot slot = shm.registerSlot(slotId.getSlotIdx(), blockId);
    if (isCached) {
      slot.makeAnchorable();
    } else {
      slot.makeUnanchorable();
    }
    boolean added = slots.put(blockId, slot);
    Preconditions.checkState(added);
    if (LOG.isTraceEnabled()) {
      LOG.trace(this + ": registered " + blockId + " with slot " +
        slotId + " (isCached=" + isCached + ")");
    }
  }
  
  public synchronized void unregisterSlot(SlotId slotId)
      throws InvalidRequestException {
    if (!enabled) {
      if (LOG.isTraceEnabled()) {
        LOG.trace("unregisterSlot: ShortCircuitRegistry is " +
            "not enabled.");
      }
      throw new UnsupportedOperationException();
    }
    ShmId shmId = slotId.getShmId();
    RegisteredShm shm = segments.get(shmId);
    if (shm == null) {
      throw new InvalidRequestException("there is no shared memory segment " +
          "registered with shmId " + shmId);
    }
    Slot slot = shm.getSlot(slotId.getSlotIdx());
    slot.makeInvalid();
    shm.unregisterSlot(slotId.getSlotIdx());
    slots.remove(slot.getBlockId(), slot);
  }
  
  public void shutdown() {
    synchronized (this) {
      if (!enabled) return;
      enabled = false;
    }
    IOUtils.closeStream(watcher);
  }

  public static interface Visitor {
    boolean accept(HashMap<ShmId, RegisteredShm> segments,
                HashMultimap<ExtendedBlockId, Slot> slots);
  }

  @VisibleForTesting
  public synchronized boolean visit(Visitor visitor) {
    return visitor.accept(segments, slots);
  }

  @VisibleForTesting
  public int getShmNum() {
    return segments.size();
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BPOfferService 源码

hadoop BPServiceActor 源码

hadoop BPServiceActorAction 源码

hadoop BPServiceActorActionException 源码

hadoop BlockChecksumHelper 源码

hadoop BlockPoolManager 源码

hadoop BlockPoolSliceStorage 源码

hadoop BlockReceiver 源码

hadoop BlockRecoveryWorker 源码

hadoop BlockScanner 源码

0  赞