Qing 1 жил өмнө
parent
commit
b77136da9a
18 өөрчлөгдсөн 331 нэмэгдсэн , 13 устгасан
  1. 18 0
      springboot-demo/pom.xml
  2. 41 0
      springboot-demo/src/main/java/com/sf/mq/kafka/KafkaConsumer.java
  3. 20 0
      springboot-demo/src/main/java/com/sf/mq/kafka/KafkaController.java
  4. 27 0
      springboot-demo/src/main/java/com/sf/mq/kafka/KafkaProducer.java
  5. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java
  6. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java
  7. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/ParamArr.java
  8. 1 3
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/TestParam.java
  9. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectConfig.java
  10. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectListener.java
  11. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutConfig.java
  12. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutListener.java
  13. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicConfig.java
  14. 1 1
      springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicListener.java
  15. 49 0
      springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java
  16. 88 0
      springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java
  17. 33 0
      springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java
  18. 45 1
      springboot-demo/src/main/resources/application.yml

+ 18 - 0
springboot-demo/pom.xml

@@ -108,6 +108,24 @@
             <artifactId>spring-boot-starter-amqp</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-spring-boot-starter</artifactId>
+            <version>2.3.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+            <version>3.1.2</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.10.1</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 41 - 0
springboot-demo/src/main/java/com/sf/mq/kafka/KafkaConsumer.java

@@ -0,0 +1,41 @@
+package com.sf.mq.kafka;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.KafkaHeaders;
+import org.springframework.messaging.handler.annotation.Header;
+import org.springframework.stereotype.Component;
+
+import java.util.Optional;
+
+@Component
+@Slf4j
+public class KafkaConsumer {
+
+    public static final String TOPIC_GROUP1 = "topic.group1";
+
+    public static final String TOPIC_GROUP2 = "topic.group2";
+
+    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP1)
+    public void topic_test(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
+        Optional message = Optional.ofNullable(record.value());
+        if (message.isPresent()) {
+            Object msg = message.get();
+            log.info("topic.group1 消费了: Topic:" + topic + ",Message:" + msg);
+            ack.acknowledge();
+        }
+    }
+
+    @KafkaListener(topics = KafkaProducer.TOPIC_TEST, groupId = TOPIC_GROUP2)
+    public void topic_test1(ConsumerRecord<?, ?> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {
+
+        Optional message = Optional.ofNullable(record.value());
+        if (message.isPresent()) {
+            Object msg = message.get();
+            log.info("topic.group2 消费了: Topic:" + topic + ",Message:" + msg);
+            ack.acknowledge();
+        }
+    }
+}

+ 20 - 0
springboot-demo/src/main/java/com/sf/mq/kafka/KafkaController.java

@@ -0,0 +1,20 @@
+package com.sf.mq.kafka;
+
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.RestController;
+
+@RestController
+public class KafkaController {
+
+    @Autowired
+    private KafkaProducer kafkaProducer;
+
+    //http://localhost:18100/send/hello
+    @GetMapping("/send/{msg}")
+    public void sendMsg(@PathVariable String msg) {
+        kafkaProducer.send(msg);
+    }
+}
+

+ 27 - 0
springboot-demo/src/main/java/com/sf/mq/kafka/KafkaProducer.java

@@ -0,0 +1,27 @@
+package com.sf.mq.kafka;
+
+import com.google.gson.Gson;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.stereotype.Component;
+
+
+@Component
+@Slf4j
+public class KafkaProducer {
+
+    @Autowired
+    private KafkaTemplate<String, Object> kafkaTemplate;
+
+    //自定义topic
+    public static final String TOPIC_TEST = "quickstart-events";
+
+    public void send(Object obj) {
+        String obj2String = new Gson().toJson(obj);
+        log.info("准备发送消息为:{}", obj2String);
+        //发送消息
+        kafkaTemplate.send(TOPIC_TEST, obj);
+    }
+}
+

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java

@@ -1,4 +1,4 @@
-package com.sf.mq;
+package com.sf.mq.rabbitmq;
 
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Component;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/anno/Param.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java

@@ -1,4 +1,4 @@
-package com.sf.mq.anno;
+package com.sf.mq.rabbitmq.anno;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/anno/ParamArr.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/ParamArr.java

@@ -1,4 +1,4 @@
-package com.sf.mq.anno;
+package com.sf.mq.rabbitmq.anno;
 
 import java.lang.annotation.ElementType;
 import java.lang.annotation.Retention;

+ 1 - 3
springboot-demo/src/main/java/com/sf/mq/anno/TestParam.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/TestParam.java

