Browse Source

0317 rabbitmq

Qing 1 year ago
parent
commit
ce8b56612e

+ 27 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/direct/Consumer.java

@@ -0,0 +1,27 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class Consumer {
+    private static final String EXCHANGE_NAME = "myExchange1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
+        // 声明一个临时队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        channel.queueBind(queueName, EXCHANGE_NAME, "harbin");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 25 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/direct/ConsumerBJ.java

@@ -0,0 +1,25 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class ConsumerBJ {
+    private static final String EXCHANGE_NAME = "myExchange1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明一个临时队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        channel.queueBind(queueName, EXCHANGE_NAME, "beijing");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 25 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/direct/ConsumerHrB.java

@@ -0,0 +1,25 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class ConsumerHrB {
+    private static final String EXCHANGE_NAME = "myExchange1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明一个临时队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        channel.queueBind(queueName, EXCHANGE_NAME, "harbin");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 31 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/direct/Producer.java

@@ -0,0 +1,31 @@
+package com.sf.exchange.direct;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static final String QUEUE_NAME = "hello2";
+    private static final String EXCHANGE_NAME = "myExchange1";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明交换机
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
+//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String input = scanner.nextLine();
+            String[] arr = input.split(" ");
+            // 当使用direct类型时  既需要消息 又需要消息所对应的路由
+            String message = arr[0];
+            String routingKey = arr[1];
+            System.out.println(message + " " + routingKey);
+            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
+        }
+    }
+}

+ 25 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/fanout/Consumer.java

@@ -0,0 +1,25 @@
+package com.sf.exchange.fanout;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class Consumer {
+    private static final String EXCHANGE_NAME = "myExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明一个临时队列
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        channel.queueBind(queueName, EXCHANGE_NAME, "");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 27 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/fanout/Producer.java

@@ -0,0 +1,27 @@
+package com.sf.exchange.fanout;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static final String QUEUE_NAME = "hello2";
+    private static final String EXCHANGE_NAME = "myExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明交换机
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT);
+//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            System.out.println(message);
+            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
+        }
+    }
+}

+ 29 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/topic/Consumer.java

@@ -0,0 +1,29 @@
+package com.sf.exchange.topic;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class Consumer {
+    private static final String EXCHANGE_NAME = "myExchange2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
+        // 声明一个临时队列 当消费者不再使用此队列时  队列被自动删除
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        // "harbin" "nangang.harbin" "xuefulu.nangang.harbin"
+        channel.queueBind(queueName, EXCHANGE_NAME, "#.harbin");
+//        channel.queueBind(queueName, EXCHANGE_NAME, "harbin");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 31 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/topic/Consumer1.java

@@ -0,0 +1,31 @@
+package com.sf.exchange.topic;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class Consumer1 {
+    private static final String EXCHANGE_NAME = "myExchange2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
+        // 声明一个临时队列 当消费者不再使用此队列时  队列被自动删除
+        String queueName = channel.queueDeclare().getQueue();
+        System.out.println(queueName);
+        // 将队列和交换机进行绑定
+        // "nangang.harbin" "daoli.harbin"
+        channel.queueBind(queueName, EXCHANGE_NAME, "*.harbin");
+        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.harbin");
+        channel.queueBind(queueName, EXCHANGE_NAME, "*.*.harbin");
+//        channel.queueBind(queueName, EXCHANGE_NAME, "beijing");
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
+        });
+    }
+}

+ 31 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/topic/Producer.java

@@ -0,0 +1,31 @@
+package com.sf.exchange.topic;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+public class Producer {
+
+    private static final String QUEUE_NAME = "hello2";
+    private static final String EXCHANGE_NAME = "myExchange2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明交换机
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
+//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String input = scanner.nextLine();
+            String[] arr = input.split(" ");
+            // 当使用direct类型时  既需要消息 又需要消息所对应的路由
+            String message = arr[0];
+            String routingKey = arr[1];
+            System.out.println(message + " " + routingKey);
+            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
+        }
+    }
+}

+ 31 - 0
rabbitmq-demo/src/main/java/com/sf/exchange/topic/ProducerTest.java

@@ -0,0 +1,31 @@
+package com.sf.exchange.topic;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+public class ProducerTest {
+
+    private static final String QUEUE_NAME = "hello2";
+    private static final String EXCHANGE_NAME = "myExchange";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 声明交换机
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
+//        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String input = scanner.nextLine();
+            String[] arr = input.split(" ");
+            // 当使用direct类型时  既需要消息 又需要消息所对应的路由
+            String message = arr[0];
+            String routingKey = arr[1];
+            System.out.println(message + " " + routingKey);
+            channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());
+        }
+    }
+}

