hadoop XDR 源码

  • 2022-10-20
  • 浏览 (249)

haddop XDR 代码

文件路径:/hadoop-common-project/hadoop-nfs/src/main/java/org/apache/hadoop/oncrpc/XDR.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.hadoop.oncrpc;

import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;

import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.util.Preconditions;

/**
 * Utility class for building XDR messages based on RFC 4506.
 *
 * Key points of the format:
 *
 * <ul>
 * <li>Primitives are stored in big-endian order (i.e., the default byte order
 * of ByteBuffer).</li>
 * <li>Booleans are stored as an integer.</li>
 * <li>Each field in the message is always aligned by 4.</li>
 * </ul>
 *
 */
public final class XDR {
  private static final int DEFAULT_INITIAL_CAPACITY = 256;
  private static final int SIZEOF_INT = 4;
  private static final int SIZEOF_LONG = 8;
  private static final byte[] PADDING_BYTES = new byte[] { 0, 0, 0, 0 };

  private ByteBuffer buf;

  public enum State {
    READING, WRITING,
  }

  private final State state;

  /**
   * Construct a new XDR message buffer.
   *
   * @param initialCapacity
   *          the initial capacity of the buffer.
   */
  public XDR(int initialCapacity) {
    this(ByteBuffer.allocate(initialCapacity), State.WRITING);
  }

  public XDR() {
    this(DEFAULT_INITIAL_CAPACITY);
  }

  public XDR(ByteBuffer buf, State state) {
    this.buf = buf;
    this.state = state;
  }

  /**
   * Wraps a byte array as a read-only XDR message. There's no copy involved,
   * thus it is the client's responsibility to ensure that the byte array
   * remains unmodified when using the XDR object.
   * 
   * @param src
   *          the byte array to be wrapped.
   */
  public XDR(byte[] src) {
    this(ByteBuffer.wrap(src).asReadOnlyBuffer(), State.READING);
  }

  public XDR asReadOnlyWrap() {
    ByteBuffer b = buf.asReadOnlyBuffer();
    if (state == State.WRITING) {
      b.flip();
    }

    XDR n = new XDR(b, State.READING);
    return n;
  }

  public ByteBuffer buffer() {
    return buf.duplicate();
  }

  public int size() {
    // TODO: This overloading intends to be compatible with the semantics of
    // the previous version of the class. This function should be separated into
    // two with clear semantics.
    return state == State.READING ? buf.limit() : buf.position();
  }

  public int readInt() {
    Preconditions.checkState(state == State.READING);
    return buf.getInt();
  }

  public void writeInt(int v) {
    ensureFreeSpace(SIZEOF_INT);
    buf.putInt(v);
  }

  public boolean readBoolean() {
    Preconditions.checkState(state == State.READING);
    return buf.getInt() != 0;
  }

  public void writeBoolean(boolean v) {
    ensureFreeSpace(SIZEOF_INT);
    buf.putInt(v ? 1 : 0);
  }

  public long readHyper() {
    Preconditions.checkState(state == State.READING);
    return buf.getLong();
  }

  public void writeLongAsHyper(long v) {
    ensureFreeSpace(SIZEOF_LONG);
    buf.putLong(v);
  }

  public byte[] readFixedOpaque(int size) {
    Preconditions.checkState(state == State.READING);
    byte[] r = new byte[size];
    buf.get(r);
    alignPosition();
    return r;
  }

  public void writeFixedOpaque(byte[] src, int length) {
    ensureFreeSpace(alignUp(length));
    buf.put(src, 0, length);
    writePadding();
  }

  public void writeFixedOpaque(byte[] src) {
    writeFixedOpaque(src, src.length);
  }

  public byte[] readVariableOpaque() {
    Preconditions.checkState(state == State.READING);
    int size = readInt();
    return readFixedOpaque(size);
  }

  public void writeVariableOpaque(byte[] src) {
    ensureFreeSpace(SIZEOF_INT + alignUp(src.length));
    buf.putInt(src.length);
    writeFixedOpaque(src);
  }

