分享

Apache-Flink深度解析-SQL概览(下)

本帖最后由 a87758133 于 2019-3-26 12:18 编辑
问题导读

1.Flink SQL 核心算子有哪些?
2.如何编写一个完整的包括Source和Sink定义的Apache Flink SQL Job?

3.Apache Flink包括哪些UDX?

上一篇
Apache-Flink深度解析-SQL概览(上)
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26903



(7)JOIN
JOIN 用于把来自两个表的行联合起来形成一个宽表,Apache Flink支持的JOIN类型:
  • JOIN - INNER JOIN
  • LEFT JOIN - LEFT OUTER JOIN
  • RIGHT JOIN - RIGHT OUTER JOIN
  • FULL JOIN - FULL OUTER JOIN
JOIN与关系代数的Join语义相同,具体如下:
v2-70273a93a34e78bd4d20a2a0665b5e13_r.jpg
SQL 示例 (JOIN)
INNER JOIN只选择满足ON条件的记录,我们查询customer_tab 和 order_tab表,将有订单的客户和订单信息选择出来,如下:
[mw_shl_code=sql,true]SELECT * FROM customer_tab AS c JOIN order_tab AS o ON o.c_id = c.c_id[/mw_shl_code]
Result
c_idc_namec_desco_idc_ido_timeo_descc_001Kevinfrom JinLino_002c_0012018-11-05 10:01:55ipadc_001Kevinfrom JinLino_003c_0012018-11-05 10:03:44flink bookc_002Sunnyfrom JinLino_oo1c_0022018-11-05 10:01:01iphone

SQL 示例 (LEFT JOIN)
LEFT JOIN与INNER JOIN的区别是当右表没有与左边相JOIN的数据时候,右边对应的字段补NULL输出,语义如下:
v2-51e91211ade08ab63f77387d8bb9edb7_r.jpg

对应的SQL语句如下(LEFT JOIN):
[mw_shl_code=sql,true]SELECT ColA, ColB, T2.ColC, ColE FROM TI LEFT JOIN T2 ON T1.ColC = T2.ColC ;[/mw_shl_code]
  • 细心的读者可能发现上面T2.ColC是添加了前缀T2了,这里需要说明一下,当两张表有字段名字一样的时候,我需要指定是从那个表里面投影的。
我们查询customer_tab 和 order_tab表,将客户和订单信息选择出来如下:
[mw_shl_code=sql,true]SELECT * FROM customer_tab AS c LEFT JOIN order_tab AS o ON o.c_id = c.c_id[/mw_shl_code]
Result
c_idc_namec_desco_idc_ido_timeo_descc_001Kevinfrom JinLino_002c_0012018-11-05 10:01:55ipadc_001Kevinfrom JinLino_003c_0012018-11-05 10:03:44flink bookc_002Sunnyfrom JinLino_oo1c_0022018-11-05 10:01:01iphonec_003JinChengfrom HeBeiNULLNULLNULLNULL
特别说明
RIGHT JOIN 相当于 LEFT JOIN 左右两个表交互一下位置。FULL JOIN相当于 RIGHT JOIN 和 LEFT JOIN 之后进行UNION ALL操作。

(8)Window
在Apache Flink中有2种类型的Window,一种是OverWindow,即传统数据库的标准开窗,每一个元素都对应一个窗口。一种是GroupWindow,目前在SQL中GroupWindow都是基于时间进行窗口划分的。
Over Window
Apache Flink中对OVER Window的定义遵循标准SQL的定义语法。
按ROWS和RANGE分类是传统数据库的标准分类方法,在Apache Flink中还可以根据时间类型(ProcTime/EventTime)和窗口的有限和无限(Bounded/UnBounded)进行分类,共计8种类型。为了避免大家对过细分类造成困扰,我们按照确定当前行的不同方式将OVER Window分成两大类进行介绍,如下:
  • ROWS OVER Window - 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。
  • RANGE OVER Window - 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口。
Bounded ROWS OVER Window
Bounded ROWS OVER Window 每一行元素都视为新的计算行,即,每一行都是一个新的窗口。
语义
我们以3个元素(2 PRECEDING)的窗口为例,如下图:

v2-ddb8d989317a59494dd0d701bd415fea_r.jpg


