分享

Zookeeper实践:通过Zookeeper实现一个消费者进程分配程序

sstutu 2015-5-24 14:37:18 发表于 实操演练 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 31880
本帖最后由 sstutu 于 2015-5-24 14:38 编辑


背景需要实现一个分布式监控程序,Agent把监控消息发送到Kafka,由消费者进程使用High Level API从Kafka获取消息

需求

随着Agent数量的增加,每秒产生的消息数量也在增加,考虑到High Level的API使消费者进程与分区的固定关系,所以需要使用Zookeeper来维护一个消费者的集群,使消费者可以根据Kafka分区的数量以及消费者集群的个数,动态的分配每个消费者节点上运行的消费者进程数

实现

[mw_shl_code=bash,true]package com.eric.hadoop.zookeeper.coordination;  
  
import java.net.InetAddress;  
import java.net.UnknownHostException;  
import java.util.HashMap;  
import java.util.List;  
import java.util.Map;  
  
import org.apache.curator.RetryPolicy;  
import org.apache.curator.framework.CuratorFramework;  
import org.apache.curator.framework.CuratorFrameworkFactory;  
import org.apache.curator.framework.recipes.cache.NodeCache;  
import org.apache.curator.framework.recipes.cache.NodeCacheListener;  
import org.apache.curator.framework.recipes.cache.PathChildrenCache;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent;  
import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener;  
import org.apache.curator.retry.ExponentialBackoffRetry;  
import org.apache.curator.utils.EnsurePath;  
import org.apache.zookeeper.CreateMode;  
import org.codehaus.plexus.util.StringUtils;  
  
/**
* 根据KafkaConsumerManager运行的数量,动态的分配运行Consumer的进行数量
*  
* @author aihua.sun
*/  
public class KafkaConsumerManager {  
    private static final String KAFKA_PART_COUNT = "10";  
    private static final String IPS = "localhost:2181";  
    private static final int SESSION_TIMEOUT = 5000;  
    private static final int CONNECT_TIMEOUT = 5000;  
    private static final String NAMESPACE = "monitor_managers";  
    private static final String KAFKA_PATH = "/kafka_count";  
    private static final String MONITOR_NODES_PATH = "/nodes";  
    private static final String ALLOC_NODE = "/alloc_node";  
    private String tempNodeId;  
    private String oldConsumerThreadCount = "0";  
    private CuratorFramework client;  
  
    public static void main(String[] args) {  
        KafkaConsumerManager kafkaConsumerManager = new KafkaConsumerManager();  
        kafkaConsumerManager.initClient();  
        kafkaConsumerManager.startConsumerThreads();  
    }  
  
    public void initClient() {  
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
        client = CuratorFrameworkFactory.builder().connectString(IPS).sessionTimeoutMs(SESSION_TIMEOUT)  
                .connectionTimeoutMs(CONNECT_TIMEOUT).retryPolicy(retryPolicy).namespace(NAMESPACE).build();  
        client.start();  
    }  
  
    public void startConsumerThreads() {  
        registerKafkaPartCount();  
        registerMonitorNodeIntoCluster();  
        registerChildChangeListener();  
        registerDataChangeListener();  
        try {  
            Thread.sleep(Integer.MAX_VALUE);  
        } catch (InterruptedException e) {  
            e.printStackTrace();  
        }  
    }  
  
