分享

Flink1.11升级填坑(Flink版本升级遇到的问题)

问题导读

1.Flink1.11升级遇到那些问题?
2.新版本waterMark如何改动?
3.flink1.11,idea运行失败如何解决?


一、背景
现有集群版本是Flink 1.10.1,想要升级到社区最新的版本Flink 1.11.1。

二、踩坑过程
提示的信息:No hostname could be resolved for ip address
详细的社区邮件讨论过程如下:
1、在提交作业的时候,JM会疯狂刷出大量的日志No hostname could be resolved for ip address xxxx。该xxxx ip是kubernetes分配给flink TM的内网ip,JM由于这个报错,直接time out。
  1. kubectl run -i -t busybox --image=busybox --restart=Never
复制代码

2、进入到pod中反向解析flink TM的ip失败。
  1. / # nslookup 10.47.96.2
  2. Server:                10.96.0.10
  3. Address:        10.96.0.10:53
  4. ** server can't find 2.96.47.10.in-addr.arpa: NXDOMAIN
复制代码

而解析JM居然可以成功
  1. / # nslookup 10.34.128.8
  2. Server:                10.96.0.10
  3. Address:        10.96.0.10:53
  4. 8.128.34.10.in-addr.arpa        name = 10-34-128-8.flink-jobmanager.flink-test.svc.cluster.local
复制代码


唯一的差别就是JM是有service。
通过添加社区提供的可选配置解决问题taskmanager-query-state-service.yaml。
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/kubernetes.html
不过目前跟社区的沟通中,社区是没有遇到这个问题的,该问题还在进一步讨论中。

三、新版本waterMark改动
新版的waterMark的生成改为
  1. @Public
  2. public interface WatermarkGenerator<T> {
  3.         /**
  4.          * Called for every event, allows the watermark generator to examine and remember the
  5.          * event timestamps, or to emit a watermark based on the event itself.
  6.          */
  7.         void onEvent(T event, long eventTimestamp, WatermarkOutput output);
  8.         /**
  9.          * Called periodically, and might emit a new watermark, or not.
  10.          *
  11.          * <p>The interval in which this method is called and Watermarks are generated
  12.          * depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.
  13.          */
  14.         void onPeriodicEmit(WatermarkOutput output);
  15. }
复制代码

使用方式改为:
  1. dataStream.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(3)));
复制代码

跟旧版本的相比extractTimestamp提取时间戳的操作不见了。
  1. public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
  2.     private final long maxOutOfOrderness = 3500; // 3.5 seconds
  3.     private long currentMaxTimestamp;
  4.     @Override
  5.     public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
  6.         long timestamp = element.getCreationTime();
  7.         currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
  8.         return timestamp;
  9.     }
  10.     @Override
  11.     public Watermark getCurrentWatermark() {
  12.         // return the watermark as current highest timestamp minus the out-of-orderness bound
  13.         return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
  14.     }
  15. }
复制代码

如果按照新版的升级,那么数据的timeStamp会变成Long.Min。正确的使用方式是
  1. dataStream.assignTimestampsAndWatermarks(
  2.                                 WatermarkStrategy
  3.                                                 .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
  4.                                                 .withTimestampAssigner((event, timestamp)->event.f1));
复制代码

  1. .assignTimestampsAndWatermarks(WatermarkStrategy.<StationLog>forBoundedOutOfOrderness(Duration.ofSeconds(3))
  2.                                 .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
  3.                                         @Override
  4.                                         public long extractTimestamp(StationLog element, long recordTimestamp) {
  5.                                                 return element.getCallTime(); //指定EventTime对应的字段
  6.                                         }
  7.                                 })
复制代码


如果有自定义,使用方式如下
  1. .assignTimestampsAndWatermarks(((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(60,60)
  2.                                 .withTimestampAssigner(new SerializableTimestampAssigner<StationLog>() {
  3.                                         @Override
  4.                                         public long extractTimestamp(StationLog element, long recordTimestamp) {
  5.                                                 return element.getCallTime(); //指定EventTime对应的字段
  6.                                         }
  7.                                 })
复制代码

四、工具类

  1. public class WatermarkStrategys{
  2.     public static < T extends TimeEvent> WatermarkStrategy<T> forBoundOutOfOrderness(long futuerOutMs,long maxOutofOrderMs){
  3.         return ((WatermarkStrategy)(ctx)->new BoundOutOrdernessStrategy(futuerOutMs,maxOutofOrderMs))
  4.             .withTimestampAssigner((SerializableTimestampAssigner<T>)(element,recordTimeStamp)-> event.getEventTimeMs())
  5.     }
  6. }
  7. public interface TimeEvent{
  8.     long getEventTimeMs();
  9. }
复制代码

五、flink1.11,idea运行失败
作业的依赖从1.10.1升级到1.11.0,在idea运行的时候报错
  1. Exception in thread "main" java.lang.IllegalStateException: No ExecutorFactory found to execute the application.
  2.    at org.apache.flink.core.execution.DefaultExecutorServiceLoader.getExecutorFactory(DefaultExecutorServiceLoader.java:84)
  3.    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.executeAsync(StreamExecutionEnvironment.java:1803)
  4.    at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.execute(StreamExecutionEnvironment.java:1713)
  5.    at org.apache.flink.streaming.api.environment.LocalStreamEnvironment.execute(LocalStreamEnvironment()
复制代码

解决方法:
尝试加一下这个依赖
groupId: org.apache.flink
artifactId: flink-clients_${scala.binary.version}

导致原因



最新经典文章,欢迎关注公众号


相关帖子

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条