上图所示窗口 user 1 的 w5和w6, user 2的 窗口 w2 和 w3,虽然有元素都是同一时刻到达,但是他们仍然是在不同的窗口,这一点有别于RANGE OVER Window。
语法
Bounded ROWS OVER Window 语法如下:
[mw_shl_code=sql,true]SELECT
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)]
     ORDER BY timeCol
     ROWS
     BETWEEN (UNBOUNDED | rowCount) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1[/mw_shl_code]
  • value_expression - 进行分区的字表达式;
  • timeCol - 用于元素排序的时间字段;
  • rowCount - 是定义根据当前行开始向前追溯几行元素。

SQL 示例
利用item_tab测试数据,我们统计同类商品中当前和当前商品之前2个商品中的最高价格。
[mw_shl_code=sql,true]SELECT  
    itemID,
    itemType,
    onSellTime,
    price,  
    MAX(price) OVER (
        PARTITION BY itemType
        ORDER BY onSellTime
        ROWS BETWEEN 2 preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab[/mw_shl_code]
Result
itemIDitemTypeonSellTimepricemaxPriceITEM001Electronic2017-11-11 10:01:002020ITEM002Electronic2017-11-11 10:02:005050ITEM003Electronic2017-11-11 10:03:003050ITEM004Electronic2017-11-11 10:03:006060ITEM005Electronic2017-11-11 10:05:004060ITEM006Electronic2017-11-11 10:06:002060ITEM007Electronic2017-11-11 10:07:007070ITEM008Clothes2017-11-11 10:08:002020

Bounded RANGE OVER Window
Bounded RANGE OVER Window 具有相同时间值的所有元素行视为同一计算行,即,具有相同时间值的所有行都是同一个窗口。
语义
我们以3秒中数据(INTERVAL '2' SECOND)的窗口为例,如下图:
v2-40981d951e2147c32c4afef332a2591f_r.jpg
注意: 上图所示窗口 user 1 的 w6, user 2的 窗口 w3,元素都是同一时刻到达,他们是在同一个窗口,这一点有别于ROWS OVER Window。
语法
Bounded RANGE OVER Window的语法如下:
[mw_shl_code=sql,true]SELECT
    agg1(col1) OVER(
     [PARTITION BY (value_expression1,..., value_expressionN)]
     ORDER BY timeCol
     RANGE
     BETWEEN (UNBOUNDED | timeInterval) PRECEDING AND CURRENT ROW) AS colName,
...
FROM Tab1[/mw_shl_code]
  • value_expression - 进行分区的字表达式;
  • timeCol - 用于元素排序的时间字段;
  • timeInterval - 是定义根据当前行开始向前追溯指定时间的元素行;


SQL 示例
我们统计同类商品中当前和当前商品之前2分钟商品中的最高价格。
[mw_shl_code=sql,true]SELECT  
    itemID,
    itemType,
    onSellTime,
    price,  
    MAX(price) OVER (
        PARTITION BY itemType
        ORDER BY rowtime
        RANGE BETWEEN INTERVAL '2' MINUTE preceding AND CURRENT ROW) AS maxPrice
  FROM item_tab[/mw_shl_code]
Result(Bounded RANGE OVER Window)
itemIDitemTypeonSellTimepricemaxPriceITEM001Electronic2017-11-11 10:01:002020ITEM002Electronic2017-11-11 10:02:005050ITEM003Electronic2017-11-11 10:03:003060ITEM004Electronic2017-11-11 10:03:006060ITEM005Electronic2017-11-11 10:05:004060ITEM006Electronic2017-11-11 10:06:002040ITEM007Electronic2017-11-11 10:07:007070ITEM008Clothes2017-11-11 10:08:002020
特别说明
OverWindow最重要是要理解每一行数据都确定一个窗口,同时目前在Apache Flink中只支持按时间字段排序。并且OverWindow开窗与GroupBy方式数据分组最大的不同在于,GroupBy数据分组统计时候,在SELECT中除了GROUP BY的key,不能直接选择其他非key的字段,但是OverWindow没有这个限制,SELECT可以选择任何字段。比如一张表table(a,b,c,d)4个字段,如果按d分组求c的最大值,两种写完如下:
  • GROUP BY - SELECT d, MAX(c) FROM table GROUP BY d
  • OVER Window = SELECT a, b, c, d, MAX(c) OVER(PARTITION BY d, ORDER BY ProcTime())
  • 如上 OVER Window 虽然PARTITION BY d,但SELECT 中仍然可以选择 a,b,c字段。但在GROUPBY中,SELECT 只能选择 d 字段。


Group Window
根据窗口数据划分的不同,目前Apache Flink有如下3种Bounded Winodw:
  • Tumble - 滚动窗口,窗口数据有固定的大小,窗口数据无叠加;
  • Hop - 滑动窗口,窗口数据有固定大小,并且有固定的窗口重建频率,窗口数据有叠加;
  • Session - 会话窗口,窗口数据没有固定的大小,根据窗口数据活跃程度划分窗口,窗口数据无叠加。
说明: Aapche Flink 还支持UnBounded的 Group Window,也就是全局Window,流上所有数据都在一个窗口里面,语义非常简单,这里不做详细介绍了。

(9)Tumble语义
Tumble 滚动窗口有固定size,窗口数据不重叠,具体语义如下:
v2-9b138fa85a58efd34ce6ad9bc36aab4a_r.jpg
语法
Tumble 滚动窗口对应的语法如下:
[mw_shl_code=sql,true]SELECT
    [gk],
    [TUMBLE_START(timeCol, size)],
    [TUMBLE_END(timeCol, size)],
    agg1(col1),
    ...
    aggn(colN)
FROM Tab1
GROUP BY [gk], TUMBLE(timeCol, size)[/mw_shl_code]
  • [gk] - 决定了流是Keyed还是/Non-Keyed;
  • TUMBLE_START - 窗口开始时间;
  • TUMBLE_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
size - 表示窗口的大小,如 秒,分钟,小时,天。
SQL 示例
利用pageAccess_tab测试数据,我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV)。
[mw_shl_code=sql,true]SELECT  
    region,
    TUMBLE_START(rowtime, INTERVAL '2' MINUTE) AS winStart,  
    TUMBLE_END(rowtime, INTERVAL '2' MINUTE) AS winEnd,  
    COUNT(region) AS pv