    private void registerKafkaPartCount() {  
        EnsurePath ensurePath = new EnsurePath("/" + NAMESPACE + KAFKA_PATH);  
        try {  
            ensurePath.ensure(client.getZookeeperClient());  
            client.setData().forPath(KAFKA_PATH, KAFKA_PART_COUNT.getBytes());  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    private void registerMonitorNodeIntoCluster() {  
        InetAddress address;  
        try {  
            address = InetAddress.getLocalHost();  
            tempNodeId = "/" + address.getHostName() + System.currentTimeMillis();  
            EnsurePath ensureNodesPath = new EnsurePath("/" + NAMESPACE + MONITOR_NODES_PATH);  
            EnsurePath ensureAllocPath = new EnsurePath("/" + NAMESPACE + ALLOC_NODE);  
            ensureNodesPath.ensure(client.getZookeeperClient());  
            ensureAllocPath.ensure(client.getZookeeperClient());  
            // 创建节点的时候,默认放0  
            client.create().withMode(CreateMode.EPHEMERAL).forPath(MONITOR_NODES_PATH + tempNodeId, "0".getBytes());  
            client.setData().forPath(ALLOC_NODE, tempNodeId.getBytes());  
            System.out.println("发送添加节点请求:" + tempNodeId);  
        } catch (UnknownHostException e) {  
            e.printStackTrace();  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
    /**
     * 节点变化的监听器 当节点数量发生变化的时候,重新分配每个节点运行的consumer进程数
     */  
    private void registerChildChangeListener() {  
        try {  
            final PathChildrenCache nodeCache = new PathChildrenCache(client, MONITOR_NODES_PATH, true);  
            nodeCache.start();  
            nodeCache.getListenable().addListener(new PathChildrenCacheListener() {  
                @Override  
                public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception {  
                    String allocNode = new String(client.getData().forPath(ALLOC_NODE));  
                    // 只有新添加的加点才进行节点的消费者线程数分配  
                        switch (event.getType()) {  
                        case CHILD_ADDED:  
                            System.out.println("添加新节点:" + event.getData().getPath());  
                            reAlloctionConsumer();  
                            break;  
                        case CHILD_REMOVED:  
                            System.out.println("删除节点:" + event.getData().getPath());  
                            reAlloctionConsumer();  
                            break;  
                        default:  
                            break;  
                        }  
                }  
            });  
  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
    }  
  
    // 重新分配每个Monitor-Manager启动的消费者进程数  
    private void reAlloctionConsumer() {  
        String kafkaPartCountStr;  
        try {  
            kafkaPartCountStr = new String(client.getData().forPath(KAFKA_PATH));  
  
            List<String> childrenList = client.getChildren().forPath(MONITOR_NODES_PATH);  
            int childrenListSize = childrenList.size();  
            if (StringUtils.isAlphanumeric(kafkaPartCountStr) && childrenListSize > 0) {  
                int kafkaPartCount = Integer.valueOf(kafkaPartCountStr);  
                Map<String, Integer> nodeConsuemerCounter = new HashMap<String, Integer>();  
                for (int i = 1; i <= kafkaPartCount; i++) {  
                    String nodeName = childrenList.get(i % childrenListSize);  
                    if (nodeConsuemerCounter.get(nodeName) == null) {  
                        nodeConsuemerCounter.put(nodeName, 1);  
                    } else {  
                        int oldPartCount = nodeConsuemerCounter.get(nodeName);  
                        nodeConsuemerCounter.put(nodeName, oldPartCount + 1);  
                    }  
                }  
                System.out.println("重新分配后各主机运行的消费者进程数");  
                for (String nodeName : nodeConsuemerCounter.keySet()) {  
                    Integer consumerThreadCount = nodeConsuemerCounter.get(nodeName);  
                    System.out.println("节点名称:" + nodeName + " 消费者进程数:" + consumerThreadCount);  
                    client.setData().forPath(MONITOR_NODES_PATH + "/" + nodeName,  
                            consumerThreadCount.toString().getBytes());  
                }  
            }  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
  
    }  
  
    private void registerDataChangeListener() {  
        try {  
            final NodeCache nodeCache = new NodeCache(client, MONITOR_NODES_PATH + tempNodeId, false);  
            nodeCache.start();  
            nodeCache.getListenable().addListener(new NodeCacheListener() {  
                @Override  
                public void nodeChanged() throws Exception {  
                    String newConsumerThreadCountStr = new String(nodeCache.getCurrentData().getData());  
  
                    //只有新的节点数发生变化时才执行对应的操作  
                    if (!oldConsumerThreadCount.equals(newConsumerThreadCountStr)) {  
  
                        if (StringUtils.isNumeric(newConsumerThreadCountStr)) {  
                            System.out.println("该主机上新的消费者进程数为:" + newConsumerThreadCountStr);  
                            oldConsumerThreadCount = newConsumerThreadCountStr;  
                        } else {  
                            System.out.println("该主机上产生了不合法的消费者进程数:" + newConsumerThreadCountStr);  
                        }  
                    }  
                }  
            });  
        } catch (Exception e) {  
            e.printStackTrace();  
        }  
    }  
  
}  [/mw_shl_code]











欢迎加入about云群425860289432264021 ,云计算爱好者群,关注about云腾讯认证空间

没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条