1.引入Maven依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.2.RELEASE</version>
</dependency>
2.application.yml配置文件
新增如下配置:
spring:
Kafka配置
kafka:
producer:
发送端brokers 集群
bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
# 发送端 id
client-id: producerDemo
# 发送端确认模式
acks: -1
# 发送失败重试次数
retries: 3
# 批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
batch-size: 4096
# 33554432 即32MB的批处理缓冲区
buffer-memory: 40960
# key 序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# value 序列化
value-serializer: org.apache.kafka.common.serialization.StringSerializer
properties:
# 与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
linger.ms: 10
consumer:
# 消费端 brokers 集群
bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
# 消费者 group.id 组ID
group-id: test-group
# 消费者消费消息后,进行自动提交
enable-auto-commit: true
# 自动提交的频率(与 enable.auto.commit = true 属性配合使用)
auto-commit-interval: 1000
# 新的groupid,是否从头开始消费
auto-offset-reset: earliest
# key 反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# value 反序列化
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
3.Controller类 ----生产&消费
@Controller
@RequestMapping("kafka")
public class KafkaController {
@Autowired
private KafkaTemplate kafkaTemplate;
/**
* 发送消息
*/
@RequestMapping("producer.htm")
@ResponseBody
public void getUser(ModelAndView view){
kafkaTemplate.send("testTopic","消息发送");
kafkaTemplate.send("testTopic1","消息发送1");
kafkaTemplate.send("testTopic2","消息发送2");
}
/**
* 消费testTopic中的消息
*/
@KafkaListener(topics = {"testTopic"})
public void topicMessage(ConsumerRecord<?, ?> record,String content){
System.out.println("消息:"+ content);
System.out.println("消息被消费.+++++++++++Topic:"+ record.topic() + ",+++++++++++++Message:" + record.value() );
}
/**
* 消费testTopic1中的消息
*/
@KafkaListener(topics = {"testTopic1"})
public void topicMessage1(String content){
System.out.println("消息被消费1:"+ content);
}
/**
* 消费testTopic2中的消息
*/
@KafkaListener(topics = {"testTopic2"})
public void topicMessage2(String content){
System.out.println("消息被消费2:"+ content);
}
}
至此,SpringBoot 整合 Kafka 总结完毕。
如果本文对你有所帮助,那就给我点个赞呗 ^_^
————————————————