立即注册 登录
About云-梭伦科技 返回首页

jixianqiuxue的个人空间 https://www.aboutyun.com/?331 [收藏] [复制] [分享] [RSS]

日志

spark-sql与elasticsearch整合&测试

已有 3311 次阅读2015-11-21 19:51

1. 前置条件

spark是1.4.1版本
elasticsearch是1.7版本
java是1.7版本
2. 依赖jar包

需要使用elasticsearch-hadoop 
下载地址:http://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop/2.2.0-m1

3. 配置

将下载的elasticsearch-hadoop包放置到$SPARK_HOME/lib/下

ls -lh lib

-rw-r----- 1 wangyue wangyue 692K 11月  9 17:49 elasticsearch-hadoop-2.2.0-m1.jar
-rw-rw-r-- 1 wangyue wangyue 946K  9月 10 14:22 mysql-connector-java-5.1.35.jar

配置$SPARK_HOME/conf/spark-env.sh 
SPARK_CLASSPATH增加elasticsearch-hadoop-2.2.0-m1.jar包

4. spark-sql写数据到elasticsearch

vim /home/cluster/data/test/people.txt,增加如下测试内容:

zhang,san,20
li,si,30
wang,wu,40
li,bai,100
du,fu,101

启动spark-shell增加如下代码:

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._
import org.elasticsearch.spark.sql._ 
import org.apache.spark.rdd.RDD._
import sqlContext.implicits._

//创建sqlContext
val sqlContext = new SQLContext(sc)

//定义Person case class
case class Person(name: String, surname: String, age: Int)

//创建DataFrame
val people = sc.textFile("file:///home/cluster/data/test/people.txt").map(_.split(",")).map(p => Person(p(0), p(1), p(2).trim.toInt)).toDF()

people.saveToEs("spark/people")  

查询插入到elasticsearch数据:

GET /spark/people/_search

{
   "took": 1,
   "timed_out": false,
   "_shards": {
      "total": 5,
      "successful": 5,
      "failed": 0
   },
   "hits": {
      "total": 5,
      "max_score": 1,
      "hits": [
         {
            "_index": "spark",
            "_type": "people",
            "_id": "AVDrt5MyJpFYJkWM4nwP",
            "_score": 1,
            "_source": {
               "name": "zhang",
               "surname": "san",
               "age": 20
            }
         },
         {
            "_index": "spark",
            "_type": "people",
            "_id": "AVDrt5SEJpFYJkWM4nwS",
            "_score": 1,
            "_source": {
               "name": "li",
               "surname": "bai",
               "age": 100
            }
         },
         {
            "_index": "spark",
            "_type": "people",
            "_id": "AVDrt5MyJpFYJkWM4nwR",
            "_score": 1,
            "_source": {
               "name": "wang",
               "surname": "wu",
               "age": 40
            }
         },
         {
            "_index": "spark",
            "_type": "people",
            "_id": "AVDrt5MyJpFYJkWM4nwQ",
            "_score": 1,
            "_source": {
               "name": "li",
               "surname": "si",
               "age": 30
            }
         },
         {
            "_index": "spark",
            "_type": "people",
            "_id": "AVDrt5SEJpFYJkWM4nwT",
            "_score": 1,
            "_source": {
               "name": "du",
               "surname": "fu",
               "age": 101
            }
         }
      ]
   }
}

5. spark-sql读取elasticsearch数据

Name Default value Description
path Elasticsearch index/type required
pushdown true Whether to translate (push-down) Spark SQL into Elasticsearch
strict false Whether to use exact (not analyzed) matching or not (analyzed)
load或read方式读取:

import org.apache.spark.sql.SQLContext    
import org.apache.spark.sql.SQLContext._

val sqlContext = new SQLContext(sc)
// options for Spark 1.3 need to include the target path/resource
val options13 = Map("path" -> "spark/people",
                    "pushdown" -> "true",
                    "es.nodes" -> "localhost","es.port" -> "9200")

// Spark 1.3 style
val spark13DF = sqlContext.load("org.elasticsearch.spark.sql", options13)

// options for Spark 1.4 - the path/resource is specified separately
val options = Map("pushdown" -> "true", "es.nodes" -> "localhost", "es.port" -> "9200")

// Spark 1.4 style
val spark14DF = sqlContext.read.format("org.elasticsearch.spark.sql").options(options).load("spark/people")

查询name,age:
spark14DF.select("name","age").collect().foreach(println(_))
[zhang,20]
[li,100]
[wang,40]
[li,30]
[du,101]

注册临时表&查询name
spark14DF.registerTempTable("people")
val results = sqlContext.sql("SELECT name FROM people")
results.map(t => "Name: " + t(0)).collect().foreach(println)
Name: zhang
Name: li
Name: wang
Name: li
Name: du

读取elasticsearch数据并创建临时表myPeople

sqlContext.sql(
   "CREATE TEMPORARY TABLE myPeople    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS ( resource 'spark/people', nodes 'localhost:9200')" ) 

sqlContext.sql("select * from myPeople").collect.foreach(println)

[20,zhang,san]
[100,li,bai]
[40,wang,wu]
[30,li,si]
[101,du,fu]


sqlContext.sql(
   "CREATE TEMPORARY TABLE myPeople    " + 
   "USING org.elasticsearch.spark.sql " + 
   "OPTIONS ( resource 'spark/people', nodes 'localhost:9200',scroll_size '20')" ) 
因为使用.会导致语法异常,应该用_风格代替它. 因此,在这个例子中es.scroll.size 变成 scroll_size(由于加载数据时候es可以删除)。注意这只工作在spark1.3/1.4的spark有更严格的解析器

esDF方式读取elasticsearch数据:

import org.apache.spark.sql.SQLContext       
import org.elasticsearch.spark.sql._ 

val sqlContext = new SQLContext(sc)
val people = sqlContext.esDF("spark/people") 
// check the associated schema
println(people.schema.treeString) 

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)
 |-- surname: string (nullable = true)


// get only the wang
val wangs = sqlContext.esDF("spark/people","?q=wang" )

wangs.show()

+---+----+-------+
|age|name|surname|
+---+----+-------+
| 40|wang|     wu|
+---+----+-------+

参考文章: 
https://www.elastic.co/guide/en/elasticsearch/hadoop/master/spark.html#spark-sql

作者:stark_summer 
出处:http://blog.csdn.net/stark_summer/article/details/49743687

路过

雷人

握手

鲜花

鸡蛋

评论 (0 个评论)

facelist doodle 涂鸦板

您需要登录后才可以评论 登录 | 立即注册

关闭

推荐上一条 /2 下一条