分享

Flink流join案例详解【含代码】



问题导读:

1、怎样关联流数据?
2、在Flink中,流Join有哪几种?
3、怎样利用window join关联商品与订单?


1 JOIN

自从Stream pipeline解决方案地成熟,流操作和关系型表结构操作的差距越来越小了。我们通过Flink这样的框架,可以进行高吞吐量的数据流执行非常密集的数据处理,例如:join、filter、aggregation。所以,接下来我们就来看看Flink的Stream join。

在介绍Stream join之前,我们先来回顾一下关系型数据的Join、以及数仓维度建模的Join。

1 RDBMS Join

在学习关系型数据库时,就对关系型数据库的JOIN非常熟悉。RDBMS中,一般有很多表,而为了满足范式设计,我们会将数据分开在不同的表中存储。而我们获取数据的时候,需要将这些表的数据集中起来,这时候,我们就要使用JOIN了。

MySQL支持多种JOIN,例如:LEFT JOIN、RIGHT JOIN、INNER JOIN。而每一种JOIN具备不同特点。

2 数仓维度建模Join

在数据仓库设计中,我们一般会采用维度建模方式,而事实表中有大量的维表外键ID以及大量指标。通过维度表外键ID,我们可以和维度表进行关联,然后通过不同维度来统计计算指标。

3 流Join

在实时处理中,事实数据(Event)源源不断地流入到Kafka中,这些事实数据可能是来自于网页中的用户访问日志、也有可能是MySQL中进行操作的binlog日志,但基本上可以确认的是,在这个事件流中,不是所有的数据都是完整的。绝大多数场景,原始日志数据需要和外部存储系统中的一些数据进行关联之后,才能得到更完整的数据。

为了进行数据的实时处理,我们需要在流处理系统将数据进行关联处理,才能得到最完整的数据。我们怎么样关联呢?假设,我们需要在Flink中进行实时指标统计,也就是我们需要用事件流中的数据与存储在MySQL中的维表在Flink流处理系统中进行关联。如何处理?

这里有三种方式:

  • 使用DataStream的API,我们可以使用connect操作,然后再使用RichFlatMapFunction或者是CoProcessFunction来手动实现Join,它们可以将数据保持在state中,在CoProcessFunction中可以使用计时器,定时进行数据的关联,然后定期清理过期的state。
  • 使用Table API,我们自己实现一个UDTF来访问维表
  • 使用流JOIN来实现


我们这里,重点就来讲讲Flink中非常重要的流JOIN操作。在Flink中,流Join主要有两种,一种是Window Join,还有一种是Interval Join。我们先来看看Window Join。

2 Window JOIN

Window Join将流中两个key相同的元素联结在一起。这种联结方式看起来非常像inner join,两个元素必须都存在,才会出现在结果中。

在Flink中,分为有三种不同类型的典型窗口:滚动窗口、滑动窗口、会话窗口。我们以窗口的类型分开讲解。

在执行窗口join时,会将所有key能够匹配上、且处在同一个滚动窗口的事件进行join,join之后传递到JoinFunction或者FlatJoinFunction。这种join看起来就像是INNER JOIN,滚动窗口operator不会将一个在某个流中,而在另一个流中不存在的元素发送到下游。


640 - 2021-01-16T173953.050.png

上述图,表示两个流进行滚动窗口join,我们发现,只要是两个流中都有的元素,才发生了join操作。

来用个简单的案例演示下:

使用两个指定Source模拟数据,一个Source是订单明细,一个Source是商品数据。我们通过window join,将数据关联到一起。

1、先将Flink的依赖导入进来


  1. <repositories>
  2.     <repository>
  3.         <id>aliyunmaven</id>
  4.         <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
  5.     </repository>
  6. </repositories>
  7. <properties>
  8.     <flink-version>1.12.0</flink-version>
  9.     <scala-version>2.12</scala-version>
  10.     <mysql-version>5.1.47</mysql-version>
  11. </properties>
  12. <dependencies>
  13.     <dependency>
  14.         <groupId>org.apache.flink</groupId>
  15.         <artifactId>flink-java</artifactId>
  16.         <version>${flink-version}</version>
  17.     </dependency>
  18.     <dependency>
  19.         <groupId>org.apache.flink</groupId>
  20.         <artifactId>flink-streaming-java_${scala-version}</artifactId>
  21.         <version>${flink-version}</version>
  22.     </dependency>
  23.     <dependency>
  24.         <groupId>org.apache.flink</groupId>
  25.         <artifactId>flink-clients_${scala-version}</artifactId>
  26.         <version>${flink-version}</version>
  27.     </dependency>
  28.     <dependency>
  29.         <groupId>com.alibaba</groupId>
  30.         <artifactId>fastjson</artifactId>
  31.         <version>1.2.62</version>
  32.     </dependency>
  33. </dependencies>