FROM pageAccess_tab
GROUP BY region, TUMBLE(rowtime, INTERVAL '2' MINUTE)[/mw_shl_code]
Result
regionwinStartwinEndpvBeiJing2017-11-11 02:00:00.02017-11-11 02:02:00.01BeiJing2017-11-11 02:10:00.02017-11-11 02:12:00.02ShangHai2017-11-11 02:00:00.02017-11-11 02:02:00.01ShangHai2017-11-11 04:10:00.02017-11-11 04:12:00.01

(10)Hop
Hop 滑动窗口和滚动窗口类似,窗口有固定的size,与滚动窗口不同的是滑动窗口可以通过slide参数控制滑动窗口的新建频率。因此当slide值小于窗口size的值的时候多个滑动窗口会重叠。
语义
Hop 滑动窗口语义如下所示:

v2-7d07c3473583416e37bc9fed667951fd_r.jpg
语法
Hop 滑动窗口对应语法如下:
[mw_shl_code=sql,true]SELECT
    [gk],
    [HOP_START(timeCol, slide, size)] ,  
    [HOP_END(timeCol, slide, size)],
    agg1(col1),
    ...
    aggN(colN)
FROM Tab1
GROUP BY [gk], HOP(timeCol, slide, size)[/mw_shl_code]
  • [gk] 决定了流是Keyed还是/Non-Keyed;
  • HOP_START - 窗口开始时间;
  • HOP_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
  • slide - 是滑动步伐的大小;
size - 是窗口的大小,如 秒,分钟,小时,天;

