package com.sf.mq.kafka; import lombok.extern.slf4j.Slf4j; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component; import java.util.Optional; @Component @Slf4j public class KafkaConsumer { public static final String TOPIC_GROUP1 = "topic.group1"; public static final String TOPIC_GROUP2 = "topic.group2"; @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP1) public void topic_test(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } } @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP2) public void topic_test1(ConsumerRecord record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) { Optional message = Optional.ofNullable(record.value()); if (message.isPresent()) { Object msg = message.get(); log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg); ack.acknowledge(); } } }