复制代码


2、先为本次的测试构建三个实体类,一个是Goods(商品类)、另一个OrderItem(订单明细)、还有一个是Join之后的实体类。


  1. public class Goods11 {
  2.     private String goodsId;
  3.     private String goodsName;
  4.     private BigDecimal goodsPrice;
  5.     public static List<Goods11> GOODS_LIST;
  6.     public static Random r;
  7.     static  {
  8.         r = new Random();
  9.         GOODS_LIST = new ArrayList<>();
  10.         GOODS_LIST.add(new Goods11("1", "小米12", new BigDecimal(4890)));
  11.         GOODS_LIST.add(new Goods11("2", "iphone12", new BigDecimal(12000)));
  12.         GOODS_LIST.add(new Goods11("3", "MacBookPro", new BigDecimal(15000)));
  13.         GOODS_LIST.add(new Goods11("4", "Thinkpad X1", new BigDecimal(9800)));
  14.         GOODS_LIST.add(new Goods11("5", "MeiZu One", new BigDecimal(3200)));
  15.         GOODS_LIST.add(new Goods11("6", "Mate 40", new BigDecimal(6500)));
  16.     }
  17.     public static Goods11 randomGoods() {
  18.         int rIndex = r.nextInt(GOODS_LIST.size());
  19.         return GOODS_LIST.get(rIndex);
  20.     }
  21.     public Goods11() {
  22.     }
  23.     public Goods11(String goodsId, String goodsName, BigDecimal goodsPrice) {
  24.         this.goodsId = goodsId;
  25.         this.goodsName = goodsName;
  26.         this.goodsPrice = goodsPrice;
  27.     }
  28.     // 生成getter/setter此处省略
  29.     @Override
  30.     public String toString() {
  31.         return JSON.toJSONString(this);
  32.     }
  33. }
复制代码

  1. public class OrderItem11 {
  2.     private String itemId;
  3.     private String goodsId;
  4.     private Integer count;
  5.     // 生成getter/setting,此处省略
  6.     @Override
  7.     public String toString() {
  8.         return JSON.toJSONString(this);
  9.     }
  10. }
复制代码

  1. public class OrderItem11 {
  2.     private String itemId;
  3.     private String goodsId;
  4.     private Integer count;
  5.     // 生成getter/setting,此处省略
  6.     @Override
  7.     public String toString() {
  8.         return JSON.toJSONString(this);
  9.     }
  10. }
  11. public class FactOrderItem {
  12.     private String goodsId;
  13.     private String goodsName;
  14.     private BigDecimal count;
  15.     private BigDecimal totalMoney;
  16.     @Override
  17.     public String toString() {
  18.         return JSON.toJSONString(this);
  19.     }
  20.     // 生成的getter/setter
  21. }
复制代码


3、构建一个商品Stream源(这个好比就是维表)


  1. public class GoodsSource11 extends RichSourceFunction {
  2.     private Boolean isCancel;
  3.     @Override
  4.     public void open(Configuration parameters) throws Exception {
  5.         isCancel = false;
  6.     }
  7.     @Override
  8.     public void run(SourceContext sourceContext) throws Exception {
  9.         while(!isCancel) {
  10.             Goods11.GOODS_LIST.stream().forEach(goods -> sourceContext.collect(goods));
  11.             TimeUnit.SECONDS.sleep(1);
  12.         }
  13.     }
  14.     @Override
  15.     public void cancel() {
  16.         isCancel = true;
  17.     }
  18. }
复制代码


