hadoop KafkaSink 源码

  • 2022-10-20
  • 浏览 (240)

haddop KafkaSink 代码

文件路径:/hadoop-tools/hadoop-kafka/src/main/java/org/apache/hadoop/metrics2/sink/KafkaSink.java

/**
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 * <p/>
 * http://www.apache.org/licenses/LICENSE-2.0
 * <p/>
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package org.apache.hadoop.metrics2.sink;

import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.commons.configuration2.SubsetConfiguration;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.metrics2.AbstractMetric;
import org.apache.hadoop.metrics2.MetricsException;
import org.apache.hadoop.metrics2.MetricsRecord;
import org.apache.hadoop.metrics2.MetricsSink;
import org.apache.hadoop.metrics2.MetricsTag;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.Closeable;
import java.io.IOException;
import java.net.InetAddress;
import java.nio.charset.Charset;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

/**
 * A metrics sink that writes to a Kafka broker. This requires you to configure
 * a broker_list and a topic in the metrics2 configuration file. The broker_list
 * must contain a comma-separated list of kafka broker host and ports. The topic
 * will contain only one topic.
 */
@InterfaceAudience.Public
@InterfaceStability.Evolving
public class KafkaSink implements MetricsSink, Closeable {
  private static final Logger LOG = LoggerFactory.getLogger(KafkaSink.class);
  public static final String BROKER_LIST = "broker_list";
  public static final String TOPIC = "topic";

  private String hostname = null;
  private String brokerList = null;
  private String topic = null;
  private Producer<Integer, byte[]> producer = null;

  private final DateTimeFormatter dateFormat =
      DateTimeFormatter.ofPattern("yyyy-MM-dd");
  private final DateTimeFormatter timeFormat =
      DateTimeFormatter.ofPattern("HH:mm:ss");
  private final ZoneId zoneId = ZoneId.systemDefault();

  public void setProducer(Producer<Integer, byte[]> p) {
    this.producer = p;
  }

  @Override
  public void init(SubsetConfiguration conf) {
    // Get Kafka broker configuration.
    Properties props = new Properties();
    brokerList = conf.getString(BROKER_LIST);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Broker list " + brokerList);
    }
    props.put("bootstrap.servers", brokerList);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Kafka brokers: " + brokerList);
    }

    // Get Kafka topic configuration.
    topic = conf.getString(TOPIC);
    if (LOG.isDebugEnabled()) {
      LOG.debug("Kafka topic " + topic);
    }
    if (Strings.isNullOrEmpty(topic)) {
      throw new MetricsException("Kafka topic can not be null");
    }

    // Set the rest of Kafka configuration.
    props.put("key.serializer",
        "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("value.serializer",
        "org.apache.kafka.common.serialization.ByteArraySerializer");
    props.put("request.required.acks", "0");

    // Set the hostname once and use it in every message.
    hostname = "null";
    try {
      hostname = InetAddress.getLocalHost().getHostName();
    } catch (Exception e) {
      LOG.warn("Error getting Hostname, going to continue");
    }

    try {
      // Create the producer object.
      producer = new KafkaProducer<Integer, byte[]>(props);
    } catch (Exception e) {
      throw new MetricsException("Error creating Producer, " + brokerList, e);
    }
  }

  @Override
  public void putMetrics(MetricsRecord record) {

    if (producer == null) {
      throw new MetricsException("Producer in KafkaSink is null!");
    }

    // Create the json object.
    StringBuilder jsonLines = new StringBuilder();

    long timestamp = record.timestamp();
    Instant instant = Instant.ofEpochMilli(timestamp);
    LocalDateTime ldt = LocalDateTime.ofInstant(instant, zoneId);
    String date = ldt.format(dateFormat);
    String time = ldt.format(timeFormat);

    // Collect datapoints and populate the json object.
    jsonLines.append("{\"hostname\": \"" + hostname);
    jsonLines.append("\", \"timestamp\": " + timestamp);
    jsonLines.append(", \"date\": \"" + date);
    jsonLines.append("\",\"time\": \"" + time);
    jsonLines.append("\",\"name\": \"" + record.name() + "\" ");
    for (MetricsTag tag : record.tags()) {
      jsonLines.append(
          ", \"" + tag.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
      jsonLines.append(" \"" + tag.value().toString() + "\"");
    }
    for (AbstractMetric metric : record.metrics()) {
      jsonLines.append(", \""
          + metric.name().toString().replaceAll("[\\p{Cc}]", "") + "\": ");
      jsonLines.append(" \"" + metric.value().toString() + "\"");
    }
    jsonLines.append("}");
    LOG.debug("kafka message: " + jsonLines.toString());

    // Create the record to be sent from the json.
    ProducerRecord<Integer, byte[]> data = new ProducerRecord<Integer, byte[]>(
        topic, jsonLines.toString().getBytes(Charset.forName("UTF-8")));

    // Send the data to the Kafka broker. Here is an example of this data:
    // {"hostname": "...", "timestamp": 1436913651516,
    // "date": "2015-6-14","time": "22:40:51","context": "yarn","name":
    // "QueueMetrics, "running_0": "1", "running_60": "0", "running_300": "0",
    // "running_1440": "0", "AppsSubmitted": "1", "AppsRunning": "1",
    // "AppsPending": "0", "AppsCompleted": "0", "AppsKilled": "0",
    // "AppsFailed": "0", "AllocatedMB": "134656", "AllocatedVCores": "132",
    // "AllocatedContainers": "132", "AggregateContainersAllocated": "132",
    // "AggregateContainersReleased": "0", "AvailableMB": "0",
    // "AvailableVCores": "0", "PendingMB": "275456", "PendingVCores": "269",
    // "PendingContainers": "269", "ReservedMB": "0", "ReservedVCores": "0",
    // "ReservedContainers": "0", "ActiveUsers": "1", "ActiveApplications": "1"}
    Future<RecordMetadata> future = producer.send(data);
    jsonLines.setLength(0);
    try {
      future.get();
    } catch (InterruptedException e) {
      throw new MetricsException("Error sending data", e);
    } catch (ExecutionException e) {
      throw new MetricsException("Error sending data", e);
    }
  }

  @Override
  public void flush() {
    LOG.debug("Kafka seems not to have any flush() mechanism!");
  }

  @Override
  public void close() throws IOException {
    // Close the producer and set it to null.
    try {
      producer.close();
    } catch (RuntimeException e) {
      throw new MetricsException("Error closing producer", e);
    } finally {
      producer = null;
    }
  }
}

相关信息

hadoop 源码目录

相关文章

hadoop BufferPool 源码

hadoop ByteBufferInputStream 源码

hadoop ByteBufferOutputStream 源码

hadoop ByteBufferWrapper 源码

hadoop Constants 源码

hadoop CosN 源码

hadoop CosNConfigKeys 源码

hadoop CosNCopyFileContext 源码

hadoop CosNCopyFileTask 源码

hadoop CosNFileReadTask 源码

0  赞