分享

Flink1.9实战:使用Flink SQL(Blink Planner)读取Kafka并写入MySQL

本帖最后由 open_thedoor 于 2020-7-15 17:05 编辑


1.png



链接:https://pan.baidu.com/s/15YOOtMwVBpzroDGroNV-jw  提取码:9gyx


说明
目前Flink1.9不支持SQL CLI中提交一个DDL语句,也不支持提交SQL脚本。本工程案例模拟PV、UV数据,通过kafka进行采集,Flink进行消费处理并转存至MySQL目标表的过程。
准备工作步骤[mw_shl_code=shell,true]# cygwin
cd /cygdrive/g/tmp/flink-example-sqlsubmit
# linux
cd /home/smartai/software/flink-example-sqlsubmit[/mw_shl_code]
  • 启动相关服务:MySQL-略;Kafka集群-./start-kafka-cluster.sh;flink集群-./start-flink-cluster.sh。
  • 关闭相关服务:MySQL-略;Kafka集群-./stop-kafka-cluster.sh;flink集群-./stop-flink-cluster.sh。
  • 运行./source-generator.sh。(创建kafka topic,并持续生产消息(来源:src/main/resources/user_behavior.log)并推送给kafka,可使用./kafka-consumer.sh验证kafka是否可以正常消费),也可以通过./send_kafka_json_msg.sh逐条发送消息(5条)。
  • 运行示例程序./run.sh。(消费kafka消息,并转储到MySQL目标表)
  • 查看任务./bin/flink list,取消任务./bin/flink cancel ${JobID},或通过Web Dashboard http://192.168.0.15:8081方式查看和取消Flink任务,运行结果可以查看MySQL目标表flink_test.pvuv_sink。
  • 如果终端返回以下输出,这意味着成功提交Flink SQL job。
Starting execution of programJob has been submitted with JobID ${JobID}配置Cygwin(客户机)单向免密访问linux集群服务器
[mw_shl_code=shell,true]# 在客户端生成公/私钥对,将私钥文件保存在客户端,再将公钥文件上传到服务器端(远程主机)
ssh-keygen
scp ~/.ssh/id_rsa.pub smartai@LTSR005:~/id_rsa_win.pub
scp ~/.ssh/id_rsa.pub smartai@LTSR006:~/id_rsa_win.pub
scp ~/.ssh/id_rsa.pub smartai@LTSR007:~/id_rsa_win.pub
# 各个远程主机将客户端公钥加入到authorized_keys
cat ~/id_rsa_win.pub >> ~/.ssh/authorized_keys
# 验证客户机免密登录
ssh smartai@LTSR005
ssh smartai@LTSR006
ssh smartai@LTSR007[/mw_shl_code]SSH访问慢解决方法
[mw_shl_code=shell,true]# 修改配置文件sshd_config(cygwin - vi /etc/sshd_config)/(linux - sudo vi /etc/ssh/ssh_config)
# UseDNS no
# GSSAPIAuthentication no

# 修改server上nsswitch.conf文件
sudo vi /etc/nsswitch.conf
hosts: files dns [NOTFOUND=return]

# 重启ssh服务,最好是重启一次机器
net start sshd # cygwin
sudo service ssh restart # linux[/mw_shl_code]补充kafka相关操作
[mw_shl_code=shell,true]# 进入kafka安装目录
cd ~/modules/kafka_2.11-0.11.0.3
# 查看kafka topic
bin/kafka-topics.sh --zookeeper 192.168.0.15:2181,192.168.0.16:2181,192.168.0.17:2181 --list
# 删除kafka topic
bin/kafka-topics.sh --delete --zookeeper 192.168.0.15:2181,192.168.0.16:2181,192.168.0.17:2181 --topic user_behavior
# 创建kafka topic
bin/kafka-topics.sh --zookeeper 192.168.0.15:2181,192.168.0.16:2181,192.168.0.17:2181 --create --topic user_behavior --replication-factor 1 --partitions 3[/mw_shl_code]MySQL相关SQL语句
[mw_shl_code=sql,true]CREATE DATABASE IF NOT EXISTS flink_test default character set utf8 COLLATE utf8_general_ci;
use flink_test;
drop table IF EXISTS  pvuv_sink;
CREATE TABLE pvuv_sink (
    dt VARCHAR(32),
    pv BIGINT(20),
    uv BIGINT(20)
)ENGINE=InnoDB AUTO_INCREMENT=556536 DEFAULT CHARSET=utf8 COMMENT='埋点统计表';

select * from pvuv_sink limit 100;
select count(*) from pvuv_sink;
delete from pvuv_sink;[/mw_shl_code]上传工程到GitHub过程(Git Bash)
[mw_shl_code=shell,true]# gitHub上创建空的仓库flink-example-sqlsubmit
cd /g/tmp/flink-example-sqlsubmit
git init
git remote add origin git@github.com:polarisgh/flink-example-sqlsubmit.git
git add .
git commit -m "first commit"
git push -u origin master[/mw_shl_code]VPN版本特别说明
注意修改配置文件env.sh和src/main/resources/q1.sqlFlink SQL文件。


作者:polaris
来源:https://github.com/polarisgh/flink-example-sqlsubmit




没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条