4、构建订单明细Stream源


  1. public class OrderItemSource11 extends RichSourceFunction {
  2.     private Boolean isCancel;
  3.     private Random r;
  4.     @Override
  5.     public void open(Configuration parameters) throws Exception {
  6.         isCancel = false;
  7.         r = new Random();
  8.     }
  9.     @Override
  10.     public void run(SourceContext sourceContext) throws Exception {
  11.         while(!isCancel) {
  12.             Goods11 goods11 = Goods11.randomGoods();
  13.             OrderItem11 orderItem11 = new OrderItem11();
  14.             orderItem11.setGoodsId(goods11.getGoodsId());
  15.             orderItem11.setCount(r.nextInt(10) + 1);
  16.             orderItem11.setItemId(UUID.randomUUID().toString());
  17.             sourceContext.collect(orderItem11);
  18.             orderItem11.setGoodsId("111");
  19.             sourceContext.collect(orderItem11);
  20.             TimeUnit.SECONDS.sleep(1);
  21.         }
  22.     }
  23.     @Override
  24.     public void cancel() {
  25.         isCancel = true;
  26.     }
  27. }
复制代码


5、构建水印分配器(此处为了简单),直接使用系统时间了

  1. public class Goods11Watermark implements WatermarkStrategy<Goods11> {
  2.     @Override
  3.     public TimestampAssigner<Goods11> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  4.         return (element, recordTimestamp) -> System.currentTimeMillis();
  5.     }
  6.     @Override
  7.     public WatermarkGenerator<Goods11> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  8.         return new WatermarkGenerator<Goods11>() {
  9.             @Override
  10.             public void onEvent(Goods11 event, long eventTimestamp, WatermarkOutput output) {
  11.                 output.emitWatermark(new Watermark(System.currentTimeMillis()));
  12.             }
  13.             @Override
  14.             public void onPeriodicEmit(WatermarkOutput output) {
  15.                 output.emitWatermark(new Watermark(System.currentTimeMillis()));
  16.             }
  17.         };
  18.     }
  19. }
  20. public class OrderItemWatermark implements WatermarkStrategy<OrderItem11> {
  21.     @Override
  22.     public TimestampAssigner<OrderItem11> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
  23.         return (element, recordTimestamp) -> System.currentTimeMillis();
  24.     }
  25.     @Override
  26.     public WatermarkGenerator<OrderItem11> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
  27.         return new WatermarkGenerator<OrderItem11>() {
  28.             @Override
  29.             public void onEvent(OrderItem11 event, long eventTimestamp, WatermarkOutput output) {
  30.                 output.emitWatermark(new Watermark(System.currentTimeMillis()));
  31.             }
  32.             @Override
  33.             public void onPeriodicEmit(WatermarkOutput output) {
  34.                 output.emitWatermark(new Watermark(System.currentTimeMillis()));
  35.             }
  36.         };
  37.     }
  38. }
复制代码


6、Window Join代码


  1. public class TumbleWindowJoin {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 构建商品数据流
  5.         SingleOutputStreamOperator<Goods11> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods11.class))
  6.                 .assignTimestampsAndWatermarks(new Goods11Watermark() {
  7.                 });
  8.         // 构建订单明细数据流
  9.         SingleOutputStreamOperator<OrderItem11> orderItemDS = env.addSource(new OrderItemSource11(), TypeInformation.of(OrderItem11.class))
  10.                 .assignTimestampsAndWatermarks(new OrderItemWatermark());
  11.         // 进行关联查询
  12.         DataStream<FactOrderItem> factOrderItemDS = orderItemDS.join(goodsDS)
  13.                 // 第一个流orderItemDS
  14.                 .where(OrderItem11::getGoodsId)
  15.                 // 第二流goodsDS
  16.                 .equalTo(Goods11::getGoodsId)
  17.                 .window(TumblingEventTimeWindows.of(Time.seconds(5)))
  18.                 .apply((OrderItem11 item, Goods11 goods) -> {
  19.                     FactOrderItem factOrderItem = new FactOrderItem();
  20.                     factOrderItem.setGoodsId(goods.getGoodsId());
  21.                     factOrderItem.setGoodsName(goods.getGoodsName());
  22.                     factOrderItem.setCount(new BigDecimal(item.getCount()));
  23.                     factOrderItem.setTotalMoney(goods.getGoodsPrice().multiply(new BigDecimal(item.getCount())));
  24.                     return factOrderItem;
  25.                 });
  26.         factOrderItemDS.print();
  27.         env.execute("滚动窗口JOIN");
  28.     }
  29. }
