hadoop JsonSerialization 源码

  • 2022-10-20
  • 浏览 (205)

haddop JsonSerialization 代码

文件路径:/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/JsonSerialization.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.util;

import javax.annotation.Nullable;
import java.io.EOFException;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Files;
import java.util.Map;

import com.fasterxml.jackson.core.JsonParseException;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonMappingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.fasterxml.jackson.databind.ObjectWriter;
import com.fasterxml.jackson.databind.SerializationFeature;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FutureDataInputStreamBuilder;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathIOException;

import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY;
import static org.apache.hadoop.fs.Options.OpenFileOptions.FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE;
import static org.apache.hadoop.util.functional.FutureIO.awaitFuture;

/**
 * Support for marshalling objects to and from JSON.
 *
 * It constructs an object mapper as an instance field.
 * and synchronizes access to those methods
 * which use the mapper.
 *
 * This class was extracted from
 * {@code org.apache.hadoop.registry.client.binding.JsonSerDeser},
 * which is now a subclass of this class.
 * @param <T> Type to marshal.
 */
@InterfaceAudience.Private
@InterfaceStability.Evolving
public class JsonSerialization<T> {

  private static final Logger LOG =
      LoggerFactory.getLogger(JsonSerialization.class);
  private static final String UTF_8 = "UTF-8";

  private final Class<T> classType;
  private final ObjectMapper mapper;

  private static final ObjectWriter WRITER =
      new ObjectMapper().writerWithDefaultPrettyPrinter();

  private static final ObjectReader MAP_READER =
      new ObjectMapper().readerFor(Map.class);

  /**
   * @return an ObjectWriter which pretty-prints its output
   */
  public static ObjectWriter writer() {
    return WRITER;
  }

  /**
   * @return an ObjectReader which returns simple Maps.
   */
  public static ObjectReader mapReader() {
    return MAP_READER;
  }

