分享

sparkstreaming数据通过Scala实现存储到数据库

langke93 发表于 2017-1-9 10:12:02 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 10219
从kafka里面取数据到sparkStreaming里面,然后再把数据保存到数据中
关键点,不是每个数据都需要创建连接,只需要为每个分区创建一个连接就可以了
下面是一个简单的例子

从kafka里面取数据到sparkStreaming里面,然后再把数据保存到数据中
关键点,不是每个数据都需要创建连接,只需要为每个分区创建一个连接就可以了
下面是一个简单的例子
[mw_shl_code=scala,true]import java.sql.{ Connection, DriverManager }

import com.oracle._
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka._
import org.apache.spark.{ SparkConf, SparkContext }

/**
* 从kafka上面读取数据,然后保存到数据库上面,虽然一般不建议把数据保存到数据库中,(保存到数据库中的时候可以建立连接池)
* 如果是保存到hbase上面也可以使用这样的方法,为每个分区创建一个RDD连接,而不是为每个数据建立一个连接
*/
object StreamToOracle {
  def main(args: Array[String]) {
    val sparkConf = new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
    val sc = new SparkContext(sparkConf)
    val ssc = new StreamingContext(sc, Seconds(10))
    //ssc.checkpoint("checkpoint")
    val topic = "test"
    val topicMap = topic.split(",").map((_, 1)).toMap
    val lines = KafkaUtils.createStream(ssc, "192.168.10.209:2181,192.168.10.219:2181,192.168.10.199:2181", "ssk", topicMap).map(_._2)
    //每个RDD进行操作
    lines.foreachRDD(rdd => {
      rdd.foreachPartition(partitionOfRecords => {
        //重点在这里,在每一个分区里面建立一个连接
        val connection = getConnection()
        partitionOfRecords.foreach(line => {
          val info = line.split(":")
          val ip = info(0)
          val mesType = info(1)
          val data = info(2)
          val timeStamp = info(3)
          val sql = "insert into MONITOR_DATA values(AUTO_INCREMENT.NEXTVAL,'" + ip + "','" + mesType + "','" + data + "'," + timeStamp + ")"
          saveToOracle(connection, sql)

        })
        closeConn(connection)
      })

      val words = rdd.flatMap(_.split(":"))

    })
    ssc.start()
    ssc.awaitTermination()
  }
  //保存数据
  def saveToOracle(con: Connection, sql: String): Int = {
    val ps = con.prepareStatement(sql);
    val res = ps.executeUpdate()
    ps.close()
    res
  }
  //关闭连接
  def closeConn(con: Connection) = {
    con.close()
  }
  //得到连接
  def getConnection(): Connection = {
    val url = "jdbc:oracle:thin:@//192.168.10.100:1521/UCLOUD"
    Class.forName("oracle.jdbc.driver.OracleDriver").newInstance();
    val con = DriverManager.getConnection(url, "scott", "scott");
    con
  }
}[/mw_shl_code]








来自:韩利鹏


欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

已有(1)人评论

跳转到指定楼层
zhanmsl 发表于 2017-7-17 15:18:30
很简单的例子,刚入门可以看看
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条