分享

kafka权威指南 第三章第5节 序列化

xingoo 2017-7-1 21:59:06 发表于 连载型 [显示全部楼层] 回帖奖励 阅读模式 关闭右栏 2 11386
本帖最后由 xingoo 于 2017-7-9 13:16 编辑

问题导读:
1 如何自定义Kafka序列化?
2 如何使用Avro进行序列化?



    在前面的例子中,producer必须要设置序列化。我们已经看过如何使用默认的字符串序列化。kafka也提供了整型和字节数组,但是这些往往是不能满足大部分的用户需求的,因此最终你还是希望能以一些更通用的方法来序列化记录。
    下面将会从如何自定义序列化开始,然后介绍一种推荐的序列化工具——Avro.

自定义序列化

    如果你发送给kafka的数据不再是简单的String和整型,那么你可以选择一些通用的序列化工具,如Avro,Thrift或者Protobuf来创建记录,也可以通过自定义的序列化工具创建,强烈推荐使用一些通用的序列化工具。但是为了理解序列化的机制中的工作原理,我们就先来看看如何自定义序列化。
    比如,序列化记录存储用户的名字,那么可以简单的创建下面的类:
[mw_shl_code=java,true]public class Customer {
    private int customerID;
    private String customerName;
    public Customer(int ID, String name) {
        this.customerID = ID;
        this.customerName = name;
    }
    public int getID() {
        return customerID;
    }
    public String getName() {
        return customerName;
    }
}[/mw_shl_code]

现在要为这个类创建序列化,可以像下面的代码:

[mw_shl_code=java,true]import org.apache.kafka.common.errors.SerializationException;
import java.nio.ByteBuffer;
import java.util.Map;
public class CustomerSerializer implements Serializer<Customer> {
    @Override
    public void configure(Map configs, boolean isKey) {
        // nothing to configure
    }
    @Override
    /**
    We are serializing Customer as:
    4 byte int representing customerId
    4 byte int representing length of customerName in UTF-8 bytes (0 if name isNull)
    N bytes representing customerName in UTF-8
    */
    public byte[] serialize(String topic, Customer data) {
        try {
            byte[] serializedName;
            int stringSize;
            if (data == null)
                return null;
            else {
                if (data.getName() != null) {
                    serializeName = data.getName().getBytes("UTF-8");
                    stringSize = serializedName.length;
                } else {
                    serializedName = new byte[0];
                    stringSize = 0;
                }
            }
            ByteBuffer buffer = ByteBuffer.allocate(4 + 4 + stringSize);
            buffer.putInt(data.getID());
            buffer.putInt(stringSize);
            buffer.put(serializedName);
            return buffer.array();
        } catch (Exception e) {
            throw new SerializationException("Error when serializing Customer tobyte[] " + e);
        }
    }
    @Override
    public void close() {
        // nothing to close
    }
}[/mw_shl_code]
    在Producer的代码中配置序列化,使用自定义的序列化类,然后发送数据。另外,注意这个代码还是很脆弱的,比如我们有很多的customer,现在需要把id改成long型,或者增加一个新的日期字段,我们会发现很难跟老的消息进行兼容。在不同的版本间进行序列化和反序列化是一件很麻烦的事情,因为需要挨个字节进行对比。更糟糕的是,如果一个公司所有的组都在使用Kafka发送相同的数据,那么他们必须要使用同一个序列化工具,如果有代码需要修改,那么必须全部进行同步更新。
    因此,我们推荐不要自定义新的序列化工具,而是使用类似Apache Avro、Thrift、Protobuf这种通用的工具。在下面的章节中,我们将会介绍如何使用Apache Avro对记录进行序列化。
使用Apache Avro序列化

    Apache Avro是一种通用语言的序列化工具,这个项目是Doug Cutting创建用来在大量的客户端中间进行数据共享的。
    Avro使用一种类似schema的东西来维护数据格式,schema一般都是用Json来描述,然后序列化成二进制文件。Avro在读取和写入文件的时候提供Schema的信息,并且是嵌入在数据我呢件的内部。
    Avro还有一个特性使它更适合在Kafka这种消息系统中传输数据,就是写入消息的时候可以使用新的schema,而读取的时候还是按照之前的shcema读取也不会有任何问题。
    比如原始的schema是下面的这种样子的:
[mw_shl_code=java,true]{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string""},
        {"name": "faxNumber", "type": ["null", "string"], "default": "null"}
    ]
}[/mw_shl_code]

  • id和name字段是必须要有的,其他的字段如fax是可选的,并且默认为null

    如果这个schema已经使用一段时间了,并且已经产生了TB级的数据。随着时间的改变,21世纪的今天已经不需要使用传真了,因此我们使用新的email字段进行替代。
    那么新的schema将会变成
