分享

部署kafka常见问答

丫丫 2016-12-19 14:54:58 发表于 问题解答 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 0 21400
本帖最后由 丫丫 于 2016-12-19 15:06 编辑
问题导读


1、如何对Kafka Broker上持久化的数据进行加密
2、Kafka是否支持跨数据中心的可用性
3、Kafka支持哪些类型的数据转换
4、Kafka支持哪些类型的数据转换
5、如何通过Kafka发送大消息或者超大负荷量?
6、Kafka是否支持MQTT或JMS协议?









是否应当为Kafka Broker使用 固态硬盘 (SSD)
实际上使用SSD盘并不能显著地改善 Kafka 的性能,主要有两个原因:

[mw_shl_code=text,true]
* Kafka写磁盘是异步的,不是同步的。就是说,除了启动、停止之外,Kafka的任何操作都不会去等待磁盘同步(sync)完成;而磁盘同步(disk syncs)总是在后台完成的。这就是为什么Kafka消息至少复制到三个副本是至关重要的,因为一旦单个副本崩溃,这个副本就会丢失数据无法同步写到磁盘。
* 每一个Kafka Partition被存储为一个串行的WAL(Write Ahead Log)日志文件。因此,除了极少数的数据查询,Kafka中的磁盘读写都是串行的。现代的操作系统已经对串行读写做了大量的优化工作。[/mw_shl_code]


如何对Kafka Broker上持久化的数据进行加密


目前,Kafka不提供任何机制对Broker上持久化的数据进行加密。用户可以自己对写入到Kafka的数据进行加密,即是,生产者(Producers)在写Kafka之前加密数据,消费者(Consumers)能解密收到的消息。这就要求生产者(Producers)把加密协议(protocols)和密钥(keys)分享给消费者(Consumers)。

另外一种选择,就是使用软件提供的文件系统级别的加密,例如Cloudera Navigator Encrypt。Cloudera Navigator Encrypt是Cloudera企业版(Cloudera Enterprise)的一部分,在应用程序和文件系统之间提供了一个透明的加密层。

Apache Zookeeper正成为Kafka集群的一个痛点(pain point),真的吗?


Kafka高级消费者(high-level consumer)的早期版本(0.8.1或更早)使用Zookeeper来维护读的偏移量(offsets,主要是Topic的每个Partition的读偏移量)。如果有大量生产者(consumers)同时从Kafka中读数据,对Kafka的读写负载可能就会超出它的容量,Zookeeper就变成一个瓶颈(bottleneck)。当然,这仅仅出现在一些很极端的案例中(extreme cases),即有成百上千个消费者(consumers)在使用同一个Zookeeper集群来管理偏移量(offset)。

不过,这个问题已经在Kafka当前的版本(0.8.2)中解决。从版本0.8.2开始,高级消费者(high-level consumer)能够使用Kafka自己来管理偏移量(offsets)。本质上讲,它使用一个单独的Kafka Topic来管理最近的读偏移量(read offsets),因此偏移量管理(offset management)不再要求Zookeeper必须存在。然后,用户将不得不面临选择是用Kafka还是Zookeeper来管理偏移量(offsets),由消费者(consumer)配置参数 offsets.storage 决定。

Cloudera强烈推荐使用Kafka来存储偏移量。当然,为了保证向后兼容性,你可以继续选择使用Zookeeper存储偏移量。(例如,你可能有一个监控平台需要从Zookeeper中读取偏移量信息。) 假如你不得不使用Zookeeper进行偏移量(offset)管理,我们推荐你为Kafka集群使用一个专用的Zookeeper集群。假如一个专用的Zookeeper集群仍然有性能瓶颈,你依然可以通过在Zookeeper节点上使用固态硬盘(SSD)来解决问题。

Kafka是否支持跨数据中心的可用性

Kafka跨数据中心可用性的推荐解决方案是使用MirrorMaker(https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=27846330 ) 。在你的每一个数据中心都搭建一个Kafka集群,在Kafka集群之间使用MirrorMaker来完成近实时的数据复制。

使用MirrorMaker的架构模式是为每一个”逻辑”的topic在每一个数据中心创建一个topic:例如,在逻辑上你有一个”clicks”的topic,那么你实际上有”DC1.clicks”和“DC2.clicks”两个topic(DC1和DC2指得是你的数据中心)。DC1向DC1.clicks中写数据,DC2向DC2.clicks中写数据。MirrorMaker将复制所有的DC1 topics到DC2,并且复制所有的DC2 topics到DC1。现在每个DC上的应用程序都能够访问写入到两个DC的事件。这个应用程序能够合并信息和处理相应的冲突。