  /**
   * Create an instance bound to a specific type.
   * @param classType class to marshall
   * @param failOnUnknownProperties fail if an unknown property is encountered.
   * @param pretty generate pretty (indented) output?
   */
  public JsonSerialization(Class<T> classType,
      boolean failOnUnknownProperties, boolean pretty) {
    Preconditions.checkArgument(classType != null, "null classType");
    this.classType = classType;
    this.mapper = new ObjectMapper();
    mapper.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES,
        failOnUnknownProperties);
    mapper.configure(SerializationFeature.INDENT_OUTPUT, pretty);
  }

  /**
   * Get the simple name of the class type to be marshalled.
   * @return the name of the class being marshalled
   */
  public String getName() {
    return classType.getSimpleName();
  }

  /**
   * Get the mapper of this class.
   * @return the mapper
   */
  public ObjectMapper getMapper() {
    return mapper;
  }

  /**
   * Convert from JSON.
   *
   * @param json input
   * @return the parsed JSON
   * @throws IOException IO problems
   * @throws JsonParseException If the input is not well-formatted
   * @throws JsonMappingException failure to map from the JSON to this class
   */
  @SuppressWarnings("unchecked")
  public synchronized T fromJson(String json)
      throws IOException, JsonParseException, JsonMappingException {
    if (json.isEmpty()) {
      throw new EOFException("No data");
    }
    try {
      return mapper.readValue(json, classType);
    } catch (IOException e) {
      LOG.error("Exception while parsing json : {}\n{}", e, json, e);
      throw e;
    }
  }

  /**
   * Read from an input stream.
   * @param stream stream to read from
   * @return the parsed entity
   * @throws IOException IO problems
   * @throws JsonParseException If the input is not well-formatted
   * @throws JsonMappingException failure to map from the JSON to this class
   */
  public synchronized T fromJsonStream(InputStream stream) throws IOException {
    return mapper.readValue(stream, classType);
  }

  /**
   * Load from a JSON text file.
   * @param jsonFile input file
   * @return the parsed JSON
   * @throws IOException IO problems
   * @throws JsonParseException If the input is not well-formatted
   * @throws JsonMappingException failure to map from the JSON to this class
   */
  @SuppressWarnings("unchecked")
  public synchronized T load(File jsonFile)
      throws IOException, JsonParseException, JsonMappingException {
    if (!jsonFile.exists()) {
      throw new FileNotFoundException("No such file: " + jsonFile);
    }
    if (!jsonFile.isFile()) {
      throw new FileNotFoundException("Not a file: " + jsonFile);
    }
    if (jsonFile.length() == 0) {
      throw new EOFException("File is empty: " + jsonFile);
    }
    try {
      return mapper.readValue(jsonFile, classType);
    } catch (IOException e) {
      LOG.warn("Exception while parsing json file {}", jsonFile, e);
      throw e;
    }
  }

  /**
   * Save to a local file. Any existing file is overwritten unless
   * the OS blocks that.
   * @param file file
   * @param instance instance
   * @throws IOException IO exception
   */
  public void save(File file, T instance) throws
      IOException {
    writeJsonAsBytes(instance, Files.newOutputStream(file.toPath()));
  }

  /**
   * Convert from a JSON file.
   * @param resource input file
   * @return the parsed JSON
   * @throws IOException IO problems
   * @throws JsonParseException If the input is not well-formatted
   * @throws JsonMappingException failure to map from the JSON to this class
   */
  @SuppressWarnings({"IOResourceOpenedButNotSafelyClosed"})
  public synchronized T fromResource(String resource)
      throws IOException, JsonParseException, JsonMappingException {
    try (InputStream resStream = this.getClass()
        .getResourceAsStream(resource)) {
      if (resStream == null) {
        throw new FileNotFoundException(resource);
      }
      return mapper.readValue(resStream, classType);
    } catch (IOException e) {
      LOG.error("Exception while parsing json resource {}", resource, e);
      throw e;
    }
  }

  /**
   * clone by converting to JSON and back again.
   * This is much less efficient than any Java clone process.
   * @param instance instance to duplicate
   * @return a new instance
   * @throws IOException IO problems.
   */
  public T fromInstance(T instance) throws IOException {
    return fromJson(toJson(instance));
  }

  /**
   * Load from a Hadoop filesystem.
   * @param fs filesystem
   * @param path path
   * @return a loaded object
   * @throws PathIOException JSON parse problem
   * @throws IOException IO problems
   */
  public T load(FileSystem fs, Path path) throws IOException {
    return load(fs, path, null);
  }

  /**
   * Load from a Hadoop filesystem.
   * If a file status is supplied, it's passed in to the openFile()
   * call so that FS implementations can optimize their opening.
   * @param fs filesystem
   * @param path path
   * @param status status of the file to open.
   * @return a loaded object
   * @throws PathIOException JSON parse problem
   * @throws EOFException file status references an empty file
   * @throws IOException IO problems
   */
  public T load(FileSystem fs, Path path, @Nullable FileStatus status)
      throws IOException {

    if (status != null && status.getLen() == 0) {
      throw new EOFException("No data in " + path);
    }
    FutureDataInputStreamBuilder builder = fs.openFile(path)
        .opt(FS_OPTION_OPENFILE_READ_POLICY,
            FS_OPTION_OPENFILE_READ_POLICY_WHOLE_FILE);
    if (status != null) {
      builder.withFileStatus(status);
    }
    try (FSDataInputStream dataInputStream =
             awaitFuture(builder.build())) {
      return fromJsonStream(dataInputStream);
    } catch (JsonProcessingException e) {
      throw new PathIOException(path.toString(),
          "Failed to read JSON file " + e, e);
    }
  }

  /**
   * Save to a Hadoop filesystem.
   * @param fs filesystem
   * @param path path
   * @param overwrite should any existing file be overwritten
   * @param instance instance
   * @throws IOException IO exception.
   */
  public void save(FileSystem fs, Path path, T instance,
      boolean overwrite) throws
      IOException {
    writeJsonAsBytes(instance, fs.create(path, overwrite));
  }

  /**
   * Write the JSON as bytes, then close the stream.
   * @param instance instance to write
   * @param dataOutputStream an output stream that will always be closed
   * @throws IOException on any failure
   */
  public void writeJsonAsBytes(T instance,
      OutputStream dataOutputStream) throws IOException {
    try {
      dataOutputStream.write(toBytes(instance));
    } finally {
      dataOutputStream.close();
    }
  }

  /**
   * Convert JSON to bytes.
   * @param instance instance to convert
   * @return a byte array
   * @throws IOException IO problems
   */
  public byte[] toBytes(T instance) throws IOException {
    return mapper.writeValueAsBytes(instance);
  }

  /**
   * Deserialize from a byte array.
   * @param bytes byte array
   * @throws IOException IO problems
   * @throws EOFException not enough data
   * @return byte array.
   */
  public T fromBytes(byte[] bytes) throws IOException {
    return fromJson(new String(bytes, 0, bytes.length, UTF_8));
  }

  /**
   * Convert an instance to a JSON string.
   * @param instance instance to convert
   * @return a JSON string description
   * @throws JsonProcessingException Json generation problems
   */
  public synchronized String toJson(T instance) throws JsonProcessingException {
    return mapper.writeValueAsString(instance);
  }

  /**
   * Convert an instance to a string form for output. This is a robust
   * operation which will convert any JSON-generating exceptions into
   * error text.
   * @param instance non-null instance
   * @return a JSON string
   */
  public String toString(T instance) {
    Preconditions.checkArgument(instance != null, "Null instance argument");
    try {
      return toJson(instance);
    } catch (JsonProcessingException e) {
      return "Failed to convert to a string: " + e;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop ApplicationClassLoader 源码

hadoop AsyncDiskService 源码

hadoop AutoCloseableLock 源码

hadoop BasicDiskValidator 源码

hadoop BlockingThreadPoolExecutorService 源码

hadoop CacheableIPList 源码

hadoop ChunkedArrayList 源码

hadoop ClassUtil 源码

hadoop Classpath 源码

hadoop CleanerUtil 源码

0  赞