[mw_shl_code=java,true]{
    "namespace": "customerManagement.avro",
    "type": "record",
    "name": "Customer",
    "fields": [
        {"name": "id", "type": "int"},
        {"name": "name", "type": "string"},
        {"name": "email", "type": ["null", "string"], "default": "null"}
    ]
}[/mw_shl_code]

    在升级到新的版本后,新的记录将会包含email字段,老的记录则会包含faxNumber字段,那么consumer读取到这样的数据后怎么处理呢?

    读取数据的应用可能调用类似getName() getId()以及getFaxNumber()这种方法。如果遇到新的Schema的数据,getName()和getId()不会有什么问题,因为他们没有什么修改,但是getFaxNumber()将会返回null,因为现在已经没有faxNumber这个字段了。
    现在假设我们已经升级了consumer的应用程序,现在getFaxNumber()方法被替换成了getEmail()方法。如果我们遇到了老的数据,getEmaill()方法将会返回null,因为这个记录里面还没有email字段。
    最重要的事情是即使我们改变了消息的schema,也不需要改变读取数据的代码,不会有任何异常或者中断发生。
    有两种需要知道的情况:
  • 数据写入时使用的schema和数据读取的时候的schema必须要能匹配上。Avro的官网上面有例子。
  • 在反序列化的时候需要知道数据的schema信息,Avro的schema是跟数据一起存储到文件的。但是在kafka中其实还有其他的更好的做法,稍后会有讲解

在Kafka中使用Avro

    如果我们在Avro的数据文件中同时存储全部的Schema信息,那么记录的大小很可能会翻倍。然而,Avro在读取记录的时候是需要获得整个Schema的,因此需要能在任何地方都能读取到Schema.。为了达到这个目的,可以引入Schema的注册机制。思路就是将Schema的信息存储在注册中心,然后我们只会存储记录的Schema对应的唯一标识。当读取数据的时候,会根据这个唯一标识去注册中心读取Schema的信息,然后进行反序列化。所有的注册操作和读取操作都是由序列化和反序列化器来完成的。数据写入Kafka的时候只需要简单的引用下使用哪个序列化工具就可以 。

下面是一个在Kafka中基于Avro序列化数据的例子:
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", schemaUrl);
String topic = "customerContacts";
int wait = 500;
Producer<String, Customer> producer = new KafkaProducer<String, Customer>(props);
// We keep producing new events until someone ctrl-c
while (true) {
    Customer customer = CustomerGenerator.getNext();
    System.out.println("Generated customer " + customer.toString());
    ProducerRecord<String, Customer> record = new ProducerRecord<>(topic, customer.getId(), customer);
    producer.send(record);
}[/mw_shl_code]

  • 使用KafkaAvroSerializer序列化对象
  • schema.registry.url是一个新的参数,指定了存储schema信息的位置
  • custmer是我们想要生成的对象
  • 需要在ProducerRecord中指定记录的类型为Customer,并且基于Customer对象创建记录
  • 然后使用Avro进行序列化发送数据

如果你想要用Avro序列化一些更通用的对象,那么也可以自定义Schema
[mw_shl_code=java,true]Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("key.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("value.serializer", "io.confluent.kafka.serializers.KafkaAvroSerializer");
props.put("schema.registry.url", url);
String schemaString = "{\"namespace\": \"customerManagement.avro\",\"type\": \"record\", " +
                                   "\"name\": \"Customer\"," +
                                   "\"fields\": [" +
                                   "{\"name\": \"id\", \"type\": \"int\"}," +
                                   "{\"name\": \"name\", \"type\": \"string\"}," +
                                   "{\"name\": \"email\", \"type\": [\"null\",\"string
                                   \"], \"default\":\"null\" }" +
                                   "]}";
Producer<String, GenericRecord> producer = new KafkaProducer<String, GenericRecord>(props);
Schema.Parser parser = new Schema.Parser();
Schema schema = parser.parse(schemaString);
for (int nCustomers = 0; nCustomers < customers; nCustomers++) {
    String name = "exampleCustomer" + nCustomers;
    String email = "example " + nCustomers + "@example.com"
    GenericRecord customer = new GenericData.Record(schema);
    customer.put("id", nCustomer);
    customer.put("name", name);
    customer.put("email", email);
    ProducerRecord<String, GenericRecord> data = new ProducerRecord<String, GenericRecord>("customerContacts",name, customer);
    producer.send(data);
}
}[/mw_shl_code]
  • 同样使用KafkaAvroSerializer
  • 提供注册中心的地址
  • 现在需要提供Schema的信息
  • 需要创建Avro的GenericRecord对象,然后使用shema和data进行初始化
  • ProducerRecord里面的内容是一个GenericRecord对象,它包含了我们自定义的Schema和数据。序列化器会知道怎么从记录中获得schema,并且将schema存储到注册中心,然后序列化对象。



本帖被以下淘专辑推荐:

已有(2)人评论

跳转到指定楼层
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

关闭

推荐上一条 /2 下一条