kafka PushHttpMetricsReporter 源码

  • 2022-10-20
  • 浏览 (254)

kafka PushHttpMetricsReporter 代码

文件路径:/tools/src/main/java/org/apache/kafka/tools/PushHttpMetricsReporter.java

/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements. See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *    http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package org.apache.kafka.tools;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricsReporter;
import org.apache.kafka.common.utils.SystemTime;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.HttpURLConnection;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.UnknownHostException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Scanner;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

/**
 * MetricsReporter that aggregates metrics data and reports it via HTTP requests to a configurable
 * webhook endpoint in JSON format.
 *
 * This is an internal class used for system tests and does not provide any compatibility guarantees.
 */
public class PushHttpMetricsReporter implements MetricsReporter {
    private static final Logger log = LoggerFactory.getLogger(PushHttpMetricsReporter.class);

    private static final String METRICS_PREFIX = "metrics.";
    static final String METRICS_URL_CONFIG = METRICS_PREFIX + "url";
    static final String METRICS_PERIOD_CONFIG = METRICS_PREFIX + "period";
    static final String METRICS_HOST_CONFIG = METRICS_PREFIX + "host";
    static final String CLIENT_ID_CONFIG = ProducerConfig.CLIENT_ID_CONFIG;

    private static final Map<String, String> HEADERS = new LinkedHashMap<>();
    static {
        HEADERS.put("Content-Type", "application/json");
    }

    private final Object lock = new Object();
    private final Time time;
    private final ScheduledExecutorService executor;
    // The set of metrics are updated in init/metricChange/metricRemoval
    private final Map<MetricName, KafkaMetric> metrics = new LinkedHashMap<>();
    private final ObjectMapper json = new ObjectMapper();

    // Non-final because these are set via configure()
    private URL url;
    private String host;
    private String clientId;

    private static final ConfigDef CONFIG_DEF = new ConfigDef()
            .define(METRICS_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH,
                    "The URL to report metrics to")
            .define(METRICS_PERIOD_CONFIG, ConfigDef.Type.INT, ConfigDef.Importance.HIGH,
                    "The frequency at which metrics should be reported, in second")
            .define(METRICS_HOST_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
                    "The hostname to report with each metric; if empty, defaults to the FQDN that can be automatically" +
                            "determined")
            .define(CLIENT_ID_CONFIG, ConfigDef.Type.STRING, "", ConfigDef.Importance.LOW,
                    "Client ID to identify the application, generally inherited from the " +
                            "producer/consumer/streams/connect instance");

    public PushHttpMetricsReporter() {
        time = new SystemTime();
        executor = Executors.newSingleThreadScheduledExecutor();
    }

    PushHttpMetricsReporter(Time mockTime, ScheduledExecutorService mockExecutor) {
        time = mockTime;
        executor = mockExecutor;
    }

    @Override
    public void configure(Map<String, ?> configs) {
        PushHttpMetricsReporterConfig config = new PushHttpMetricsReporterConfig(CONFIG_DEF, configs);
        try {
            url = new URL(config.getString(METRICS_URL_CONFIG));
        } catch (MalformedURLException e) {
            throw new ConfigException("Malformed metrics.url", e);
        }
        int period = config.getInteger(METRICS_PERIOD_CONFIG);
        clientId = config.getString(CLIENT_ID_CONFIG);

        host = config.getString(METRICS_HOST_CONFIG);
        if (host == null || host.isEmpty()) {
            try {
                host = InetAddress.getLocalHost().getCanonicalHostName();
            } catch (UnknownHostException e) {
                throw new ConfigException("Failed to get canonical hostname", e);
            }
        }

        executor.scheduleAtFixedRate(new HttpReporter(), period, period, TimeUnit.SECONDS);

        log.info("Configured PushHttpMetricsReporter for {} to report every {} seconds", url, period);
    }

    @Override
    public void init(List<KafkaMetric> initMetrics) {
        synchronized (lock) {
            for (KafkaMetric metric : initMetrics) {
                log.debug("Adding metric {}", metric.metricName());
                metrics.put(metric.metricName(), metric);
            }
        }
    }

    @Override
    public void metricChange(KafkaMetric metric) {
        synchronized (lock) {
            log.debug("Updating metric {}", metric.metricName());
            metrics.put(metric.metricName(), metric);
        }
    }

    @Override
    public void metricRemoval(KafkaMetric metric) {
        synchronized (lock) {
            log.debug("Removing metric {}", metric.metricName());
            metrics.remove(metric.metricName());
        }
    }

    @Override
    public void close() {
        executor.shutdown();
        try {
            executor.awaitTermination(30, TimeUnit.SECONDS);
        } catch (InterruptedException e) {
            throw new KafkaException("Interrupted when shutting down PushHttpMetricsReporter", e);
        }
    }