SQL 示例
利用pageAccessCount_tab测试数据,我们需要每5分钟统计近10分钟的页面访问量(PV).
[mw_shl_code=sql,true]SELECT  
  HOP_START(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winStart,  
  HOP_END(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE) AS winEnd,  
  SUM(accessCount) AS accessCount  
FROM pageAccessCount_tab
GROUP BY HOP(rowtime, INTERVAL '5' MINUTE, INTERVAL '10' MINUTE)[/mw_shl_code]

Result
winStartwinEndaccessCount2017-11-11 01:55:00.02017-11-11 02:05:00.01862017-11-11 02:00:00.02017-11-11 02:10:00.03962017-11-11 02:05:00.02017-11-11 02:15:00.02432017-11-11 02:10:00.02017-11-11 02:20:00.0332017-11-11 04:05:00.02017-11-11 04:15:00.01292017-11-11 04:10:00.02017-11-11 04:20:00.0129
(11)Session
Seeeion 会话窗口 是没有固定大小的窗口,通过session的活跃度分组元素。不同于滚动窗口和滑动窗口,会话窗口不重叠,也没有固定的起止时间。一个会话窗口在一段时间内没有接收到元素时,即当出现非活跃间隙时关闭。一个会话窗口 分配器通过配置session gap来指定非活跃周期的时长.
语义
Session 会话窗口语义如下所示:
v2-75c50f9071d1c6c061efa84e20a1f367_r.jpg
语法
Seeeion 会话窗口对应语法如下:
[mw_shl_code=sql,true]SELECT
    [gk],
    SESSION_START(timeCol, gap) AS winStart,  
    SESSION_END(timeCol, gap) AS winEnd,
    agg1(col1),
     ...
    aggn(colN)
FROM Tab1
GROUP BY [gk], SESSION(timeCol, gap)[/mw_shl_code]
  • [gk] 决定了流是Keyed还是/Non-Keyed;
  • SESSION_START - 窗口开始时间;
  • SESSION_END - 窗口结束时间;
  • timeCol - 是流表中表示时间字段;
  • gap - 是窗口数据非活跃周期的时长;

SQL 示例
利用pageAccessSession_tab测试数据,我们按地域统计连续的两个访问用户之间的访问时间间隔不超过3分钟的的页面访问量(PV).
[mw_shl_code=sql,true]SELECT  
    region,
    SESSION_START(rowtime, INTERVAL '3' MINUTE) AS winStart,  
    SESSION_END(rowtime, INTERVAL '3' MINUTE) AS winEnd,
    COUNT(region) AS pv  
FROM pageAccessSession_tab
GROUP BY region, SESSION(rowtime, INTERVAL '3' MINUTE)[/mw_shl_code]
Result
regionwinStartwinEndpvBeiJing2017-11-11 02:10:00.02017-11-11 02:13:00.01ShangHai2017-11-11 02:01:00.02017-11-11 02:08:00.04ShangHai2017-11-11 02:10:00.02017-11-11 02:14:00.02ShangHai2017-11-11 04:16:00.02017-11-11 04:19:00.01

(12)UDX
Apache Flink 除了提供了大部分ANSI-SQL的核心算子,也为用户提供了自己编写业务代码的机会,那就是User-Defined Function,目前支持如下三种 User-Defined Function:
  • UDF - User-Defined Scalar Function
  • UDTF - User-Defined Table Function
  • UDAF - User-Defined Aggregate Funciton
UDX都是用户自定义的函数,那么Apache Flink框架为啥将自定义的函数分成三类呢?是根据什么划分的呢?Apache Flink对自定义函数进行分类的依据是根据函数语义的不同,函数的输入和输出不同来分类的,具体如下:
UDXINPUTOUTPUTINPUT:OUTPUTUDF单行中的N(N>=0)列单行中的1列1:1UDTF单行中的N(N>=0)列M(M>=0)行1:N(N>=0)UDAFM(M>=0)行中的每行的N(N>=0)列单行中的1列M:1(M>=0)
①UDF
  • 定义
用户想自己编写一个字符串联接的UDF,我们只需要实现ScalarFunction#eval()方法即可,简单实现如下:
[mw_shl_code=scala,true]object MyConnect extends ScalarFunction {
  @varargs
  def eval(args: String*): String = {
    val sb = new StringBuilder
    var i = 0
    while (i < args.length) {
      if (args(i) == null) {
        return null
      }
      sb.append(args(i))
      i += 1
    }
    sb.toString
  }
}[/mw_shl_code]

  • 使用
[mw_shl_code=scala,true]...
val fun = MyConnect
tEnv.registerFunction("myConnect", fun)
val sql = "SELECT myConnect(a, b) as str FROM tab"
...[/mw_shl_code]

②UDTF
  • 定义
用户想自己编写一个字符串切分的UDTF,我们只需要实现TableFunction#eval()方法即可,简单实现如下:
ScalarFunction#eval()
[mw_shl_code=scala,true]class MySplit extends TableFunction[String] {
  def eval(str: String): Unit = {
    if (str.contains("#")){
      str.split("#").foreach(collect)
    }
  }

  def eval(str: String, prefix: String): Unit = {
    if (str.contains("#")) {
      str.split("#").foreach(s => collect(prefix + s))
    }
  }
}[/mw_shl_code]
  • 使用
[mw_shl_code=scala,true]...
val fun = new MySplit()
tEnv.registerFunction("mySplit", fun)
val sql = "SELECT c, s FROM MyTable, LATERAL TABLE(mySplit(c)) AS T(s)"
...[/mw_shl_code]

③UDAF
  • 定义
UDAF 要实现的接口比较多,我们以一个简单的CountAGG为例,做简单实现如下:
[mw_shl_code=scala,true]/** The initial accumulator for count aggregate function */
class CountAccumulator extends JTuple1[Long] {
  f0 = 0L //count
}

/**
  * User-defined count aggregate function
  */
class MyCount
  extends AggregateFunction[JLong, CountAccumulator] {

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def accumulate(acc: CountAccumulator): Unit = {
    acc.f0 += 1L
  }

  // process argument is optimized by Calcite.
  // For instance count(42) or count(*) will be optimized to count().
  def retract(acc: CountAccumulator): Unit = {
    acc.f0 -= 1L
  }

  def accumulate(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 += 1L
    }
  }

  def retract(acc: CountAccumulator, value: Any): Unit = {
    if (value != null) {
      acc.f0 -= 1L
    }
  }

  override def getValue(acc: CountAccumulator): JLong = {
    acc.f0
  }

  def merge(acc: CountAccumulator, its: JIterable[CountAccumulator]): Unit = {
    val iter = its.iterator()
    while (iter.hasNext) {
      acc.f0 += iter.next().f0
    }
  }

  override def createAccumulator(): CountAccumulator = {
    new CountAccumulator
  }

  def resetAccumulator(acc: CountAccumulator): Unit = {
    acc.f0 = 0L
  }

  override def getAccumulatorType: TypeInformation[CountAccumulator] = {
    new TupleTypeInfo(classOf[CountAccumulator], BasicTypeInfo.LONG_TYPE_INFO)
  }

  override def getResultType: TypeInformation[JLong] =
    BasicTypeInfo.LONG_TYPE_INFO
}[/mw_shl_code]

  • 使用