另一种更复杂的模式是在每一个DC都搭建本地和聚合Kafka集群。这个模式已经被Linkedin使用,Linkedin Kafka运维团队已经在这篇

Blog(https://engineering.linkedin.com/kafka/running-kafka-scale )中有详细的描述(参见“Tiers and Aggregation”)。


Kafka支持哪些类型的数据转换(data transformation)


数据流过的Kafka的时候,Kafka并不能进行数据转换。为了处理数据转换,我们推荐如下方法:

[mw_shl_code=text,true]
* 对于简单事件处理,使用Flume Kafka integration(http://blog.cloudera.com/blog/20 ... or-event-processing ),并且写一个简单的Apache Flume Interceptor。
* 对于复杂(事件)处理,使用Apache Spark Streaming从Kafka中读数据和处理数据。[/mw_shl_code]

在这两种情况下,被转换或者处理的数据可被写会到新的Kafka Topic中,或者直接传送到数据的最终消费者(Consumer)那里。

对于实时事件处理模式更全面的描述,看看这篇文章(http://blog.cloudera.com/blog/2015/06/architectural-patterns-for-near-real-time-data-processing-with-apache-hadoop/ )。


如何通过Kafka发送大消息或者超大负荷量?



Cloudera的性能测试表明Kafka达到最大吞吐量的消息大小为10K左右。更大的消息将导致吞吐量下降。然后,在一些情况下,用户需要发送比10K大的多的消息。

如果消息负荷大小是每100s处理MB级别


我们推荐探索以下选择:
[mw_shl_code=text,true]
* 如果可以使用共享存储(HDFS、S3、NAS),那么将超负载放在共享存储上,仅用Kafka发送负载数据位置的消息。
* 对于大消息,在写入Kafka之前将消息拆分成更小的部分,使用消息Key确保所有的拆分部分都写入到同一个partition中,以便于它们能被同一个消息着(Consumer)消费的到,在消费的时候将拆分部分重新组装成一个大消息。

[/mw_shl_code]
在通过Kafka发送大消息时,请记住以下几点:


压缩配置

[mw_shl_code=text,true]
* Kafka生产者(Producers)能够压缩消息。通过配置参数compression.codec确保压缩已经开启。有效的选项为"gzip"和"snappy"。[/mw_shl_code]
Broker配置


[mw_shl_code=text,true]
* message.max.bytes (default: 1000000): Broker能够接受的最大消息。增加这个值以便于匹配你的最大消息。
* log.segment.bytes (default: 1GB): Kafka数据文件的大小。确保它至少大于一条消息。默认情况下已经够用,一般最大的消息不会超过1G大小。
* replica.fetch.max.bytes (default: 1MB): Broker间复制的最大的数据大小。这个值必须大于message.max.bytes,否则一个Broker接受到消息但是会复制失败,从而导致潜在的数据丢失。[/mw_shl_code]
Consumer配置


[mw_shl_code=text,true]
* fetch.message.max.bytes (default: 1MB): Consumer所读消息的最大大小。这个值应该大于或者等于Broker配置的message.max.bytes的值。[/mw_shl_code]
其他方面的考虑

[mw_shl_code=text,true]
* Broker需要针对复制为每一个partition分配一个replica.fetch.max.bytes大小的缓存区。需要计算确认( partition的数量 * 最大消息的大小 )不会超过可用的内存,否则就会引发OOMs(内存溢出异常)。
* Consumers有同样的问题,因子参数为 fetch.message.max.bytes :确认每一个partition的消费者针对最大的消息有足够可用的内存。
* 大消息可能引发更长时间的垃圾回收停顿(garbage collection pauses)(brokers需要申请更大块的内存)。注意观察GC日志和服务器日志。假如发现长时间的GC停顿导致Kafka丢失了Zookeeper session,你可能需要为zookeeper.session.timeout.ms配置更长的timeout值。[/mw_shl_code]
Kafka是否支持MQTT或JMS协议

目前,Kafka针对上述协议不提供直接支持。但是,用户可以自己编写Adaptors从MQTT或者JMS中读取数据,然后写入到Kafka中。


来源:navigating






没找到任何评论,期待你打破沉寂

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条