分享

Kafka权威指南 第四章第8节:我们该如何退出

feilong 2017-7-21 09:37:11 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 12398
问题导读

1.如何干净利落地退出轮询?
2.退出可以调用哪些方法?
3.退出代码是什么样的?





But How Do We Exit? 但是我们该如何退出呢?

在本章的前面,当我们讨论轮询循环时,我要求你不要担心消费者在无限循环中的轮询,我们将讨论如何干净利落地退出循环。

当你决定退出轮询是,你需要启动另外的线程去调用 consumer.wakeup()。如果你是在主线程中运行的消费闭环,这可以从一个shutdownhook做。注意consumer.wakeup()是安全的从一个不同的线程调用的唯一的消费方法。线程唤醒会导致poll()退出并伴随Wakeupexception,或者如果consumer.wakeup()在线程没有等待轮询,该异常将在在下一次轮询迭代时被抛出的。Wakeupexception不需要处理,它只是一种中断循环的方式,但重要的是,在退出线程前,需要调用consumer.close(),这将做最后的提交并将发给组协调员消费者离开消费组的消息,因此负载均衡立即触发,你不需要等待会话超时。

如果用户在主应用程序线程中运行时退出,代码将如下所示。这个例子有点截断,你可以在这里查看完整的示例。
[mw_shl_code=java,false]Runtime.getRuntime().addShutdownHook(new Thread() {
public void run() {
System.out.println("Starting exit...");
consumer.wakeup();
try {
mainThread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
...
try {
// looping until ctrl-c, the shutdown hook will cleanup on exit
while (true) {
ConsumerRecords<String, String> records = movingAvg.consumer.
poll(1000);
System.out.println(System.currentTimeMillis() + " -- waiting
for data...");
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s\n",
record.offset(), record.key(), record.value());
}
for (TopicPartition tp: consumer.assignment())
System.out.println("Committing offset at position:" + consumer.
position(tp));
movingAvg.consumer.commitSync();
}
} catch (WakeupException e) {
// ignore for shutdown
} finally {
consumer.close();//在退出之前,确保你已经完全关闭了消费者
System.out.println("Closed consumer and we are done");
}[/mw_shl_code]

本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条