分享

求助,SparkSQL和Streaming结合程序出现的问题

SingleDee 发表于 2016-1-8 13:28:12 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 11018
本帖最后由 SingleDee 于 2016-1-8 13:34 编辑

小弟刚接触Streaming,有一个需求是需要到Mysql获取一个表的数据,用来当Streaming程序Filter的条件,
def GetMonNumber(sqlContext: SQLContext): Map[String, Int] = {
  val df = sqlContext.jdbc("jdbc:mysql://DB:3306/xxxx", "Table")
  df.map(x => (x.getString(0), x.getInt(1))).collect().toMap[String, Int]
}
返回一个Map[String,Int]

现在有一个问题,我在Mysql更新了一下表(增加数据或者删除数据)
在Spark这边体现不出来,还是复用了第一次select的结果。
请问有什么办法让程序每次都必须重新获取Mysql的表?



已有(2)人评论

跳转到指定楼层
arsenduan 发表于 2016-1-8 14:55:35
这个方法有三种:
第一种,使用触发器,不过难度比较大。
一旦更新或则删除,执行触发器,然后调用程序,重新读取

第二种,一旦mysql进行操作的时候,spark里面进行同步操作。不过这个可能会有问题,因为一旦数据库删除失败,而spark删除成功,这样就导致不一致了。所以这里有个先后顺序

第三种:定时读取数据库,这样无论是否更新,都能及时同步
回复

使用道具 举报

regan 发表于 2016-1-8 19:15:01
       楼主,你的Spark中没有得到数据库中更新的表的数据,这很显然你启动你spark的时候只查取了一次数据,并将查找出来的数据放入到了Spark内存中,当你数据库中数据更新后,你Spark没有重新去查数据库,而使用的是Spark启动的时候从数据库中查找出来放在Spark内存中的数据。因此才会出现这种情况。
     对于这种情况我给的建议是:你可以在你的程序中的map、flatMap等方法中调用你getMonNumber()方法,原因是map,flatmap操作是作用在数据集上面的每一个元素。如果你的数据集有100个元素,那么map/flatMap会执行100次,在其中调用100次getMonNumber方法。当然考虑到查询的效率,其实在这100个数据上只需要查询一次数据库。为了实现只查询一次这种效果,你可以模仿如下程序:(哥哥尝试过的,呵呵)
先定义一个原子变量,作为是否查找过数据库的标记
private val inited: AtomicInteger = new AtomicInteger(0)

然后在你的程序中的map或flatMap等方法中调用你写的getMonNumber方法,可参考如下:
val decodedRowsRDD = stream.map(a => {
  init()
  val b = a.toArray
  val result = MRDD.decodeLine(b)
  result
})
private def init(): Unit = {
  if (inited.get == 0) {
    logInfo("init method was called!")
    getMonNumber();//这是你写的查询表的方法
    inited.incrementAndGet()
    inited.synchronized(inited.notify())
  }
}



OK!!!! 你尝试着这个思路玩一玩

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条