    private class HttpReporter implements Runnable {
        @Override
        public void run() {
            long now = time.milliseconds();
            final List<MetricValue> samples;
            synchronized (lock) {
                samples = new ArrayList<>(metrics.size());
                for (KafkaMetric metric : metrics.values()) {
                    MetricName name = metric.metricName();
                    samples.add(new MetricValue(name.name(), name.group(), name.tags(), metric.metricValue()));
                }
            }

            MetricsReport report = new MetricsReport(new MetricClientInfo(host, clientId, now), samples);

            log.trace("Reporting {} metrics to {}", samples.size(), url);
            HttpURLConnection connection = null;
            try {
                connection = newHttpConnection(url);
                connection.setRequestMethod("POST");
                // connection.getResponseCode() implicitly calls getInputStream, so always set to true.
                // On the other hand, leaving this out breaks nothing.
                connection.setDoInput(true);
                connection.setRequestProperty("Content-Type", "application/json");
                byte[] data = json.writeValueAsBytes(report);
                connection.setRequestProperty("Content-Length", Integer.toString(data.length));
                connection.setRequestProperty("Accept", "*/*");
                connection.setUseCaches(false);

                connection.setDoOutput(true);

                try (OutputStream os = connection.getOutputStream()) {
                    os.write(data);
                    os.flush();
                }

                int responseCode = connection.getResponseCode();
                if (responseCode >= 400) {
                    InputStream is = connection.getErrorStream();
                    String msg = readResponse(is);
                    log.error("Error reporting metrics, {}: {}", responseCode, msg);
                } else if (responseCode >= 300) {
                    log.error("PushHttpMetricsReporter does not currently support redirects, saw {}", responseCode);
                } else {
                    log.info("Finished reporting metrics with response code {}", responseCode);
                }
            } catch (Throwable t) {
                log.error("Error reporting metrics", t);
            } finally {
                if (connection != null) {
                    connection.disconnect();
                }
            }
        }
    }

    // Static package-private so unit tests can use a mock connection
    static HttpURLConnection newHttpConnection(URL url) throws IOException {
        return (HttpURLConnection) url.openConnection();
    }

    // Static package-private so unit tests can mock reading response
    static String readResponse(InputStream is) {
        try (Scanner s = new Scanner(is, StandardCharsets.UTF_8.name()).useDelimiter("\\A")) {
            return s.hasNext() ? s.next() : "";
        }
    }

    private static class MetricsReport {
        private final MetricClientInfo client;
        private final Collection<MetricValue> metrics;

        MetricsReport(MetricClientInfo client, Collection<MetricValue> metrics) {
            this.client = client;
            this.metrics = metrics;
        }

        @JsonProperty
        public MetricClientInfo client() {
            return client;
        }

        @JsonProperty
        public Collection<MetricValue> metrics() {
            return metrics;
        }
    }

    private static class MetricClientInfo {
        private final String host;
        private final String clientId;
        private final long time;

        MetricClientInfo(String host, String clientId, long time) {
            this.host = host;
            this.clientId = clientId;
            this.time = time;
        }

        @JsonProperty
        public String host() {
            return host;
        }

        @JsonProperty("client_id")
        public String clientId() {
            return clientId;
        }

        @JsonProperty
        public long time() {
            return time;
        }
    }

    private static class MetricValue {

        private final String name;
        private final String group;
        private final Map<String, String> tags;
        private final Object value;

        MetricValue(String name, String group, Map<String, String> tags, Object value) {
            this.name = name;
            this.group = group;
            this.tags = tags;
            this.value = value;
        }

        @JsonProperty
        public String name() {
            return name;
        }

        @JsonProperty
        public String group() {
            return group;
        }

        @JsonProperty
        public Map<String, String> tags() {
            return tags;
        }

        @JsonProperty
        public Object value() {
            return value;
        }
    }

    // The signature for getInt changed from returning int to Integer so to remain compatible with 0.8.2.2 jars
    // for system tests we replace it with a custom version that works for all versions.
    private static class PushHttpMetricsReporterConfig extends AbstractConfig {
        public PushHttpMetricsReporterConfig(ConfigDef definition, Map<?, ?> originals) {
            super(definition, originals);
        }

        public Integer getInteger(String key) {
            return (Integer) get(key);
        }

    }
}

相关信息

kafka 源码目录

相关文章

kafka ClientCompatibilityTest 源码

kafka OAuthCompatibilityTool 源码

kafka PrintVersionAndExitAction 源码

kafka ProducerPerformance 源码

kafka ThroughputThrottler 源码

kafka TransactionalMessageCopier 源码

kafka TransactionsCommand 源码

kafka VerifiableConsumer 源码

kafka VerifiableLog4jAppender 源码

kafka VerifiableProducer 源码

0  赞