分享

flume-kafka-storm日志处理经验

howtodown 2014-9-17 20:41:25 发表于 总结型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 20 211747
admln 发表于 2015-7-14 17:02:18
本帖最后由 admln 于 2015-7-14 17:03 编辑

1.日志的话用Logback;
2.入hbase的话用批量。
拙见供参考
回复

使用道具 举报

Pengjx2015 发表于 2015-11-15 14:07:55
回复

使用道具 举报

Pengjx2015 发表于 2016-1-14 12:28:24
谢谢楼主分享经验!
回复

使用道具 举报

Pengjx2015 发表于 2016-1-21 20:32:22
好资源,谢楼主分享
回复

使用道具 举报

daozhu 发表于 2016-7-11 22:35:06
不错  多谢
回复

使用道具 举报

hlmcm 发表于 2016-12-14 10:37:11
好资源,谢楼主分享
回复

使用道具 举报

wanghiu 发表于 2017-5-12 10:30:30
楼主您好,flume1.5,kafka 0.10.0,flume向kafka发送的消息,用kafkaconsumer,接收不到内容。
flume的配置文件:
        #config sources
        agent.sources.s1.type = netcat
        agent.sources.s1.bind = 192.168.0.101
        agent.sources.s1.port = 9093
        agent.sources.s1.channels=c1

        #config channels
        agent.channels.c1.type=memory
        agent.channels.c1.capacity=10000
        agent.channels.c1.transactionCapacity=100

        #config sinks
        agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
        agent.sinks.k1.channel=c1
        agent.sinks.k1.brokerList=192.168.0.101:9092
        agent.sinks.k1.topic=test
        agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
        agent.sinks.k1.producer.type=sync
        agent.sinks.k1.custom.encoding=UTF-8
        agent.sinks.k1.custom.topic.name=test

向flume监控端口发送消息
                URL url = new URL("http://192.168.0.101:9093");
                           HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                           connection.setDoOutput(true);
                           connection.setDoInput(true);
                           connection.setRequestMethod("POST");
                           connection.setUseCaches(false);
                           connection.setInstanceFollowRedirects(true);
                           connection.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
                           connection.connect();
                           DataOutputStream out = new DataOutputStream(
                           connection.getOutputStream());
                           String testStr = "hello";
                           out.writeBytes(testStr);
                           out.flush();
                           out.close();

接收部分的代码:
        Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.101:9092");
            props.put("group.id", "test-consumer-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "earliest");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumerNew = new KafkaConsumer<>(props);
            consumerNew.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = consumerNew.poll(1);
                for (ConsumerRecord<String, String> record : records){
                        System.out.println("value="+record.value());
                }
             }
kafkaconsumer接收到的消息是:
        value=POST / HTTP/1.1
        value=Content-Type: application/x-www-form-urlencoded
        value=Cache-Control: no-cache
        value=Pragma: no-cache
        value=User-Agent: Java/1.8.0_112
        value=Host: 192.168.0.101:9093
        value=Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
        value=Connection: keep-alive
        value=Content-Length: 5
        value=

请教一下,该如何写,才能读取到flume发送到kafka的消息
回复

使用道具 举报

wanghiu 发表于 2017-5-12 10:31:49
楼主您好,flume 1.5,kafka 0.10.0,flume向kafka发送的消息,用kafkaconsumer,接收不到内容。
flume的配置文件:
        #config sources
        agent.sources.s1.type = netcat
        agent.sources.s1.bind = 192.168.0.101
        agent.sources.s1.port = 9093
        agent.sources.s1.channels=c1

        #config channels
        agent.channels.c1.type=memory
        agent.channels.c1.capacity=10000
        agent.channels.c1.transactionCapacity=100

        #config sinks
        agent.sinks.k1.type= org.apache.flume.sink.kafka.KafkaSink
        agent.sinks.k1.channel=c1
        agent.sinks.k1.brokerList=192.168.0.101:9092
        agent.sinks.k1.topic=test
        agent.sinks.k1.serializer.class=kafka.serializer.StringEncoder
        agent.sinks.k1.producer.type=sync
        agent.sinks.k1.custom.encoding=UTF-8
        agent.sinks.k1.custom.topic.name=test

向flume监控端口发送消息
                URL url = new URL("http://192.168.0.101:9093");
                           HttpURLConnection connection = (HttpURLConnection) url.openConnection();
                           connection.setDoOutput(true);
                           connection.setDoInput(true);
                           connection.setRequestMethod("POST");
                           connection.setUseCaches(false);
                           connection.setInstanceFollowRedirects(true);
                           connection.setRequestProperty("Content-Type","application/x-www-form-urlencoded");
                           connection.connect();
                           DataOutputStream out = new DataOutputStream(
                           connection.getOutputStream());
                           String testStr = "hello";
                           out.writeBytes(testStr);
                           out.flush();
                           out.close();

接收部分的代码:
        Properties props = new Properties();
            props.put("bootstrap.servers", "192.168.0.101:9092");
            props.put("group.id", "test-consumer-group");
            props.put("enable.auto.commit", "true");
            props.put("auto.commit.interval.ms", "1000");
            props.put("auto.offset.reset", "earliest");
            props.put("session.timeout.ms", "30000");
            props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            KafkaConsumer<String, String> consumerNew = new KafkaConsumer<>(props);
            consumerNew.subscribe(Arrays.asList("test"));
            while (true) {
                ConsumerRecords<String, String> records = consumerNew.poll(1);
                for (ConsumerRecord<String, String> record : records){
                        System.out.println("value="+record.value());
                }
             }
kafkaconsumer接收到的消息是:
        value=POST / HTTP/1.1
        value=Content-Type: application/x-www-form-urlencoded
        value=Cache-Control: no-cache
        value=Pragma: no-cache
        value=User-Agent: Java/1.8.0_112
        value=Host: 192.168.0.101:9093
        value=Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
        value=Connection: keep-alive
        value=Content-Length: 5
        value=
请问如何才能正确获取flume发送给kafka的消息
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条