分享

请教:mapreduce作业中的多指标去重问题

wjhdtx 发表于 2014-11-24 08:16:16 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 13 25773
说下需求,日志格式如下:
  1. 北京,visitIp:58.25.125.65,loginIp:58.25.125.65,。。。
  2. 天津,visitIp:57.25.124.65,loginIp:57.25.124.65,。。。
  3. 天津,visitIp:57.25.124.67,loginIp:,。。。
  4. 上海,visitIp:96.28.129.67,loginIp:,。。。
  5. 北京,visitIp:58.25.125.65,loginIp:58.25.125.65,。。。
复制代码
需要统计的结果是:按城市统计visitIp和loginIp出现的个数(都需要IP去重),即每个城市每天的UV(用户访问数)和LP(登录用户数),UV和LP的统计需要IP去重。

如果一次作业只统计一个指标,比如UV或者LP,方案是比较多的。但随着日志量的增加,如果一个作业统计一个指标,那作业的数量太多了,所以想在一次作业中就统计出这两个指标(实际上有更多指标)

需求大概是这样。

我目前的做法是在map端去重,key是城市,在redis中保存已记录的visitIp和loginIp。
  1. public void map(...)
  2.         {
  3.                 ...
  4.                 if(redis.sIsMember(uvSetKey.getBytes(), visitIp.getBytes()))
  5.                 {
  6.                         uv = 1;
  7.                         redis.sAdd(uvSetKey.getBytes(), visitIp.getBytes())
  8.                 }
  9.                 ...
  10.                 if(redis.sIsMember(lpSetKey.getBytes(), loginIp.getBytes()))
  11.                 {
  12.                         lp = 1;
  13.                         redis.sAdd(lpSetKey.getBytes(), loginIp.getBytes())
  14.                 }
  15.                 ...
  16.                 context.write(city, uv+"|"+lp);
  17.         }
复制代码
相信大家已经看出来了,有个问题,多个map并行运行时,上面代码线程不安全,会导致uv和lp比实际的个数多,因为  redis.sIsMember-操作-redis.sAdd  不是原子操作,我的想法是分布式锁,但目前没有好的高性能分布式锁开发出来。


我的问题:
1. 不知道这种需求大家是怎么做的?
2. 分布式锁使用恰当吗?如果使用分布式锁,有推荐的成熟实现吗?


谢谢大家!




已有(14)人评论

跳转到指定楼层
bioger_hit 发表于 2014-11-24 08:59:18


大概有两种方案,可以看看这个是否对你有帮助

Redis实现锁总结
回复

使用道具 举报

desehawk 发表于 2014-11-24 11:01:06
锁有很多种方式,没有完美的锁,只能根据自己的实际情况来。


下面可以参考:

Redis有一系列的命令,特点是以NX结尾,NX是Not eXists的缩写,如SETNX命令就应该理解为:SET if Not eXists。这系列的命令非常有用,这里讲使用SETNX来实现分布式锁。

用SETNX实现分布式锁
利用SETNX非常简单地实现分布式锁。例如:某客户端要获得一个名字foo的锁,客户端使用下面的命令进行获取:
SETNX lock.foo <current Unix time + lock timeout + 1>
如返回1,则该客户端获得锁,把lock.foo的键值设置为时间值表示该键已被锁定,该客户端最后可以通过DEL lock.foo来释放该锁。
如返回0,表明该锁已被其他客户端取得,这时我们可以先返回或进行重试等对方完成或等待锁超时。


解决死锁
上面的锁定逻辑有一个问题:如果一个持有锁的客户端失败或崩溃了不能释放锁,该怎么解决?我们可以通过锁的键对应的时间戳来判断这种情况是否发生了,如果当前的时间已经大于lock.foo的值,说明该锁已失效,可以被重新使用。

发生这种情况时,可不能简单的通过DEL来删除锁,然后再SETNX一次,当多个客户端检测到锁超时后都会尝试去释放它,这里就可能出现一个竞态条件,让我们模拟一下这个场景:

