分享

彻底明白Flink系统学习30:查询配置

pig2 2019-2-14 08:13:36 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 5702
问题导读

1.有界批量输入还是无界流输入,表API和SQL查询都有什么特点?
2.状态的本质是什么?
3.连续查询的准确性和资源消耗如何调整?
4.配置空闲状态保留时间有那两个参数?


上一篇文章:彻底明白Flink系统学习29-4:【Flink1.7】流概念之模式检测
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26695

版本:Flink1.7

无论表达式输入是有界批量输入还是无界流输入,表API和SQL查询都具有相同的语义。在许多情况下,对流输入的连续查询能够计算与离线计算结果相同的准确结果。然而,这在一般情况下是不可能的,因为连续查询必须限制它们维护的状态的大小,以避免耗尽存储并且能够在很长一段时间内处理无界流数据。因此,连续查询可能只能提供近似结果,具体取决于输入数据的特征和查询本身。(这里需要说明的是,状态的本质其实还是存储,所以对于状态的维护,需要不断的清理)

Flink的Table API和SQL接口提供参数来调整连续查询的准确性和资源消耗。参数通过QueryConfig对象指定。 QueryConfig可以从TableEnvironment获得,并在转换Table时传回,即,当它转换为DataStream或通过TableSink发出时。

[mw_shl_code=scala,true]val env = StreamExecutionEnvironment.getExecutionEnvironment
val tableEnv = TableEnvironment.getTableEnvironment(env)

// obtain query configuration from TableEnvironment
val qConfig: StreamQueryConfig = tableEnv.queryConfig
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))

// define query
val result: Table = ???

// create TableSink
val sink: TableSink[Row] = ???

// register TableSink
tableEnv.registerTableSink(
  "outputTable",                  // table name
  Array[String](...),             // field names
  Array[TypeInformation[_]](...), // field types
  sink)                           // table sink

// emit result Table via a TableSink
result.insertInto("outputTable", qConfig)

// convert result Table into a DataStream[Row]
val stream: DataStream[Row] = result.toAppendStream[Row](qConfig)[/mw_shl_code]
[mw_shl_code=java,true]StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tableEnv = TableEnvironment.getTableEnvironment(env);

// obtain query configuration from TableEnvironment
StreamQueryConfig qConfig = tableEnv.queryConfig();
// set query parameters
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));

// define query
Table result = ...

// create TableSink
TableSink<Row> sink = ...

// register TableSink
tableEnv.registerTableSink(
  "outputTable",               // table name
  new String[]{...},           // field names
  new TypeInformation[]{...},  // field types
  sink);                       // table sink

// emit result Table via a TableSink
result.insertInto("outputTable", qConfig);

// convert result Table into a DataStream<Row>
DataStream<Row> stream = tableEnv.toAppendStream(result, Row.class, qConfig);[/mw_shl_code]

在下文中,我们将描述QueryConfig的参数以及它们如何影响查询的准确性和资源消耗。

空闲状态保留时间
许多查询聚合或连接一个或多个key 属性上的记录。 在流上执行此类查询时,连续查询需要收集记录或维护每个key的部分结果。 如果输入流的key域正在变化,即,active key值随时间变化,则随着越来越多的不同key,连续查询累积越来越多的状态。 但是,经常在一段时间后key变为非活动状态,并且它们的相应状态变得陈旧且无用。
例如,以下查询计算每个会话(session)的单击次数。
[mw_shl_code=sql,true]SELECT sessionId, COUNT(*) FROM clicks GROUP BY sessionId;
[/mw_shl_code]

sessionId属性用作分组key,连续查询维护其(看到的)每个sessionId的计数。 sessionId属性随着时间的推移而变化(进化),并且sessionId值仅在会话(session)结束之前有效,即,在有限的时间段内。 但是,连续查询无法知道sessionId的此属性,并期望每个sessionId值都可以在任何时间点发生。 它维护每个(观察到的)sessionId值的计数。 因此,随着(观察到)越来越多的sessionId值,查询的总状态大小不断增长。

空闲状态保留时间参数(Idle State Retention Time)定义了在删除key之前保留key状态多长时间而不进行更新。 对于前面的示例查询,只要在配置的时间段内没有更新sessionId,就会删除它的计数。

通过删除key的状态,连续查询完全忘记它之前已经看过这个key。 如果处理了具有其状态已被删除的key的记录,则该记录将被视为具有相应key的第一个记录。 对于上面的示例,这意味着sessionId的计数将再次从0开始。

配置空闲状态保留时间有两个参数:

  • minimum idle state retention time定义了非活动key的状态在被删除之前至少保持多长时间。
  • maximum idle state retention time定义了非活动key的状态在被删除之前最多保留多长时间。

参数规定如下:
[mw_shl_code=scala,true]val qConfig: StreamQueryConfig = ???

// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24))[/mw_shl_code]

[mw_shl_code=java,true]StreamQueryConfig qConfig = ...

// set idle state retention time: min = 12 hours, max = 24 hours
qConfig.withIdleStateRetentionTime(Time.hours(12), Time.hours(24));[/mw_shl_code]
清理状态需要额外的簿记(bookkeeping),这对于minTime和maxTime的较大差异而言变得更实用(便宜)。 minTime和maxTime之间的差异必须至少为5分钟。

最新经典文章,欢迎关注公众号



已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条