hadoop WebHdfsFileSystem 源码

  • 2022-10-20
haddop WebHdfsFileSystem 代码


 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *     http://www.apache.org/licenses/LICENSE-2.0
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * See the License for the specific language governing permissions and
 * limitations under the License.

package org.apache.hadoop.hdfs.web;

import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_CUSTOM_HEADER_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_ENABLED_KEY;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_DEFAULT;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_WEBHDFS_REST_CSRF_METHODS_TO_IGNORE_KEY;

import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
import java.io.EOFException;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.InvocationTargetException;
import java.net.HttpURLConnection;
import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.net.URI;
import java.net.URL;
import java.nio.charset.StandardCharsets;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Base64;
import java.util.Base64.Decoder;
import java.util.Collection;
import java.util.EnumSet;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.StringTokenizer;
import java.util.concurrent.TimeUnit;

import javax.ws.rs.core.HttpHeaders;
import javax.ws.rs.core.MediaType;

import org.apache.commons.io.IOUtils;
import org.apache.commons.io.input.BoundedInputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.crypto.key.KeyProvider;
import org.apache.hadoop.crypto.key.KeyProviderTokenIssuer;
import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.ContentSummary;
import org.apache.hadoop.fs.CreateFlag;
import org.apache.hadoop.fs.DelegationTokenRenewer;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FSInputStream;
import org.apache.hadoop.fs.FileEncryptionInfo;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsServerDefaults;
import org.apache.hadoop.fs.GlobalStorageStatistics;
import org.apache.hadoop.fs.GlobalStorageStatistics.StorageStatisticsProvider;
import org.apache.hadoop.fs.MultipartUploaderBuilder;
import org.apache.hadoop.fs.QuotaUsage;
import org.apache.hadoop.fs.StorageStatistics;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.fs.impl.FileSystemMultipartUploaderBuilder;
import org.apache.hadoop.fs.permission.FsCreateModes;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics;
import org.apache.hadoop.hdfs.DFSOpsCountStatistics.OpType;
import org.apache.hadoop.fs.MD5MD5CRC32FileChecksum;
import org.apache.hadoop.fs.Options;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.XAttrCodec;
import org.apache.hadoop.fs.XAttrSetFlag;
import org.apache.hadoop.fs.permission.AclEntry;
import org.apache.hadoop.fs.permission.AclStatus;
import org.apache.hadoop.fs.permission.FsAction;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.DFSUtilClient;
import org.apache.hadoop.hdfs.HAUtilClient;
import org.apache.hadoop.hdfs.HdfsKMSUtil;
import org.apache.hadoop.hdfs.client.DfsPathCapabilities;
import org.apache.hadoop.hdfs.client.HdfsClientConfigKeys;
import org.apache.hadoop.hdfs.protocol.BlockStoragePolicy;
import org.apache.hadoop.hdfs.protocol.DirectoryListing;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.apache.hadoop.hdfs.protocol.HdfsConstants;
import org.apache.hadoop.hdfs.protocol.HdfsFileStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReport;
import org.apache.hadoop.hdfs.protocol.SnapshotDiffReportListing;
import org.apache.hadoop.hdfs.protocol.SnapshottableDirectoryStatus;
import org.apache.hadoop.hdfs.protocol.SnapshotStatus;
import org.apache.hadoop.hdfs.protocol.proto.HdfsProtos.FileEncryptionInfoProto;
import org.apache.hadoop.hdfs.protocolPB.PBHelperClient;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.web.resources.*;
import org.apache.hadoop.hdfs.web.resources.HttpOpParam.Op;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryUtils;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.StandbyException;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.security.token.TokenSelector;
import org.apache.hadoop.security.token.delegation.AbstractDelegationTokenSelector;
import org.apache.hadoop.security.token.DelegationTokenIssuer;
import org.apache.hadoop.util.JsonSerialization;
import org.apache.hadoop.util.KMSUtil;
import org.apache.hadoop.util.Lists;
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Charsets;
import org.apache.hadoop.util.Preconditions;

