分享

请教storm-kafka关于consumer group的问题

caiyifeng 发表于 2015-8-12 15:00:10 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 4 39450
求教各位:
我有一个topic,两个Topology,都需要消费该topic的数据,好像需要设置不同的consumer group才能支持,
请问程序中如何设置参数才能支持呢

已有(4)人评论

跳转到指定楼层
Alkaloid0515 发表于 2015-8-12 15:48:44
  Consumers
    本质上kafka只支持Topic.每个consumer属于一个consumer group;反过来说,每个group中可以有多个consumer.发送到Topic的消息,只会被订阅此Topic的每个group中的一个consumer消费.

    如果所有的consumer都具有相同的group,这种情况和queue模式很像;消息将会在consumers之间负载均衡.
    如果所有的consumer都具有不同的group,那这就是"发布-订阅";消息将会广播给所有的消费者.
    在kafka中,一个partition中的消息只会被group中的一个consumer消费;每个group中consumer消息消费互相独立;我们可以认为一个group是一个"订阅"者,一个Topic中的每个partions,只会被一个"订阅者"中的一个consumer消费,不过一个consumer可以消费多个partitions中的消息.kafka只能保证一个partition中的消息被某个consumer消费时,消息是顺序的.事实上,从Topic角度来说,消息仍不是有序的.

    kafka的设计原理决定,对于一个topic,同一个group中不能有多于partitions个数的consumer同时消费,否则将意味着某些consumer将无法得到消息.

##############################################

consumer端部署及API



1、consumer.properties:文件位于/resources目录下

[mw_shl_code=bash,true]zookeeper.connect=192.168.0.1:2181test-datacenter/test-server
# timeout in ms for connecting to zookeeper
zookeeper.connectiontimeout.ms=1000000
#consumer group id
group.id=test-group
#consumer timeout
#consumer.timeout.ms=5000[/mw_shl_code]


2、JAVA API实现

[mw_shl_code=java,true]import java.io.UnsupportedEncodingException;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

import kafka.consumer.*;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

import org.apache.commons.collections.CollectionUtils;


public class kafkaConsumer {

  public static void main(String[] args) throws InterruptedException, UnsupportedEncodingException {

    Properties properties = new Properties();
    properties.put("zookeeper.connect", "192.168.0.1:2181/test-datacenter/test-server");
    properties.put("auto.commit.enable", "true");
    properties.put("auto.commit.interval.ms", "60000");
    properties.put("group.id", "test");

    ConsumerConfig consumerConfig = new ConsumerConfig(properties);

    ConsumerConnector javaConsumerConnector = Consumer.createJavaConsumerConnector(consumerConfig);

    //topic的过滤器
    Whitelist whitelist = new Whitelist("test");
    List<KafkaStream<byte[], byte[]>> partitions = javaConsumerConnector.createMessageStreamsByFilter(whitelist);

    if (CollectionUtils.isEmpty(partitions)) {
      System.out.println("empty!");
      TimeUnit.SECONDS.sleep(1);
    }

    //消费消息
    for (KafkaStream<byte[], byte[]> partition : partitions) {

      ConsumerIterator<byte[], byte[]> iterator = partition.iterator();
      while (iterator.hasNext()) {
        MessageAndMetadata<byte[], byte[]> next = iterator.next();
        System.out.println("partiton:" + next.partition());
        System.out.println("offset:" + next.offset());
        System.out.println("message:" + new String(next.message(), "utf-8"));
      }
    }
  }
}[/mw_shl_code]




回复

使用道具 举报

xuanxufeng 发表于 2015-8-12 16:09:59




consumer基本配置如下:

       group.id

       zookeeper.connect


