分享

彻底明白Flink系统学习13:【Flink1.7】流连接器介绍及如何添加连接器

pig2 2018-12-19 18:47:27 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 3 7196
问题导读

1.Flink有哪些内置连接器?
2.Flink二进制发布包中是否直接可以使用连接器?
3.如何才能添加连接器?
4.连接器使用有那两种方式添加?
5.连接器添加jar包的方式,如何才能生效?

上一篇:
彻底明白Flink系统学习12:【Flink1.7】如何控制流物理分区
http://www.aboutyun.com/forum.php?mod=viewthread&tid=26497


Apache Flink支持各种连接器(Streaming Connectors),允许跨各种技术进行数据读/写。

Flink内置了一些基本source和sinks ,并且始终可用。 预定义的数据源包括从文件,目录和sockets读取,以及从集合和迭代器中提取数据。 预定义的数据接收器支持写入文件,stdout和stderr以及套接字。
评注:【这里我们需要跟Flume区别开来,Flume同样有source和sink,我们这里的数据源当然也是读取数据,sink这里是指将数据保存到某个地方】

绑定连接器

连接器提供用于与各种第三方系统连接的代码。 目前支持下面系统:

  • Apache Kafka (source/sink)
  • Apache Cassandra (sink)
  • Amazon Kinesis Streams (source/sink)
  • Elasticsearch (sink)
  • Hadoop FileSystem (sink)
  • RabbitMQ (source/sink)
  • Apache NiFi (source/sink)
  • Twitter Streaming API (source)

注意,要在应用程序中使用其中一个连接器,通常需要第三方组件,例如, 用于数据存储或消息队列的服务器。 另请注意,虽然本节中列出的流连接器是Flink项目的一部分,并且包含在源码中,但它们不包含在官网提供的二进制版中。

Apache Bahir中的连接器
Flink的其它流连接器会在Apache Bahir发布,包括:

例如这里我们举一个例子:
Flink Flume连接器

此连接器提供sink,可以发送数据到Flume。 要使用这个连接器,添加以下依赖项到项目中:
  1. <dependency>
  2.   <groupId>org.apache.bahir</groupId>
  3.   <artifactId>flink-connector-flume_2.11</artifactId>
  4.   <version>1.1-SNAPSHOT</version>
  5. </dependency>
复制代码

版本兼容性:该模块与Flume 1.8.0兼容。
注意,流连接器不是Flink二进制发布版本的一部分。 需要将它们链接到作业jar中。

要创建FlumeSink,实例化以下构造函数:

  1. FlumeSink(String host, int port, SerializationSchema<IN> schema)
复制代码

进一步补充:
上面我们说到了,官网提供的二进制版本并不提供连接器,那么如何添加连接器那?通过下面方式:
二进制包含lib文件夹中的jar包, 几乎所有Flink类都位于那里,但有一些例外,例如流连接器和一些新添加的模块。如果想使用它们,有两个办法:

1.将所需的jar文件复制到lib文件夹中,并复制到所有TaskManagers上。 注意,必须在此之后重新启动TaskManagers。这里是所需的组件。
2.所有源码打包,下载地址
https://github.com/apache/flink

使用Maven打包flink-connector-kafka,有两个选项
1.maven程序集插件构建了一个包含所有依赖项的所谓uber-jar(可执行jar), 装配配置很简单,生成的jar可能会变得笨重。
2.maven unpack插件解压缩依赖项的相关部分,然后将其与代码打包在一起。


使用后一种方法绑定Kafka连接器,flink-connector-kafka需要从连接器和Kafka API本身添加类。 将以下内容添加到插件部分。
  1. <plugin>
  2.     <groupId>org.apache.maven.plugins</groupId>
  3.     <artifactId>maven-dependency-plugin</artifactId>
  4.     <version>2.9</version>
  5.     <executions>
  6.         <execution>
  7.             <id>unpack</id>
  8.             <!-- executed just before the package phase -->
  9.             <phase>prepare-package</phase>
  10.             <goals>
  11.                 <goal>unpack</goal>
  12.             </goals>
  13.             <configuration>
  14.                 <artifactItems>
  15.                     <!-- For Flink connector classes -->
  16.                     <artifactItem>
  17.                         <groupId>org.apache.flink</groupId>
  18.                         <artifactId>flink-connector-kafka</artifactId>
  19.                         <version>1.2.1</version>
  20.                         <type>jar</type>
  21.                         <overWrite>false</overWrite>
  22.                         <outputDirectory>${project.build.directory}/classes</outputDirectory>
  23.                         <includes>org/apache/flink/**</includes>
  24.                     </artifactItem>
  25.                     <!-- For Kafka API classes -->
  26.                     <artifactItem>
  27.                         <groupId>org.apache.kafka</groupId>
  28.                         <artifactId>kafka_<YOUR_SCALA_VERSION></artifactId>
  29.                         <version><YOUR_KAFKA_VERSION></version>
  30.                         <type>jar</type>
  31.                         <overWrite>false</overWrite>
  32.                         <outputDirectory>${project.build.directory}/classes</outputDirectory>
  33.                         <includes>kafka/**</includes>
  34.                     </artifactItem>
  35.                 </artifactItems>
  36.             </configuration>
  37.         </execution>
  38.     </executions>
  39. </plugin>
复制代码

当运行mvn clean包时,生成的jar包含所需的依赖项。

其它连接到Flink的方法
通过异步I / O进行数据导入、导出
使用连接器不是将数据输入和输出Flink的唯一方法。 一种常见的模式是在Map或FlatMap中查询外部数据库或Web服务,以导入导出主数据流。 Flink提供了一个用于异步I / O的API,以便更有效,更稳健地进行这种输入输出。


查询状态
当Flink应用程序将大量数据推送到外部数据存储时,这可能会成为I / O瓶颈。 如果所涉及的数据具有写比较多,则更好的方法可以是外部应用程序从Flink获取所需的数据。 可查询状态接口通过允许按需查询Flink管理的状态来实现此功能。




最新经典文章,欢迎关注公众号




已有(3)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条