  public String readString() {
    return new String(readVariableOpaque(), StandardCharsets.UTF_8);
  }

  public void writeString(String s) {
    writeVariableOpaque(s.getBytes(StandardCharsets.UTF_8));
  }

  private void writePadding() {
    Preconditions.checkState(state == State.WRITING);
    int p = pad(buf.position());
    ensureFreeSpace(p);
    buf.put(PADDING_BYTES, 0, p);
  }

  private int alignUp(int length) {
    return length + pad(length);
  }

  private int pad(int length) {
    switch (length % 4) {
    case 1:
      return 3;
    case 2:
      return 2;
    case 3:
      return 1;
    default:
      return 0;
    }
  }

  private void alignPosition() {
    buf.position(alignUp(buf.position()));
  }

  private void ensureFreeSpace(int size) {
    Preconditions.checkState(state == State.WRITING);
    if (buf.remaining() < size) {
      int newCapacity = buf.capacity() * 2;
      int newRemaining = buf.capacity() + buf.remaining();

      while (newRemaining < size) {
        newRemaining += newCapacity;
        newCapacity *= 2;
      }

      ByteBuffer newbuf = ByteBuffer.allocate(newCapacity);
      buf.flip();
      newbuf.put(buf);
      buf = newbuf;
    }
  }

  /**
   * check if the rest of data has more than len bytes.
   * @param xdr XDR message
   * @param len minimum remaining length
   * @return specify remaining length is enough or not
   */
  public static boolean verifyLength(XDR xdr, int len) {
    return xdr.buf.remaining() >= len;
  }

  static byte[] recordMark(int size, boolean last) {
    byte[] b = new byte[SIZEOF_INT];
    ByteBuffer buf = ByteBuffer.wrap(b);
    buf.putInt(!last ? size : size | 0x80000000);
    return b;
  }

  /**
   * Write an XDR message to a TCP ChannelBuffer.
   * @param request XDR request
   * @param last specifies last request or not
   * @return TCP buffer
   */
  public static ByteBuf writeMessageTcp(XDR request, boolean last) {
    Preconditions.checkState(request.state == XDR.State.WRITING);
    ByteBuffer b = request.buf.duplicate();
    b.flip();
    byte[] fragmentHeader = XDR.recordMark(b.limit(), last);
    ByteBuffer headerBuf = ByteBuffer.wrap(fragmentHeader);

    // TODO: Investigate whether making a copy of the buffer is necessary.
    return Unpooled.wrappedBuffer(headerBuf, b);
  }

  /**
   * Write an XDR message to a UDP ChannelBuffer.
   * @param response XDR response
   * @return UDP buffer
   */
  public static ByteBuf writeMessageUdp(XDR response) {
    Preconditions.checkState(response.state == XDR.State.READING);
    // TODO: Investigate whether making a copy of the buffer is necessary.
    return Unpooled.copiedBuffer(response.buf);
  }

  public static int fragmentSize(byte[] mark) {
    ByteBuffer b = ByteBuffer.wrap(mark);
    int n = b.getInt();
    return n & 0x7fffffff;
  }

  public static boolean isLastFragment(byte[] mark) {
    ByteBuffer b = ByteBuffer.wrap(mark);
    int n = b.getInt();
    return (n & 0x80000000) != 0;
  }

  @VisibleForTesting
  public byte[] getBytes() {
    ByteBuffer d = asReadOnlyWrap().buffer();
    byte[] b = new byte[d.remaining()];
    d.get(b);

    return b;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop RegistrationClient 源码

hadoop RpcAcceptedReply 源码

hadoop RpcCall 源码

hadoop RpcCallCache 源码

hadoop RpcDeniedReply 源码

hadoop RpcInfo 源码

hadoop RpcMessage 源码

hadoop RpcProgram 源码

hadoop RpcReply 源码

hadoop RpcResponse 源码

0  赞