问题导读
1.Flink有哪些内置连接器?
2.Flink二进制发布包中是否直接可以使用连接器?
3.如何才能添加连接器?
4.连接器使用有那两种方式添加?
5.连接器添加jar包的方式,如何才能生效?
上一篇:
彻底明白Flink系统学习12:【Flink1.7】如何控制流物理分区
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26497
Apache Flink支持各种连接器(Streaming Connectors),允许跨各种技术进行数据读/写。
Flink内置了一些基本source和sinks ,并且始终可用。 预定义的数据源包括从文件,目录和sockets读取,以及从集合和迭代器中提取数据。 预定义的数据接收器支持写入文件,stdout和stderr以及套接字。
评注:【这里我们需要跟Flume区别开来,Flume同样有source和sink,我们这里的数据源当然也是读取数据,sink这里是指将数据保存到某个地方】
绑定连接器
连接器提供用于与各种第三方系统连接的代码。 目前支持下面系统:
- Apache Kafka (source/sink)
- Apache Cassandra (sink)
- Amazon Kinesis Streams (source/sink)
- Elasticsearch (sink)
- Hadoop FileSystem (sink)
- RabbitMQ (source/sink)
- Apache NiFi (source/sink)
- Twitter Streaming API (source)
注意,要在应用程序中使用其中一个连接器,通常需要第三方组件,例如, 用于数据存储或消息队列的服务器。 另请注意,虽然本节中列出的流连接器是Flink项目的一部分,并且包含在源码中,但它们不包含在官网提供的二进制版中。
Apache Bahir中的连接器
Flink的其它流连接器会在Apache Bahir发布,包括:
例如这里我们举一个例子:
Flink Flume连接器
此连接器提供sink,可以发送数据到Flume。 要使用这个连接器,添加以下依赖项到项目中:
- <dependency>
- <groupId>org.apache.bahir</groupId>
- <artifactId>flink-connector-flume_2.11</artifactId>
- <version>1.1-SNAPSHOT</version>
- </dependency>
复制代码
版本兼容性:该模块与Flume 1.8.0兼容。
注意,流连接器不是Flink二进制发布版本的一部分。 需要将它们链接到作业jar中。
要创建FlumeSink,实例化以下构造函数:
- FlumeSink(String host, int port, SerializationSchema<IN> schema)
复制代码
进一步补充:
上面我们说到了,官网提供的二进制版本并不提供连接器,那么如何添加连接器那?通过下面方式:
二进制包含lib文件夹中的jar包, 几乎所有Flink类都位于那里,但有一些例外,例如流连接器和一些新添加的模块。如果想使用它们,有两个办法:
1.将所需的jar文件复制到lib文件夹中,并复制到所有TaskManagers上。 注意,必须在此之后重新启动TaskManagers。这里是所需的组件。
2.所有源码打包,下载地址
https://github.com/apache/flink
使用Maven打包flink-connector-kafka,有两个选项
1.maven程序集插件构建了一个包含所有依赖项的所谓uber-jar(可执行jar), 装配配置很简单,生成的jar可能会变得笨重。
2.maven unpack插件解压缩依赖项的相关部分,然后将其与代码打包在一起。
使用后一种方法绑定Kafka连接器,flink-connector-kafka需要从连接器和Kafka API本身添加类。 将以下内容添加到插件部分。
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-dependency-plugin</artifactId>
- <version>2.9</version>
- <executions>
- <execution>
- <id>unpack</id>
- <!-- executed just before the package phase -->
- <phase>prepare-package</phase>
- <goals>
- <goal>unpack</goal>
- </goals>
- <configuration>
- <artifactItems>
- <!-- For Flink connector classes -->
- <artifactItem>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-kafka</artifactId>
- <version>1.2.1</version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>org/apache/flink/**</includes>
- </artifactItem>
- <!-- For Kafka API classes -->
- <artifactItem>
- <groupId>org.apache.kafka</groupId>
- <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
- <version><YOUR_KAFKA_VERSION></version>
- <type>jar</type>
- <overWrite>false</overWrite>
- <outputDirectory>${project.build.directory}/classes</outputDirectory>
- <includes>kafka/**</includes>
- </artifactItem>
- </artifactItems>
- </configuration>
- </execution>
- </executions>
- </plugin>
复制代码
当运行mvn clean包时,生成的jar包含所需的依赖项。
其它连接到Flink的方法
通过异步I / O进行数据导入、导出
使用连接器不是将数据输入和输出Flink的唯一方法。 一种常见的模式是在Map或FlatMap中查询外部数据库或Web服务,以导入导出主数据流。 Flink提供了一个用于异步I / O的API,以便更有效,更稳健地进行这种输入输出。
查询状态
当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。 如果所涉及的数据具有写比较多,则更好的方法可以是外部应用程序从Flink获取所需的数据。 可查询状态接口通过允许按需查询Flink管理的状态来实现此功能。
最新经典文章,欢迎关注公众号
|