说在前面的话:
几年前我做过银行项目,那会使用IBM的datastage这个产品,只要拖拖拽拽就可以实现多数据源的表关联取数然后落地到另一张表里现在回想起来那个产品的设计思路真是高大上,之前的看法是这玩意就是封装了类似存储过程的东西,底层还是sql语句,现在看来
那会太天真了,这么多年过去了现在已经是大数据的天下,技术保守的银行都已经过陆陆续续上了大数据项目,大数据上的很多框架
也开始有这种思想的实现,drill,spark,hive等也都给出了相应的解决方法,真正具有前瞻性的思想还是在那些顶级互联网公司
浅谈下hive中如何实现多数据源整合,hive还是借助的sqoop工具,比如把mysql数据同步到hdfs实现数据物理的集中,进而可以对
多数据源进行关联取数,相信未来hive的某个版本可以创建一个函数就能实现和关系数据库甚至文件的数据交互。
drill则是给出了更为明了的实现:
SELECT custview.membership, sum(orders.order_total) AS salesFROM hive.orders, custview, dfs.`clicks/clicks.json` c WHERE orders.cust_id = custview.cust_id AND orders.cust_id = c.user_info.cust_id GROUP BY custview.membershipORDER BY 2;
那么一直以性能和一栈式开发著称的spark是如何给出这一思想的具体实现的呢?
DataFrame
其实早起版本的spark也能实现,只是会稍微麻烦些,需要用java那套写jdbc的方式取关系型数据库的数据,或者sparkContext.read/textFile
的方式封装成一个rdd.toDf(),然后可以注册临时表,对那个文件或者数据库表数据进行sql操作,其实这些都是逻辑的封装,都知道它的lazy
特性,除非触发action操作。后来DataFrame变得更强大了,直接.jdbc:
[mw_shl_code=scala,true]package spark
import java.sql.{Connection, DriverManager, ResultSet, Statement}
import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.{Logging, SparkConf, SparkContext}
import scala.collection.mutable.ListBuffer
/**
* 解析mysql表中信息,注册成spark临时表然后对临时表做操作
* Created by 圣斗士宙斯 on 2017/8/16.
*/
object SparkRegisterTempTab extends App with Logging {
case class DataSource(datasourcetype: Int,
url: String,
sourcetable: String,
targettable: String,
username: String,
password: String)
val conf = new SparkConf()
.setAppName("APP")
.setMaster("local[2]")
val driver = "com.mysql.jdbc.Driver"
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
val aa: Unit = parse(sqlContext)
val rdd: DataFrame = sqlContext.sql("select * from test")
//mysql数据转parquet
rdd.write.mode(SaveMode.Overwrite).format("parquet").save("C:\\Users\\zhuqitian\\Desktop\\orc")
// 元数据
def getMetaData(sqlContext: SQLContext) = {
val url = sqlContext.getConf("spark.datasource.url",
"jdbc:mysql://localhost:3306/spark_project?user=root&password=123456")
val dataSourceTable = sqlContext.getConf("spark.datasource.table",
"datasource")
Class.forName(driver)
val dataSources = new ListBuffer[DataSource]()
var conn: Connection = null
var stat: Statement = null
var rs: ResultSet = null
try {
conn = DriverManager.getConnection(url)
stat = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY)
rs = stat.executeQuery("SELECT * FROM " + dataSourceTable)
while (rs.next()) {
val url = rs.getString("url")
val username = rs.getString("username")
val password = rs.getString("password")
val sourceTable = rs.getString("sourcetable")
val targetTable = rs.getString("targettable")
val dataSourceType = rs.getString("datasourcetype")
dataSources += DataSource(dataSourceType.toInt, url, sourceTable, targetTable, username, password)
}
} finally {
rs.close()
stat.close()
conn.close()
}
dataSources
}
//对数据类型做判断
def parse(sqlContext: SQLContext): Unit = {
val dataSources = getMetaData(sqlContext)
for (dataSource <- dataSources) {
dataSource.datasourcetype match {
//mysql
case 1 => {
sqlContext.read.format("jdbc").options(
Map(
"url" -> dataSource.url,
"dbtable" -> dataSource.sourcetable,
"driver" -> driver,
"user" -> dataSource.username,
"password" -> dataSource.password))
.load.registerTempTable(dataSource.targettable)
logInfo("load mysql :" + dataSource.sourcetable + " to " + dataSource.targettable + " success...")
}
//...
case _ =>
}
}
}
}
[/mw_shl_code]
如果大家有好的建议可以给我留言,一起探讨
|
|