[mw_shl_code=scala,true]...
val fun = new MyCount()
tEnv.registerFunction("myCount", fun)
val sql = "SELECT myCount(c) FROM MyTable GROUP BY  a"
...[/mw_shl_code]


Source&Sink
上面我们介绍了Apache Flink SQL核心算子的语法及语义,这部分将选取Bounded EventTime Tumble Window为例为大家编写一个完整的包括Source和Sink定义的Apache Flink SQL Job。假设有一张淘宝页面访问表(PageAccess_tab),有地域,用户ID和访问时间。我们需要按不同地域统计每2分钟的淘宝首页的访问量(PV). 具体数据如下:
regionuserIdaccessTimeShangHaiU00102017-11-11 10:01:00BeiJingU10012017-11-11 10:01:00BeiJingU20322017-11-11 10:10:00BeiJingU11002017-11-11 10:11:00ShangHaiU00112017-11-11 12:10:00
Source 定义
自定义Apache Flink Stream Source需要实现StreamTableSource, StreamTableSource中通过StreamExecutionEnvironment 的addSource方法获取DataStream, 所以我们需要自定义一个 SourceFunction, 并且要支持产生WaterMark,也就是要实现DefinedRowtimeAttributes接口。
Source Function定义
支持接收携带EventTime的数据集合,Either的数据结构,Right表示WaterMark和Left表示数据:
[mw_shl_code=scala,true]class MySourceFunction[T](dataWithTimestampList: Seq[Either[(Long, T), Long]])
  extends SourceFunction[T] {
  override def run(ctx: SourceContext[T]): Unit = {
    dataWithTimestampList.foreach {
      case Left(t) => ctx.collectWithTimestamp(t._2, t._1)
      case Right(w) => ctx.emitWatermark(new Watermark(w))
    }
  }
  override def cancel(): Unit = ???
}[/mw_shl_code]

定义 StreamTableSource
我们自定义的Source要携带我们测试的数据,以及对应的WaterMark数据,具体如下:
[mw_shl_code=scala,true]class MyTableSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

  val fieldNames = Array("accessTime", "region", "userId")
  val schema = new TableSchema(fieldNames, Array(Types.SQL_TIMESTAMP, Types.STRING, Types.STRING))
  val rowType = new RowTypeInfo(
    Array(Types.LONG, Types.STRING, Types.STRING).asInstanceOf[Array[TypeInformation[_]]],
    fieldNames)

  // 页面访问表数据 rows with timestamps and watermarks
  val data = Seq(
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "ShangHai", "U0010")),
    Right(1510365660000L),
    Left(1510365660000L, Row.of(new JLong(1510365660000L), "BeiJing", "U1001")),
    Right(1510365660000L),
    Left(1510366200000L, Row.of(new JLong(1510366200000L), "BeiJing", "U2032")),
    Right(1510366200000L),
    Left(1510366260000L, Row.of(new JLong(1510366260000L), "BeiJing", "U1100")),
    Right(1510366260000L),
    Left(1510373400000L, Row.of(new JLong(1510373400000L), "ShangHai", "U0011")),
    Right(1510373400000L)
  )

  override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
    Collections.singletonList(new RowtimeAttributeDescriptor(
      "accessTime",
      new ExistingField("accessTime"),
      PreserveWatermarks.INSTANCE))
  }

  override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
    execEnv.addSource(new MySourceFunction[Row](data)).setParallelism(1).returns(rowType)
  }

  override def getReturnType: TypeInformation[Row] = rowType

  override def getTableSchema: TableSchema = schema

}[/mw_shl_code]

