1234567891011121314151617181920212223242526272829303132333435363738394041 |
- package com.sf.mq.kafka;
- import lombok.extern.slf4j.Slf4j;
- import org.apache.kafka.clients.consumer.ConsumerRecord;
- import org.springframework.kafka.annotation.KafkaListener;
- import org.springframework.kafka.support.Acknowledgment;
- import org.springframework.kafka.support.KafkaHeaders;
- import org.springframework.messaging.handler.annotation.Header;
- import org.springframework.stereotype.Component;
- import java.util.Optional;
- @Component
- @Slf4j
- public class KafkaConsumer {
- public static final String TOPIC_GROUP1 = "topic.group1";
- public static final String TOPIC_GROUP2 = "topic.group2";
- @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP1)
- public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
- Optional message = Optional.ofNullable(record.value());
- if (message.isPresent()) {
- Object msg = message.get();
- log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
- ack.acknowledge();
- }
- }
- @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP2)
- public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
- Optional message = Optional.ofNullable(record.value());
- if (message.isPresent()) {
- Object msg = message.get();
- log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
- ack.acknowledge();
- }
- }
- }
|