分享

工作经验分享:FLINK SQL实战案例之商品销量实时统计

hyj 2021-5-26 07:13:49 发表于 Flink [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 3039
本帖最后由 hyj 于 2021-5-26 07:20 编辑

问题导读

1.本文的业务包含哪些流程?
2.本文难点在什么地方?
3.如何通过flink sql实现商品销量实时统计?

1、案例背景介绍
互联网电商往往需要对订单商品销量实时统计,用于实时大屏展示,库存销量监控等等。本文主要介绍如何通过flink sql的方式进行商品实时销量的统计。

业务流程介绍:

1.使用otter采集业务库binlog数据输出到kafka

2.flink读取kafka数据进行商品销量统计

3.统计结果输出到mysql

4.下游业务系统直接读取mysql数据

业务需求介绍:

根据订单创建时间统计商品每天的实时销量,不包含取消订单的商品

2、准备工作
将mysql订单相关的binlog日志实时同步到kafka对应的Topic,然后创建对应的flink table source表。

为了简化需求,下面的订单表和订单明细表只列出主要的字段。

订单主表:orders

字段名

数据类型

注释

order_no

VARCHAR

订单编号

order_status

int

订单状态(0:已取消,1:待支付,2:已支付,3:已出库,... )

pay_time

timestamp

支付时间

create_time

timestamp

订单创建时间

update_time

timestamp

订单更新时间



订单明细表:order_detail

字段名

数据类型

注释

order_no

VARCHAR

订单编号

product_code

VARCHAR

商品编码

quantity

int

商品数量

create_time

timestamp

订单创建时间

update_time

timestamp

订单更新时间


3、难点解析
同一个订单会有多次业务操作(例如下单、付款、发货,取消等等),每一次业务操作都会导致订单状态发生变化,并且每次变化订单表对应的Binlog日志会产生一条订单号相同的数据。如果我们不做处理直接关联聚合查询的话会导致数据重复统计结果不正确。因此我们需要了解业务系统都有哪些操作会对订单主表和订单明细进行更新操作。

假设业务系统数据变更是这样的:

用户下单后新增订单主表和订单明细表数据
后续的业务操作只会更新订单主表数据,订单明细表数据不会更新变化
数据每次更新update_time字段都会同时变化
再来看一下我们的需求如何处理:

需求的要求是当订单创建后我们就要统计该订单对应的商品数量,而当订单状态变为取消时我们要减掉该订单对应的商品数量。根据需求和订单数据更新的特点,这里需要用到flink回撤流的特性来处理该需求。flinksql可以使用row_number() over(partition by order_no order by update_time desc) 通过限制 where rn=1来获取同一订单的最新状态数据,然后和订单明细表进行关联求和。flinksql会自动更新统计结果。

4、编写业务逻辑

  1. --订单主表source table
  2. CREATE TABLE orders
  3.             (
  4.                order_no     string,
  5.                order_state  int,
  6.                pay_time     string,
  7.                create_time  string,
  8.                update_time  string
  9.              )
  10.        WITH (
  11.                'connector.type' = 'kafka',      
  12.                'connector.version' = 'universal', --kafka版本   
  13.                'connector.topic' = '_tporders',--kafkatopic
  14.                'connector.properties.zookeeper.connect' = 'localhost:2181',
  15.                'connector.properties.bootstrap.servers' = 'localhost:9092',
  16.                'connector.properties.group.id' = 'testGroup',
  17.                'connector.startup-mode' = 'latest-offset',
  18.                'format.type' = 'json' --数据为json格式            
  19.              )
  20.             
  21. --订单明细表source table
  22. CREATE TABLE order_detail
  23.             (
  24.                order_no     string,
  25.                product_code string,
  26.                quantity     int,
  27.                create_time  string,
  28.                update_time  string
  29.              )
  30.        WITH (
  31.                'connector.type' = 'kafka',      
  32.                'connector.version' = 'universal', --kafka版本   
  33.                'connector.topic' = 'tp_order_detail',--kafkatopic
  34.                'connector.properties.zookeeper.connect' = 'localhost:2181',
  35.                'connector.properties.bootstrap.servers' = 'localhost:9092',
  36.                'connector.properties.group.id' = 'testGroup',
  37.                'connector.startup-mode' = 'latest-offset',
  38.                'format.type' = 'json' --数据为json格式            
  39.              )
  40. --mysql统计结果表sink table
  41. --mysql建表时指定主键为order_date,product_code,flink写入数据时相同主键会进行更新
  42. CREATE TABLE product_sale
  43.              (
  44.               order_date string,
  45.               product_code string,
  46.               cnt int
  47.               )
  48.          WITH (
  49.            'connector.type' = 'jdbc',
  50.            'connector.url' = 'jdbc:mysql://localhost:3306/flink?serverTimezone=UTC&useSSL=true',
  51.            'connector.table' = 'order_state_cnt',
  52.            'connector.driver' = 'com.mysql.cj.jdbc.Driver',
  53.            'connector.username' = 'root',
  54.            'connector.password' = 'root',
  55.            'connector.write.flush.max-rows' = '1',--默认每5000条数据写入一次,测试调小一点
  56.            'connector.write.flush.interval' = '2s',--写入时间间隔
  57.            'connector.write.max-retries' = '3'
  58.          )
  59.          
  60. --统计商品销量并写入mysql        
  61. insert into product_sale
  62. select create_date,product_code,sum(quantity)
  63. from (select t1.order_no,
  64.              t1.create_date,
  65.              t2.product_code,
  66.              t2.quantity
  67.        from (select order_id,
  68.                     order_status,
  69.                     substring(create_time,1,10) create_date,
  70.                     update_time ,
  71.                     row_number() over(partition by order_no order by update_time desc) as rn
  72.               from orders
  73.               )t1
  74.        left join order_detail t2
  75.             on t1.order_no=t2.order_no
  76.       where t1.rn=1--取最新的订单状态数据
  77.       and t1.order_status<>0--不包含取消订单
  78.    )t3
  79. group by create_date,product_code
复制代码
5数据测试

假设在13点创建了两个订单,数据如下:

订单主表数据:

order_no

order_state

pay_time

create_time

update_time

order1

1


2020-04-01 13:00:00

2020-04-01 13:00:00

order2

1

2020-04-01 13:00:00

2020-04-01 13:00:00

订单明细数据:

order_no

product_code

quantity

create_time

update_time

order1

product1

3

2020-04-01 13:00:00

2020-04-01 13:00:00

order1

product2

5

2020-04-01 13:00:00

2020-04-01 13:00:00

order2

product1

2

2020-04-01 13:00:00

2020-04-01 13:00:00

order2

product2

4

2020-04-01 13:00:00

2020-04-01 13:00:00


统计结果:

order_date

product_code

cnt

2020-04-01

product1

5

2020-04-01

product2

9


然后订单号为order1的订单取消了:

order_no

order_state

pay_time

create_time

update_time

order1

0


2020-04-01 13:00:00

2020-04-01 13:15:00

order2

1

2020-04-01 13:00:00

2020-04-01 13:00:00


统计结果:

order_date

product_code

cnt

2020-04-01

product1

2

2020-04-01

product2

4



最新经典文章,欢迎关注公众号

原文链接
https://www.freesion.com/article/9323437359/

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条