C0操作超时了,但它还持有着锁,C1和C2读取lock.foo检查时间戳,先后发现超时了。
C1 发送DEL lock.foo
C1 发送SETNX lock.foo 并且成功了。
C2 发送DEL lock.foo
C2 发送SETNX lock.foo 并且成功了。
这样一来,C1,C2都拿到了锁!问题大了!

幸好这种问题是可以避免的,让我们来看看C3这个客户端是怎样做的:

C3发送SETNX lock.foo 想要获得锁,由于C0还持有锁,所以Redis返回给C3一个0
C3发送GET lock.foo 以检查锁是否超时了,如果没超时,则等待或重试。
反之,如果已超时,C3通过下面的操作来尝试获得锁:
GETSET lock.foo <current Unix time + lock timeout + 1>
通过GETSET,C3拿到的时间戳如果仍然是超时的,那就说明,C3如愿以偿拿到锁了。
如果在C3之前,有个叫C4的客户端比C3快一步执行了上面的操作,那么C3拿到的时间戳是个未超时的值,这时,C3没有如期获得锁,需要再次等待或重试。留意一下,尽管C3没拿到锁,但它改写了C4设置的锁的超时值,不过这一点非常微小的误差带来的影响可以忽略不计。

注意:为了让分布式锁的算法更稳键些,持有锁的客户端在解锁之前应该再检查一次自己的锁是否已经超时,再去做DEL操作,因为可能客户端因为某个耗时的操作而挂起,操作完的时候锁因为超时已经被别人获得,这时就不必解锁了。

示例伪代码
根据上面的代码,我写了一小段Fake代码来描述使用分布式锁的全过程:
  1. # get lock
  2. lock = 0
  3. while lock != 1:
  4. timestamp = current Unix time + lock timeout + 1
  5. lock = SETNX lock.foo timestamp
  6. if lock == 1 or (now() > (GET lock.foo) and now() > (GETSET lock.foo timestamp)):
  7. break;
  8. else:
  9. sleep(10ms)
  10. # do your job
  11. do_job()
  12. # release
  13. if now() < GET lock.foo:
  14. DEL lock.foo
复制代码



是的,要想这段逻辑可以重用,使用python的你马上就想到了Decorator,而用Java的你是不是也想到了那谁?AOP + annotation?行,怎样舒服怎样用吧,别重复代码就行。

java之jedis实现
expireMsecs 锁持有超时,防止线程在入锁以后,无限的执行下去,让锁无法释放
timeoutMsecs 锁等待超时,防止线程饥饿,永远没有入锁执行代码的机会



expireMsecs 锁持有超时,防止线程在入锁以后,无限的执行下去,让锁无法释放
timeoutMsecs 锁等待超时,防止线程饥饿,永远没有入锁执行代码的机会

  1. /**
  2. * Acquire lock.
  3. *
  4. * @param jedis
  5. * @return true if lock is acquired, false acquire timeouted
  6. * @throws InterruptedException
  7. * in case of thread interruption
  8. */
  9. public synchronized boolean acquire(Jedis jedis) throws InterruptedException {
  10. int timeout = timeoutMsecs;
  11. while (timeout >= 0) {
  12. long expires = System.currentTimeMillis() + expireMsecs + 1;
  13. String expiresStr = String.valueOf(expires); //锁到期时间
  14. if (jedis.setnx(lockKey, expiresStr) == 1) {
  15. // lock acquired
  16. locked = true;
  17. return true;
  18. }
  19. String currentValueStr = jedis.get(lockKey); //redis里的时间
  20. if (currentValueStr != null && Long.parseLong(currentValueStr) < System.currentTimeMillis()) {
  21. //判断是否为空,不为空的情况下,如果被其他线程设置了值,则第二个条件判断是过不去的
  22. // lock is expired
  23. String oldValueStr = jedis.getSet(lockKey, expiresStr);
  24. //获取上一个锁到期时间,并设置现在的锁到期时间,
  25. //只有一个线程才能获取上一个线上的设置时间,因为jedis.getSet是同步的
  26. if (oldValueStr != null && oldValueStr.equals(currentValueStr)) {
  27. //如过这个时候,多个线程恰好都到了这里,但是只有一个线程的设置值和当前值相同,他才有权利获取锁
  28. // lock acquired
  29. locked = true;
  30. return true;
  31. }
  32. }
  33. timeout -= 100;
  34. Thread.sleep(100);
  35. }
  36. return false;
  37. }
