分享

Flink 1.14.0 消费 kafka 数据自定义反序列化类

Mirinda 发表于 2021-10-14 10:27:27 [显示全部楼层] 回帖奖励 倒序浏览 阅读模式 关闭右栏 0 589
本帖最后由 Mirinda 于 2021-10-14 10:30 编辑

问题导读:
1.你有没有阅读源码的习惯?
2.有阅读过kafka源码吗?
3.你对文中的自定义序列化类有什么看法?

在最近发布的 Flink 1.14.0 版本中对 Source 接口进行了重构,细节可以参考 FLIP-27: Refactor Source Interface
重构之后 API 层面的改动还是非常大的,那在使用新的 API 消费 kafka 数据的时候如何自定义序列化类呢?
  1. Kafka SourceKafkaSource<String> source = KafkaSource.<String>builder()
  2.     .setBootstrapServers(brokers)
  3.     .setTopics("input-topic")
  4.     .setGroupId("my-group")
  5.     .setStartingOffsets(OffsetsInitializer.earliest())
  6.     .setValueOnlyDeserializer(new SimpleStringSchema())
  7.     .build();
  8. env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
复制代码

KafkaSourceBuilder 类提供了两个方法来反序列数据,分别是 setDeserializer 和 setValueOnlyDeserializer 从名字上就应该可以看出这两者的区别,前者是反序列化完整的 ConsumerRecord,后者只反序列化 ConsumerRecord 的 value.然后我们来看一下底层的源码
KafkaSourceBuilder 源码
  1. /**
  2. * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
  3. * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource.
  4. *
  5. * @param recordDeserializer the deserializer for Kafka {@link
  6. *     org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord}.
  7. * @return this KafkaSourceBuilder.
  8. */
  9. public KafkaSourceBuilder<OUT> setDeserializer(
  10.         KafkaRecordDeserializationSchema<OUT> recordDeserializer) {
  11.     this.deserializationSchema = recordDeserializer;
  12.     return this;
  13. }
  14. /**
  15. * Sets the {@link KafkaRecordDeserializationSchema deserializer} of the {@link
  16. * org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} for KafkaSource. The given
  17. * {@link DeserializationSchema} will be used to deserialize the value of ConsumerRecord. The
  18. * other information (e.g. key) in a ConsumerRecord will be ignored.
  19. *
  20. * @param deserializationSchema the {@link DeserializationSchema} to use for deserialization.
  21. * @return this KafkaSourceBuilder.
  22. */
  23. public KafkaSourceBuilder<OUT> setValueOnlyDeserializer(
  24.         DeserializationSchema<OUT> deserializationSchema) {
  25.     this.deserializationSchema =
  26.             KafkaRecordDeserializationSchema.valueOnly(deserializationSchema);
  27.     return this;
  28. }
复制代码

可以看到这两个方法实际上是一样的,虽然两个方法的参数不同,setDeserializer 方法参数类型是 KafkaRecordDeserializationSchema 而 setValueOnlyDeserializer 方法的参数类型是 DeserializationSchema 那这两种参数类型有什么区别和联系呢?下面会进一步解释, 但是这两个方法最后返回的都是 KafkaRecordDeserializationSchema 对象,我们继续来看 KafkaRecordDeserializationSchema 的源码
先来看一下 DeserializationSchema 的部分源码
DeserializationSchema 源码
  1. @Public
  2. public interface DeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
  3.    
  4.     @PublicEvolving
  5.     default void open(InitializationContext context) throws Exception {}
  6.    
  7.     T deserialize(byte[] message) throws IOException;
  8.     @PublicEvolving
  9.     default void deserialize(byte[] message, Collector<T> out) throws IOException {
  10.         T deserialize = deserialize(message);
  11.         if (deserialize != null) {
  12.             out.collect(deserialize);
  13.         }
  14.     }
  15.    
  16.     boolean isEndOfStream(T nextElement);
  17. }
复制代码

KafkaRecordDeserializationSchema 源码
  1. /** An interface for the deserialization of Kafka records. */
  2. public interface KafkaRecordDeserializationSchema<T> extends Serializable, ResultTypeQueryable<T> {
  3.    
  4.     @PublicEvolving
  5.     default void open(DeserializationSchema.InitializationContext context) throws Exception {}
  6.     @PublicEvolving
  7.     void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<T> out) throws IOException;
  8.   
  9.    
  10.     static <V> KafkaRecordDeserializationSchema<V> of(
  11.             KafkaDeserializationSchema<V> kafkaDeserializationSchema) {
  12.         return new KafkaDeserializationSchemaWrapper<>(kafkaDeserializationSchema);
  13.     }
  14.    
  15.     static <V> KafkaRecordDeserializationSchema<V> valueOnly(
  16.             DeserializationSchema<V> valueDeserializationSchema) {
  17.         return new KafkaValueOnlyDeserializationSchemaWrapper<>(valueDeserializationSchema);
  18.     }
  19.    
  20.     static <V> KafkaRecordDeserializationSchema<V> valueOnly(
  21.             Class<? extends Deserializer<V>> valueDeserializerClass) {
  22.         return new KafkaValueOnlyDeserializerWrapper<>(
  23.                 valueDeserializerClass, Collections.emptyMap());
  24.     }
  25.     static <V, D extends Configurable & Deserializer<V>>
  26.             KafkaRecordDeserializationSchema<V> valueOnly(
  27.                     Class<D> valueDeserializerClass, Map<String, String> config) {
  28.         return new KafkaValueOnlyDeserializerWrapper<>(valueDeserializerClass, config);
  29.     }
  30. }
