分享

彻底明白Flink系统学习26:【Flink1.7】Table API 和SQL API之时间属性

pig2 2019-1-21 11:19:39 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 9524
本帖最后由 pig2 于 2019-1-21 11:22 编辑
问题导读
1.Table和SQL API如何定义时间属性?
2.如何将时间属性嵌入到Table和SQL API中?
3.有哪两种方法可以定义处理时间属性?

上一篇:
彻底明白Flink系统学习25:【Flink1.7】Table API 和SQL API之动态表详解
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26637


Flink能够根据不同的时间概念处理流数据。

  • 处理时间是指正在执行相应操作的机器的系统时间(也称为“挂钟时间”)。
  • 事件时间是指基于附加到每行的时间戳来处理流数据。 时间戳可以在事件发生时进行编码。
  • 注入时间是事件进入Flink的时间; 在内部,它与事件时间类似地对待。


本文介绍了如何在Flink的Table API和SQL中为基于时间的操作定义时间属性。


时间属性介绍
Table API和SQL中基于时间的操作(如窗口)需要有关时间概念及其来源的信息。因此,表可以提供逻辑时间属性,用于指示时间和访问表程序中的相应时间戳。

时间属性可以是每个表schema的一部分。即可以为一个字段。它们是在从DataStream创建表时定义的,或者是在使用TableSource时预定义的。一旦在开头定义了时间属性(字段),它就可以作为字段引用,并且可以在基于时间的操作中使用。

只要时间属性未被修改并且只是从查询的一部分转发到另一部分,它仍然是有效的时间属性。时间属性的行为类似于常规时间戳,可以访问以进行计算。如果在计算中使用了时间属性,则它将具体化并成为常规时间戳。常规时间戳不与Flink的时间和水印系统配合,因此不能再用于基于时间的操作。

表程序要求为流式环境指定相应的时间特性:

[mw_shl_code=scala,true]val env = StreamExecutionEnvironment.getExecutionEnvironment

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime)
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)[/mw_shl_code]

[mw_shl_code=java,true]final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); // default

// alternatively:
// env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
// env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);[/mw_shl_code]

处理时间
处理时间允许表程序根据本地机器的时间产生结果。 这是最简单的时间概念,但不提供决定论。 它既不需要时间戳提取也不需要水印生成。

有两种方法可以定义处理时间属性。

在DataStream到表转换期间
处理时间属性在架构定义期间使用.proctime属性定义。 time属性只能通过附加的逻辑字段扩展物理模式。 因此,它只能在schema定义的末尾定义。
[mw_shl_code=scala,true]val stream: DataStream[(String, String)] = ...

// 声明一个额外的逻辑字段作为处理时间属性
val table = tEnv.fromDataStream(stream, 'UserActionTimestamp, 'Username, 'Data, 'UserActionTime.proctime)

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)[/mw_shl_code]
[mw_shl_code=java,true]DataStream<Tuple2<String, String>> stream = ...;

// 声明一个额外的逻辑字段作为处理时间属性
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.proctime");

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));[/mw_shl_code]

使用TableSource
处理时间属性由实现DefinedProctimeAttribute接口的TableSource定义。 逻辑时间属性附加到由TableSource的返回类型定义的物理模式。
[mw_shl_code=scala,true]
// 使用处理时间属性定义表源
class UserActionSource extends StreamTableSource[Row] with DefinedProctimeAttribute {

        override def getReturnType = {
                val names = Array[String]("Username" , "Data")
                val types = Array[TypeInformation[_]](Types.STRING, Types.STRING)
                Types.ROW(names, types)
        }

        override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
                // create stream
                val stream = ...
                stream
        }

        override def getProctimeAttribute = {
                // field with this name will be appended as a third field
                "UserActionTime"
        }
}

// 注册table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)[/mw_shl_code]
[mw_shl_code=java,true]// define a table source with a processing attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedProctimeAttribute {

        @Override
        public TypeInformation<Row> getReturnType() {
                String[] names = new String[] {"Username" , "Data"};
                TypeInformation[] types = new TypeInformation[] {Types.STRING(), Types.STRING()};
                return Types.ROW(names, types);
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
                // create stream
                DataStream<Row> stream = ...;
                return stream;
        }

        @Override
        public String getProctimeAttribute() {
                // field with this name will be appended as a third field
                return "UserActionTime";
        }
}

// 注册table source
tEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));[/mw_shl_code]

事件时间
事件时间允许表程序根据每个记录中包含的时间生成结果。 即使在无序事件或延迟事件的情况下,这也允许一致的结果。 当从持久存储中读取记录时,它还确保表程序的可重放结果。

此外,事件时间允许批处理和流式传输环境中的表程序的统一语法。 流式传输环境中的时间属性可以是批处理环境中的记录的常规字段。

为了处理乱序事件并区分流媒体中的准时和晚期事件,Flink需要从事件中提取时间戳并在时间上做出某种进展(所谓的水印)。

可以在DataStream-to-Table转换期间或使用TableSource定义事件时间属性。

在DataStream到表转换期间
在schema定义期间使用.rowtime属性定义事件时间属性。 必须在转换的DataStream中分配时间戳和水印。

将DataStream转换为表时,有两种方法可以定义时间属性。 根据DataStream架构中是否存在指定的.rowtime字段名称,时间戳字段也是
  • 作为新字段附加到schema 或
  • 替换现有字段。

在任何一种情况下,事件时间时间戳字段都将保存DataStream事件时间时间戳的值。

