|
@@ -0,0 +1,82 @@
|
|
|
|
+package com.sf.handle.confirm;
|
|
|
|
+
|
|
|
|
+import com.rabbitmq.client.Channel;
|
|
|
|
+import com.rabbitmq.client.ConfirmCallback;
|
|
|
|
+import com.rabbitmq.client.MessageProperties;
|
|
|
|
+import com.sf.util.MqUtils;
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+import java.io.IOException;
|
|
|
|
+import java.util.Scanner;
|
|
|
|
+import java.util.concurrent.ConcurrentNavigableMap;
|
|
|
|
+import java.util.concurrent.ConcurrentSkipListMap;
|
|
|
|
+
|
|
|
|
+/**
|
|
|
|
+ * 异步发布
|
|
|
|
+ */
|
|
|
|
+public class ProducerASync {
|
|
|
|
+
|
|
|
|
+ private final static String QUEUE_NAME = "hello2";
|
|
|
|
+
|
|
|
|
+ public static void main(String[] args) throws Exception {
|
|
|
|
+ // 得到连接通道
|
|
|
|
+ Channel channel = MqUtils.getChannel();
|
|
|
|
+ // 声明队列
|
|
|
|
+ // 名称 是否持久化 exclusive排他锁 自动删除 其他参数(死信队列)
|
|
|
|
+ channel.queueDeclare(QUEUE_NAME, false, false, false, null);
|
|
|
|
+ // 开启发布确认 为了发送消息更安全
|
|
|
|
+ channel.confirmSelect();
|
|
|
|
+
|
|
|
|
+ // 通过跳表实现的线程安全map
|
|
|
|
+ // 将序号与消息进行关联 方便将小于等于某个消息序号的消息作为map取出来
|
|
|
|
+ ConcurrentSkipListMap<Long, String> outstandingConfirms =
|
|
|
|
+ new ConcurrentSkipListMap<>();
|
|
|
|
+ // 应答回调函数
|
|
|
|
+ // 消息序号 处理一个还是多个消息
|
|
|
|
+ ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> {
|
|
|
|
+ System.out.println("已经收到消息");
|
|
|
|
+ if (multiple) {
|
|
|
|
+ // NavigableMap是SortedMap的子接口 navigable 可驾驶的 可航行的
|
|
|
|
+ // headMap() 返回小于第一个参数的键值对的视图
|
|
|
|
+ ConcurrentNavigableMap<Long, String> confirmed =
|
|
|
|
+ outstandingConfirms.headMap(sequenceNumber, true);
|
|
|
|
+ confirmed.clear();
|
|
|
|
+ } else {
|
|
|
|
+ outstandingConfirms.remove(sequenceNumber);
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ // 未收到消息回调
|
|
|
|
+ ConfirmCallback confirmCallback = new ConfirmCallback() {
|
|
|
|
+ @Override
|
|
|
|
+ public void handle(long sequenceNumber, boolean multiple)
|
|
|
|
+ throws IOException {
|
|
|
|
+ String message = outstandingConfirms.get(sequenceNumber);
|
|
|
|
+ System.out.println("序号为" + sequenceNumber + "的消息需要重新发送");
|
|
|
|
+ }
|
|
|
|
+ };
|
|
|
|
+ // 添加一个监听器 收到消息回调 Basic.Ack 或者未收到消息回调 Basic.Nack
|
|
|
|
+ channel.addConfirmListener(cleanOutstandingConfirms, confirmCallback);
|
|
|
|
+
|
|
|
|
+ System.out.println("输入消息:");
|
|
|
|
+ Scanner scanner = new Scanner(System.in);
|
|
|
|
+ while (scanner.hasNext()) {
|
|
|
|
+ long start = System.currentTimeMillis();
|
|
|
|
+ String message = scanner.next();
|
|
|
|
+ // 获取消息的下一个标号 从1开始
|
|
|
|
+
|
|
|
|
+ // 通过channel提供的方法getNextPublishSeqNo来获取 消息发布成功后的id
|
|
|
|
+ // 使用一个map来存储 待确认消息的id
|
|
|
|
+ outstandingConfirms.put(channel.getNextPublishSeqNo(), message);
|
|
|
|
+ channel.basicPublish("", "hello", null, message.getBytes());
|
|
|
|
+ System.out.println(message);
|
|
|
|
+ // 等待broker
|
|
|
|
+ // 阻塞等待
|
|
|
|
+// boolean flag = channel.waitForConfirms();
|
|
|
|
+// if (flag) {
|
|
|
|
+// System.out.println("broker已经收到消息");
|
|
|
|
+// }
|
|
|
|
+ long end = System.currentTimeMillis();
|
|
|
|
+ System.out.println("发布异步确认消息耗时:" + (end - start) + "毫秒");
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|