Kafka权威指南:Kafka生产者

Kafka生产者

Kafka 发送消息的主要步骤:

ProducerRecord 对象包括目标主题和发送的内容,还可以制定键或分区。如果消息成功写入 Kafka,就返回一个 RecordMetaData 对象,它包含了主题和分区信息,以及记录在分区里的偏移量。如果写入失败,则会返回一个错误。

Propertites prop = new Propertites();
prop.put("bootstrap.servers", "broker1:9092,borker2:9092");
prop.put("key.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");
prop.put("value.deserializer", "org.apache.kafka.common.serialiation.StringDeserializer");

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(prop);

Kafka 发送消息主要有以下3种方式:

  • 发送并忘记:把消息发送给服务器,但并不关心它是否正常到达。
ProducerRecord<String, String> record 
        = new ProducerRecord<>("CustoerContry", "Precision Products", "France");
    try {
        producer.send(record);
    } catch(Exception e) {
        e.printStackTrce();
    }
}
  • 同步发送:使用 send() 方法发送消息,它会返回一个 Future 对象,调用 get() 方法进行等待,就可以知道消息是否发送成功。
ProducerRecord<String, String> record 
        = new ProducerRecord<>("CustoerContry", "Precision Products", "France");
    try {
        producer.send(record).get();
    } catch(Exception e) {
        e.printStackTrce();
    }
}
  • 异步发送:调用 send() 方法, 并指定一个回调函数,服务器在返回响应时调用该函数。

This chapter requires login to view full content. You are viewing a preview.

Login to View Full Content

Course Curriculum

3

框架与 I/O:Spring、Netty 与 Web 容器

理解 Spring Boot 自动装配、AOP 与事务原理,掌握 Netty Reactor 模型及 Tomcat 连接处理机制,构建高内聚、易扩展的应用服务层。
4

高性能中间件:消息、缓存与存储

熟练运用 MySQL 索引/事务、Redis 缓存策略、Kafka/RocketMQ 消息可靠性,以及 ZooKeeper 分布式协调,搭建稳定、解耦的分布式数据底座。
6

云原生:容器化、可观测性与工程效能

通过 Docker/K8s 实现弹性部署,集成 Metrics/Logs/Traces 构建可观测体系,推动 DevOps 与自动化,让架构在云上持续交付与进化。