分享

通过KSQL分析Apache Kafka中的Twitter数据

本帖最后由 pig2 于 2018-7-11 18:16 编辑
问题导读

1.KSQL如何获取Twitter数据?
2.获取数据后,如何分析Twitter数据?
3.KSQL基于什么平台?



前言:
Twitter是对应国内微博,我们可以扩展思路,借用下面思路,实现微博的数据分析。

正文:
KSQL是Apache Kafka的开源SQL流引擎。 它允许使用简单的交互式SQL界面轻松地对Kafka topic进行复杂的流处理。 在这篇简短的文章中,我们将看到使用Twitter演示流数据源,入门很容易。 我们将从最原始的tweets stream,通过在KSQL中进行过滤,来构建聚合,例如计算每个用户每小时的tweets 数量。

tweet_kafka.png



首先,去Confluent(https://www.confluent.io/download/) 平台下载安装包。我使用的是RPM, tar, zip都可以使用。
启动
[mw_shl_code=bash,true]$ confluent start[/mw_shl_code]
我们将使用Kafka Connect从Twitter中提取数据。Twitter连接器可以在Github(https://github.com/jcustenborder/kafka-connect-twitter)上找到。要安装它,只需执行以下操作:
[mw_shl_code=bash,true]# Clone the git repo
cd /home/rmoff
git clone https://github.com/jcustenborder/kafka-connect-twitter.git[/mw_shl_code]
[mw_shl_code=bash,true]# Compile the code
cd kafka-connect-twitter
mvn clean package[/mw_shl_code]
构建成功,解压包
[mw_shl_code=bash,true]cd target
tar -xvf kafka-connect-twitter-0.2-SNAPSHOT.tar.gz[/mw_shl_code]
解压后,我们修改配置文件etc/schema-registry/connect-avro-distributed.properties
添加如下内容
[mw_shl_code=bash,true]plugin.path=share/java,/home/rmoff/kafka-connect-twitter/
[/mw_shl_code]
重启 Kafka Connect:
[mw_shl_code=bash,true]confluent stop connect
confluent start connect[/mw_shl_code]
一旦安装成功,你可以直接使用Kafka Connect REST API,或则创建配置文件,这就是我们要做的。我们需要先转到Twitter才能获取API密钥(https://apps.twitter.com/)。
假如已经将内容写到/home/rmoff/twitter-source.json,你可以运行
[mw_shl_code=bash,true]$ confluent load twitter_source -d /home/rmoff/twitter-source.json
[/mw_shl_code]
然后来自每个人最喜欢的推文甚至明星开始显示

现在让我们开启KSQL吧! KSQL包含在Confluent Platform 4.1及更高版本中。 假设已经使用CLI启动,这也将启动KSQL服务器,只需运行:
[mw_shl_code=bash,true]ksql[/mw_shl_code]
使用KSQL,我们可以在Kafka主题中获取数据并进行查询。 首先,我们需要告诉KSQL主题中的数据模式是什么。 Twitter消息实际上是一个非常庞大的JSON对象,但为了简洁起见,我们只需要选择几列:
[mw_shl_code=sql,true]ksql> CREATE STREAM twitter_raw (CreatedAt BIGINT, Id BIGINT, Text VARCHAR) WITH (KAFKA_TOPIC='twitter_json_01', VALUE_FORMAT='JSON');

Message  
----------------
Stream created[/mw_shl_code]

通过定义模式,我们可以查询流。 要让KSQL显示主题开头的数据(而不是当前时间点,这是默认值),运行:
[mw_shl_code=sql,true]ksql> SET 'auto.offset.reset' = 'earliest';  
Successfully changed local property 'auto.offset.reset' from 'null' to 'earliest'[/mw_shl_code]
现在让我们看看数据。 我们将使用LIMIT子句选择一行:
现在让我们重新定义流,其中包含现在已定义并可供我们使用的推文有效负载的所有内容:


现在我们可以使用普通的SQL查询处理更多数据:

请注意,没有LIMIT子句,因此您将看到连续查询的结果。 与数据库上的查询,数据查询,数据查询和数据传输速率的结果不同。 按Ctrl-C取消并返回KSQL提示符。 在上面的查询中,我们做了一些事情:

  • TIMESTAMPTOSTRING用于从人类可读格式转换时间戳
  • EXTRACTJSONFIELD显示源中的一个嵌套用户字段


有关支持的函数列表,请参见KSQL文档(https://docs.confluent.io/curren ... 61838070.1531281912)。

我们可以从这些数据创建派生流:

并查询派生流:

在我们完成之前,让我们看看如何进行一些聚合。

你可能会得到一些结果;这是因为KSQL实际上正在进行值的聚合。既然我们已经设置KSQL阅读的话题的所有帖子(SET“auto.offset.reset” =“earliest”;)它的读取所有这些消息在一次和聚集计算的更新。在这里发生的事情实际上是微妙的,值得挖掘。我们的推文入站流就是一个流。但是现在我们正在创建聚合,我们实际创建了一个表。表是给定时间点的给定键的快照。 KSQL根据消息的时间和句柄聚合数据困惑?我们希望不是,但让我们看看我们是否可以用一个例子来说明这一点。我们将聚合声明为实际表格:

[mw_shl_code=bash,true]ksql> DESCRIBE user_tweet_count;

Field           | Type  
-----------------------------------  
ROWTIME         | BIGINT  
ROWKEY          | VARCHAR(STRING)  
USER_SCREENNAME | VARCHAR(STRING)  
TWEET_COUNT     | BIGINT  
ksql>[/mw_shl_code]

让我们看看这些是什么:

ROWTIME是窗口的开始时间,ROWKEY是GROUP BY(USER_SCREENNAME)和窗口的组合。 所以我们可以通过创建一个额外的派生表来整理它:

我们感兴趣的是:


Conclusion
所以,我们有它! 我们从Kafka获取数据,并将其与KSQL一起使用。 我们不仅可以探索和转换数据,还可以使用KSQL轻松构建流式流和表。

user_tweet.png










已有(1)人评论

跳转到指定楼层
美丽天空 发表于 2018-7-12 11:43:33
学习了,哈哈哈
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条