[mw_shl_code=scala,true]// 选项1:

//注入时间戳并根据流的知识分配水印
val stream: DataStream[(String, String)] = inputStream.assignTimestampsAndWatermarks(...)

// 声明一个额外的逻辑字段作为事件时间属性
val table = tEnv.fromDataStream(stream, 'Username, 'Data, 'UserActionTime.rowtime)


// 选项2:

//从第一个字段中提取时间戳,并根据流的知识分配水印
val stream: DataStream[(Long, String, String)] = inputStream.assignTimestampsAndWatermarks(...)

//第一个字段已用于时间戳提取,不再需要
// 用逻辑事件时间属性替换第一个字段
val table = tEnv.fromDataStream(stream, 'UserActionTime.rowtime, 'Username, 'Data)

// Usage:

val windowedTable = table.window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)[/mw_shl_code]
[mw_shl_code=java,true]// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));[/mw_shl_code]
[mw_shl_code=java,true]// Option 1:

// extract timestamp and assign watermarks based on knowledge of the stream
DataStream<Tuple2<String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// declare an additional logical field as an event time attribute
Table table = tEnv.fromDataStream(stream, "Username, Data, UserActionTime.rowtime");


// Option 2:

// extract timestamp from first field, and assign watermarks based on knowledge of the stream
DataStream<Tuple3<Long, String, String>> stream = inputStream.assignTimestampsAndWatermarks(...);

// the first field has been used for timestamp extraction, and is no longer necessary
// replace first field with a logical event time attribute
Table table = tEnv.fromDataStream(stream, "UserActionTime.rowtime, Username, Data");

// Usage:

WindowedTable windowedTable = table.window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));[/mw_shl_code]

使用TableSource
事件时间属性由实现DefinedRowtimeAttributes接口的TableSource定义。 getRowtimeAttributeDescriptors()方法返回RowtimeAttributeDescriptor列表,用于描述时间属性的最终名称,用于派生属性值的时间戳提取器以及与属性关联的水印策略。

确保getDataStream()方法返回的DataStream与定义的时间属性对齐。 仅当定义了StreamRecordTimestamp时间戳提取器时,才考虑DataStream的时间戳(由TimestampAssigner分配的时间戳)。 仅当定义了PreserveWatermarks水印策略时,才会保留DataStream的水印。 否则,只有TableSource的rowtime属性的值是相关的。

[mw_shl_code=scala,true]// 使用rowtime属性定义表源
class UserActionSource extends StreamTableSource[Row] with DefinedRowtimeAttributes {

        override def getReturnType = {
                val names = Array[String]("Username" , "Data", "UserActionTime")
                val types = Array[TypeInformation[_]](Types.STRING, Types.STRING, Types.LONG)
                Types.ROW(names, types)
        }

        override def getDataStream(execEnv: StreamExecutionEnvironment): DataStream[Row] = {
                // create stream
                // ...
                //  根据"UserActionTime" 属性分配watermarks
                val stream = inputStream.assignTimestampsAndWatermarks(...)
                stream
        }

        override def getRowtimeAttributeDescriptors: util.List[RowtimeAttributeDescriptor] = {
                //将“UserActionTime”属性标记为事件时间属性。
                // 我们创建了一个“UserActionTime”的属性描述符。
                val rowtimeAttrDescr = new RowtimeAttributeDescriptor(
                        "UserActionTime",
                        new ExistingField("UserActionTime"),
                        new AscendingTimestamps)
                val listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr)
                listRowtimeAttrDescr
        }
}

// register the table source
tEnv.registerTableSource("UserActions", new UserActionSource)

val windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble over 10.minutes on 'UserActionTime as 'userActionWindow)[/mw_shl_code]
[mw_shl_code=java,true]// define a table source with a rowtime attribute
public class UserActionSource implements StreamTableSource<Row>, DefinedRowtimeAttributes {

        @Override
        public TypeInformation<Row> getReturnType() {
                String[] names = new String[] {"Username", "Data", "UserActionTime"};
                TypeInformation[] types =
                    new TypeInformation[] {Types.STRING(), Types.STRING(), Types.LONG()};
                return Types.ROW(names, types);
        }

        @Override
        public DataStream<Row> getDataStream(StreamExecutionEnvironment execEnv) {
                // create stream
                // ...
                // assign watermarks based on the "UserActionTime" attribute
                DataStream<Row> stream = inputStream.assignTimestampsAndWatermarks(...);
                return stream;
        }

        @Override
        public List<RowtimeAttributeDescriptor> getRowtimeAttributeDescriptors() {
                // Mark the "UserActionTime" attribute as event-time attribute.
                // We create one attribute descriptor of "UserActionTime".
                RowtimeAttributeDescriptor rowtimeAttrDescr = new RowtimeAttributeDescriptor(
                        "UserActionTime",
                        new ExistingField("UserActionTime"),
                        new AscendingTimestamps());
                List<RowtimeAttributeDescriptor> listRowtimeAttrDescr = Collections.singletonList(rowtimeAttrDescr);
                return listRowtimeAttrDescr;
        }
}

// 注册表SourcetEnv.registerTableSource("UserActions", new UserActionSource());

WindowedTable windowedTable = tEnv
        .scan("UserActions")
        .window(Tumble.over("10.minutes").on("UserActionTime").as("userActionWindow"));[/mw_shl_code]



最新经典文章,欢迎关注公众号


已有(1)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条