PropertyDefaultDescription
group.id 用来唯一标识consumer进程所在组的字符串,如果设置同样的group  id,表示这些processes都是属于同一个consumer  group
zookeeper.connect 指定zookeeper的连接的字符串,格式是hostname:port,此处host和port都是zookeeper server的host和port,为避免某个zookeeper 机器宕机之后失联,你可以指定多个hostname:port,使用逗号作为分隔:
hostname1:port1,hostname2:port2,hostname3:port3
可以在zookeeper连接字符串中加入zookeeper的chroot路径,此路径用于存放他自己的数据,方式:
hostname1:port1,hostname2:port2,hostname3:port3/chroot/path
consumer.idnull不需要设置,一般自动产生
socket.timeout.ms30*100网络请求的超时限制。真实的超时限制是   max.fetch.wait+socket.timeout.ms
socket.receive.buffer.bytes64*1024socket用于接收网络请求的缓存大小
fetch.message.max.bytes1024*1024每次fetch请求中,针对每次fetch消息的最大字节数。这些字节将会督导用于每个partition的内存中,因此,此设置将会控制consumer所使用的memory大小。这个fetch请求尺寸必须至少和server允许的最大消息尺寸相等,否则,producer可能发送的消息尺寸大于consumer所能消耗的尺寸。
num.consumer.fetchers1用于fetch数据的fetcher线程数
auto.commit.enabletrue如果为真,consumer所fetch的消息的offset将会自动的同步到zookeeper。这项提交的offset将在进程挂掉时,由新的consumer使用
auto.commit.interval.ms60*1000consumer向zookeeper提交offset的频率,单位是秒
queued.max.message.chunks2用于缓存消息的最大数目,以供consumption。每个chunk必须和fetch.message.max.bytes相同
rebalance.max.retries4当新的consumer加入到consumer  group时,consumers集合试图重新平衡分配到每个consumer的partitions数目。如果consumers集合改变了,当分配正在执行时,这个重新平衡会失败并重入
fetch.min.bytes1每次fetch请求时,server应该返回的最小字节数。如果没有足够的数据返回,请求会等待,直到足够的数据才会返回。
fetch.wait.max.ms100如果没有足够的数据能够满足fetch.min.bytes,则此项配置是指在应答fetch请求之前,server会阻塞的最大时间。
rebalance.backoff.ms2000在重试reblance之前backoff时间
refresh.leader.backoff.ms200在试图确定某个partition的leader是否失去他的leader地位之前,需要等待的backoff时间
auto.offset.resetlargestzookeeper中没有初始化的offset时,如果offset是以下值的回应:
smallest:自动复位offset为smallest的offset
largest:自动复位offset为largest的offset
anything  else:向consumer抛出异常
consumer.timeout.ms-1如果没有消息可用,即使等待特定的时间之后也没有,则抛出超时异常
exclude.internal.topicstrue是否将内部topics的消息暴露给consumer
paritition.assignment.strategyrange选择向consumer 流分配partitions的策略,可选值:range,roundrobin
client.idgroup id value是用户特定的字符串,用来在每次请求中帮助跟踪调用。它应该可以逻辑上确认产生这个请求的应用
zookeeper.session.timeout.ms6000zookeeper 会话的超时限制。如果consumer在这段时间内没有向zookeeper发送心跳信息,则它会被认为挂掉了,并且reblance将会产生
zookeeper.connection.timeout.ms6000客户端在建立通zookeeper连接中的最大等待时间
zookeeper.sync.time.ms2000ZK follower可以落后ZK leader的最大时间
offsets.storagezookeeper用于存放offsets的地点: zookeeper或者kafka
offset.channel.backoff.ms1000重新连接offsets channel或者是重试失败的offset的fetch/commit请求的backoff时间
offsets.channel.socket.timeout.ms10000当读取offset的fetch/commit请求回应的socket 超时限制。此超时限制是被consumerMetadata请求用来请求offset管理
offsets.commit.max.retries5重试offset commit的次数。这个重试只应用于offset  commits在shut-down之间。他
dual.commit.enabledtrue如果使用“kafka”作为offsets.storage,你可以二次提交offset到zookeeper(还有一次是提交到kafka)。在zookeeper-based的offset  storage到kafka-based的offset storage迁移时,这是必须的。对任意给定的consumer  group来说,比较安全的建议是当完成迁移之后就关闭这个选项
partition.assignment.strategyrange在“range”和“roundrobin”策略之间选择一种作为分配partitions给consumer 数据流的策略; 循环的partition分配器分配所有可用的partitions以及所有可用consumer  线程。它会将partition循环的分配到consumer线程上。如果所有consumer实例的订阅都是确定的,则partitions的划分是确定的分布。循环分配策略只有在以下条件满足时才可以:(1)每个topic在每个consumer实力上都有同样数量的数据流。(2)订阅的topic的集合对于consumer  group中每个consumer实例来说都是确定的。


更多细节可以查看  scala类:  kafka.consumer.ConsumerConfig




回复

使用道具 举报

caiyifeng 发表于 2015-8-12 16:49:19
回复

使用道具 举报

arsenduan 发表于 2015-8-12 19:42:51
caiyifeng 发表于 2015-8-12 16:49
多谢楼上两位,
我现在用的java,我想了解的是:
两个Topology都是通过kafkaSpout从kafka抓 ...


你这个问题,别人都不会碰到的,都是运用已有的知识,然后进行尝试


group.id用来唯一标识consumer进程所在组的字符串,如果设置同样的group  id,表示这些processes都是属于同一个consumer  group


上面比较很清楚了,你设置一下不同group.id试一下

回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条