KafkaConsumer.java 1.5 KB

1234567891011121314151617181920212223242526272829303132333435363738394041
  1. package com.sf.mq.kafka;
  2. import lombok.extern.slf4j.Slf4j;
  3. import org.apache.kafka.clients.consumer.ConsumerRecord;
  4. import org.springframework.kafka.annotation.KafkaListener;
  5. import org.springframework.kafka.support.Acknowledgment;
  6. import org.springframework.kafka.support.KafkaHeaders;
  7. import org.springframework.messaging.handler.annotation.Header;
  8. import org.springframework.stereotype.Component;
  9. import java.util.Optional;
  10. @Component
  11. @Slf4j
  12. public class KafkaConsumer {
  13. public static final String TOPIC_GROUP1 = "topic.group1";
  14. public static final String TOPIC_GROUP2 = "topic.group2";
  15. @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP1)
  16. public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  17. Optional message = Optional.ofNullable(record.value());
  18. if (message.isPresent()) {
  19. Object msg = message.get();
  20. log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
  21. ack.acknowledge();
  22. }
  23. }
  24. @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP2)
  25. public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
  26. Optional message = Optional.ofNullable(record.value());
  27. if (message.isPresent()) {
  28. Object msg = message.get();
  29. log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
  30. ack.acknowledge();
  31. }
  32. }
  33. }