kafka MeteredTimestampedWindowStore 源码
kafka MeteredTimestampedWindowStore 代码
文件路径:/streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredTimestampedWindowStore.java
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.streams.state.internals;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.streams.processor.internals.SerdeGetter;
import org.apache.kafka.streams.state.TimestampedWindowStore;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.WindowStore;
/**
* A Metered {@link TimestampedWindowStore} wrapper that is used for recording operation metrics, and hence its
* inner WindowStore implementation do not need to provide its own metrics collecting functionality.
* The inner {@link WindowStore} of this class is of type <Bytes,byte[]>, hence we use {@link Serde}s
* to convert from <K,ValueAndTimestamp<V>> to <Bytes,byte[]>
*
* @param <K>
* @param <V>
*/
class MeteredTimestampedWindowStore<K, V>
extends MeteredWindowStore<K, ValueAndTimestamp<V>>
implements TimestampedWindowStore<K, V> {
MeteredTimestampedWindowStore(final WindowStore<Bytes, byte[]> inner,
final long windowSizeMs,
final String metricScope,
final Time time,
final Serde<K> keySerde,
final Serde<ValueAndTimestamp<V>> valueSerde) {
super(inner, windowSizeMs, metricScope, time, keySerde, valueSerde);
}
@SuppressWarnings("unchecked")
@Override
protected Serde<ValueAndTimestamp<V>> prepareValueSerde(final Serde<ValueAndTimestamp<V>> valueSerde, final SerdeGetter getter) {
if (valueSerde == null) {
return new ValueAndTimestampSerde<>((Serde<V>) getter.valueSerde());
} else {
return super.prepareValueSerde(valueSerde, getter);
}
}
}
相关信息
相关文章
kafka AbstractDualSchemaRocksDBSegmentedBytesStore 源码
kafka AbstractMergedSortedCacheStoreIterator 源码
kafka AbstractRocksDBSegmentedBytesStore 源码
kafka AbstractRocksDBTimeOrderedSegmentedBytesStore 源码
0
赞
热门推荐
-
2、 - 优质文章
-
3、 gate.io
-
7、 golang
-
9、 openharmony
-
10、 Vue中input框自动聚焦