Przeglądaj źródła

0324 spring amqp

Qing 1 rok temu
rodzic
commit
ff9a102123

+ 5 - 0
springboot-demo/pom.xml

@@ -103,6 +103,11 @@
             <artifactId>spring-boot-starter-data-redis</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-amqp</artifactId>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 21 - 0
springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java

@@ -0,0 +1,21 @@
+package com.sf.mq;
+
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class MyRabbitListener {
+
+    // 对于通过springboot来消费消息的方式 简化了消费者绑定队列的逻辑
+    // 可以直接通过队列名 使得一个方法成为接受消息的方法
+    @RabbitListener(queues = "myQueue")
+    public void consume(String msg) {
+        System.out.println("consume1: " + msg);
+    }
+
+    // 第二个消费者
+    @RabbitListener(queues = "myQueue")
+    public void consume1(String msg) {
+        System.out.println("consume2: " + msg);
+    }
+}

+ 35 - 0
springboot-demo/src/main/java/com/sf/mq/direct/DirectConfig.java

@@ -0,0 +1,35 @@
+package com.sf.mq.direct;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class DirectConfig {
+
+    // 通过bean的声明方式 来创建交换器
+    // 声明交换器时 只需要类型和名字 直接使用对应类型的实现类FanoutExchange
+    // 通过构造器指定名字
+    @Bean
+    public DirectExchange directExchange() {
+        return new DirectExchange("lovecoding.direct");
+    }
+
+    // 通过bean的声明方式 来创建队列
+    // 通过队列的构造器指定名字
+    // 注意引用的包来自 org.springframework.amqp.core
+    @Bean
+    public Queue directQueue() {
+        return new Queue("direct.queue1");
+    }
+
+    // 绑定交换器和队列
+    // 当前是fanout类型 不需要routingKey
+    // 绑定关系叫做Binding类  通过BindingBuilder来构造绑定关系
+    // bind方法后面加队列 再使用链式编程  to方法后面加交换器
+    @Bean
+    public Binding bindingDirect(Queue directQueue, DirectExchange directExchange) {
+        return BindingBuilder.bind(directQueue).to(directExchange).with("111");
+    }
+
+}

+ 29 - 0
springboot-demo/src/main/java/com/sf/mq/direct/DirectListener.java

@@ -0,0 +1,29 @@
+package com.sf.mq.direct;
+
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.rabbit.annotation.Exchange;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class DirectListener {
+
+    // 在使用RabbitListener注解时
+    // 可以直接声明队列、交换器、以及routingKey 建立他们之间的绑定关系
+    // 参数bindings 需要结合注解使用@QueueBinding 代替前面config的使用
+    // 在binding的关系中  还需要队列和交换器的bean 使用@Queue和@Exchange来代替
+    // 是一种注解嵌套注解的用法
+    // 其中 ExchangeTypes.DIRECT是默认值
+    @RabbitListener(bindings = @QueueBinding(
+            value = @Queue(name = "direct.queue"),
+            exchange = @Exchange(name = "lovecoding.direct", type = ExchangeTypes.DIRECT),
+            key = {"222"}
+    ))
+//    @RabbitListener(queues = "direct.queue")
+    public void consume(String msg) {
+        System.out.println("direct.queue consume: " + msg);
+    }
+
+}

+ 47 - 0
springboot-demo/src/main/java/com/sf/mq/fanout/FanoutConfig.java

@@ -0,0 +1,47 @@
+package com.sf.mq.fanout;
+
+import org.springframework.amqp.core.*;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class FanoutConfig {
+
+    // 通过bean的声明方式 来创建交换器
+    // 声明交换器时 只需要类型和名字 直接使用对应类型的实现类FanoutExchange
+    // 通过构造器指定名字
+    @Bean
+    public FanoutExchange fanoutExchange() {
+//        TopicExchang
+//        DirectExchange
+        return new FanoutExchange("lovecoding.fanout");
+    }
+
+    // 通过bean的声明方式 来创建队列
+    // 通过队列的构造器指定名字
+    // 注意引用的包来自 org.springframework.amqp.core
+    @Bean
+    public Queue fanoutQueue() {
+        return new Queue("fanout.queue");
+    }
+
+    // 再增加一个队列和绑定关系  来验证广播效果
+    @Bean
+    public Queue fanoutQueue1() {
+        return new Queue("fanout.queue1");
+    }
+
+    // 绑定交换器和队列
+    // 当前是fanout类型 不需要routingKey
+    // 绑定关系叫做Binding类  通过BindingBuilder来构造绑定关系
+    // bind方法后面加队列 再使用链式编程  to方法后面加交换器
+    @Bean
+    public Binding binding(Queue fanoutQueue, FanoutExchange fanoutExchange) {
+        return BindingBuilder.bind(fanoutQueue).to(fanoutExchange);
+    }
+
+    @Bean
+    public Binding binding1(Queue fanoutQueue1, FanoutExchange fanoutExchange) {
+        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
+    }
+}

+ 19 - 0
springboot-demo/src/main/java/com/sf/mq/fanout/FanoutListener.java

