分享

Spark通过JdbcRDD整合 Mysql(JdbcRDD)开发

howtodown 2014-11-4 14:34:27 发表于 推荐型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 1 43008
本帖最后由 howtodown 于 2014-11-4 14:52 编辑
问题导读
1.在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD?
2.JdbcRDD类构造函数的各个参数的含义你认为都是什么?







在Spark中提供了一个JdbcRDD类,该RDD就是读取JDBC中的数据并转换成RDD,之后我们就可以对该RDD进行各种的操作。我们先看看该类的构造函数:
  1. JdbcRDD[T: ClassTag](
  2.     sc: SparkContext,
  3.     getConnection: () => Connection,
  4.     sql: String,
  5.     lowerBound: Long,
  6.     upperBound: Long,
  7.     numPartitions: Int,
  8.     mapRow: (ResultSet) => T = JdbcRDD.resultSetToObjectArray _)
复制代码

这个类带了很多参数,关于这个函数的各个参数的含义,


       1、getConnection 返回一个已经打开的结构化数据库连接,JdbcRDD会自动维护关闭。
  2、sql 是查询语句,此查询语句必须包含两处占位符?来作为分割数据库ResulSet的参数,例如:”select title, author from books where ? < = id and id <= ?”
  3、lowerBound, upperBound, numPartitions 分别为第一、第二占位符,partition的个数。例如,给出lowebound 1,upperbound 20, numpartitions 2,则查询分别为(1, 10)与(11, 20)
  4、mapRow 是转换函数,将返回的ResultSet转成RDD需用的单行数据,此处可以选择Array或其他,也可以是自定义的case class。默认的是将ResultSet 转换成一个Object数组。



下面我们说明如何使用该类。

  1. package scala
  2. import java.sql.DriverManager
  3. import org.apache.spark.SparkContext
  4. import org.apache.spark.rdd.JdbcRDD
  5. object SparkToJDBC {
  6. def main(args: Array[String]) {
  7. val sc = new SparkContext("local", "mysql")
  8. val rdd = new JdbcRDD(
  9. sc,
  10. () => {
  11. Class.forName("com.mysql.jdbc.Driver").newInstance()
  12. DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
  13. },
  14. "SELECT content FROM mysqltest WHERE ID >= ? AND ID <= ?",
  15. 1, 100, 3,
  16. r => r.getString(1)).cache()
  17. print(rdd.filter(_.contains("success")).count())
  18. sc.stop()
  19. }
  20. }
复制代码

代码比较简短,主要是读mysqltest 表中的数据,并统计ID>=1 && ID < = 100 && content.contains("success")的记录条数。我们从代码中可以看出JdbcRDD的sql参数要带有两个?的占位符,而这两个占位符是给参数lowerBound和参数upperBound定义where语句的上下边界的。从JdbcRDD类的构造函数可以知道,参数lowerBound和参数upperBound都只能是Long类型的,并不支持其他类型的比较,这个使得JdbcRDD使用场景比较有限。而且在使用过程中sql参数必须有类似 ID >= ? AND ID < = ?这样的where语句,如果你写成下面的形式:

  1. val rdd = new JdbcRDD(
  2.   sc,
  3.   () => {
  4.     Class.forName("com.mysql.jdbc.Driver").newInstance()
  5.     DriverManager.getConnection("jdbc:mysql://localhost:3306/db", "root", "123456")
  6.   },
  7.   "SELECT content FROM mysqltest",
  8.   1, 100, 3,
  9.   r => r.getString(1)).cache()
复制代码

那不好意思,运行的时候会出现以下的错误:
  1. 2014-09-10 15:47:45,621 (Executor task launch worker-0) [ERROR -
  2. org.apache.spark.Logging$class.logError(Logging.scala:95)] Exception in task ID 1
  3. java.sql.SQLException: Parameter index out of range (1 > number of parameters, which is 0).
  4.         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:1074)
  5.         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:988)
  6.         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:974)
  7.         at com.mysql.jdbc.SQLError.createSQLException(SQLError.java:919)
  8.         at com.mysql.jdbc.PreparedStatement.checkBounds(PreparedStatement.java:3813)
  9.         at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3795)
  10.         at com.mysql.jdbc.PreparedStatement.setInternal(PreparedStatement.java:3840)
  11.         at com.mysql.jdbc.PreparedStatement.setLong(PreparedStatement.java:3857)
  12.         at org.apache.spark.rdd.JdbcRDD$anon$1.<init>(JdbcRDD.scala:84)
  13.         at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:70)
  14.         at org.apache.spark.rdd.JdbcRDD.compute(JdbcRDD.scala:50)
  15.         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  16.         at org.apache.spark.CacheManager.getOrCompute(CacheManager.scala:77)
  17.         at org.apache.spark.rdd.RDD.iterator(RDD.scala:227)
  18.         at org.apache.spark.rdd.FilteredRDD.compute(FilteredRDD.scala:34)
  19.         at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:262)
  20.         at org.apache.spark.rdd.RDD.iterator(RDD.scala:229)
  21.         at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:111)
  22.         at org.apache.spark.scheduler.Task.run(Task.scala:51)
  23.         at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:183)
  24.         at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
  25.         at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
  26.         at java.lang.Thread.run(Thread.java:619)
复制代码

看下JdbcRDD类的compute函数实现就知道了:

  1. override def compute(thePart: Partition, context: TaskContext) = new NextIterator[T] {
  2.     context.addOnCompleteCallback{ () => closeIfNeeded() }
  3.     val part = thePart.asInstanceOf[JdbcPartition]
  4.     val conn = getConnection()
  5.     val stmt = conn.prepareStatement(sql, ResultSet.TYPE_FORWARD_ONLY,
  6.                                 ResultSet.CONCUR_READ_ONLY)
  7.     if (conn.getMetaData.getURL.matches("jdbc:mysql:.*")) {
  8.       stmt.setFetchSize(Integer.MIN_VALUE)
  9.       logInfo("statement fetch size set to: " + stmt.getFetchSize +
  10.                                 " to force mySQL streaming ")
  11.     }
  12.     stmt.setLong(1, part.lower)
  13.     stmt.setLong(2, part.upper)
  14. ...........................
复制代码

不过值得高兴的是,我们可以自定义一个JdbcRDD,修改上面的计算思路,这样就可以得到符合我们自己要求的JdbcRDD。


对于JAVA例子,JdbcRDD类的最后一个参数很不好传,很多人都没有实现


http://www.songyafei.cn/post/a0d5b_26775a3

已有(1)人评论

跳转到指定楼层
韩克拉玛寒 发表于 2014-11-5 08:59:44
谢谢楼主。很不错的文章。分享了
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条