Sink 定义
我们简单的将计算结果写入到Apache Flink内置支持的CSVSink中,定义Sink如下:
[mw_shl_code=scala,true]def getCsvTableSink: TableSink[Row] = {
    val tempFile = File.createTempFile("csv_sink_", "tem")
    // 打印sink的文件路径,方便我们查看运行结果
    println("Sink path : " + tempFile)
    if (tempFile.exists()) {
      tempFile.delete()
    }
    new CsvTableSink(tempFile.getAbsolutePath).configure(
      Array[String]("region", "winStart", "winEnd", "pv"),
      Array[TypeInformation[_]](Types.STRING, Types.SQL_TIMESTAMP, Types.SQL_TIMESTAMP, Types.LONG))
  }[/mw_shl_code]

构建主程序
主程序包括执行环境的定义,Source/Sink的注册以及统计查SQL的执行,具体如下:
[mw_shl_code=scala,true]def main(args: Array[String]): Unit = {
    // Streaming 环境
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val tEnv = TableEnvironment.getTableEnvironment(env)

    // 设置EventTime
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //方便我们查出输出数据
    env.setParallelism(1)

    val sourceTableName = "mySource"
    // 创建自定义source数据结构
    val tableSource = new MyTableSource

    val sinkTableName = "csvSink"
    // 创建CSV sink 数据结构
    val tableSink = getCsvTableSink

    // 注册source
    tEnv.registerTableSource(sourceTableName, tableSource)
    // 注册sink
    tEnv.registerTableSink(sinkTableName, tableSink)

    val sql =
      "SELECT  " +
      "  region, " +
      "  TUMBLE_START(accessTime, INTERVAL '2' MINUTE) AS winStart," +
      "  TUMBLE_END(accessTime, INTERVAL '2' MINUTE) AS winEnd, COUNT(region) AS pv " +
      " FROM mySource " +
      " GROUP BY TUMBLE(accessTime, INTERVAL '2' MINUTE), region"

    tEnv.sqlQuery(sql).insertInto(sinkTableName);
    env.execute()
  }[/mw_shl_code]

执行并查看运行结果
执行主程序后我们会在控制台得到Sink的文件路径,如下:
[mw_shl_code=shell,true]Sink path : /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
[/mw_shl_code]

Cat 方式查看计算结果,如下:

[mw_shl_code=shell,true]jinchengsunjcdeMacBook-Pro:FlinkTableApiDemo jincheng.sunjc$ cat /var/folders/88/8n406qmx2z73qvrzc_rbtv_r0000gn/T/csv_sink_8025014910735142911tem
ShangHai,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:00:00.0,2017-11-11 02:02:00.0,1
BeiJing,2017-11-11 02:10:00.0,2017-11-11 02:12:00.0,2
ShangHai,2017-11-11 04:10:00.0,2017-11-11 04:12:00.0,1[/mw_shl_code]

表格化如上结果:
regionwinStartwinEndpvBeiJing2017-11-11 02:00:00.02017-11-11 02:02:00.01BeiJing2017-11-11 02:10:00.02017-11-11 02:12:00.02ShangHai2017-11-11 02:00:00.02017-11-11 02:02:00.01ShangHai2017-11-11 04:10:00.02017-11-11 04:12:00.01
上面这个端到端的完整示例也可以应用到本篇前面介绍的其他算子示例中,只是大家根据Source和Sink的Schema不同来进行相应的构建即可!

总结
本篇概要的向大家介绍了SQL的由来,Apache Flink SQL 大部分核心功能,并附带了具体的测试数据和测试程序,最后以一个End-to-End的示例展示了如何编写Apache Flink SQL的Job收尾。本篇着重向大家介绍Apache Flink SQL的使用,后续我们再继续探究每个算子的实现原理。


最新经典文章,欢迎关注公众号

来源:知乎

作者:王知无

原文:《Apache-Flink深度解析-SQL概览》

https://zhuanlan.zhihu.com/p/59772928








本帖被以下淘专辑推荐:

已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条