public static void main(String[] args) throws Exception {
Properties props = new Properties();
//设置代理\服务器地址
props.put("metadata.broker.list", "s102:9092, s103:9092, s104:9092");
//序列化类,string
props.put("serializer.class", "kafka.serializer.StringEncoder");
//包装java的prop,包装成ProducerConfig
ProducerConfig config = new ProducerConfig(props);
//使用producerConfig初始化producer
//<String, String> 中第一个为key类型(未接触到),第二个是value类型,真实数据
Producer<String, String> producer = new Producer<String, String>(config);
String topic = "t3";
for (int i = 1000; i < 2000; i++) {
KeyedMessage<String, String> data = new KeyedMessage<String, String>(topic, "tom" + i);
producer.send(data);
Thread.sleep(500);
}
producer.close();
}
|