@@ -1,6 +1,4 @@
-package com.sf.mq.anno;
-
-import java.lang.annotation.Annotation;
+package com.sf.mq.rabbitmq.anno;
 
 @ParamArr(arr = {
         @Param(name = "name1", desc = "desc1"),

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/direct/DirectConfig.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectConfig.java

@@ -1,4 +1,4 @@
-package com.sf.mq.direct;
+package com.sf.mq.rabbitmq.direct;
 
 import org.springframework.amqp.core.*;
 import org.springframework.context.annotation.Bean;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/direct/DirectListener.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectListener.java

@@ -1,4 +1,4 @@
-package com.sf.mq.direct;
+package com.sf.mq.rabbitmq.direct;
 
 import org.springframework.amqp.core.ExchangeTypes;
 import org.springframework.amqp.rabbit.annotation.Exchange;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/fanout/FanoutConfig.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutConfig.java

@@ -1,4 +1,4 @@
-package com.sf.mq.fanout;
+package com.sf.mq.rabbitmq.fanout;
 
 import org.springframework.amqp.core.*;
 import org.springframework.context.annotation.Bean;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/fanout/FanoutListener.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutListener.java

@@ -1,4 +1,4 @@
-package com.sf.mq.fanout;
+package com.sf.mq.rabbitmq.fanout;
 
 import org.springframework.amqp.rabbit.annotation.RabbitListener;
 import org.springframework.stereotype.Component;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/topic/TopicConfig.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicConfig.java

@@ -1,4 +1,4 @@
-package com.sf.mq.topic;
+package com.sf.mq.rabbitmq.topic;
 
 import org.springframework.amqp.core.Binding;
 import org.springframework.amqp.core.BindingBuilder;

+ 1 - 1
springboot-demo/src/main/java/com/sf/mq/topic/TopicListener.java → springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicListener.java

@@ -1,4 +1,4 @@
-package com.sf.mq.topic;
+package com.sf.mq.rabbitmq.topic;
 
 import org.springframework.amqp.core.ExchangeTypes;
 import org.springframework.amqp.rabbit.annotation.Exchange;

+ 49 - 0
springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java

@@ -0,0 +1,49 @@
+package com.sf.mq.rocketmq;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.common.message.MessageExt;
+import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
+import org.apache.rocketmq.spring.core.RocketMQListener;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+@Slf4j
+@Component
+public class MQConsumerService {
+
+    // topic需要和生产者的topic一致,consumerGroup属性是必须指定的,内容可以随意
+    // selectorExpression的意思指的就是tag,默认为“*”,不设置的话会监听所有消息
+    @Service
+    @RocketMQMessageListener(topic = "TEST_TOPIC", selectorExpression = "tag1", consumerGroup = "Con_Group_One")
+    public class ConsumerSend implements RocketMQListener<String> {
+        // 监听到消息就会执行此方法
+        @Override
+        public void onMessage(String msg) {
+            log.info("监听到消息:user={}", msg);
+        }
+    }
+
+    // 注意:这个ConsumerSend2和上面ConsumerSend在没有添加tag做区分时,不能共存,
+    // 不然生产者发送一条消息,这两个都会去消费,如果类型不同会有一个报错,所以实际运用中最好加上tag,写这只是让你看知道就行
+    @Service
+    @RocketMQMessageListener(topic = "TEST_TOPIC", consumerGroup = "Con_Group_Two")
+    public class ConsumerSend2 implements RocketMQListener<String> {
+        @Override
+        public void onMessage(String str) {
+            log.info("监听到消息:str={}", str);
+        }
+    }
+
+    // MessageExt:是一个消息接收通配符,不管发送的是String还是对象,都可接收,当然也可以像上面明确指定类型(我建议还是指定类型较方便)
+    @Service
+    @RocketMQMessageListener(topic = "TEST_TOPIC", selectorExpression = "tag2", consumerGroup = "Con_Group_Three")
+    public class Consumer implements RocketMQListener<MessageExt> {
+        @Override
+        public void onMessage(MessageExt messageExt) {
+            byte[] body = messageExt.getBody();
+            String msg = new String(body);
+            log.info("监听到消息:msg={}", msg);
+        }
+    }
+
+}

+ 88 - 0
springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java

@@ -0,0 +1,88 @@
+package com.sf.mq.rocketmq;
+
+import com.alibaba.fastjson.JSON;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.spring.core.RocketMQTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.messaging.support.MessageBuilder;
+import org.springframework.stereotype.Component;
+
+@Slf4j
+@Component
+public class MQProducerService {
+
+    @Value("${rocketmq.producer.send-message-timeout}")
+    private Integer messageTimeOut;
+
+    // 建议正常规模项目统一用一个TOPIC
+    private static final String topic = "TEST_TOPIC";
+
+    // 直接注入使用,用于发送消息到broker服务器
+    @Autowired
+    private RocketMQTemplate rocketMQTemplate;
+
+    /**
+     * 普通发送(这里的参数对象User可以随意定义,可以发送个对象,也可以是字符串等)
+     */
+    public void send(String msg) {
+        // 在 rocketmq-spring-boot-starter 中,Tag 的设置方式: 在 topic后面加上 “:tagName”
+        rocketMQTemplate.convertAndSend(topic + ":tag1", msg);
+//        rocketMQTemplate.send(topic + ":tag1", MessageBuilder.withPayload(user).build()); // 等价于上面一行
+    }
+
+    /**
+     * 发送同步消息(阻塞当前线程,等待broker响应发送结果,这样不太容易丢失消息)
+     * (msgBody也可以是对象,sendResult为返回的发送结果)
+     */
+    public SendResult sendMsg(String msgBody) {
+        // 通过同步方式来发送消息  MessageBuilder来把消息体的内容 打包成载荷  返回一个Message对象
+        SendResult sendResult = rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build());
+        log.info("【sendMsg】sendResult={}", JSON.toJSONString(sendResult));
+        return sendResult;
+    }
+
+    /**
+     * 发送异步消息(通过线程池执行发送到broker的消息任务,执行完后回调:在SendCallback中可处理相关成功失败时的逻辑)
+     * (适合对响应时间敏感的业务场景)
+     */
+    public void sendAsyncMsg(String msgBody) {
+        rocketMQTemplate.asyncSend(topic, MessageBuilder.withPayload(msgBody).build(),
+                new SendCallback() {
+                    @Override
+                    public void onSuccess(SendResult sendResult) {
+                        // 处理消息发送成功逻辑
+                    }
+
+                    @Override
+                    public void onException(Throwable throwable) {
+                        // 处理消息发送异常逻辑
+                    }
+                });
+    }
+
+    /**
+     * 发送延时消息(上面的发送同步消息,delayLevel的值就为0,因为不延时)
+     * 在start版本中 延时消息一共分为18个等级分别为:1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
+     */
+    public void sendDelayMsg(String msgBody, int delayLevel) {
+        rocketMQTemplate.syncSend(topic, MessageBuilder.withPayload(msgBody).build(), messageTimeOut, delayLevel);
+    }
+
+    /**
+     * 发送单向消息(只负责发送消息,不等待应答,不关心发送结果,如日志)
+     */
+    public void sendOneWayMsg(String msgBody) {
+        rocketMQTemplate.sendOneWay(topic, MessageBuilder.withPayload(msgBody).build());
+    }
+
+    /**
+     * 发送带tag的消息,直接在topic后面加上":tag"
+     */
+    public SendResult sendTagMsg(String msgBody) {
+        return rocketMQTemplate.syncSend(topic + ":tag2", MessageBuilder.withPayload(msgBody).build());
+    }
+
+}

+ 33 - 0
springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java

@@ -0,0 +1,33 @@
+package com.sf.mq.rocketmq;
+
+import org.apache.rocketmq.client.producer.SendResult;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RestController;
+
+import java.util.Random;
+
+@RestController
+@RequestMapping("/rocketmq")
+public class RocketMQController {
+
+    @Autowired
+    private MQProducerService mqProducerService;
+
+    // http://localhost:18080/rocketmq/send
+    @GetMapping("/send")
+    public String send() {
+        String msg = "hello spring rocketmq " + new Random().nextInt(100);
+        mqProducerService.send(msg);
+        System.out.println("发送消息:" +msg);
+        return "success";
+    }
+
+    // http://localhost:18080/rocketmq/sendTag
+    @GetMapping("/sendTag")
+    public SendResult sendTag() {
+        SendResult sendResult = mqProducerService.sendTagMsg("带有tag的字符消息");
+        return sendResult;
+    }
+}

+ 45 - 1
springboot-demo/src/main/resources/application.yml

@@ -10,4 +10,48 @@ spring:
     host: 127.0.0.1
     port: 5672
     username: guest
-    password: guest
+    password: guest
+  kafka:
+    bootstrap-servers: 127.0.0.1:9092
+    producer:
+      # 发生错误后,消息重发的次数。
+      retries: 0
+      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
+      batch-size: 16384
+      # 设置生产者内存缓冲区的大小。
+      buffer-memory: 33554432
+      # 键的序列化方式
+      key-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # 值的序列化方式
+      value-serializer: org.apache.kafka.common.serialization.StringSerializer
+      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
+      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
+      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
+      acks: 1
+    consumer:
+      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
+      auto-commit-interval: 1S
+      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
+      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
+      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
+      auto-offset-reset: earliest
+      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
+      enable-auto-commit: false
+      # 键的反序列化方式
+      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+      # 值的反序列化方式
+      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
+    listener:
+      # 在侦听器容器中运行的线程数。
+      concurrency: 5
+      #listner负责ack,每调用一次,就立即commit
+      ack-mode: manual_immediate
+      missing-topics-fatal: false
+
+rocketmq:
+  name-server: 127.0.0.1:9876 # 访问地址
+  producer:
+    group: pg # 必须指定group
+    send-message-timeout: 3000 # 消息发送超时时长,默认3s
+    retry-times-when-send-failed: 3 # 同步发送消息失败重试次数,默认2
+    retry-times-when-send-async-failed: 3 # 异步发送消息失败重试次数,默认2