复制代码

一般用法
其中很多繁琐的边缘代码
包括:异常处理,释放资源等等

  1. JedisPool pool;
  2. JedisLock jedisLock = new JedisLock(pool.getResource(), lockKey, timeoutMsecs, expireMsecs);
  3. try {
  4. if (jedisLock.acquire()) { // 启用锁
  5. //执行业务逻辑
  6. } else {
  7. logger.info("The time wait for lock more than [{}] ms ", timeoutMsecs);
  8. }
  9. } catch (Throwable t) {
  10. // 分布式锁异常
  11. logger.warn(t.getMessage(), t);
  12. } finally {
  13. if (jedisLock != null) {
  14. try {
  15. jedisLock.release();// 则解锁
  16. } catch (Exception e) {
  17. }
  18. }
  19. if (jedis != null) {
  20. try {
  21. pool.returnResource(jedis);// 还到连接池里
  22. } catch (Exception e) {
  23. }
  24. }
  25. }
复制代码

犀利用法
用匿名类来实现,代码非常简洁
至于SimpleLock的实现,请在附件中下载查看


redis-jedis-2.4.1_lock.zip (1.15 MB, 下载次数: 3)

点评

谢谢版主,使用了你提供的分布式锁。  发表于 2014-11-25 13:14
回复

使用道具 举报

jixianqiuxue 发表于 2014-11-24 11:02:46
楼主是怎么认为的
回复

使用道具 举报

wjhdtx 发表于 2014-11-24 14:28:31
大家做hadoop没遇到这种情况吗?都是用分布式锁来搞?
回复

使用道具 举报

bioger_hit 发表于 2014-11-24 15:51:18
wjhdtx 发表于 2014-11-24 14:28
大家做hadoop没遇到这种情况吗?都是用分布式锁来搞?
分布式的,如果不产生错误的话,恐怕需奥考虑分布式所,楼主 似乎有更好的解决方案或则想法没有讲出来。
回复

使用道具 举报

chinaboy2005 发表于 2014-11-24 16:25:00
学习一下,有没有解决方案啊
回复

使用道具 举报

desehawk 发表于 2014-11-24 19:43:11
chinaboy2005 发表于 2014-11-24 16:25
学习一下,有没有解决方案啊
上面两个方案,  挺不错的,可以尝试
回复

使用道具 举报

wjhdtx 发表于 2014-11-25 08:39:58
回复

使用道具 举报

nettman 发表于 2014-11-25 14:50:30
wjhdtx 发表于 2014-11-25 08:39
谢谢各位的回复,目前我也没什么好的方案(今天准备研究下redis分布式锁,试试看性能如何)。

实现多指 ...


可以看看zookeeper是否与redis相结合

分布式锁服务。当分布式系统操作数据,例如:读取数据、分析数据、最后修改数据。在分布式系统里这些操作可能会分散到集群里不同的节点上,那么这时候就存在数据操作过程中一致性的问题,如果不一致,我们将会得到一个错误的运算结果,在单一进程的程序里,一致性的问题很好解决,但是到了分布式系统就比较困难,因为分布式系统里不同服务器的运算都是在独立的进程里,运算的中间结果和过程还要通过网络进行传递,那么想做到数据操作一致性要困难的多。Zookeeper提供了一个锁服务解决了这样的问题,能让我们在做分布式数据运算时候,保证数据操作的一致性。
回复

使用道具 举报

12下一页
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条