+ 35 - 0
rabbitmq-demo/src/main/java/com/sf/handle/Producer.java

@@ -0,0 +1,35 @@
+package com.sf.handle;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.MessageProperties;
+import com.sf.util.MqUtils;
+
+import java.util.Scanner;
+
+/**
+ * 对于消息的持久化而言
+ * 需要队列时持久化的  同时消息时持久化的
+ */
+public class Producer {
+
+    private final static String QUEUE_NAME = "hello";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.basicQos(1);
+        // 队列的持久化 是在声明队列时
+        boolean durable = true;
+        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            System.out.println(message);
+            // 消息的持久化
+//            channel.basicPublish("", QUEUE_NAME,
+//                    MessageProperties.PERSISTENT_BASIC, message.getBytes());
+            // 如果只有队列是持久化 而消息不是持久化的  那消息也会丢失
+            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
+        }
+    }
+}

+ 52 - 0
rabbitmq-demo/src/main/java/com/sf/handle/ProducerAE.java

@@ -0,0 +1,52 @@
+package com.sf.handle;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.ReturnListener;
+import com.sf.util.MqUtils;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Scanner;
+
+/**
+ * 对于消息的持久化而言
+ * 需要队列时持久化的  同时消息时持久化的
+ */
+public class ProducerAE {
+
+    private final static String QUEUE_NAME = "queue_demo4";
+    private final static String EXCHANGE_NAME = "exchange_demo4";
+    private final static String ROUNTING_KEY = "routing_key_demo";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+
+        // 如果消息没有队列可以接收  那么可以用备份交换机进行处理
+        // 备份交换机
+        channel.exchangeDeclare("myAe", BuiltinExchangeType.FANOUT,
+                true, false, null);
+        channel.queueDeclare("myQueue", true, false, false, null);
+        channel.queueBind("myQueue", "myAe", "");
+
+        HashMap<String, Object> arguments = new HashMap<>();
+        arguments.put("alternate-exchange", "myAe");
+        // 主交换机 绑定的备份交换机是 "myAe"
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
+                true, false, arguments);
+
+        // 队列的持久化 是在声明队列时
+        channel.queueDeclare(QUEUE_NAME, true, false, false, null);
+        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY);
+
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            System.out.println(message);
+            // 如果只有队列是持久化 而消息不是持久化的  那消息也会丢失
+            channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
+        }
+    }
+}

+ 54 - 0
rabbitmq-demo/src/main/java/com/sf/handle/ProducerBak.java

@@ -0,0 +1,54 @@
+package com.sf.handle;
+
+import com.rabbitmq.client.*;
+import com.sf.util.MqUtils;
+
+import java.io.IOException;
+import java.util.Scanner;
+
+/**
+ * 对于消息的持久化而言
+ * 需要队列时持久化的  同时消息时持久化的
+ */
+public class ProducerBak {
+
+    private final static String QUEUE_NAME = "queue_demo3";
+    private final static String EXCHANGE_NAME = "exchange_demo3";
+
+    private final static String ROUNTING_KEY = "routing_key_demo";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 交换机的持久化
+//        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT,
+//                true, false, null);
+        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
+        // 队列的持久化 是在声明队列时
+        boolean durable = true;
+//        channel.queueDeclare(QUEUE_NAME, durable, false, false, null);
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUNTING_KEY);
+
+        // 接收消息回退的监听
+        channel.addReturnListener(new ReturnListener() {
+            @Override
+            public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException {
+                System.out.println(replyCode);
+                System.out.println(replyText);
+                System.out.println(exchange);
+                System.out.println(routingKey);
+            }
+        });
+
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            String message = scanner.next();
+            System.out.println(message);
+            // 如果只有队列是持久化 而消息不是持久化的  那消息也会丢失
+//            channel.basicPublish(EXCHANGE_NAME, "", true,
+//                    MessageProperties.PERSISTENT_BASIC, message.getBytes());
+            channel.basicPublish(EXCHANGE_NAME, "", true, null, message.getBytes());
+        }
+    }
+}

