分享

使用Storm和Trident进行实时趋势分析(二)

本帖最后由 poppowerlb2 于 2015-6-3 00:02 编辑
问题导读

1.如何发送日志消息给Kafka
2.怎样进行日志拓扑分析?
3.如何
使用Trident实现指数加权移动平均
4.怎样
使用storm和XMPP协议发送警报和通知




上接:http://www.aboutyun.com/forum.ph ... e=1&extra=#pid94943

最后的拓扑

我们现在已经有所有必要的组件来构建我们的日志分析拓扑如下:
[mw_shl_code=java,true]public class LogAnalysisTopology {
    public static StormTopologybuildTopology(){
        TridentTopologytopology = newTridentTopology();
       KafkaConfig.StaticHosts kafkaHosts=KafkaConfig.StaticHosts.fromHostString(
                      Arrays.asList(newString[]{"testserver"}), 1);
        TridentKafkaConfigspoutConf = newTridentKafkaConfig(kafkaHosts, "log-analysis");
        //spoutConf.scheme= newStringScheme();
        spoutConf.scheme =newSchemeAsMultiScheme(new StringScheme());
       spoutConf.forceStartOffsetTime(-1);
       OpaqueTridentKafkaSpout spout = newOpaqueTridentKafkaSpout(spoutConf);
        Stream spoutStream=topology.newStream("kafka-stream", spout);
        Fields jsonFields =newFields("level","timestamp","message","logger");
        Stream parsedStream=spoutStream.each(new
               Fields("str"),newJsonProjectFunction(jsonFields), jsonFields);
        // drop theunparsed JSON to reducetuple size
        parsedStream=parsedStream.project(jsonFields);
        EWMA ewma = newEWMA().sliding(1.0,
              EWMA.Time.MINUTES).withAlpha(EWMA.ONE_MINUTE_ALPHA);
        StreamaverageStream =parsedStream.each(new Fields("timestamp"),
               new MovingAverageFunction(ewma,
                       EWMA.Time.MINUTES),newFields("average"));
       ThresholdFilterFunction tff = newThresholdFilterFunction(50D);
        StreamthresholdStream =averageStream.each(new Fields("average"), tff,
               new Fields("change","threshold"));
        StreamfilteredStream =
               thresholdStream.each(newFields("change"), newBooleanFilter());
      filteredStream.each(filteredStream.getOutputFields(),
               new XMPPFunction(newNotifyMessageMapper()), new Fields());
        returntopology.build();
    }
    public static void main(String[]args)throws
           Exception {
        Config conf = newConfig();
       conf.put(XMPPFunction.XMPP_USER,"storm@budreau.local");
       conf.put(XMPPFunction.XMPP_PASSWORD,"storm");
       conf.put(XMPPFunction.XMPP_SERVER,"budreau.local");
        conf.put(XMPPFunction.XMPP_TO,"tgoetz@budreau.local");
       conf.setMaxSpoutPending(5);
        if (args.length ==0) {
          LocalClustercluster = new LocalCluster();
         cluster.submitTopology("log-analysis", conf, buildTopology());
        } else {
           conf.setNumWorkers(3);
          StormSubmitter.submitTopology(args[0],
                   conf, buildTopology());
        }
    }
}[/mw_shl_code]
然后,buildTopology()方法创建所有kafka spout和trident之间的流连接功能和过滤器。
main()方法然后提交拓扑到一个集群:如果是运行在本地模式就是本地集群或远程集群运行时模式就是 分布式模式。
我们开始通过配置kafka spout来读取我们来自应用程序配置写日志事件的同一个话题。因为kafka持久化所有接受的消息,因为我们的应用程序可能已经跑了一段时间(因此记录了许多事件),我们告诉spout快进到kafka的结束队列通过调用forceStartOffsetTime()方法的值为1。这将避免所有的旧消息的重播,我们可能不感兴趣。使用的值2将迫使spout回退队列,并使用一个特定的日期以毫秒为单位将迫使它回放到特定的时间点。如果forceFromStartTime()方法不调用,spout将尝试恢复,最后离开zookeeper通过查找一个偏移量。
接下来,我们设置JsonProjectFunction类来解析从kafka收到的原始JSON并释放出我们感兴趣的值。回想一下,trident的功能是附加的。这意味着我们的元组流,除了所有JSON,提取的值也会包含原始的未解析JSON字符串。因为我们不再需要这些数据,我们调用Stream.project()方法截取我们想要的字段列表。project()方法用于减少元组流,保留只是必要的字段,它尤为重要在实现大量数据流时。
最后,我们应用BooleanFilter类来连接产生的流给XMPPFunction类。
拓扑的main()方法简单填充一个XMPPFunction类配置对象所需的属性并提交拓扑。
运行日志分析拓扑

为了运行分析拓扑结构,首先确保本章早些时候提到的Zookeeper,kafka,OpenFire都是启动和运行。然后,运行拓扑的main()方法。
当拓扑激活时,storm XMPP用户连接到XMPP服务器并触发一个事件。如果你登录到XMPP的同一台服务器客户端和storm用户在你的好友列表,您将看到它变得可用。如下截图所示:
phone_xmpp.jpg
接下来,运行RogueApplication,等待一分钟。您应该会收到即时消息通知,指示阈值被超过,然后紧随其后的是一个指示恢复正常(低于阈值),如下截图所示:
screenshot.jpg
总结

在这一章,我们已经向您介绍了实时分析通过创建一个简单但强大的拓扑结构,可以适应范围广泛的应用程序。我们构建的组件是通用的,可以很容易地重用在其他项目和扩展。最后,我们介绍了一个实际的spout 实现,可用于多种用途。
而实时分析的话题非常广泛,诚然我们本章并不周详,我们鼓励你去探索在本书的其他章节的技术,并考虑他们如何可能被纳入你的分析工具。
在下一章,我们将向您介绍trident的分布式状态机制,构建一个应用程序不断地把storm处理的数据写入到图形数据库。

已有(5)人评论

跳转到指定楼层
邓立辉 发表于 2015-10-19 20:59:16
谢谢分享楼主
回复

使用道具 举报

bbfj 发表于 2016-3-27 09:55:33
不错的,讲解的很详细
回复

使用道具 举报

ningjianbang 发表于 2016-4-6 17:05:35
谢谢楼主分享,学习了
回复

使用道具 举报

jmamike 发表于 2017-6-14 11:32:59
不错,有参考价值
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条