Kafka权威指南:Kafka生产者
Kafka生产者
Kafka 发送消息的主要步骤:

ProducerRecord 对象包括目标主题和发送的内容,还可以制定键或分区。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。
Propertites prop = new Propertites();
prop.put("bootstrap.servers", "broker1:9092,borker2:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);
Kafka 发送消息主要有以下3种方式:
- 发送并忘记:把消息发送给服务器,但并不关心它是否正常到达。
ProducerRecord<String, String> record
= new ProducerRecord<>("CustoerContry", "Precision Products", "France");
try {
producer.send(record);
} catch(Exception e) {
e.printStackTrce();
}
}
- 同步发送:使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
ProducerRecord<String, String> record
= new ProducerRecord<>("CustoerContry", "Precision Products", "France");
try {
producer.send(record).get();
} catch(Exception e) {
e.printStackTrce();
}
}
- 异步发送:调用 send() 方法, 并指定一个回调函数,服务器在返回响应时调用该函数。
This chapter requires login to view full content. You are viewing a preview.
Login to View Full Content