hadoop ReadBufferWorker 源码

  • 2022-10-20
  • 浏览 (180)

haddop ReadBufferWorker 代码

文件路径:/hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/ReadBufferWorker.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.fs.azurebfs.services;

import java.io.IOException;
import java.util.concurrent.CountDownLatch;

import org.apache.hadoop.fs.PathIOException;
import org.apache.hadoop.fs.azurebfs.contracts.services.ReadBufferStatus;

class ReadBufferWorker implements Runnable {

  protected static final CountDownLatch UNLEASH_WORKERS = new CountDownLatch(1);
  private int id;

  ReadBufferWorker(final int id) {
    this.id = id;
  }

  /**
   * return the ID of ReadBufferWorker.
   */
  public int getId() {
    return this.id;
  }

  /**
   * Waits until a buffer becomes available in ReadAheadQueue.
   * Once a buffer becomes available, reads the file specified in it and then posts results back to buffer manager.
   * Rinse and repeat. Forever.
   */
  public void run() {
    try {
      UNLEASH_WORKERS.await();
    } catch (InterruptedException ex) {
      Thread.currentThread().interrupt();
    }
    ReadBufferManager bufferManager = ReadBufferManager.getBufferManager();
    ReadBuffer buffer;
    while (true) {
      try {
        buffer = bufferManager.getNextBlockToRead();   // blocks, until a buffer is available for this thread
      } catch (InterruptedException ex) {
        Thread.currentThread().interrupt();
        return;
      }
      if (buffer != null) {
        try {
          // do the actual read, from the file.
          int bytesRead = buffer.getStream().readRemote(
              buffer.getOffset(),
              buffer.getBuffer(),
              0,
              // If AbfsInputStream was created with bigger buffer size than
              // read-ahead buffer size, make sure a valid length is passed
              // for remote read
              Math.min(buffer.getRequestedLength(), buffer.getBuffer().length),
                  buffer.getTracingContext());

          bufferManager.doneReading(buffer, ReadBufferStatus.AVAILABLE, bytesRead);  // post result back to ReadBufferManager
        } catch (IOException ex) {
          buffer.setErrException(ex);
          bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
        } catch (Exception ex) {
          buffer.setErrException(new PathIOException(buffer.getStream().getPath(), ex));
          bufferManager.doneReading(buffer, ReadBufferStatus.READ_FAILED, 0);
        }
      }
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop AbfsAclHelper 源码

hadoop AbfsClient 源码

hadoop AbfsClientContext 源码

hadoop AbfsClientContextBuilder 源码

hadoop AbfsClientRenameResult 源码

hadoop AbfsClientThrottlingAnalyzer 源码

hadoop AbfsClientThrottlingIntercept 源码

hadoop AbfsCounters 源码

hadoop AbfsErrors 源码

hadoop AbfsHttpHeader 源码

0  赞