/** A FileSystem for HDFS over the web. */
public class WebHdfsFileSystem extends FileSystem
    implements DelegationTokenRenewer.Renewable,
    TokenAspect.TokenManagementDelegator, KeyProviderTokenIssuer {
  public static final Logger LOG = LoggerFactory
  /** WebHdfs version. */
  public static final int VERSION = 1;
  /** Http URI: http://namenode:port/{PATH_PREFIX}/path/to/file */
  public static final String PATH_PREFIX = "/" + WebHdfsConstants.WEBHDFS_SCHEME
      + "/v" + VERSION;
  public static final String EZ_HEADER = "X-Hadoop-Accept-EZ";
  public static final String FEFINFO_HEADER = "X-Hadoop-feInfo";
  public static final String DFS_HTTP_POLICY_KEY = "dfs.http.policy";

   * Default connection factory may be overridden in tests to use smaller
   * timeout values
  protected URLConnectionFactory connectionFactory;

  public static final String CANT_FALLBACK_TO_INSECURE_MSG =
      "The client is configured to only allow connecting to secure cluster";

  private boolean canRefreshDelegationToken;

  private UserGroupInformation ugi;
  private URI uri;
  private Token<?> delegationToken;
  protected Text tokenServiceName;
  private RetryPolicy retryPolicy = null;
  private Path workingDir;
  private Path cachedHomeDirectory;
  private InetSocketAddress nnAddrs[];
  private int currentNNAddrIndex;
  private boolean disallowFallbackToInsecureCluster;
  private boolean isInsecureCluster;
  private String restCsrfCustomHeader;
  private Set<String> restCsrfMethodsToIgnore;

  private DFSOpsCountStatistics storageStatistics;
  private KeyProvider testProvider;
  private boolean isTLSKrb;

  private boolean isServerHCFSCompatible = true;

   * Return the protocol scheme for the FileSystem.
   * @return <code>webhdfs</code>
  public String getScheme() {
    return WebHdfsConstants.WEBHDFS_SCHEME;

   * return the underlying transport protocol (http / https).
  protected String getTransportScheme() {
    return "http";

  protected Text getTokenKind() {
    return WebHdfsConstants.WEBHDFS_TOKEN_KIND;

  public synchronized void initialize(URI uri, Configuration conf
  ) throws IOException {
    super.initialize(uri, conf);

    // set user and acl patterns based on configuration file

    int connectTimeout = (int) conf.getTimeDuration(

    int readTimeout = (int) conf.getTimeDuration(

    boolean isOAuth = conf.getBoolean(

    if(isOAuth) {
      LOG.debug("Enabling OAuth2 in WebHDFS");
      connectionFactory = URLConnectionFactory
          .newOAuth2URLConnectionFactory(connectTimeout, readTimeout, conf);
    } else {
      LOG.debug("Not enabling OAuth2 in WebHDFS");
      connectionFactory = URLConnectionFactory
          .newDefaultURLConnectionFactory(connectTimeout, readTimeout, conf);

    this.isTLSKrb = "HTTPS_ONLY".equals(conf.get(DFS_HTTP_POLICY_KEY));

    ugi = UserGroupInformation.getCurrentUser();
    this.uri = URI.create(uri.getScheme() + "://" + uri.getAuthority());
    this.nnAddrs = resolveNNAddr();

    boolean isHA = HAUtilClient.isClientFailoverConfigured(conf, this.uri);
    boolean isLogicalUri = isHA && HAUtilClient.isLogicalUri(conf, this.uri);
    // In non-HA or non-logical URI case, the code needs to call
    // getCanonicalUri() in order to handle the case where no port is
    // specified in the URI
    this.tokenServiceName = isLogicalUri ?
        HAUtilClient.buildTokenServiceForLogicalUri(uri, getScheme())
        : SecurityUtil.buildTokenService(getCanonicalUri());

    if (!isHA) {
      this.retryPolicy =
    } else {

      int maxFailoverAttempts = conf.getInt(
      int maxRetryAttempts = conf.getInt(
      int failoverSleepBaseMillis = conf.getInt(
      int failoverSleepMaxMillis = conf.getInt(

      this.retryPolicy = RetryPolicies
              maxFailoverAttempts, maxRetryAttempts, failoverSleepBaseMillis,

    this.workingDir = makeQualified(new Path(getHomeDirectoryString(ugi)));
    this.canRefreshDelegationToken = UserGroupInformation.isSecurityEnabled();
    this.isInsecureCluster = !this.canRefreshDelegationToken;
    this.disallowFallbackToInsecureCluster = !conf.getBoolean(
    this.delegationToken = null;

    storageStatistics = (DFSOpsCountStatistics) GlobalStorageStatistics.INSTANCE
            new StorageStatisticsProvider() {
              public StorageStatistics provide() {
                return new DFSOpsCountStatistics();

   * Initializes client-side handling of cross-site request forgery (CSRF)
   * protection by figuring out the custom HTTP headers that need to be sent in
   * requests and which HTTP methods are ignored because they do not require
   * CSRF protection.
   * @param conf configuration to read
  private void initializeRestCsrf(Configuration conf) {
    if (conf.getBoolean(DFS_WEBHDFS_REST_CSRF_ENABLED_KEY,
      this.restCsrfCustomHeader = conf.getTrimmed(
      this.restCsrfMethodsToIgnore = new HashSet<>();
    } else {
      this.restCsrfCustomHeader = null;
      this.restCsrfMethodsToIgnore = null;

   * Returns a list of strings from a comma-delimited configuration value.
   * @param conf configuration to check
   * @param name configuration property name
   * @param defaultValue default value if no value found for name
   * @return list of strings from comma-delimited configuration value, or an
   *     empty list if not found
  private static List<String> getTrimmedStringList(Configuration conf,
      String name, String defaultValue) {
    String valueString = conf.get(name, defaultValue);
    if (valueString == null) {
      return new ArrayList<>();
    return new ArrayList<>(StringUtils.getTrimmedStringCollection(valueString));

  public URI getCanonicalUri() {
    return super.getCanonicalUri();

  TokenSelector<DelegationTokenIdentifier> tokenSelector =
      new AbstractDelegationTokenSelector<DelegationTokenIdentifier>(getTokenKind()){};

  // the first getAuthParams() for a non-token op will either get the
  // internal token from the ugi or lazy fetch one
  protected synchronized Token<?> getDelegationToken() throws IOException {
    if (delegationToken == null) {
      Token<?> token = tokenSelector.selectToken(
          new Text(getCanonicalServiceName()), ugi.getTokens());
      // ugi tokens are usually indicative of a task which can't
      // refetch tokens.  Don't attempt to fetch tokens from the
      // namenode in this situation.
      if (token != null) {
        LOG.debug("Using UGI token: {}", token);
        canRefreshDelegationToken = false;
      } else {
        if (canRefreshDelegationToken) {
          token = getDelegationToken(null);
          if (token != null) {
            LOG.debug("Fetched new token: {}", token);
          } else { // security is disabled
            canRefreshDelegationToken = false;
            isInsecureCluster = true;
    return delegationToken;

  synchronized boolean replaceExpiredDelegationToken() throws IOException {
    boolean replaced = false;
    if (attemptReplaceDelegationTokenFromUGI()) {
      return true;
    if (canRefreshDelegationToken) {
      Token<?> token = getDelegationToken(null);
      LOG.debug("Replaced expired token: {}", token);
      replaced = (token != null);
    return replaced;

  private synchronized boolean attemptReplaceDelegationTokenFromUGI() {
    Token<?> token = tokenSelector.selectToken(
            new Text(getCanonicalServiceName()), ugi.getTokens());
    if (token != null && !token.equals(delegationToken)) {
      LOG.debug("Replaced expired token with new UGI token: {}", token);
      return true;
    return false;

  protected int getDefaultPort() {
    return HdfsClientConfigKeys.DFS_NAMENODE_HTTP_PORT_DEFAULT;

  public URI getUri() {
    return this.uri;

  protected URI canonicalizeUri(URI uri) {
    return NetUtils.getCanonicalUri(uri, getDefaultPort());

  /** @return the home directory */
  public static String getHomeDirectoryString(final UserGroupInformation ugi) {
    return "/user/" + ugi.getShortUserName();

  public Path getHomeDirectory() {
    if (cachedHomeDirectory == null) {
      final HttpOpParam.Op op = GetOpParam.Op.GETHOMEDIRECTORY;
      try {
        String pathFromDelegatedFS = new FsPathResponseRunner<String>(op, null){
          String decodeResponse(Map<?, ?> json) throws IOException {
            return JsonUtilClient.getPath(json);
        }   .run();

        cachedHomeDirectory = new Path(pathFromDelegatedFS).makeQualified(
            this.getUri(), null);

      } catch (IOException e) {
        LOG.error("Unable to get HomeDirectory from original File System", e);
        cachedHomeDirectory = new Path("/user/" + ugi.getShortUserName())
            .makeQualified(this.getUri(), null);
    return cachedHomeDirectory;

  public synchronized Path getWorkingDirectory() {
    return workingDir;

  public synchronized void setWorkingDirectory(final Path dir) {
    Path absolutePath = makeAbsolute(dir);
    String result = absolutePath.toUri().getPath();
    if (!DFSUtilClient.isValidName(result)) {
      throw new IllegalArgumentException("Invalid DFS directory name " +
    workingDir = absolutePath;

  private Path makeAbsolute(Path f) {
    return f.isAbsolute()? f: new Path(workingDir, f);

  public static Map<?, ?> jsonParse(final HttpURLConnection c,
      final boolean useErrorStream) throws IOException {
    if (c.getContentLength() == 0) {
      return null;
    final InputStream in = useErrorStream ?
        c.getErrorStream() : c.getInputStream();
    if (in == null) {
      throw new IOException("The " + (useErrorStream? "error": "input") +
          " stream is null.");
    try {
      final String contentType = c.getContentType();
      if (contentType != null) {
        final MediaType parsed = MediaType.valueOf(contentType);
        if (!MediaType.APPLICATION_JSON_TYPE.isCompatible(parsed)) {
          throw new IOException("Content-Type \"" + contentType
              + "\" is incompatible with \"" + MediaType.APPLICATION_JSON
              + "\" (parsed=\"" + parsed + "\")");
      return JsonSerialization.mapReader().readValue(in);
    } finally {

  private static Map<?, ?> validateResponse(final HttpOpParam.Op op,
      final HttpURLConnection conn, boolean unwrapException)
      throws IOException {
    final int code = conn.getResponseCode();
    // server is demanding an authentication we don't support
    if (code == HttpURLConnection.HTTP_UNAUTHORIZED) {
      // match hdfs/rpc exception
      throw new AccessControlException(conn.getResponseMessage());
    if (code != op.getExpectedHttpResponseCode()) {
      final Map<?, ?> m;
      try {
        m = jsonParse(conn, true);
      } catch(Exception e) {
        throw new IOException("Unexpected HTTP response: code=" + code + " != "
            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
            + ", message=" + conn.getResponseMessage(), e);

      if (m == null) {
        throw new IOException("Unexpected HTTP response: code=" + code + " != "
            + op.getExpectedHttpResponseCode() + ", " + op.toQueryString()
            + ", message=" + conn.getResponseMessage());
      } else if (m.get(RemoteException.class.getSimpleName()) == null) {
        return m;

      IOException re = JsonUtilClient.toRemoteException(m);

      //check if exception is due to communication with a Standby name node
      if (re.getMessage() != null && re.getMessage().endsWith(
          StandbyException.class.getSimpleName())) {
        LOG.trace("Detected StandbyException", re);
        throw new IOException(re);
      // extract UGI-related exceptions and unwrap InvalidToken
      // the NN mangles these exceptions but the DN does not and may need
      // to re-fetch a token if either report the token is expired
      if (re.getMessage() != null && re.getMessage().startsWith(
          SecurityUtil.FAILED_TO_GET_UGI_MSG_HEADER)) {
        String[] parts = re.getMessage().split(":\\s+", 3);
        re = new RemoteException(parts[1], parts[2]);
        re = ((RemoteException)re).unwrapRemoteException(InvalidToken.class);
      throw unwrapException? toIOException(re): re;
    return null;

   * Covert an exception to an IOException.
   * For a non-IOException, wrap it with IOException.
   * For a RemoteException, unwrap it.
   * For an IOException which is not a RemoteException, return it.
  private static IOException toIOException(Exception e) {
    if (!(e instanceof IOException)) {
      return new IOException(e);

    final IOException ioe = (IOException)e;
    if (!(ioe instanceof RemoteException)) {
      return ioe;

    return ((RemoteException)ioe).unwrapRemoteException();

  private synchronized InetSocketAddress getCurrentNNAddr() {
    return nnAddrs[currentNNAddrIndex];

   * Reset the appropriate state to gracefully fail over to another name node
  private synchronized void resetStateToFailOver() {
    currentNNAddrIndex = (currentNNAddrIndex + 1) % nnAddrs.length;

   * Return a URL pointing to given path on the namenode.
   * @param path to obtain the URL for
   * @param query string to append to the path
   * @return namenode URL referring to the given path
   * @throws IOException on error constructing the URL
  private URL getNamenodeURL(String path, String query) throws IOException {
    InetSocketAddress nnAddr = getCurrentNNAddr();
    final URL url = new URL(getTransportScheme(), nnAddr.getHostName(),
        nnAddr.getPort(), path + '?' + query);
    LOG.trace("url={}", url);
    return url;

  private synchronized Param<?, ?>[] getAuthParameters(final HttpOpParam.Op op)
      throws IOException {
    List<Param<?,?>> authParams = Lists.newArrayList();
    // Skip adding delegation token for token operations because these
    // operations require authentication.
    Token<?> token = null;
    if (!op.getRequireAuth()) {
      token = getDelegationToken();
    if (token != null) {
      authParams.add(new DelegationParam(token.encodeToUrlString()));
    } else {
      UserGroupInformation userUgi = ugi;
      UserGroupInformation realUgi = userUgi.getRealUser();
      if (realUgi != null) { // proxy user
        authParams.add(new DoAsParam(userUgi.getShortUserName()));
        userUgi = realUgi;
      UserParam userParam = new UserParam((userUgi.getShortUserName()));

      //in insecure, use user.name parameter, in secure, use spnego auth
      if(isInsecureCluster) {
    return authParams.toArray(new Param<?,?>[0]);

  URL toUrl(final HttpOpParam.Op op, final Path fspath,
      final Param<?,?>... parameters) throws IOException {
    //initialize URI path and query

    final String path = PATH_PREFIX
        + (fspath == null? "/": makeQualified(fspath).toUri().getRawPath());
    final String query = op.toQueryString()
        + Param.toSortedString("&", getAuthParameters(op))
        + Param.toSortedString("&", parameters);
    final URL url = getNamenodeURL(path, query);
    LOG.trace("url={}", url);
    return url;

   * This class is for initialing a HTTP connection, connecting to server,
   * obtaining a response, and also handling retry on failures.
  abstract class AbstractRunner<T> {
    abstract protected URL getUrl() throws IOException;

    protected final HttpOpParam.Op op;
    private final boolean redirected;
    protected ExcludeDatanodesParam excludeDatanodes =
        new ExcludeDatanodesParam("");

    private boolean checkRetry;
    private String redirectHost;
    private boolean followRedirect = true;

    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected) {
      this.op = op;
      this.redirected = redirected;

    protected AbstractRunner(final HttpOpParam.Op op, boolean redirected,
        boolean followRedirect) {
      this(op, redirected);
      this.followRedirect = followRedirect;

    T run() throws IOException {
      UserGroupInformation connectUgi = ugi.getRealUser();
      if (connectUgi == null) {
        connectUgi = ugi;
      if (op.getRequireAuth()) {
      try {
        // the entire lifecycle of the connection must be run inside the
        // doAs to ensure authentication is performed correctly
        return connectUgi.doAs(
            new PrivilegedExceptionAction<T>() {
              public T run() throws IOException {
                return runWithRetry();
      } catch (InterruptedException e) {
        throw new IOException(e);

     * Two-step requests redirected to a DN
     * Create/Append:
     * Step 1) Submit a Http request with neither auto-redirect nor data.
     * Step 2) Submit another Http request with the URL from the Location header
     * with data.
     * The reason of having two-step create/append is for preventing clients to
     * send out the data before the redirect. This issue is addressed by the
     * "Expect: 100-continue" header in HTTP/1.1; see RFC 2616, Section 8.2.3.
     * Unfortunately, there are software library bugs (e.g. Jetty 6 http server
     * and Java 6 http client), which do not correctly implement "Expect:
     * 100-continue". The two-step create/append is a temporary workaround for
     * the software library bugs.
     * Open/Checksum
     * Also implements two-step connects for other operations redirected to
     * a DN such as open and checksum
    protected HttpURLConnection connect(URL url) throws IOException {
      //redirect hostname and port
      redirectHost = null;

      if (url.getProtocol().equals("http") &&
        UserGroupInformation.isSecurityEnabled() &&
        isTLSKrb) {
        throw new IOException("Access denied: dfs.http.policy is HTTPS_ONLY.");

      // resolve redirects for a DN operation unless already resolved
      if (op.getRedirect() && !redirected) {
        final HttpOpParam.Op redirectOp =
        final HttpURLConnection conn = connect(redirectOp, url);
        // application level proxy like httpfs might not issue a redirect
        if (conn.getResponseCode() == op.getExpectedHttpResponseCode()) {
          return conn;
        try {
          validateResponse(redirectOp, conn, false);
          url = new URL(conn.getHeaderField("Location"));
          redirectHost = url.getHost() + ":" + url.getPort();
        } finally {
          // TODO: consider not calling conn.disconnect() to allow connection reuse
          // See http://tinyurl.com/java7-http-keepalive
        if (!followRedirect) {
          return conn;
      try {
        final HttpURLConnection conn = connect(op, url);
        // output streams will validate on close
        if (!op.getDoOutput()) {
          validateResponse(op, conn, false);
        return conn;
      } catch (IOException ioe) {
        if (redirectHost != null) {
          if (excludeDatanodes.getValue() != null) {
            excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
                + excludeDatanodes.getValue());
          } else {
            excludeDatanodes = new ExcludeDatanodesParam(redirectHost);
        throw ioe;

    private HttpURLConnection connect(final HttpOpParam.Op op, final URL url)
        throws IOException {
      final HttpURLConnection conn =
      final boolean doOutput = op.getDoOutput();
      if (restCsrfCustomHeader != null &&
          !restCsrfMethodsToIgnore.contains(op.getType().name())) {
        // The value of the header is unimportant.  Only its presence matters.
        conn.setRequestProperty(restCsrfCustomHeader, "\"\"");
      conn.setRequestProperty(EZ_HEADER, "true");
      switch (op.getType()) {
      // if not sending a message body for a POST or PUT operation, need
      // to ensure the server/proxy knows this
      case POST:
      case PUT: {
        if (!doOutput) {
          // explicitly setting content-length to 0 won't do spnego!!
          // opening and closing the stream will send "Content-Length: 0"
        } else {
          conn.setChunkedStreamingMode(32 << 10); //32kB-chunk
      return conn;

    private T runWithRetry() throws IOException {
       * Do the real work.
       * There are three cases that the code inside the loop can throw an
       * IOException:
       * <ul>
       * <li>The connection has failed (e.g., ConnectException,
       * @see FailoverOnNetworkExceptionRetry for more details)</li>
       * <li>The namenode enters the standby state (i.e., StandbyException).</li>
       * <li>The server returns errors for the command (i.e., RemoteException)</li>
       * </ul>
       * The call to shouldRetry() will conduct the retry policy. The policy
       * examines the exception and swallows it if it decides to rerun the work.
      for(int retry = 0; ; retry++) {
        checkRetry = !redirected;
        final URL url = getUrl();
        try {
          final HttpURLConnection conn = connect(url);
          return getResponse(conn);
        } catch (AccessControlException ace) {
          // no retries for auth failures
          throw ace;
        } catch (InvalidToken it) {
          // try to replace the expired token with a new one.  the attempt
          // to acquire a new token must be outside this operation's retry
          // so if it fails after its own retries, this operation fails too.
          if (op.getRequireAuth() || !replaceExpiredDelegationToken()) {
            throw it;
        } catch (IOException ioe) {
          // Attempt to include the redirected node in the exception. If the
          // attempt to recreate the exception fails, just use the original.
          String node = redirectHost;
          if (node == null) {
            node = url.getAuthority();
          try {
            IOException newIoe = ioe.getClass().getConstructor(String.class)
                .newInstance(node + ": " + ioe.getMessage());
            ioe = newIoe;
          } catch (NoSuchMethodException | SecurityException 
                   | InstantiationException | IllegalAccessException
                   | IllegalArgumentException | InvocationTargetException e) {
          shouldRetry(ioe, retry);

    private void shouldRetry(final IOException ioe, final int retry
    ) throws IOException {
      InetSocketAddress nnAddr = getCurrentNNAddr();
      if (checkRetry) {
        try {
          final RetryPolicy.RetryAction a = retryPolicy.shouldRetry(
              ioe, retry, 0, true);

          boolean isRetry =
              a.action == RetryPolicy.RetryAction.RetryDecision.RETRY;
          boolean isFailoverAndRetry =
              a.action == RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY;
          if (isRetry || isFailoverAndRetry) {
            LOG.info("Retrying connect to namenode: {}. Already retried {}"
                    + " time(s); retry policy is {}, delay {}ms.",
                nnAddr, retry, retryPolicy, a.delayMillis);

            if (isFailoverAndRetry) {

        } catch(Exception e) {
          LOG.warn("Original exception is ", ioe);
          throw toIOException(e);
      throw toIOException(ioe);

    abstract T getResponse(HttpURLConnection conn) throws IOException;

   * Abstract base class to handle path-based operations with params
  abstract class AbstractFsPathRunner<T> extends AbstractRunner<T> {
    private final Path fspath;
    private Param<?,?>[] parameters;

    AbstractFsPathRunner(final HttpOpParam.Op op, final Path fspath,
        Param<?,?>... parameters) {
      super(op, false);
      this.fspath = fspath;
      this.parameters = parameters;

    AbstractFsPathRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
        final Path fspath) {
      super(op, false);
      this.fspath = fspath;
      this.parameters = parameters;

    protected void updateURLParameters(Param<?, ?>... p) {
      this.parameters = p;

    protected URL getUrl() throws IOException {
      if (excludeDatanodes.getValue() != null) {
        Param<?, ?>[] tmpParam = new Param<?, ?>[parameters.length + 1];
        System.arraycopy(parameters, 0, tmpParam, 0, parameters.length);
        tmpParam[parameters.length] = excludeDatanodes;
        return toUrl(op, fspath, tmpParam);
      } else {
        return toUrl(op, fspath, parameters);

    Path getFspath() {
      return fspath;

   * Default path-based implementation expects no json response
  class FsPathRunner extends AbstractFsPathRunner<Void> {
    FsPathRunner(Op op, Path fspath, Param<?,?>... parameters) {
      super(op, fspath, parameters);

    Void getResponse(HttpURLConnection conn) throws IOException {
      return null;

   * Handle path-based operations with a json response
  abstract class FsPathResponseRunner<T> extends AbstractFsPathRunner<T> {
    FsPathResponseRunner(final HttpOpParam.Op op, final Path fspath,
        Param<?,?>... parameters) {
      super(op, fspath, parameters);

    FsPathResponseRunner(final HttpOpParam.Op op, Param<?,?>[] parameters,
        final Path fspath) {
      super(op, parameters, fspath);

    final T getResponse(HttpURLConnection conn) throws IOException {
      try {
        final Map<?,?> json = jsonParse(conn, false);
        if (json == null) {
          // match exception class thrown by parser
          throw new IllegalStateException("Missing response");
        return decodeResponse(json);
      } catch (IOException ioe) {
        throw ioe;
      } catch (Exception e) { // catch json parser errors
        final IOException ioe =
            new IOException("Response decoding failure: "+e.toString(), e);
        LOG.debug("Response decoding failure.", e);
        throw ioe;
      } finally {
        // Don't call conn.disconnect() to allow connection reuse
        // See http://tinyurl.com/java7-http-keepalive

    abstract T decodeResponse(Map<?,?> json) throws IOException;

   * Handle path-based operations with json boolean response
  class FsPathBooleanRunner extends FsPathResponseRunner<Boolean> {
    FsPathBooleanRunner(Op op, Path fspath, Param<?,?>... parameters) {
      super(op, fspath, parameters);

    Boolean decodeResponse(Map<?,?> json) throws IOException {
      return (Boolean)json.get("boolean");

   * Handle create/append output streams
  class FsPathOutputStreamRunner
      extends AbstractFsPathRunner<FSDataOutputStream> {
    private final int bufferSize;

    FsPathOutputStreamRunner(Op op, Path fspath, int bufferSize,
        Param<?,?>... parameters) {
      super(op, fspath, parameters);
      this.bufferSize = bufferSize;

    FSDataOutputStream getResponse(final HttpURLConnection conn)
        throws IOException {
      return new FSDataOutputStream(new BufferedOutputStream(
          conn.getOutputStream(), bufferSize), statistics) {
        public void write(int b) throws IOException {
          try {
          } catch (IOException e) {
            LOG.warn("Write to output stream for file '{}' failed. "
                + "Attempting to fetch the cause from the connection.",
                getFspath(), e);
            validateResponse(op, conn, true);
            throw e;

        public void write(byte[] b, int off, int len) throws IOException {
          try {
            super.write(b, off, len);
          } catch (IOException e) {
            LOG.warn("Write to output stream for file '{}' failed. "
                + "Attempting to fetch the cause from the connection.",
                getFspath(), e);
            validateResponse(op, conn, true);
            throw e;

        public void close() throws IOException {
          try {
          } finally {
            try {
              validateResponse(op, conn, true);
            } finally {
              // This is a connection to DataNode.  Let's disconnect since
              // there is little chance that the connection will be reused
              // any time soonl

  class FsPathConnectionRunner extends AbstractFsPathRunner<HttpURLConnection> {
    FsPathConnectionRunner(Op op, Path fspath, Param<?,?>... parameters) {
      super(op, fspath, parameters);
    HttpURLConnection getResponse(final HttpURLConnection conn)
        throws IOException {
      return conn;

   * Used by open() which tracks the resolved url itself
  class URLRunner extends AbstractRunner<HttpURLConnection> {
    private final URL url;
    protected URL getUrl() throws IOException {
      return url;

    protected URLRunner(final HttpOpParam.Op op, final URL url,
        boolean redirected, boolean followRedirect) {
      super(op, redirected, followRedirect);
      this.url = url;

    HttpURLConnection getResponse(HttpURLConnection conn) throws IOException {
      return conn;

  private FsPermission applyUMask(FsPermission permission) {
    if (permission == null) {
      permission = FsPermission.getDefault();
    return FsCreateModes.applyUMask(permission,

  private HdfsFileStatus getHdfsFileStatus(Path f) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETFILESTATUS;
    HdfsFileStatus status = new FsPathResponseRunner<HdfsFileStatus>(op, f) {
      HdfsFileStatus decodeResponse(Map<?,?> json) {
        return JsonUtilClient.toFileStatus(json, true);
    if (status == null) {
      throw new FileNotFoundException("File does not exist: " + f);
    return status;

  public FileStatus getFileStatus(Path f) throws IOException {
    return getHdfsFileStatus(f).makeQualified(getUri(), f);

  public AclStatus getAclStatus(Path f) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETACLSTATUS;
    AclStatus status = new FsPathResponseRunner<AclStatus>(op, f) {
      AclStatus decodeResponse(Map<?,?> json) {
        return JsonUtilClient.toAclStatus(json);
    if (status == null) {
      throw new FileNotFoundException("File does not exist: " + f);
    return status;

  public boolean mkdirs(Path f, FsPermission permission) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.MKDIRS;
    final FsPermission modes = applyUMask(permission);
    return new FsPathBooleanRunner(op, f,
        new PermissionParam(modes.getMasked()),
        new UnmaskedPermissionParam(modes.getUnmasked())

  public boolean supportsSymlinks() {
    return true;

   * Create a symlink pointing to the destination path.
  public void createSymlink(Path destination, Path f, boolean createParent
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.CREATESYMLINK;
    new FsPathRunner(op, f,
        new DestinationParam(makeQualified(destination).toUri().getPath()),
        new CreateParentParam(createParent)

  public boolean rename(final Path src, final Path dst) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
    return new FsPathBooleanRunner(op, src,
        new DestinationParam(makeQualified(dst).toUri().getPath())

  public void rename(final Path src, final Path dst,
      final Options.Rename... options) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.RENAME;
    new FsPathRunner(op, src,
        new DestinationParam(makeQualified(dst).toUri().getPath()),
        new RenameOptionSetParam(options)

  public void setXAttr(Path p, String name, byte[] value,
      EnumSet<XAttrSetFlag> flag) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETXATTR;
    if (value != null) {
      new FsPathRunner(op, p, new XAttrNameParam(name), new XAttrValueParam(
          XAttrCodec.encodeValue(value, XAttrCodec.HEX)),
          new XAttrSetFlagParam(flag)).run();
    } else {
      new FsPathRunner(op, p, new XAttrNameParam(name),
          new XAttrSetFlagParam(flag)).run();

  public byte[] getXAttr(Path p, final String name) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
    return new FsPathResponseRunner<byte[]>(op, p, new XAttrNameParam(name),
        new XAttrEncodingParam(XAttrCodec.HEX)) {
      byte[] decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.getXAttr(json);

  public Map<String, byte[]> getXAttrs(Path p) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
    return new FsPathResponseRunner<Map<String, byte[]>>(op, p,
        new XAttrEncodingParam(XAttrCodec.HEX)) {
      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toXAttrs(json);

  public Map<String, byte[]> getXAttrs(Path p, final List<String> names)
      throws IOException {
    Preconditions.checkArgument(names != null && !names.isEmpty(),
        "XAttr names cannot be null or empty.");
    Param<?,?>[] parameters = new Param<?,?>[names.size() + 1];
    for (int i = 0; i < parameters.length - 1; i++) {
      parameters[i] = new XAttrNameParam(names.get(i));
    parameters[parameters.length - 1] = new XAttrEncodingParam(XAttrCodec.HEX);

    final HttpOpParam.Op op = GetOpParam.Op.GETXATTRS;
    return new FsPathResponseRunner<Map<String, byte[]>>(op, parameters, p) {
      Map<String, byte[]> decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toXAttrs(json);

  public List<String> listXAttrs(Path p) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.LISTXATTRS;
    return new FsPathResponseRunner<List<String>>(op, p) {
      List<String> decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toXAttrNames(json);

  public void removeXAttr(Path p, String name) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.REMOVEXATTR;
    new FsPathRunner(op, p, new XAttrNameParam(name)).run();

  public void setOwner(final Path p, final String owner, final String group
  ) throws IOException {
    if (owner == null && group == null) {
      throw new IOException("owner == null && group == null");

    final HttpOpParam.Op op = PutOpParam.Op.SETOWNER;
    new FsPathRunner(op, p,
        new OwnerParam(owner), new GroupParam(group)

  public void setPermission(final Path p, final FsPermission permission
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETPERMISSION;
    new FsPathRunner(op, p,new PermissionParam(permission)).run();

  public void modifyAclEntries(Path path, List<AclEntry> aclSpec)
      throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.MODIFYACLENTRIES;
    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();

  public void removeAclEntries(Path path, List<AclEntry> aclSpec)
      throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACLENTRIES;
    new FsPathRunner(op, path, new AclPermissionParam(aclSpec)).run();

  public void removeDefaultAcl(Path path) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.REMOVEDEFAULTACL;
    new FsPathRunner(op, path).run();

  public void removeAcl(Path path) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.REMOVEACL;
    new FsPathRunner(op, path).run();

  public void setAcl(final Path p, final List<AclEntry> aclSpec)
      throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETACL;
    new FsPathRunner(op, p, new AclPermissionParam(aclSpec)).run();

  public void allowSnapshot(final Path p) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.ALLOWSNAPSHOT;
    new FsPathRunner(op, p).run();

  public void satisfyStoragePolicy(final Path p) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SATISFYSTORAGEPOLICY;
    new FsPathRunner(op, p).run();

  public void enableECPolicy(String policyName) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.ENABLEECPOLICY;
    new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();

  public void disableECPolicy(String policyName) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.DISABLEECPOLICY;
    new FsPathRunner(op, null, new ECPolicyParam(policyName)).run();

  public void setErasureCodingPolicy(Path p, String policyName)
      throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETECPOLICY;
    new FsPathRunner(op, p, new ECPolicyParam(policyName)).run();

  public void unsetErasureCodingPolicy(Path p) throws IOException {
    final HttpOpParam.Op op = PostOpParam.Op.UNSETECPOLICY;
    new FsPathRunner(op, p).run();

  public ErasureCodingPolicy getErasureCodingPolicy(Path p)
      throws IOException {
    final HttpOpParam.Op op =GetOpParam.Op.GETECPOLICY;
    return new FsPathResponseRunner<ErasureCodingPolicy>(op, p) {
      ErasureCodingPolicy decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toECPolicy((Map<?, ?>) json);

  public Path createSnapshot(final Path path, final String snapshotName)
      throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.CREATESNAPSHOT;
    return new FsPathResponseRunner<Path>(op, path,
        new SnapshotNameParam(snapshotName)) {
      Path decodeResponse(Map<?,?> json) {
        return new Path((String) json.get(Path.class.getSimpleName()));

  public void disallowSnapshot(final Path p) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.DISALLOWSNAPSHOT;
    new FsPathRunner(op, p).run();

  public void deleteSnapshot(final Path path, final String snapshotName)
      throws IOException {
    final HttpOpParam.Op op = DeleteOpParam.Op.DELETESNAPSHOT;
    new FsPathRunner(op, path, new SnapshotNameParam(snapshotName)).run();

  public void renameSnapshot(final Path path, final String snapshotOldName,
      final String snapshotNewName) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.RENAMESNAPSHOT;
    new FsPathRunner(op, path, new OldSnapshotNameParam(snapshotOldName),
        new SnapshotNameParam(snapshotNewName)).run();

  private SnapshotDiffReport getSnapshotDiffReport(
      final String snapshotDir, final String fromSnapshot, final String toSnapshot)
      throws IOException {
    return new FsPathResponseRunner<SnapshotDiffReport>(
        new Path(snapshotDir),
        new OldSnapshotNameParam(fromSnapshot),
        new SnapshotNameParam(toSnapshot)) {
          SnapshotDiffReport decodeResponse(Map<?, ?> json) {
            return JsonUtilClient.toSnapshotDiffReport(json);

  // This API should be treated as private to WebHdfsFileSystem. Only tests can use it directly.
  public SnapshotDiffReportListing getSnapshotDiffReportListing(
        String snapshotDir, final String fromSnapshot, final String toSnapshot,
        byte[] startPath, int index) throws IOException {
    return new FsPathResponseRunner<SnapshotDiffReportListing>(
        new Path(snapshotDir),
        new OldSnapshotNameParam(fromSnapshot),
        new SnapshotNameParam(toSnapshot),
        new SnapshotDiffStartPathParam(DFSUtilClient.bytes2String(startPath)),
        new SnapshotDiffIndexParam(index)) {
          SnapshotDiffReportListing decodeResponse(Map<?, ?> json) {
            return JsonUtilClient.toSnapshotDiffReportListing(json);

  public SnapshotDiffReport getSnapshotDiffReport(final Path snapshotDir,
      final String fromSnapshot, final String toSnapshot) throws IOException {
    return DFSUtilClient.getSnapshotDiffReport(
        snapshotDir.toUri().getPath(), fromSnapshot, toSnapshot,
        this::getSnapshotDiffReport, this::getSnapshotDiffReportListing);

  public SnapshottableDirectoryStatus[] getSnapshottableDirectoryList()
      throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETSNAPSHOTTABLEDIRECTORYLIST;
    return new FsPathResponseRunner<SnapshottableDirectoryStatus[]>(op, null) {
      SnapshottableDirectoryStatus[] decodeResponse(Map<?, ?> json) {
        return JsonUtilClient.toSnapshottableDirectoryList(json);

  public SnapshotStatus[] getSnapshotListing(final Path snapshotDir)
      throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETSNAPSHOTLIST;
    return new FsPathResponseRunner<SnapshotStatus[]>(op, snapshotDir) {
      SnapshotStatus[] decodeResponse(Map<?, ?> json) {
        return JsonUtilClient.toSnapshotList(json);

  public boolean setReplication(final Path p, final short replication
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETREPLICATION;
    return new FsPathBooleanRunner(op, p,
        new ReplicationParam(replication)

  public void setTimes(final Path p, final long mtime, final long atime
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.SETTIMES;
    new FsPathRunner(op, p,
        new ModificationTimeParam(mtime),
        new AccessTimeParam(atime)

  public long getDefaultBlockSize() {
    return getConf().getLongBytes(HdfsClientConfigKeys.DFS_BLOCK_SIZE_KEY,

  public short getDefaultReplication() {
    return (short)getConf().getInt(HdfsClientConfigKeys.DFS_REPLICATION_KEY,

  public void concat(final Path trg, final Path [] srcs) throws IOException {
    final HttpOpParam.Op op = PostOpParam.Op.CONCAT;
    new FsPathRunner(op, trg, new ConcatSourcesParam(srcs)).run();

  public FSDataOutputStream create(final Path f, final FsPermission permission,
      final boolean overwrite, final int bufferSize, final short replication,
      final long blockSize, final Progressable progress) throws IOException {

    final FsPermission modes = applyUMask(permission);
    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
    return new FsPathOutputStreamRunner(op, f, bufferSize,
        new PermissionParam(modes.getMasked()),
        new UnmaskedPermissionParam(modes.getUnmasked()),
        new OverwriteParam(overwrite),
        new BufferSizeParam(bufferSize),
        new ReplicationParam(replication),
        new BlockSizeParam(blockSize)

  public FSDataOutputStream createNonRecursive(final Path f,
      final FsPermission permission, final EnumSet<CreateFlag> flag,
      final int bufferSize, final short replication, final long blockSize,
      final Progressable progress) throws IOException {

    final FsPermission modes = applyUMask(permission);
    final HttpOpParam.Op op = PutOpParam.Op.CREATE;
    return new FsPathOutputStreamRunner(op, f, bufferSize,
        new PermissionParam(modes.getMasked()),
        new UnmaskedPermissionParam(modes.getUnmasked()),
        new CreateFlagParam(flag),
        new CreateParentParam(false),
        new BufferSizeParam(bufferSize),
        new ReplicationParam(replication),
        new BlockSizeParam(blockSize)

  public FSDataOutputStream append(final Path f, final int bufferSize,
      final Progressable progress) throws IOException {

    final HttpOpParam.Op op = PostOpParam.Op.APPEND;
    return new FsPathOutputStreamRunner(op, f, bufferSize,
        new BufferSizeParam(bufferSize)

  public boolean truncate(Path f, long newLength) throws IOException {

    final HttpOpParam.Op op = PostOpParam.Op.TRUNCATE;
    return new FsPathBooleanRunner(op, f, new NewLengthParam(newLength)).run();

  public boolean delete(Path f, boolean recursive) throws IOException {
    final HttpOpParam.Op op = DeleteOpParam.Op.DELETE;
    return new FsPathBooleanRunner(op, f,
        new RecursiveParam(recursive)

  public FSDataInputStream open(final Path f, final int bufferSize
  ) throws IOException {
    WebHdfsInputStream webfsInputStream =
        new WebHdfsInputStream(f, bufferSize);
    if (webfsInputStream.getFileEncryptionInfo() == null) {
      return new FSDataInputStream(webfsInputStream);
    } else {
      return new FSDataInputStream(

  public synchronized void close() throws IOException {
    try {
      if (canRefreshDelegationToken && delegationToken != null) {
    } catch (IOException ioe) {
      LOG.debug("Token cancel failed: ", ioe);
    } finally {
      if (connectionFactory != null) {

  // use FsPathConnectionRunner to ensure retries for InvalidTokens
  class UnresolvedUrlOpener extends ByteRangeInputStream.URLOpener {
    private final FsPathConnectionRunner runner;
    UnresolvedUrlOpener(FsPathConnectionRunner runner) {
      this.runner = runner;

    protected HttpURLConnection connect(long offset, boolean resolved)
        throws IOException {
      assert offset == 0;
      HttpURLConnection conn = runner.run();
      return conn;

  class OffsetUrlOpener extends ByteRangeInputStream.URLOpener {
    OffsetUrlOpener(final URL url) {

    /** Setup offset url and connect. */
    protected HttpURLConnection connect(final long offset,
        final boolean resolved) throws IOException {
      final URL offsetUrl = offset == 0L? url
          : new URL(url + "&" + new OffsetParam(offset));
      return new URLRunner(GetOpParam.Op.OPEN, offsetUrl, resolved,

  private static final String OFFSET_PARAM_PREFIX = OffsetParam.NAME + "=";

  /** Remove offset parameter, if there is any, from the url */
  static URL removeOffsetParam(final URL url) throws MalformedURLException {
    String query = url.getQuery();
    if (query == null) {
      return url;
    final String lower = StringUtils.toLowerCase(query);
    if (!lower.startsWith(OFFSET_PARAM_PREFIX)
        && !lower.contains("&" + OFFSET_PARAM_PREFIX)) {
      return url;

    //rebuild query
    StringBuilder b = null;
    for(final StringTokenizer st = new StringTokenizer(query, "&");
        st.hasMoreTokens();) {
      final String token = st.nextToken();
      if (!StringUtils.toLowerCase(token).startsWith(OFFSET_PARAM_PREFIX)) {
        if (b == null) {
          b = new StringBuilder("?").append(token);
        } else {
    query = b == null? "": b.toString();

    final String urlStr = url.toString();
    return new URL(urlStr.substring(0, urlStr.indexOf('?')) + query);

  static class OffsetUrlInputStream extends ByteRangeInputStream {
    OffsetUrlInputStream(UnresolvedUrlOpener o, OffsetUrlOpener r)
        throws IOException {
      super(o, r);

    /** Remove offset parameter before returning the resolved url. */
    protected URL getResolvedUrl(final HttpURLConnection connection
    ) throws MalformedURLException {
      return removeOffsetParam(connection.getURL());

   * Get {@link FileStatus} of files/directories in the given path. If path
   * corresponds to a file then {@link FileStatus} of that file is returned.
   * Else if path represents a directory then {@link FileStatus} of all
   * files/directories inside given path is returned.
   * @param f given path
   * @return the statuses of the files/directories in the given path
  public FileStatus[] listStatus(final Path f) throws IOException {

    final URI fsUri = getUri();
    final HttpOpParam.Op op = GetOpParam.Op.LISTSTATUS;
    return new FsPathResponseRunner<FileStatus[]>(op, f) {
      FileStatus[] decodeResponse(Map<?,?> json) {
        HdfsFileStatus[] hdfsStatuses =
        final FileStatus[] statuses = new FileStatus[hdfsStatuses.length];
        for (int i = 0; i < hdfsStatuses.length; i++) {
          statuses[i] = hdfsStatuses[i].makeQualified(fsUri, f);

        return statuses;

  private static final byte[] EMPTY_ARRAY = new byte[] {};

   * Get DirectoryEntries of the given path. DirectoryEntries contains an array
   * of {@link FileStatus}, as well as iteration information.
   * @param f given path
   * @return DirectoryEntries for given path
  public DirectoryEntries listStatusBatch(Path f, byte[] token) throws
      FileNotFoundException, IOException {
    byte[] prevKey = EMPTY_ARRAY;
    if (token != null) {
      prevKey = token;
    DirectoryListing listing = new FsPathResponseRunner<DirectoryListing>(
        f, new StartAfterParam(new String(prevKey, Charsets.UTF_8))) {
      DirectoryListing decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toDirectoryListing(json);
    // Qualify the returned FileStatus array
    final URI fsUri = getUri();
    final HdfsFileStatus[] statuses = listing.getPartialListing();
    FileStatus[] qualified = new FileStatus[statuses.length];
    for (int i = 0; i < statuses.length; i++) {
      qualified[i] = statuses[i].makeQualified(fsUri, f);
    return new DirectoryEntries(qualified, listing.getLastName(),

  public Token<DelegationTokenIdentifier> getDelegationToken(
      final String renewer) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETDELEGATIONTOKEN;
    Token<DelegationTokenIdentifier> token =
        new FsPathResponseRunner<Token<DelegationTokenIdentifier>>(
            op, null, new RenewerParam(renewer)) {
          Token<DelegationTokenIdentifier> decodeResponse(Map<?,?> json)
              throws IOException {
            return JsonUtilClient.toDelegationToken(json);
    if (token != null) {
    } else {
      if (disallowFallbackToInsecureCluster) {
        throw new AccessControlException(CANT_FALLBACK_TO_INSECURE_MSG);
    return token;

  public DelegationTokenIssuer[] getAdditionalTokenIssuers()
      throws IOException {
    KeyProvider keyProvider = getKeyProvider();
    if (keyProvider instanceof DelegationTokenIssuer) {
      return new DelegationTokenIssuer[] {(DelegationTokenIssuer) keyProvider};
    return null;

  public synchronized Token<?> getRenewToken() {
    return delegationToken;

  public <T extends TokenIdentifier> void setDelegationToken(
      final Token<T> token) {
    synchronized (this) {
      delegationToken = token;

  public synchronized long renewDelegationToken(final Token<?> token
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.RENEWDELEGATIONTOKEN;
    return new FsPathResponseRunner<Long>(op, null,
        new TokenArgumentParam(token.encodeToUrlString())) {
      Long decodeResponse(Map<?,?> json) throws IOException {
        return ((Number) json.get("long")).longValue();

  public synchronized void cancelDelegationToken(final Token<?> token
  ) throws IOException {
    final HttpOpParam.Op op = PutOpParam.Op.CANCELDELEGATIONTOKEN;
    new FsPathRunner(op, null,
        new TokenArgumentParam(token.encodeToUrlString())

  public BlockLocation[] getFileBlockLocations(final FileStatus status,
      final long offset, final long length) throws IOException {
    if (status == null) {
      return null;
    return getFileBlockLocations(status.getPath(), offset, length);

  public BlockLocation[] getFileBlockLocations(final Path p, final long offset,
      final long length) throws IOException {
    BlockLocation[] locations;
    try {
      if (isServerHCFSCompatible) {
        locations = getFileBlockLocations(GetOpParam.Op.GETFILEBLOCKLOCATIONS, p, offset, length);
      } else {
        locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p, offset, length);
    } catch (RemoteException e) {
      // parsing the exception is needed only if the client thinks the service is compatible
      if (isServerHCFSCompatible && isGetFileBlockLocationsException(e)) {
        LOG.warn("Server does not appear to support GETFILEBLOCKLOCATIONS." +
                "Fallback to the old GET_BLOCK_LOCATIONS. Exception: {}",
        isServerHCFSCompatible = false;
        locations = getFileBlockLocations(GetOpParam.Op.GET_BLOCK_LOCATIONS, p, offset, length);
      } else {
        throw e;
    return locations;

  private boolean isGetFileBlockLocationsException(RemoteException e) {
    return e.getMessage() != null && e.getMessage().contains("Invalid value for webhdfs parameter")
        && e.getMessage().contains(GetOpParam.Op.GETFILEBLOCKLOCATIONS.toString());

  private BlockLocation[] getFileBlockLocations(final GetOpParam.Op operation,
      final Path p, final long offset, final long length) throws IOException {
    return new FsPathResponseRunner<BlockLocation[]>(operation, p,
        new OffsetParam(offset), new LengthParam(length)) {
      BlockLocation[] decodeResponse(Map<?, ?> json) throws IOException {
        switch (operation) {
          return JsonUtilClient.toBlockLocationArray(json);
          return DFSUtilClient.locatedBlocks2Locations(JsonUtilClient.toLocatedBlocks(json));
          throw new IOException("Unknown operation " + operation.name());

  public Path getTrashRoot(Path path) {

    final HttpOpParam.Op op = GetOpParam.Op.GETTRASHROOT;
    try {
      String strTrashPath = new FsPathResponseRunner<String>(op, path) {
        String decodeResponse(Map<?, ?> json) throws IOException {
          return JsonUtilClient.getPath(json);
      return new Path(strTrashPath).makeQualified(getUri(), null);
    } catch(IOException e) {
      LOG.warn("Cannot find trash root of " + path, e);
      // keep the same behavior with dfs
      return super.getTrashRoot(path).makeQualified(getUri(), null);

  public void access(final Path path, final FsAction mode) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.CHECKACCESS;
    new FsPathRunner(op, path, new FsActionParam(mode)).run();

  public ContentSummary getContentSummary(final Path p) throws IOException {

    final HttpOpParam.Op op = GetOpParam.Op.GETCONTENTSUMMARY;
    return new FsPathResponseRunner<ContentSummary>(op, p) {
      ContentSummary decodeResponse(Map<?,?> json) {
        return JsonUtilClient.toContentSummary(json);

  public QuotaUsage getQuotaUsage(final Path p) throws IOException {

    final HttpOpParam.Op op = GetOpParam.Op.GETQUOTAUSAGE;
    return new FsPathResponseRunner<QuotaUsage>(op, p) {
      QuotaUsage decodeResponse(Map<?, ?> json) {
        return JsonUtilClient.toQuotaUsage(json);

  public void setQuota(Path p, final long namespaceQuota,
      final long storagespaceQuota) throws IOException {
    // sanity check
    if ((namespaceQuota <= 0 &&
        namespaceQuota != HdfsConstants.QUOTA_RESET) ||
        (storagespaceQuota < 0 &&
            storagespaceQuota != HdfsConstants.QUOTA_RESET)) {
      throw new IllegalArgumentException("Invalid values for quota : " +
          namespaceQuota + " and " + storagespaceQuota);


    final HttpOpParam.Op op = PutOpParam.Op.SETQUOTA;
    new FsPathRunner(op, p, new NameSpaceQuotaParam(namespaceQuota),
        new StorageSpaceQuotaParam(storagespaceQuota)).run();

  public void setQuotaByStorageType(Path path, StorageType type, long quota)
      throws IOException {
    if (quota <= 0 && quota != HdfsConstants.QUOTA_RESET) {
      throw new IllegalArgumentException("Invalid values for quota :" + quota);
    if (type == null) {
      throw new IllegalArgumentException("Invalid storage type (null)");
    if (!type.supportTypeQuota()) {
      throw new IllegalArgumentException(
          "Quota for storage type '" + type.toString() + "' is not supported");


    final HttpOpParam.Op op = PutOpParam.Op.SETQUOTABYSTORAGETYPE;
    new FsPathRunner(op, path, new StorageTypeParam(type.name()),
        new StorageSpaceQuotaParam(quota)).run();

  public MD5MD5CRC32FileChecksum getFileChecksum(final Path p
  ) throws IOException {

    final HttpOpParam.Op op = GetOpParam.Op.GETFILECHECKSUM;
    return new FsPathResponseRunner<MD5MD5CRC32FileChecksum>(op, p) {
      MD5MD5CRC32FileChecksum decodeResponse(Map<?,?> json) throws IOException {
        return JsonUtilClient.toMD5MD5CRC32FileChecksum(json);

   * Resolve an HDFS URL into real INetSocketAddress. It works like a DNS
   * resolver when the URL points to an non-HA cluster. When the URL points to
   * an HA cluster with its logical name, the resolver further resolves the
   * logical name(i.e., the authority in the URL) into real namenode addresses.
  private InetSocketAddress[] resolveNNAddr() {
    Configuration conf = getConf();
    final String scheme = uri.getScheme();

    ArrayList<InetSocketAddress> ret = new ArrayList<>();

    if (!HAUtilClient.isLogicalUri(conf, uri)) {
      InetSocketAddress addr = NetUtils.createSocketAddr(uri.getAuthority(),

    } else {
      Map<String, Map<String, InetSocketAddress>> addresses = DFSUtilClient
          .getHaNnWebHdfsAddresses(conf, scheme);

      // Extract the entry corresponding to the logical name.
      Map<String, InetSocketAddress> addrs = addresses.get(uri.getHost());
      for (InetSocketAddress addr : addrs.values()) {

    InetSocketAddress[] r = new InetSocketAddress[ret.size()];
    return ret.toArray(r);

  public String getCanonicalServiceName() {
    return tokenServiceName == null ? super.getCanonicalServiceName()
        : tokenServiceName.toString();

  public void setStoragePolicy(Path p, String policyName) throws IOException {
    if (policyName == null) {
      throw new IOException("policyName == null");
    final HttpOpParam.Op op = PutOpParam.Op.SETSTORAGEPOLICY;
    new FsPathRunner(op, p, new StoragePolicyParam(policyName)).run();

  public Collection<BlockStoragePolicy> getAllStoragePolicies()
      throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETALLSTORAGEPOLICY;
    return new FsPathResponseRunner<Collection<BlockStoragePolicy>>(op, null) {
      Collection<BlockStoragePolicy> decodeResponse(Map<?, ?> json)
          throws IOException {
        return JsonUtilClient.getStoragePolicies(json);

  public BlockStoragePolicy getStoragePolicy(Path src) throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETSTORAGEPOLICY;
    return new FsPathResponseRunner<BlockStoragePolicy>(op, src) {
      BlockStoragePolicy decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toBlockStoragePolicy((Map<?, ?>) json

  public void unsetStoragePolicy(Path src) throws IOException {
    final HttpOpParam.Op op = PostOpParam.Op.UNSETSTORAGEPOLICY;
    new FsPathRunner(op, src).run();

   * Caller of this method should handle UnsupportedOperationException in case
   * when new client is talking to old namenode that don't support
   * FsServerDefaults call.
  public FsServerDefaults getServerDefaults() throws IOException {
    final HttpOpParam.Op op = GetOpParam.Op.GETSERVERDEFAULTS;
    return new FsPathResponseRunner<FsServerDefaults>(op, null) {
      FsServerDefaults decodeResponse(Map<?, ?> json) throws IOException {
        return JsonUtilClient.toFsServerDefaults(json);

  InetSocketAddress[] getResolvedNNAddr() {
    return nnAddrs;

  public void setRetryPolicy(RetryPolicy rp) {
    this.retryPolicy = rp;

  public URI getKeyProviderUri() throws IOException {
    String keyProviderUri = null;
    try {
      keyProviderUri = getServerDefaults().getKeyProviderUri();
    } catch (UnsupportedOperationException e) {
      // This means server doesn't support GETSERVERDEFAULTS call.
      // Do nothing, let keyProviderUri = null.
    } catch (RemoteException e) {
      if (e.getClassName() != null &&
          e.getClassName().equals("java.lang.IllegalArgumentException")) {
        // See HDFS-13100.
        // This means server doesn't support GETSERVERDEFAULTS call.
        // Do nothing, let keyProviderUri = null.
      } else {
        throw e;
    return HdfsKMSUtil.getKeyProviderUri(ugi, getUri(), keyProviderUri,

  public KeyProvider getKeyProvider() throws IOException {
    if (testProvider != null) {
      return testProvider;
    URI keyProviderUri = getKeyProviderUri();
    if (keyProviderUri == null) {
      return null;
    return KMSUtil.createKeyProviderFromUri(getConf(), keyProviderUri);

  public void setTestProvider(KeyProvider kp) {
    testProvider = kp;

   * HDFS client capabilities.
   * Uses {@link DfsPathCapabilities} to keep in sync with HDFS.
   * {@inheritDoc}
  public boolean hasPathCapability(final Path path, final String capability)
      throws IOException {
    // qualify the path to make sure that it refers to the current FS.
    final Path p = makeQualified(path);
    Optional<Boolean> cap = DfsPathCapabilities.hasPathCapability(p,
    if (cap.isPresent()) {
      return cap.get();
    return super.hasPathCapability(p, capability);

  public MultipartUploaderBuilder createMultipartUploader(final Path basePath)
      throws IOException {
    return new FileSystemMultipartUploaderBuilder(this, basePath);

   * This class is used for opening, reading, and seeking files while using the
   * WebHdfsFileSystem. This class will invoke the retry policy when performing
   * any of these actions.
  public class WebHdfsInputStream extends FSInputStream {
    private ReadRunner readRunner = null;

    WebHdfsInputStream(Path path, int buffersize) throws IOException {
      // Only create the ReadRunner once. Each read's byte array and position
      // will be updated within the ReadRunner object before every read.
      readRunner = new ReadRunner(path, buffersize);

    public int read() throws IOException {
      final byte[] b = new byte[1];
      return (read(b, 0, 1) == -1) ? -1 : (b[0] & 0xff);

    public int read(byte b[], int off, int len) throws IOException {
      return readRunner.read(b, off, len);

    public void seek(long newPos) throws IOException {

    public long getPos() throws IOException {
      return readRunner.getPos();

    protected int getBufferSize() throws IOException {
      return readRunner.getBufferSize();

    protected Path getPath() throws IOException {
      return readRunner.getPath();

    public boolean seekToNewSource(long targetPos) throws IOException {
      return false;

    public void close() throws IOException {

    public void setFileLength(long len) {

    public long getFileLength() {
      return readRunner.getFileLength();

    ReadRunner getReadRunner() {
      return readRunner;

    void setReadRunner(ReadRunner rr) {
      this.readRunner = rr;

    FileEncryptionInfo getFileEncryptionInfo() {
      return readRunner.getFileEncryptionInfo();

    InputStream createWrappedInputStream() throws IOException {
      return HdfsKMSUtil.createWrappedInputStream(
          this, getKeyProvider(), getFileEncryptionInfo(), getConf());

  enum RunnerState {
    DISCONNECTED, // Connection is closed programmatically by ReadRunner
    OPEN,         // Connection has been established by ReadRunner
    SEEK,         // Calling code has explicitly called seek()
    CLOSED        // Calling code has explicitly called close()

   * This class will allow retries to occur for both open and read operations.
   * The first WebHdfsFileSystem#open creates a new WebHdfsInputStream object,
   * which creates a new ReadRunner object that will be used to open a
   * connection and read or seek into the input stream.
   * ReadRunner is a subclass of the AbstractRunner class, which will run the
   * ReadRunner#getUrl(), ReadRunner#connect(URL), and ReadRunner#getResponse
   * methods within a retry loop, based on the configured retry policy.
   * ReadRunner#connect will create a connection if one has not already been
   * created. Otherwise, it will return the previously created connection
   * object. This is necessary because a new connection should not be created
   * for every read.
   * Likewise, ReadRunner#getUrl will construct a new URL object only if the
   * connection has not previously been established. Otherwise, it will return
   * the previously created URL object.
   * ReadRunner#getResponse will initialize the input stream if it has not
   * already been initialized and read the requested data from the specified
   * input stream.
  protected class ReadRunner extends AbstractFsPathRunner<Integer> {
    private InputStream in = null;
    private HttpURLConnection cachedConnection = null;
    private byte[] readBuffer;
    private int readOffset;
    private int readLength;
    private RunnerState runnerState = RunnerState.SEEK;
    private URL originalUrl = null;
    private URL resolvedUrl = null;

    private final Path path;
    private final int bufferSize;
    private long pos = 0;
    private long fileLength = 0;
    private FileEncryptionInfo feInfo = null;

    /* The following methods are WebHdfsInputStream helpers. */

    ReadRunner(Path p, int bs) throws IOException {
      super(GetOpParam.Op.OPEN, p, new BufferSizeParam(bs));
      this.path = p;
      this.bufferSize = bs;

    private void getRedirectedUrl() throws IOException {
      URLRunner urlRunner = new URLRunner(GetOpParam.Op.OPEN, null, false,
          false) {
        protected URL getUrl() throws IOException {
          return toUrl(op, path, new BufferSizeParam(bufferSize));
      HttpURLConnection conn = urlRunner.run();
      String feInfoStr = conn.getHeaderField(FEFINFO_HEADER);
      if (feInfoStr != null) {
        Decoder decoder = Base64.getDecoder();
        byte[] decodedBytes = decoder.decode(
        feInfo = PBHelperClient
      String location = conn.getHeaderField("Location");
      if (location != null) {
        // This saves the location for datanode where redirect was issued.
        // Need to remove offset because seek can be called after open.
        resolvedUrl = removeOffsetParam(new URL(location));
      } else {
        // This is cached for proxies like httpfsfilesystem.
        cachedConnection = conn;
      originalUrl = super.getUrl();

    int read(byte[] b, int off, int len) throws IOException {
      if (runnerState == RunnerState.CLOSED) {
        throw new IOException("Stream closed");
      if (len == 0) {
        return 0;

      // Before the first read, pos and fileLength will be 0 and readBuffer
      // will all be null. They will be initialized once the first connection
      // is made. Only after that it makes sense to compare pos and fileLength.
      if (pos >= fileLength && readBuffer != null) {
        return -1;

      // If a seek is occurring, the input stream will have been closed, so it
      // needs to be reopened. Use the URLRunner to call AbstractRunner#connect
      // with the previously-cached resolved URL and with the 'redirected' flag
      // set to 'true'. The resolved URL contains the URL of the previously
      // opened DN as opposed to the NN. It is preferable to use the resolved
      // URL when creating a connection because it does not hit the NN or every
      // seek, nor does it open a connection to a new DN after every seek.
      // The redirect flag is needed so that AbstractRunner#connect knows the
      // URL is already resolved.
      // Note that when the redirected flag is set, retries are not attempted.
      // So, if the connection fails using URLRunner, clear out the connection
      // and fall through to establish the connection using ReadRunner.
      if (runnerState == RunnerState.SEEK) {
        try {
          final URL rurl = new URL(resolvedUrl + "&" + new OffsetParam(pos));
          cachedConnection = new URLRunner(GetOpParam.Op.OPEN, rurl, true,
        } catch (IOException ioe) {

      readBuffer = b;
      readOffset = off;
      readLength = len;

      int count = -1;
      count = this.run();
      if (count >= 0) {
        pos += count;
      } else if (pos < fileLength) {
        throw new EOFException(
                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
      return count;

    void seek(long newPos) throws IOException {
      if (pos != newPos) {
        pos = newPos;

    public void close() throws IOException {

    /* The following methods are overriding AbstractRunner methods,
     * to be called within the retry policy context by runWithRetry.

    protected URL getUrl() throws IOException {
      // This method is called every time either a read is executed.
      // The check for connection == null is to ensure that a new URL is only
      // created upon a new connection and not for every read.
      if (cachedConnection == null) {
        // Update URL with current offset. BufferSize doesn't change, but it
        // still must be included when creating the new URL.
        updateURLParameters(new BufferSizeParam(bufferSize),
            new OffsetParam(pos));
        originalUrl = super.getUrl();
      return originalUrl;

    /* Only make the connection if it is not already open. Don't cache the
     * connection here. After this method is called, runWithRetry will call
     * validateResponse, and then call the below ReadRunner#getResponse. If
     * the code path makes it that far, then we can cache the connection.
    protected HttpURLConnection connect(URL url) throws IOException {
      HttpURLConnection conn = cachedConnection;
      if (conn == null) {
        try {
          conn = super.connect(url);
        } catch (IOException e) {
          throw e;
      return conn;

     * This method is used to perform reads within the retry policy context.
     * This code is relying on runWithRetry to always call the above connect
     * method and the verifyResponse method prior to calling getResponse.
    Integer getResponse(final HttpURLConnection conn)
        throws IOException {
      try {
        // In the "open-then-read" use case, runWithRetry will have executed
        // ReadRunner#connect to make the connection and then executed
        // validateResponse to validate the response code. Only then do we want
        // to cache the connection.
        // In the "read-after-seek" use case, the connection is made and the
        // response is validated by the URLRunner. ReadRunner#read then caches
        // the connection and the ReadRunner#connect will pass on the cached
        // connection
        // In either case, stream initialization is done here if necessary.
        cachedConnection = conn;
        if (in == null) {
          in = initializeInputStream(conn);

        int count = in.read(readBuffer, readOffset, readLength);
        if (count < 0 && pos < fileLength) {
          throw new EOFException(
                  "Premature EOF: pos=" + pos + " < filelength=" + fileLength);
        return Integer.valueOf(count);
      } catch (IOException e) {
        String redirectHost = resolvedUrl.getAuthority();
        if (excludeDatanodes.getValue() != null) {
          excludeDatanodes = new ExcludeDatanodesParam(redirectHost + ","
              + excludeDatanodes.getValue());
        } else {
          excludeDatanodes = new ExcludeDatanodesParam(redirectHost);

        // If an exception occurs, close the input stream and null it out so
        // that if the abstract runner decides to retry, it will reconnect.
        throw e;

    InputStream initializeInputStream(HttpURLConnection conn)
        throws IOException {
      // Cache the resolved URL so that it can be used in the event of
      // a future seek operation.
      resolvedUrl = removeOffsetParam(conn.getURL());
      final String cl = conn.getHeaderField(HttpHeaders.CONTENT_LENGTH);
      InputStream inStream = conn.getInputStream();
      if (LOG.isDebugEnabled()) {
        LOG.debug("open file: " + conn.getURL());
      if (cl != null) {
        long streamLength = Long.parseLong(cl);
        fileLength = pos + streamLength;
        // Java has a bug with >2GB request streams.  It won't bounds check
        // the reads so the transfer blocks until the server times out
        inStream = new BoundedInputStream(inStream, streamLength);
      } else {
        fileLength = getHdfsFileStatus(path).getLen();
      // Wrapping in BufferedInputStream because it is more performant than
      // BoundedInputStream by itself.
      runnerState = RunnerState.OPEN;
      return new BufferedInputStream(inStream, bufferSize);

    // Close both the InputStream and the connection.
    void closeInputStream(RunnerState rs) throws IOException {
      if (in != null) {
        in = null;
      if (cachedConnection != null) {
        cachedConnection = null;
      runnerState = rs;

    /* Getters and Setters */

    protected InputStream getInputStream() {
      return in;

    protected void setInputStream(InputStream inStream) {
      in = inStream;

    Path getPath() {
      return path;

    int getBufferSize() {
      return bufferSize;

    long getFileLength() {
      return fileLength;

    void setFileLength(long len) {
      fileLength = len;

    long getPos() {
      return pos;

    protected FileEncryptionInfo getFileEncryptionInfo() {
      return feInfo;


