分享

如何定时清空sparkstreaming的统计结果,重新开始统计

此名木有人用 发表于 2016-5-9 20:46:24 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 11 32979
需求:spark streaming kafka 通过 updateStateByKey 能定时统计出结果来,因为需要每天的0点需要重新统计结果。
那么该如何将当天的统计的结果清除,然后在不停止这个spark程序的情况下重新开始统计呢?

已有(11)人评论

跳转到指定楼层
starrycheng 发表于 2016-5-10 08:21:43
此名木有人用 发表于 2016-5-9 21:39
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, Stri ...

这是楼主自己写的还是别人的。
如果熟悉建议加上代码注解。
而且楼主需要的是定时器,可以启动定时器RecurringTimer,判断时间到了之后,清空内存。

回复

使用道具 举报

hyj 发表于 2016-5-9 21:00:01
描述的有点抽象。
你统计信息放到哪里了
又是如何提取的。
无论是存储到数据库还是文件,可以使用shell,定时删除
回复

使用道具 举报

此名木有人用 发表于 2016-5-9 21:06:42
本帖最后由 此名木有人用 于 2016-5-9 21:14 编辑
hyj 发表于 2016-5-9 21:00
描述的有点抽象。
你统计信息放到哪里了
又是如何提取的。

不好意思啊,描述的有点抽象。我的意思是现在一直跑着的spark实时统计的程序,因为一直消费着kafka中的数据,所以一直跑着,统计的结果每次updateStateByKey的时候写入redis。但是有个新的需求是。每天过0点的时候 现在的统计结果需要舍弃,重新统计新的kafka中的消息(也就是比如5.9号的结果统计的是(a,100) 那么如果接着统计5.10号的日志的话会叠加(a,101),所以需要舍弃(a,100)的结果,重新统计结果为(a,1))。现在想的l有点low的办法是。将这个正在跑的程序stop掉,然后再重新启动程序,设置kafka的offset 然后接着统计。但是这样每天需要停止程序,启动程序。这样是不是不好,或者说有没有别的更好的办法。谢谢!
回复

使用道具 举报

tntzbzc 发表于 2016-5-9 21:23:53
本帖最后由 tntzbzc 于 2016-5-9 21:25 编辑
此名木有人用 发表于 2016-5-9 21:06
不好意思啊,描述的有点抽象。我的意思是现在一直跑着的spark实时统计的程序,因为一直消费着kafka中的数 ...

是想定时清空redis?
还是想定时清空什么?
上面无论定时清空什么,都可以实现的。当然重启也是种办法。
如果想清空spark stream程序里面应该也有重置的函数



回复

使用道具 举报

此名木有人用 发表于 2016-5-9 21:25:14
tntzbzc 发表于 2016-5-9 21:23
还是没说清定时清空是什么?
是想定时清空redis?
还是想定时清空什么?

每天的0点清空spark 已经统计的结果 不是redis里的数据,
回复

使用道具 举报

tntzbzc 发表于 2016-5-9 21:27:03
此名木有人用 发表于 2016-5-9 21:25
每天的0点清空spark 已经统计的结果 不是redis里的数据,

spark统计的你放到内存了吧。直接清空内存就好了。
比如你放到数组里,那你就重置下数组。或则你放到rdd里,那就把rdd释放掉或则置为空等
回复

使用道具 举报

此名木有人用 发表于 2016-5-9 21:39:34
tntzbzc 发表于 2016-5-9 21:27
spark统计的你放到内存了吧。直接清空内存就好了。
比如你放到数组里,那你就重置下数组。或则你放到rdd ...

val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder, String](ssc, kafkaParams, fromOffsets, messageHandler)
    kafkaStream.map((_, 1)).reduceByKey((_ + _)).updateStateByKey((x, y: Option[Int]) => {
        Some(y.getOrElse(0) + x.sum)
      }).print()
这是代码。。。该如何清空呢。。谢谢

回复

使用道具 举报

此名木有人用 发表于 2016-5-10 09:21:06
starrycheng 发表于 2016-5-10 08:21
这是楼主自己写的还是别人的。
如果熟悉建议加上代码注解。
而且楼主需要的是定时器,可以启动定时器Re ...

代码是我自己写的。现在的问题是。。我不知道如何清空spark统计结果的内存。代码如何写不太清楚。
回复

使用道具 举报

starrycheng 发表于 2016-5-10 10:47:55
此名木有人用 发表于 2016-5-10 09:21
代码是我自己写的。现在的问题是。。我不知道如何清空spark统计结果的内存。代码如何写不太清楚。

首先这里面使用了rdd了吧。
上面先不要管打印的事情,先把rdd表示出来。
rdd有了,根据定时器,调用unpersist释放rdd

unpersist注释:
释放RDD,将存入磁盘和内存的数据元素释放。但是RDD对象会被保留,在后续使用它的时候,Spark会重新依据依赖图计算。

回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条