复制代码


1、Window Join首先需要使用where和equalTo指定使用哪个key来进行关联,此处我们通过应用方法,基于GoodsId来关联两个流中的元素。

2、设置了5秒的滚动窗口,流的元素关联都会在这个5秒的窗口中进行关联。

3、apply方法中实现了,将两个不同类型的元素关联并生成一个新类型的元素。

3 Interval Join

前面学习的Window Join必须要在一个Window中进行JOIN,那如果没有Window如何处理呢?

interval join也是使用相同的key来join两个流(流A、流B),并且流B中的元素中的时间戳,和流A元素的时间戳,有一个时间间隔。也就是:流B的元素的时间戳 ≥ 流A的元素时间戳 + 下界,且流B的元素的时间戳 ≤ 流A的元素时间戳 + 上界。

我们来看Flink官方的一张图。


640 - 2021-01-16T174616.816.png

我们看到,流A的每一个元素,都会和流B的一定时间范围的元素进行JOIN。

其中,上界和下界可以是负数,也可以是整数。Interval join目前只支持INNER JOIN。将连接后的元素传递给ProcessJoinFunction时,时间戳变为两个元素中最大的那个时间戳。

注意:

Interval Join只支持事件时间。

依然是前面的案例:


  1. public class IntervalJoin {
  2.     public static void main(String[] args) throws Exception {
  3.         StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  4.         // 构建商品数据流
  5.         SingleOutputStreamOperator<Goods11> goodsDS = env.addSource(new GoodsSource11(), TypeInformation.of(Goods11.class))
  6.                 .assignTimestampsAndWatermarks(new Goods11Watermark() {
  7.                 });
  8.         // 构建订单明细数据流
  9.         SingleOutputStreamOperator<OrderItem11> orderItemDS = env.addSource(new OrderItemSource11(), TypeInformation.of(OrderItem11.class))
  10.                 .assignTimestampsAndWatermarks(new OrderItemWatermark());
  11.         // 进行关联查询
  12.         SingleOutputStreamOperator<FactOrderItem> factOrderItemDS = orderItemDS.keyBy(item -> item.getGoodsId())
  13.                 .intervalJoin(goodsDS.keyBy(goods -> goods.getGoodsId()))
  14.                 .between(Time.seconds(-1), Time.seconds(0))
  15.                 .upperBoundExclusive()
  16.                 .process(new ProcessJoinFunction<OrderItem11, Goods11, FactOrderItem>() {
  17.                     @Override
  18.                     public void processElement(OrderItem11 left, Goods11 right, Context ctx, Collector<FactOrderItem> out) throws Exception {
  19.                         FactOrderItem factOrderItem = new FactOrderItem();
  20.                         factOrderItem.setGoodsId(right.getGoodsId());
  21.                         factOrderItem.setGoodsName(right.getGoodsName());
  22.                         factOrderItem.setCount(new BigDecimal(left.getCount()));
  23.                         factOrderItem.setTotalMoney(right.getGoodsPrice().multiply(new BigDecimal(left.getCount())));
  24.                         out.collect(factOrderItem);
  25.                     }
  26.                 });
  27.         factOrderItemDS.print();
  28.         env.execute("Interval JOIN");
  29.     }
  30. }
复制代码


我们看到了,上述案例中,我们并没有使用Window就实现了两个流的Join。

1、这里我们通过keyBy将两个流join到一起

2、interval join需要设置流A去关联哪个时间范围的流B中的元素。此处,我设置的下界为-1、上界为0,且上界是一个开区间。表达的意思就是流A中某个元素的时间,对应上一秒的流B中的元素。

3、process中将两个key一样的元素,关联在一起,并加载到一个新的FactOrderItem对象中

参考文献:

https://ci.apache.org/projects/f ... rators/joining.html





最新经典文章,欢迎关注公众号



---------------------

作者:Flink
来源:weixin
原文:面试实时开发必问——Flink中如何做流Join呢?

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条