问题导读 1.从哪个版本开始Elasticseach支持spark sql? 2.如何查找Elasticseach和spark sql对应版本? 3.如何实现Spark Structured Streaming数据保存到Elasticseach? 关注最新经典文章,欢迎关注公众号 Spark为流数据提供了两类API,一个是Spark Streaming,它是Spark提供的独立库。 另一个是基于Spark-SQL库构建的结构化流。今天我们将专注于使用Spark Structured Streaming将流数据保存到Elasticseach。 Elasticsearch在版本6.0.0版本的“Elasticsearch For Apache Hadoop”依赖项中添加了对Spark Structured Streaming 2.2.0的支持。 我们将使用这些或更高版本来构建我们的sbt-scala项目。 准备 首先,添加 “Spark SQL” 依赖:
找到对应版本elasticsearch
你可以到“Elasticsearch For Apache Hadoop”中找到合适的版本,但是需要注意,Elasticsearch 至少是6.0版本,Spark-SQL至少是2.2.0版本或则更高版本 如下图,我们打开连接看到如下内容: 开始写代码 我们将在此代码中读取JSON文件并将其数据保存到elasticsearch。 但首先让我们通过基本代码将JSON文件读取作为DataFrame,为此,我们需要为JSON创建一个schema 。
现在我们用下面代码实现使用schema以流模式读取JSON文件:
使用streamingDF,我们将数据输出到控制台。 使用控制台来验证数据。
这里.format(“console”)实现结果在控制台上打印。输出结果如下:
这只是一个简单的结构化流代码,其中JSON文件是源,数据输出到控制台。 输出数据到Elasticsearch 现在Spark Structured Streaming输出数据保存到elasticsearch,我们需要在SparkSession的对象中添加elasticsearch的配置:
我们在此处使用elasticsearch节点和端口添加身份验证凭据。 目前,该地址用于本地机器进行测试。 现在使用前面的代码,将数据写到elasticsearch:
上面我们看到改变format由控制台(console )变为org.elasticsearch.spark.sql,它实现了将数据写到elasticsearch 现在代码已经完成了。 运行后,这会将JSON数据保存到elasticsearch。 要确保使用curl: curl http://localhost:9200/index-name/_search 整个代码下载: structured-streaming-examples-master.zip 地址: https://github.com/anuj1207/structured-streaming-examples |
|小黑屋|about云开发-学问论坛|社区 ( 京ICP备12023829号 )
GMT+8, 2018-9-27 18:40 , Processed in 0.319640 second(s), 30 queries , Gzip On.