+ 6 - 5
rabbitmq-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -8,11 +8,12 @@ public class Producer {
 
 
     public static void main(String[] args) throws Exception {
     public static void main(String[] args) throws Exception {
         ConnectionFactory factory = new ConnectionFactory();
         ConnectionFactory factory = new ConnectionFactory();
-//        factory.setHost("127.0.0.1");
-//        factory.setPort(5672);
-//        factory.setUsername("guest");
-//        factory.setPassword("guest");
-        factory.setUri("amqp://test:123456@127.0.0.1:5672/test1");
+        factory.setHost("127.0.0.1");
+        factory.setPort(5672);
+        factory.setUsername("test1");
+        factory.setPassword("123456");
+//        factory.setVirtualHost("");
+//        factory.setUri("amqp://test:123456@127.0.0.1:5672/test1");
         Connection connection = factory.newConnection();
         Connection connection = factory.newConnection();
         // 连接要被复用  通过连接要得到通道
         // 连接要被复用  通过连接要得到通道
         Channel channel = connection.createChannel();
         Channel channel = connection.createChannel();

+ 54 - 0
rabbitmq-demo/src/main/java/com/sf/hotpot/Consumer.java

@@ -0,0 +1,54 @@
+package com.sf.hotpot;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+import java.util.concurrent.TimeUnit;
+
+public class Consumer {
+
+    private final static String QUEUE_NAME = "hotpot";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        // 当处理消息时  等一个处理完 再取下一个
+        channel.basicQos(5);
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+//            System.out.println(body);
+            // 消息标记
+            long deliveryTag = message.getEnvelope().getDeliveryTag();
+            try {
+                Thread.sleep(100);
+//                TimeUnit.SECONDS.sleep(1);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+            if (body.equals("大头菜")) {
+                // 两个参数  消息标记  是否应答多个
+                // multiple  true 把当前的消息和之前的消息一起做应答
+                //           false 只应答当前的消息
+                System.out.println("消费了:" + body);
+                channel.basicAck(deliveryTag, false);
+            } else {
+                // 不应答的处理
+                // requeue 是否重新入队列
+//                channel.basicNack(deliveryTag, false, true);
+                // 如果只拒绝一个  可以使用basicReject  默认把multiple设置为false
+                System.out.println("退回了:" + body);
+                channel.basicReject(deliveryTag, true);
+            }
+
+        };
+        // 自动应答  手动应答
+        // 自动应答  对mq而言 只要把消息发送出去  就视为接收成功 消息会从队列中删除掉
+        // 手动应答  在接收到消息后进行逻辑处理
+        boolean autoAck = false;
+        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
+        });
+    }
+
+}

+ 54 - 0
rabbitmq-demo/src/main/java/com/sf/hotpot/ConsumerTD.java

@@ -0,0 +1,54 @@
+package com.sf.hotpot;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+import java.util.concurrent.TimeUnit;
+
+public class ConsumerTD {
+
+    private final static String QUEUE_NAME = "hotpot";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.basicQos(5);
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+//            System.out.println(body);
+            // 消息标记
+            long deliveryTag = message.getEnvelope().getDeliveryTag();
+            try {
+//                TimeUnit.SECONDS.sleep(1);
+                Thread.sleep(100);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+
+            if (body.equals("土豆")) {
+                // 两个参数  消息标记  是否应答多个
+                // multiple  true 把当前的消息和之前的消息一起做应答
+                //           false 只应答当前的消息
+                System.out.println("消费了:" + body);
+                channel.basicAck(deliveryTag, false);
+            } else {
+                // 不应答的处理
+                // requeue 是否重新入队列
+//                channel.basicNack(deliveryTag, false, true);
+                // 如果只拒绝一个  可以使用basicReject  默认把multiple设置为false
+                System.out.println("退回了:" + body);
+                channel.basicReject(deliveryTag, true);
+            }
+
+        };
+        // 自动应答  手动应答
+        // 自动应答  对mq而言 只要把消息发送出去  就视为接收成功 消息会从队列中删除掉
+        // 手动应答  在接收到消息后进行逻辑处理
+        boolean autoAck = false;
+        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
+        });
+    }
+
+}

+ 56 - 0
rabbitmq-demo/src/main/java/com/sf/hotpot/Producer.java

@@ -0,0 +1,56 @@
+package com.sf.hotpot;
+
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Random;
+import java.util.Scanner;
+import java.util.concurrent.TimeUnit;
+
+public class Producer {
+
+    private final static String QUEUE_NAME = "hotpot";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Map<Integer, String> map = new HashMap<>();
+        map.put(0, "大头菜");
+        map.put(1, "土豆");
+        map.put(2, "菠菜");
+
+        int cnt = 100;
+        int cnt0 = 0;
+        int cnt1 = 0;
+        int cnt2 = 0;
+        while (cnt > 0) {
+            cnt--;
+
+            String message = map.get(0);
+            int type = new Random().nextInt(3);
+            switch (type) {
+                case 0:
+                    cnt0++;
+                    break;
+                case 1:
+                    message = map.get(1);
+                    cnt1++;
+                    break;
+                case 2:
+                    message = map.get(2);
+                    cnt2++;
+                    break;
+                default:
+                    System.out.println(type);
+            }
+            channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
+//            TimeUnit.SECONDS.sleep(1);
+        }
+
+        System.out.println("大头菜:" + cnt0);
+        System.out.println("土豆:" + cnt1);
+        System.out.println("菠菜:" + cnt2);
+    }
+}

