分享

Storm在线业务实践-集群空闲CPU飙高问题排查

PeersLee 发表于 2016-3-30 18:14:43 [显示全部楼层] 只看大图 回帖奖励 阅读模式 关闭右栏 5 13694
本帖最后由 PeersLee 于 2016-3-30 18:27 编辑
问题导读:

1.什么是Storm?
2.集群空闲CPU飙高出现的现象是什么样的?
3.出现此现象之后应该如何进行排查?





解决方案:

首先简单介绍一下Storm,熟悉的同学可以直接跳过这段。

Storm是Twitter开源的一个大数据处理框架,专注于流式数据的处理。Storm通过创建拓扑结构(Topology)来转换数据流。和Hadoop的作业(Job)不同,Topology会持续转换数据,除非被集群关闭。

下图是一个简单的Storm Topology结构图。


2016-03-30_175144.png

可以看出Topology是由不同组件(Component)串/并联形成的有向图。数据元组(Tuple)会在Component之间通过数据流的形式进行有向传递。Component有两种

Spout:Tuple来源节点,持续不断的产生Tuple,形成数据流
Bolt:Tuple处理节点,处理收到的Tuple,如果有需要,也可以生成新的Tuple传递到其他Bolt
目前业界主要在离线或者对实时性要求不高业务中使用Storm。随着Storm版本的更迭,可靠性和实时性在逐渐增强,已经有运行在线业务的能力。因此我们尝试将一些实时性要求在百毫秒级的在线业务迁入Storm集群。





现象
  • 某次高峰时,Storm上的一个业务拓扑频繁出现消息处理延迟。延时达到了10s甚至更高。查看高峰时的物理机指标监控,CPU、内存和IO都有很大的余量。判断是随着业务增长,服务流量逐渐增加,某个Bolt之前设置的并行度不够,导致消息堆积了。
  • 临时增加该Bolt并行度,解决了延迟的问题,但是第二天的低峰期,服务突然报警,CPU负载过高,达到了100%。






排查
1.用Top看了下CPU占用,系统调用占用了70%左右。再用wtool对Storm的工作进程进行分析,找到了CPU占用最高的线程
[mw_shl_code=applescript,true]java.lang.Thread.State: TIMED_WAITING (parking)
        at sun.misc.Unsafe.park(Native Method)
        - parking to wait for  <0x0000000640a248f8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
        at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:215)
        at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2163)
        at com.lmax.disruptor.BlockingWaitStrategy.waitFor(BlockingWaitStrategy.java:87)
        at com.lmax.disruptor.ProcessingSequenceBarrier.waitFor(ProcessingSequenceBarrier.java:54)
        at backtype.storm.utils.DisruptorQueue.consumeBatchWhenAvailable(DisruptorQueue.java:97)
        at backtype.storm.disruptor$consume_batch_when_available.invoke(disruptor.clj:80)
        at backtype.storm.daemon.executor$fn__3441$fn__3453$fn__3500.invoke(executor.clj:748)
        at backtype.storm.util$async_loop$fn__464.invoke(util.clj:463)
        at clojure.lang.AFn.run(AFn.java:24)
        at java.lang.Thread.run(Thread.java:745)[/mw_shl_code]


我们可以看到这些线程都在信号量上等待。调用的来源是disruptor$consume_batch_when_available。

2.disruptor是Storm内部消息队列的封装。所以先了解了一下Storm内部的消息传输机制。
2016-03-30_175953.png

(图片来源Understanding the Internal Message Buffers of Storm)

Storm的工作节点称为Worker(其实就是一个JVM进程)。不同Worker之间通过Netty(旧版Storm使用ZeroMQ)进行通讯。

每个Worker内部包含一组Executor。Strom会为拓扑中的每个Component都分配一个Executor。在实际的数据处理流程 中,数据以消息的形式在Executor之间流转。Executor会循环调用绑定的Component的处理方法来处理收到的消息。

Executor之间的消息传输使用队列作为消息管道。Storm会给每个Executor分配两个队列和两个处理线程。

工作线程:读取接收队列,对消息进行处理,如果产生新的消息,会写入发送队列
发送线程:读取发送队列,将消息发送其他Executor
当Executor的发送线程发送消息时,会判断目标Executor是否在同一Worker内,如果是,则直接将消息写入目标Executor的接收队列,如果不是,则将消息写入Worker的传输队列,通过网络发送。

Executor工作/发送线程读取队列的代码如下,这里会循环调用consume-batch-when-available读取队列中的消息,并对消息进行处理。


[mw_shl_code=applescript,true](async-loop
  (fn []
    ...
    (disruptor/consume-batch-when-available receive-queue event-handler)            
    ...
    ))[/mw_shl_code]

3.我们再来看一下consume_batch_when_available这个函数里做了什么。
[mw_shl_code=applescript,true](defn consume-batch-when-available
  [^DisruptorQueue queue handler]
  (.consumeBatchWhenAvailable queue handler))[/mw_shl_code]

前面提到Storm使用队列作为消息管道。Storm作为流式大数据处理框架,对消息传输的性能很敏感,因此使用了高效内存队列Disruptor Queue作为消息队列。

2016-03-30_180204.png

Disruptor Queue是LMAX开源的一个无锁内存队列。内部实现如下。

2016-03-30_180238.png

Disruptor Queue通过Sequencer来管理队列,Sequencer内部使用RingBuffer存储消息。RingBuffer中消息的位置使用Sequence表示。队列的生产消费过程如下