@@ -0,0 +1,19 @@
+package com.sf.mq.fanout;
+
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class FanoutListener {
+
+    @RabbitListener(queues = "fanout.queue")
+    public void consume(String msg) {
+        System.out.println("fanout.queue consume: " + msg);
+    }
+
+    @RabbitListener(queues = "fanout.queue1")
+    public void consume1(String msg) {
+        System.out.println("fanout.queue1 consume: " + msg);
+    }
+
+}

+ 38 - 0
springboot-demo/src/main/java/com/sf/mq/topic/TopicConfig.java

@@ -0,0 +1,38 @@
+package com.sf.mq.topic;
+
+import org.springframework.amqp.core.Binding;
+import org.springframework.amqp.core.BindingBuilder;
+import org.springframework.amqp.core.Queue;
+import org.springframework.amqp.core.TopicExchange;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+public class TopicConfig {
+
+    // 通过bean的声明方式 来创建交换器
+    // 声明交换器时 只需要类型和名字 直接使用对应类型的实现类FanoutExchange
+    // 通过构造器指定名字
+    @Bean
+    public TopicExchange topicExchange() {
+        return new TopicExchange("lovecoding.topic1");
+    }
+
+    // 通过bean的声明方式 来创建队列
+    // 通过队列的构造器指定名字
+    // 注意引用的包来自 org.springframework.amqp.core
+    @Bean
+    public Queue topicQueue() {
+        return new Queue("topic.queue1");
+    }
+
+    // 绑定交换器和队列
+    // 当前是fanout类型 不需要routingKey
+    // 绑定关系叫做Binding类  通过BindingBuilder来构造绑定关系
+    // bind方法后面加队列 再使用链式编程  to方法后面加交换器
+    @Bean
+    public Binding bindingTopic(Queue topicQueue, TopicExchange topicExchange) {
+        return BindingBuilder.bind(topicQueue).to(topicExchange).with("*.*.harbin");
+    }
+
+}

+ 33 - 0
springboot-demo/src/main/java/com/sf/mq/topic/TopicListener.java

@@ -0,0 +1,33 @@
+package com.sf.mq.topic;
+
+import org.springframework.amqp.core.ExchangeTypes;
+import org.springframework.amqp.rabbit.annotation.Exchange;
+import org.springframework.amqp.rabbit.annotation.Queue;
+import org.springframework.amqp.rabbit.annotation.QueueBinding;
+import org.springframework.amqp.rabbit.annotation.RabbitListener;
+import org.springframework.stereotype.Component;
+
+@Component
+public class TopicListener {
+
+    // 在使用RabbitListener注解时
+    // 可以直接声明队列、交换器、以及routingKey 建立他们之间的绑定关系
+    // 参数bindings 需要结合注解使用@QueueBinding 代替前面config的使用
+    // 在binding的关系中  还需要队列和交换器的bean 使用@Queue和@Exchange来代替
+    // 是一种注解嵌套注解的用法
+    @RabbitListener(bindings = @QueueBinding(
+            value = @Queue(name = "topic.queue"),
+            exchange = @Exchange(name = "lovecoding.topic", type = ExchangeTypes.TOPIC),
+            key = {"#.harbin"}
+    ))
+    public void consume(String msg) {
+        System.out.println("topic.queue consume: " + msg);
+    }
+
+    @RabbitListener(queues = "topic.queue1")
+    public void consume1(String msg) {
+        System.out.println("topic.queue1 consume: " + msg);
+    }
+
+
+}

+ 6 - 1
springboot-demo/src/main/resources/application.yml

@@ -5,4 +5,9 @@ spring:
     redis:
       host: 127.0.0.1
       port: 6379
-      password:
+      password:
+  rabbitmq:
+    host: 127.0.0.1
+    port: 5672
+    username: guest
+    password: guest

+ 52 - 0
springboot-demo/src/test/java/com/sf/RabbitMqTests.java

@@ -0,0 +1,52 @@
+package com.sf;
+
+import org.junit.jupiter.api.Test;
+import org.springframework.amqp.rabbit.core.RabbitTemplate;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+
+@SpringBootTest
+public class RabbitMqTests {
+
+    @Autowired
+    private RabbitTemplate rabbitTemplate;
+
+    @Test
+    public void test() {
+        String queueName = "myQueue";
+        String message = "hello spring amqp ";
+        // 通过队列发送消息
+        for (int i = 0; i < 10; i++) {
+            rabbitTemplate.convertAndSend(queueName, message + i);
+            System.out.println(i);
+        }
+    }
+
+
+    @Test
+    public void testFanout() {
+        String exchangeName = "lovecoding.fanout";
+        String message = "hello fanout";
+        rabbitTemplate.convertAndSend(exchangeName,"",message);
+    }
+
+    @Test
+    public void testDirect() {
+        String exchangeName = "lovecoding.direct";
+        String message = "hello direct";
+        rabbitTemplate.convertAndSend(exchangeName,"222",message);
+    }
+
+    @Test
+    public void testTopic() {
+        String exchangeName = "lovecoding.topic";
+        String message = "hello topic";
+        rabbitTemplate.convertAndSend(exchangeName,"nangang.harbin",message);
+
+        String exchangeName1 = "lovecoding.topic1";
+        String message1 = "hello topic";
+        rabbitTemplate.convertAndSend(exchangeName1,"xuefu.nangang.harbin",message1);
+    }
+
+
+}