程序员社区

SpringBoot笔记(四):SpringBoot整合Kafka

1.引入Maven依赖


<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.2.RELEASE</version>
</dependency>

2.application.yml配置文件

新增如下配置:

spring:

Kafka配置

kafka:
producer:

发送端brokers 集群

  bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
  # 发送端 id
  client-id: producerDemo
  # 发送端确认模式
  acks: -1
  # 发送失败重试次数
  retries: 3
  # 批处理条数,当多个记录被发送至统一分区时,producer对于同一个分区来说,会按照 batch.size 的大小进行统一收集,批量发送
  batch-size: 4096
  # 33554432 即32MB的批处理缓冲区
  buffer-memory: 40960
  # key 序列化
  key-serializer: org.apache.kafka.common.serialization.StringSerializer
  # value 序列化
  value-serializer: org.apache.kafka.common.serialization.StringSerializer
  properties:
    # 与 batch.size 配合使用。延迟统一收集,产生聚合,然后批量发送至broker
    linger.ms: 10
consumer:
  # 消费端 brokers 集群
  bootstrap-servers: 192.168.204.201:9092,192.168.204.202:9092,192.168.204.203:9092
  # 消费者 group.id 组ID
  group-id: test-group
  # 消费者消费消息后,进行自动提交
  enable-auto-commit: true
  # 自动提交的频率(与 enable.auto.commit = true 属性配合使用)
  auto-commit-interval: 1000
  # 新的groupid,是否从头开始消费
  auto-offset-reset: earliest
  # key 反序列化
  key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
  # value 反序列化
  value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3.Controller类 ----生产&消费

@Controller
@RequestMapping("kafka")
public class KafkaController {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    /**
     * 发送消息
     */
    @RequestMapping("producer.htm")
    @ResponseBody
    public void getUser(ModelAndView view){
        kafkaTemplate.send("testTopic","消息发送");
        kafkaTemplate.send("testTopic1","消息发送1");
        kafkaTemplate.send("testTopic2","消息发送2");
    }

    /**
     * 消费testTopic中的消息
     */
    @KafkaListener(topics = {"testTopic"})
    public void topicMessage(ConsumerRecord<?, ?> record,String content){
        System.out.println("消息:"+ content);
        System.out.println("消息被消费.+++++++++++Topic:"+ record.topic() + ",+++++++++++++Message:" + record.value() );
    }

    /**
     * 消费testTopic1中的消息
     */
    @KafkaListener(topics = {"testTopic1"})
    public void topicMessage1(String content){
        System.out.println("消息被消费1:"+ content);
    }

    /**
     * 消费testTopic2中的消息
     */
    @KafkaListener(topics = {"testTopic2"})
    public void topicMessage2(String content){
        System.out.println("消息被消费2:"+ content);
    }

}

至此,SpringBoot 整合 Kafka 总结完毕。

如果本文对你有所帮助,那就给我点个赞呗 ^_^
————————————————

赞(0) 打赏
未经允许不得转载:IDEA激活码 » SpringBoot笔记(四):SpringBoot整合Kafka

一个分享Java & Python知识的社区