Sequencer使用一个Cursor来保存写入位置。
每个Consumer都会维护一个消费位置,并注册到Sequencer。
Consumer通过SequenceBarrier和Sequencer进行交互。Consumer每次消费时,SequenceBarrier会比较消费位置和Cursor来判断是否有可用消息:如果没有,会按照设定的策略等待消息;如果有,则读取消息,修改消费位置。
Producer在写入前会查看所有消费者的消费位置,在有可用位置时会写入消息,更新Cursor。
查看DisruptorQueue.consumeBatchWhenAvailable实现如下

[mw_shl_code=applescript,true]final long nextSequence = _consumer.get() + 1;
final long availableSequence = _barrier.waitFor(nextSequence, 10, TimeUnit.MILLISECONDS);
if (availableSequence >= nextSequence) {
    consumeBatchToCursor(availableSequence, handler);
}[/mw_shl_code]

继续查看_barrier.waitFor方法

[mw_shl_code=applescript,true]public long waitFor(final long sequence, final long timeout, final TimeUnit units) throws AlertException, InterruptedException {
    checkAlert();
    return waitStrategy.waitFor(sequence, cursorSequence, dependentSequences, this, timeout, units);
}[/mw_shl_code]

Disruptor Queue为消费者提供了若干种消息等待策略

BlockingWaitStrategy:阻塞等待,CPU占用小,但是会切换线程,延迟较高
BusySpinWaitStrategy:自旋等待,CPU占用高,但是无需切换线程,延迟低
YieldingWaitStrategy:先自旋等待,然后使用Thread.yield()唤醒其他线程,CPU占用和延迟比较均衡
SleepingWaitStrategy:先自旋,然后Thread.yield(),最后调用LockSupport.parkNanos(1L),CPU占用和延迟比较均衡
Storm的默认等待策略为BlockingWaitStrategy。BlockingWaitStrategy的waitFor函数实现如下


[mw_shl_code=applescript,true]if ((availableSequence = cursor.get()) < sequence) {
        lock.lock();
        try {
            ++numWaiters;
            while ((availableSequence = cursor.get()) < sequence) {
                barrier.checkAlert();

                if (!processorNotifyCondition.await(timeout, sourceUnit)) {
                    break;
                }
            }
        }
        finally {
            --numWaiters;
            lock.unlock();
        }
}[/mw_shl_code]

BlockingWaitStrategy内部使用信号量来阻塞Consumer,当await超时后,Consumer线程会被自动唤醒,继续循环查询可用消息。

5.但是随着调高超时,测试时并没有发现消息处理有延时。继续查看BlockingWaitStrategy代码,发现Disruptor Queu的Producer在写入消息后会唤醒等待的Consumer。

[mw_shl_code=applescript,true]if (0 != numWaiters)
{
    lock.lock();
    try
    {
        processorNotifyCondition.signalAll();
    }
    finally
    {
        lock.unlock();
    }
}[/mw_shl_code]

这样,Storm的10ms超时就很奇怪了,没有减少消息延时,反而增加了系统负载。带着这个疑问查看代码的上下文,发现在构造DisruptorQueue对象时有这么一句注释

[mw_shl_code=applescript,true];; :block strategy requires using a timeout on waitFor (implemented in DisruptorQueue),
        as sometimes the consumer stays blocked even when there's an item on the queue.
(defnk disruptor-queue
    [^String queue-name buffer-size :claim-strategy :multi-threaded :wait-strategy :block]
    (DisruptorQueue. queue-name
                ((CLAIM-STRATEGY claim-strategy) buffer-size)
                (mk-wait-strategy wait-strategy)))[/mw_shl_code]

Storm使用的Disruptor Queue版本为2.10.1。查看Disruptor Queue的change log,发现该版本的BlockingWaitStrategy有潜在的并发问题,可能导致某条消息在写入时没有唤醒等待的消费者。

[mw_shl_code=applescript,true]2.10.2 Released (21-Aug-2012)

Bug fix, potential race condition in BlockingWaitStrategy.
Bug fix set initial SequenceGroup value to -1 (Issue #27).
Deprecate timeout methods that will be removed in version 3.[/mw_shl_code]

因此Storm使用了短超时,这样在出现并发问题时,没有被唤醒的消费方也会很快因为超时重新查询可用消息,防止出现消息延时。

这样如果直接修改超时到1000ms,一旦出现并发问题,最坏情况下消息会延迟1000ms。在权衡性能和延时之后,我们在Storm的配置文件中增加配置项来修改超时参数。这样使用者可以自己选择保证低延时还是低CPU占用率。


6.就BlockingWaitStrategy的潜在并发问题咨询了Disruptor Queue的作者,得知2.10.4版本已经修复了这个并发问题(Race condition in 2.10.1 release
)。

将Storm依赖升级到此版本。但是对Disruptor Queue的2.10.1做了并发测试,无法复现这个并发问题,因此也无法确定2.10.4是否彻底修复。谨慎起见,在升级依赖的同时保留了之前的超时配 置项,并将默认超时调整为1000ms。经测试,在集群空闲时CPU占用正常,并且压测也没有出现消息延时。






总结
关于集群空闲CPU反而飙高的问题,已经向Storm社区提交PR并且已被接受[STORM-935] Update Disruptor queue version to 2.10.4。在线业务流量通常起伏很大,如果被这个问题困扰,可以考虑应用此patch。
Storm UI中可以看到很多有用的信息,但是缺乏记录,最好对其进行二次开发(或者直接读取ZooKeeper中信息),记录每个时间段的数据,方便分析集群和拓扑运行状况。
来自:http://daiwa.ninja/index.php/2015/07/18/storm-cpu-overload/

已有(5)人评论

跳转到指定楼层
德像天地 发表于 2016-3-30 20:45:51
楼主好人谢谢分享
回复

使用道具 举报

xiaosong_6666 发表于 2016-6-15 20:40:52
不错,支持下
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条