分享

flink 1.11 新的水印生成器

本帖最后由 levycui 于 2020-7-15 17:25 编辑
2020-07-15_172357.jpg

2020-07-15_172409.jpg


Flink 1.11的水印代码:

[mw_shl_code=java,true]package datastream;

import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.util.Collector;

import java.util.Date;
import java.util.UUID;

/**
* @author zhangjun 欢迎关注我的公众号[大数据技术与应用实战],获取更多精彩实战内容
* <p>
* flink 1.11 中新的水印生成器
*/
public class WatermarkTest{
        public static void main(String[] args){

                StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
                env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
                //设置周期性水印的间隔
                env.getConfig().setAutoWatermarkInterval(5000L);
                DataStream<Tuple2<String,Long>> dataStream = env.addSource(new MySource());

                DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
                                new WatermarkStrategy<Tuple2<String,Long>>(){
                                        @Override
                                        public WatermarkGenerator<Tuple2<String,Long>> createWatermarkGenerator(
                                                        WatermarkGeneratorSupplier.Context context){
                                                return new WatermarkGenerator<Tuple2<String,Long>>(){
                                                        private long maxTimestamp;
                                                        private long delay = 3000;

                                                        @Override
                                                        public void onEvent(
                                                                        Tuple2<String,Long> event,
                                                                        long eventTimestamp,
                                                                        WatermarkOutput output){
                                                                maxTimestamp = Math.max(maxTimestamp, event.f1);
                                                        }

                                                        @Override
                                                        public void onPeriodicEmit(WatermarkOutput output){
                                                                output.emitWatermark(new Watermark(maxTimestamp - delay));
                                                        }
                                                };
                                        }
                                });

//使用内置的水印生成器
//                DataStream<Tuple2<String,Long>> withTimestampsAndWatermarks = dataStream.assignTimestampsAndWatermarks(
//                                WatermarkStrategy
//                                                .<Tuple2<String,Long>>forBoundedOutOfOrderness(Duration.ofSeconds(5))
//                                                .withTimestampAssigner((event, timestamp)->event.f1));

                withTimestampsAndWatermarks.process(new ProcessFunction<Tuple2<String,Long>,Object>(){

                        @Override
                        public void processElement(
                                        Tuple2<String,Long> value, Context ctx, Collector<Object> out) throws Exception{
                                long w = ctx.timerService().currentWatermark();
                                System.out.println(" 水印 : " + w + "  water  date " + new Date(w) + " now " +
                                                   new Date(value.f1));
                        }
                });

                try {
                        env.execute();
                } catch (Exception e){
                        e.printStackTrace();
                }
        }

        public static class MySource implements SourceFunction<Tuple2<String,Long>>{
                private volatile boolean isRunning = true;

                @Override
                public void run(SourceContext<Tuple2<String,Long>> ctx) throws Exception{
                        while (isRunning){
                                Thread.sleep(1000);
                                //订单id
                                String orderid = UUID.randomUUID().toString();
                                //订单完成时间
                                long orderFinishTime = System.currentTimeMillis();
                                ctx.collect(Tuple2.of(orderid, orderFinishTime));
                        }
                }

                @Override
                public void cancel(){
                        isRunning = false;
                }
        }
}[/mw_shl_code]

更多代码下载:
bigdata-examples-master.zip (52.76 KB, 下载次数: 3)

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条