+ 38 - 0
rabbitmq-demo/src/main/java/com/sf/message/Consumer.java

@@ -0,0 +1,38 @@
+package com.sf.message;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.sf.util.MqUtils;
+
+public class Consumer {
+
+    private final static String QUEUE_NAME = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+            // 消息标记
+            long deliveryTag = message.getEnvelope().getDeliveryTag();
+            // 两个参数  消息标记  是否应答多个
+            // multiple  true 把当前的消息和之前的消息一起做应答
+            //           false 只应答当前的消息
+//            channel.basicAck(deliveryTag,false);
+            // 不应答的处理
+            // requeue 是否重新入队列
+            channel.basicNack(deliveryTag, false, true);
+            // 如果只拒绝一个  可以使用basicReject  默认把multiple设置为false
+            channel.basicReject(deliveryTag, true);
+        };
+        // 自动应答  手动应答
+        // 自动应答  对mq而言 只要把消息发送出去  就视为接收成功 消息会从队列中删除掉
+        // 手动应答  在接收到消息后进行逻辑处理
+        boolean autoAck = false;
+        channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, consumerTag -> {
+        });
+    }
+
+}

+ 31 - 0
rabbitmq-demo/src/main/java/com/sf/message/ConsumerGet.java

@@ -0,0 +1,31 @@
+package com.sf.message;
+
+import com.rabbitmq.client.Channel;
+import com.rabbitmq.client.DeliverCallback;
+import com.rabbitmq.client.Delivery;
+import com.rabbitmq.client.GetResponse;
+import com.sf.util.MqUtils;
+
+/**
+ * 消息消费的模式  推拉
+ */
+public class ConsumerGet {
+
+    private final static String QUEUE_NAME = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        DeliverCallback deliverCallback = (String consumerTag, Delivery message) -> {
+            String body = new String(message.getBody());
+            System.out.println(body);
+        };
+        // 推送的模式
+//        channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {
+//        });
+        // 拉取的模式
+        GetResponse response = channel.basicGet(QUEUE_NAME, true);
+        String message = new String(response.getBody());
+        System.out.println(message);
+    }
+}

+ 60 - 0
rabbitmq-demo/src/main/java/com/sf/message/Producer.java

@@ -0,0 +1,60 @@
+package com.sf.message;
+
+import com.rabbitmq.client.AMQP;
+import com.rabbitmq.client.BasicProperties;
+import com.rabbitmq.client.Channel;
+import com.sf.util.MqUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
+
+public class Producer {
+
+    private final static String QUEUE_NAME = "hello2";
+
+    public static void main(String[] args) throws Exception {
+        Channel channel = MqUtils.getChannel();
+        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
+        Scanner scanner = new Scanner(System.in);
+        System.out.println("请输入消息:");
+        while (scanner.hasNext()) {
+            // 发送类型  消息内容
+            // 1 hello
+            String input = scanner.nextLine();
+            String[] arr = input.split(" ");
+            int type = Integer.parseInt(arr[0]);
+            String message = arr[1];
+            System.out.println(message);
+            switch (type) {
+                case 1:
+                    // 发送一个普通的消息
+                    channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
+                    break;
+                case 2:
+                    // 发布一个文本类型的 需要被持久化的消息
+                    channel.basicPublish("", QUEUE_NAME,
+                            new AMQP.BasicProperties.Builder().contentType("text/plain")
+                                    .deliveryMode(2).build(),
+                            message.getBytes());
+                    break;
+                case 3:
+                    // 发布一个带有headers的消息
+                    Map<String, Object> headers = new HashMap<>();
+                    headers.put("location", "harbin");
+                    headers.put("time", "0317");
+                    channel.basicPublish("", QUEUE_NAME,
+                            new AMQP.BasicProperties.Builder().headers(headers).build(),
+                            message.getBytes());
+                    break;
+                case 4:
+                    // 发布一个带有过期时间的消息
+                    channel.basicPublish("", QUEUE_NAME,
+                            new AMQP.BasicProperties.Builder().expiration("50000").build(),
+                            message.getBytes());
+                    break;
+            }
+
+        }
+    }
+}