博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Apache Kafka - KIP-42: Add Producer and Consumer Interceptors
阅读量:4599 次
发布时间:2019-06-09

本文共 4775 字,大约阅读时间需要 15 分钟。

kafka 0.10.0.0 released

 

Interceptors的概念应该来自flume

参考,

比如,flume提供的

Timestamp Interceptor

Host Interceptor

Static Interceptor

Regex Filtering Interceptor

Regex Extractor Interceptor

可以对于流过的message进行一些包装,比如插入时间,host,或做些过滤等etl操作

 

所以kafka在producer和consumer端也都提供这样的Interceptors接口,

 

ProducerInterceptor

/** * A plugin interface to allow things to intercept events happening to a producer record, * such as sending producer record or getting an acknowledgement when a record gets published */public interface ProducerInterceptor
extends Configurable { /** * This is called when client sends record to KafkaProducer, before key and value gets serialized. * @param record the record from client * @return record that is either original record passed to this method or new record with modified key and value. */ public ProducerRecord
onSend(ProducerRecord
record); /** * This is called when the send has been acknowledged * @param metadata The metadata for the record that was sent (i.e. the partition and offset). The metadata information may be only partially filled, if an error occurred. Topic will be always set, and if partition is not -1, partition will be set partition set/assigned to this record. * @param exception The exception thrown during processing of this record. Null if no error occurred. */ public void onAcknowledgement(RecordMetadata metadata, Exception exception); /** * This is called when interceptor is closed */ public void close();}

onSend() will be called in KafkaProducer.send(), before key and value gets serialized and before partition gets assigned.

If the implementation modifies key and/or value, it must return modified key and value in a new ProducerRecord object.

onAcknowledgement() will be called when the send is acknowledged. It has same API as Callback.onCompletion(), and is called just before Callback.onCompletion() is called.

多个multiple interceptors之间是可以串联的

ProducerInterceptor APIs will be called from multiple threads: onSend() will be called on submitting thread and onAcknowledgement() will be called on producer I/O thread.

 

ConsumerInterceptor

/** * A plugin interface to allow things to intercept Consumer events such as receiving a record or record being consumed * by a client. */public interface ConsumerInterceptor
extends Configurable { /** * This is called when the records are about to be returned to the client. * @param records records to be consumed by the client. Null if record dropped/ignored/discarded (non consumable) * @return records that is either original 'records' passed to this method or modified set of records */ public ConsumerRecords
onConsume(ConsumerRecords
records); /** * This is called when offsets get committed * This method will be called when the commit request sent to the server has been acknowledged. * @param offsets A map of the offsets and associated metadata that this callback applies to */ public void onCommit(Map
offsets); /** * This is called when interceptor is closed */ public void close();}

onConsume() will be called in KafkaConsumer.poll(), just before poll() returns ConsumerRecords.

onCommit() will be called when offsets get committed: just before OffsetCommitCallback.onCompletion() is called and in ConsumerCoordinator.commitOffsetsSync() on successful commit.

Since new consumer is single-threaded, ConsumerInterceptor API will be called from a single thread.

 

总结,

Interceptor作为一种plugin可以做些,对message的decorate或cleaning或filtering等一些轻量的工作,最主要的用途还是用于监控,trace message

Interceptor可以串联执行

Interceptor必须要轻量,因为如果耗时就会影响链路的throughput

 

confluent公司也提供相应的interceptor产品,用于data stream的监控

 

同时,为了更好的监控和audit

Currently, RecordMetadata contains topic/partition, offset, and timestamp (KIP-32).

We propose to add remaining record's metadata in RecordMetadata: checksum and record size. Both checksum and record size are useful for monitoring and audit.

For symmetry, we also propose to expose the same metadata on consumer side and make available to interceptors.

We will add checksum and record size fields to RecordMetadata and ConsumerRecord.

public final class RecordMetadata {

private final long offset;

private final TopicPartition topicPartition;

private final long checksum;                <<== NEW: checksum of the record

private final int size;                     <<== NEW: record size in bytes(before compression)

 

public final class ConsumerRecord<K, V> {

.......

private final long checksum;               <<== NEW: checksum of the record

private final int size;                    <<== NEW: record size in bytes (after decompression)

转载于:https://www.cnblogs.com/fxjwind/p/6269700.html

你可能感兴趣的文章
【云计算】使用supervisor管理Docker多进程-ntpd+uwsgi+nginx示例最佳实践
查看>>
Ubuntu16.04下配置ssh免密登录
查看>>
实验二 2
查看>>
will-change属性
查看>>
android学习笔记54——ContentProvider
查看>>
Unity3d android开发之触摸操作识别-双击,滑动去噪处理
查看>>
Custom view * is not using the 2- or 3-argument View constructors; XML attributes will not work
查看>>
模型选择准则
查看>>
安卓动态增加按钮
查看>>
iOS7程序后台运行
查看>>
maven+testng+reportng的pom设置
查看>>
IT telephone interview
查看>>
gitlab安装配置
查看>>
ps载入画笔
查看>>
悲怆:IT人的一声叹息->一个程序员的自白[转帖]
查看>>
[SpringMVC]自定义注解实现控制器访问次数限制
查看>>
日记(序)
查看>>
A == B ?
查看>>
洛谷P3763 [Tjoi2017]DNA 【后缀数组】
查看>>
GSM模块_STM32实现GPRS与服务器数据传输经验总结
查看>>