hadoop DBOutputFormat 源码

  • 2022-10-20
  • 浏览 (156)

haddop DBOutputFormat 代码

文件路径:/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/db/DBOutputFormat.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.mapreduce.lib.db;

import java.io.IOException;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.OutputFormat;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * A OutputFormat that sends the reduce output to a SQL table.
 * <p> 
 * {@link DBOutputFormat} accepts &lt;key,value&gt; pairs, where 
 * key has a type extending DBWritable. Returned {@link RecordWriter} 
 * writes <b>only the key</b> to the database with a batch SQL query.  
 * 
 */
@InterfaceAudience.Public
@InterfaceStability.Stable
public class DBOutputFormat<K  extends DBWritable, V> 
extends OutputFormat<K,V> {

  private static final Logger LOG =
      LoggerFactory.getLogger(DBOutputFormat.class);
  public String dbProductName = "DEFAULT";

  public void checkOutputSpecs(JobContext context) 
      throws IOException, InterruptedException {}

  public OutputCommitter getOutputCommitter(TaskAttemptContext context) 
      throws IOException, InterruptedException {
    return new FileOutputCommitter(FileOutputFormat.getOutputPath(context),
                                   context);
  }

  /**
   * A RecordWriter that writes the reduce output to a SQL table
   */
  @InterfaceStability.Evolving
  public class DBRecordWriter 
      extends RecordWriter<K, V> {

    private Connection connection;
    private PreparedStatement statement;

    public DBRecordWriter() throws SQLException {
    }

    public DBRecordWriter(Connection connection
        , PreparedStatement statement) throws SQLException {
      this.connection = connection;
      this.statement = statement;
      this.connection.setAutoCommit(false);
    }

    public Connection getConnection() {
      return connection;
    }
    
    public PreparedStatement getStatement() {
      return statement;
    }
    
    /** {@inheritDoc} */
    public void close(TaskAttemptContext context) throws IOException {
      try {
        statement.executeBatch();
        connection.commit();
      } catch (SQLException e) {
        try {
          connection.rollback();
        }
        catch (SQLException ex) {
          LOG.warn(StringUtils.stringifyException(ex));
        }
        throw new IOException(e.getMessage());
      } finally {
        try {
          statement.close();
          connection.close();
        }
        catch (SQLException ex) {
          throw new IOException(ex.getMessage());
        }
      }
    }

    /** {@inheritDoc} */
    public void write(K key, V value) throws IOException {
      try {
        key.write(statement);
        statement.addBatch();
      } catch (SQLException e) {
        throw new IOException("Failed to execute SQL statement for key " + key, e);
      }
    }
  }

  /**
   * Constructs the query used as the prepared statement to insert data.
   * 
   * @param table
   *          the table to insert into
   * @param fieldNames
   *          the fields to insert into. If field names are unknown, supply an
   *          array of nulls.
   */
  public String constructQuery(String table, String[] fieldNames) {
    if(fieldNames == null) {
      throw new IllegalArgumentException("Field names may not be null");
    }

    StringBuilder query = new StringBuilder();
    query.append("INSERT INTO ").append(table);

    if (fieldNames.length > 0 && fieldNames[0] != null) {
      query.append(" (");
      for (int i = 0; i < fieldNames.length; i++) {
        query.append(fieldNames[i]);
        if (i != fieldNames.length - 1) {
          query.append(",");
        }
      }
      query.append(")");
    }
    query.append(" VALUES (");

    for (int i = 0; i < fieldNames.length; i++) {
      query.append("?");
      if(i != fieldNames.length - 1) {
        query.append(",");
      }
    }

    if (dbProductName.startsWith("DB2") || dbProductName.startsWith("ORACLE")) {
      query.append(")");
    } else {
      query.append(");");
    }

    return query.toString();
  }

  /** {@inheritDoc} */
  public RecordWriter<K, V> getRecordWriter(TaskAttemptContext context) 
      throws IOException {
    DBConfiguration dbConf = new DBConfiguration(context.getConfiguration());
    String tableName = dbConf.getOutputTableName();
    String[] fieldNames = dbConf.getOutputFieldNames();
    
    if(fieldNames == null) {
      fieldNames = new String[dbConf.getOutputFieldCount()];
    }
    
    try {
      Connection connection = dbConf.getConnection();
      PreparedStatement statement = null;

      DatabaseMetaData dbMeta = connection.getMetaData();
      this.dbProductName = dbMeta.getDatabaseProductName().toUpperCase();

      statement = connection.prepareStatement(
                    constructQuery(tableName, fieldNames));
      return new DBRecordWriter(connection, statement);
    } catch (Exception ex) {
      throw new IOException(ex.getMessage());
    }
  }

  /**
   * Initializes the reduce-part of the job with 
   * the appropriate output settings
   * 
   * @param job The job
   * @param tableName The table to insert data into
   * @param fieldNames The field names in the table.
   */
  public static void setOutput(Job job, String tableName, 
      String... fieldNames) throws IOException {
    if(fieldNames.length > 0 && fieldNames[0] != null) {
      DBConfiguration dbConf = setOutput(job, tableName);
      dbConf.setOutputFieldNames(fieldNames);
    } else {
      if (fieldNames.length > 0) {
        setOutput(job, tableName, fieldNames.length);
      }
      else { 
        throw new IllegalArgumentException(
          "Field names must be greater than 0");
      }
    }
  }
  
  /**
   * Initializes the reduce-part of the job 
   * with the appropriate output settings
   * 
   * @param job The job
   * @param tableName The table to insert data into
   * @param fieldCount the number of fields in the table.
   */
  public static void setOutput(Job job, String tableName, 
      int fieldCount) throws IOException {
    DBConfiguration dbConf = setOutput(job, tableName);
    dbConf.setOutputFieldCount(fieldCount);
  }
  
  private static DBConfiguration setOutput(Job job,
      String tableName) throws IOException {
    job.setOutputFormatClass(DBOutputFormat.class);
    job.setReduceSpeculativeExecution(false);

    DBConfiguration dbConf = new DBConfiguration(job.getConfiguration());
    
    dbConf.setOutputTableName(tableName);
    return dbConf;
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BigDecimalSplitter 源码

hadoop BooleanSplitter 源码

hadoop DBConfiguration 源码

hadoop DBInputFormat 源码

hadoop DBRecordReader 源码

hadoop DBSplitter 源码

hadoop DBWritable 源码

hadoop DataDrivenDBInputFormat 源码

hadoop DataDrivenDBRecordReader 源码

hadoop DateSplitter 源码

0  赞