复制代码

顾名思义,这两个都是反序列接口,并且都继承了 Serializable, ResultTypeQueryable 这两个接口。不同点是,deserialize 方法的参数不一样,KafkaDeserializationSchema 接口很明显是为反序列化 kafka 数据而生的。DeserializationSchema 接口可以反序列化任意二进制数据,更加具有通用性。所以这两个是同一级接口
如果你想要获取 kafka 的元数据信息选择实现 KafkaDeserializationSchema 接口就可以了,KafkaDeserializationSchema 接口还有 4 个静态方法,其中的 of 方法就是用来反序列化 ConsumerRecord 的,剩下的 3 个 valueOnly 是用来反序列化 kafka 消息中的 value 的.
到这里就非常清楚了,如果我们要自定义序列化类,实现 DeserializationSchema 和 KafkaRecordDeserializationSchema 任何一个都是可以的.下面就以 KafkaRecordDeserializationSchema 接口为例,实现一个简单的反序列化类.
MyKafkaDeserialization 自定义序列化类
  1. package flink.stream.deserialization;
  2. import bean.Jason;
  3. import com.alibaba.fastjson.JSON;
  4. import org.apache.flink.api.common.typeinfo.TypeInformation;
  5. import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
  6. import org.apache.kafka.clients.consumer.ConsumerRecord;
  7. import org.apache.log4j.Logger;
  8. public class MyKafkaDeserialization implements KafkaDeserializationSchema<Jason> {
  9.     private static final Logger log = Logger.getLogger(MyKafkaDeserialization.class);
  10.     private final String encoding = "UTF8";
  11.     private boolean includeTopic;
  12.     private boolean includeTimestamp;
  13.     public MyKafkaDeserialization(boolean includeTopic, boolean includeTimestamp) {
  14.         this.includeTopic = includeTopic;
  15.         this.includeTimestamp = includeTimestamp;
  16.     }
  17.     @Override
  18.     public TypeInformation<Jason> getProducedType() {
  19.         return TypeInformation.of(Jason.class);
  20.     }
  21.     @Override
  22.     public boolean isEndOfStream(Jason nextElement) {
  23.         return false;
  24.     }
  25.     @Override
  26.     public Jason deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
  27.         if (consumerRecord != null) {
  28.             try {
  29.                 String value = new String(consumerRecord.value(), encoding);
  30.                 Jason jason = JSON.parseObject(value, Jason.class);
  31.                 if (includeTopic) jason.setTopic(consumerRecord.topic());
  32.                 if (includeTimestamp) jason.setTimestamp(consumerRecord.timestamp());
  33.                 return jason;
  34.             } catch (Exception e) {
  35.                 log.error("deserialize failed : " + e.getMessage());
  36.             }
  37.         }
  38.         return null;
  39.     }
  40. }
复制代码

整个实现是非常简单的,这样就可以把消费到的数据反序列化成自己想要的格式,虽然 Flink 1.14.0 重构了 Source 接口,但是反序列化接口几乎没变,只不过在原有的基础上增加了几个方法而已.
使用
  1. KafkaSource<Jason> source = KafkaSource.<Jason>builder()
  2.         .setProperty("security.protocol", "SASL_PLAINTEXT")
  3.         .setProperty("sasl.mechanism", "PLAIN")
  4.         .setProperty("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username="" + username + "" password="" + password + "";")
  5.         // discover new partitions per 10 seconds
  6.         .setProperty("partition.discovery.interval.ms", "10000")
  7.         .setBootstrapServers(broker)
  8.         .setTopics(topic)
  9.         .setGroupId(group_id)
  10.         .setStartingOffsets(OffsetsInitializer.earliest())
  11.         .setDeserializer(KafkaRecordDeserializationSchema.of(new MyKafkaDeserialization(true, true)))
  12.         // 只反序列化 value
  13.         .setValueOnlyDeserializer(new MyDeSerializer())
  14.         .build();
复制代码

setDeserializer 和 setValueOnlyDeserializer 只用设置一个即可.

推荐阅读
Flink 任务实时监控最佳实践
Flink on yarn 实时日志收集最佳实践
Flink 1.14.0 全新的 Kafka Connector



最新经典文章,欢迎关注公众号


没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /5 下一条