浏览代码

0331 rocketmq

Qing 1 年之前
父节点
当前提交
a3b1db5bca

+ 81 - 37
rocketmq-demo/.idea/workspace.xml

@@ -4,18 +4,18 @@
     <option name="autoReloadType" value="SELECTIVE" />
   </component>
   <component name="ChangeListManager">
-    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0327 callback">
+    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0327 rocketmq">
       <change afterPath="$PROJECT_DIR$/../.idea/VIPJAVA.iml" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/google-java-format.xml" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/delay/DelayConsumer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/delay/DelayProducer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagConsumer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagProducer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newhelloworld/ProducerExample.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newhelloworld/PushConsumerExample.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/order/OrderConsumer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/order/OrderProducer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/resources/log4j.properties" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Questioner1.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/CallBack.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Consumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Hotel.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Test.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/sql/FilterBySQLConsumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/sql/FilterBySQLProducer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/neworder/OrderConsumerExample.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/neworder/OrderProducerExample.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java" afterDir="false" />
@@ -27,10 +27,12 @@
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Answer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Answer.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Questioner.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person1.java" beforeDir="false" />
       <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/tag/FilterByTagConsumer.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagProducer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/tag/FilterByTagProducer.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/pom.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/Param.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java" afterDir="false" />
@@ -82,16 +84,21 @@
     "Application.Consumer.executor": "Run",
     "Application.DelayConsumer.executor": "Run",
     "Application.DelayProducer.executor": "Run",
+    "Application.FilterBySQLConsumer.executor": "Run",
+    "Application.FilterBySQLProducer.executor": "Run",
     "Application.FilterByTagConsumer.executor": "Run",
     "Application.FilterByTagProducer.executor": "Run",
     "Application.OnewayProducer.executor": "Run",
     "Application.OrderConsumer (1).executor": "Run",
     "Application.OrderConsumer.executor": "Run",
+    "Application.OrderConsumerExample.executor": "Run",
     "Application.OrderProducer.executor": "Run",
+    "Application.OrderProducerExample.executor": "Run",
     "Application.Producer.executor": "Run",
     "Application.ProducerExample.executor": "Run",
     "Application.PushConsumerExample.executor": "Run",
     "Application.Test.executor": "Run",
+    "Application.com.sf.callback1.Test.executor": "Run",
     "Maven. [org.apache.maven.plugins:maven-archetype-plugin:RELEASE:generate].executor": "Run",
     "RunOnceActivity.OpenProjectViewOnStart": "true",
     "RunOnceActivity.ShowReadmeOnStart": "true",
@@ -111,13 +118,13 @@
       <recent name="com.sf.callback" />
     </key>
   </component>
-  <component name="RunManager" selected="Application.FilterByTagConsumer">
-    <configuration name="DelayConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.delay.DelayConsumer" />
+  <component name="RunManager" selected="Application.FilterBySQLConsumer">
+    <configuration name="FilterBySQLConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.filter.sql.FilterBySQLConsumer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.delay.*" />
+          <option name="PATTERN" value="com.sf.filter.sql.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -125,12 +132,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="DelayProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.delay.DelayProducer" />
+    <configuration name="FilterBySQLProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.filter.sql.FilterBySQLProducer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.delay.*" />
+          <option name="PATTERN" value="com.sf.filter.sql.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -138,12 +145,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="FilterByTagConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.filter.FilterByTagConsumer" />
+    <configuration name="OrderConsumerExample" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.neworder.OrderConsumerExample" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.filter.*" />
+          <option name="PATTERN" value="com.sf.neworder.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -151,12 +158,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="FilterByTagProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.filter.FilterByTagProducer" />
+    <configuration name="OrderProducerExample" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.neworder.OrderProducerExample" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.filter.*" />
+          <option name="PATTERN" value="com.sf.neworder.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -164,12 +171,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="OrderConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.order.OrderConsumer" />
+    <configuration name="com.sf.callback1.Test" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.callback1.Test" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.order.*" />
+          <option name="PATTERN" value="com.sf.callback1.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -179,11 +186,11 @@
     </configuration>
     <recent_temporary>
       <list>
-        <item itemvalue="Application.FilterByTagConsumer" />
-        <item itemvalue="Application.FilterByTagProducer" />
-        <item itemvalue="Application.DelayProducer" />
-        <item itemvalue="Application.DelayConsumer" />
-        <item itemvalue="Application.OrderConsumer" />
+        <item itemvalue="Application.FilterBySQLConsumer" />
+        <item itemvalue="Application.FilterBySQLProducer" />
+        <item itemvalue="Application.OrderConsumerExample" />
+        <item itemvalue="Application.OrderProducerExample" />
+        <item itemvalue="Application.com.sf.callback1.Test" />
       </list>
     </recent_temporary>
   </component>
@@ -196,7 +203,9 @@
       <option name="presentableId" value="Default" />
       <updated>1711266581667</updated>
       <workItem from="1711266583057" duration="4122000" />
-      <workItem from="1711533395564" duration="9098000" />
+      <workItem from="1711533395564" duration="9797000" />
+      <workItem from="1711589785583" duration="267000" />
+      <workItem from="1711847771311" duration="4311000" />
     </task>
     <task id="LOCAL-00001" summary="0324 rocketmq">
       <option name="closed" value="true" />
@@ -214,7 +223,15 @@
       <option name="project" value="LOCAL" />
       <updated>1711538107269</updated>
     </task>
-    <option name="localTasksCounter" value="3" />
+    <task id="LOCAL-00003" summary="0327 rocketmq">
+      <option name="closed" value="true" />
+      <created>1711546222801</created>
+      <option name="number" value="00003" />
+      <option name="presentableId" value="LOCAL-00003" />
+      <option name="project" value="LOCAL" />
+      <updated>1711546222801</updated>
+    </task>
+    <option name="localTasksCounter" value="4" />
     <servers />
   </component>
   <component name="TypeScriptGeneratedFilesManager">
@@ -224,6 +241,7 @@
     <option name="OPEN_GENERIC_TABS">
       <map>
         <entry key="3895a3d2-79df-4765-8c62-8961ee80125f" value="TOOL_WINDOW" />
+        <entry key="9f61037c-b616-4293-ba61-25f4ce732917" value="TOOL_WINDOW" />
       </map>
     </option>
     <option name="TAB_STATES">
@@ -253,12 +271,38 @@
             </State>
           </value>
         </entry>
+        <entry key="9f61037c-b616-4293-ba61-25f4ce732917">
+          <value>
+            <State>
+              <option name="FILTERS">
+                <map>
+                  <entry key="branch">
+                    <value>
+                      <list>
+                        <option value="HEAD" />
+                      </list>
+                    </value>
+                  </entry>
+                  <entry key="structure">
+                    <value>
+                      <list>
+                        <option value="dir:/Users/Qing/IdeaProjects/SiFu/VIPJAVA/rocketmq-demo" />
+                      </list>
+                    </value>
+                  </entry>
+                </map>
+              </option>
+              <option name="SHOW_ONLY_AFFECTED_CHANGES" value="true" />
+            </State>
+          </value>
+        </entry>
       </map>
     </option>
   </component>
   <component name="VcsManagerConfiguration">
     <MESSAGE value="0324 rocketmq" />
     <MESSAGE value="0327 callback" />
-    <option name="LAST_COMMIT_MESSAGE" value="0327 callback" />
+    <MESSAGE value="0327 rocketmq" />
+    <option name="LAST_COMMIT_MESSAGE" value="0327 rocketmq" />
   </component>
 </project>

+ 2 - 1
rocketmq-demo/src/main/java/com/sf/callback/Answer.java

@@ -1,5 +1,6 @@
 package com.sf.callback;
 
+import java.util.Random;
 import java.util.concurrent.TimeUnit;
 
 public class Answer {
@@ -17,7 +18,7 @@ public class Answer {
         System.out.println("回答完毕");
         // 回答之后的回调
         // 返回之后调用的逻辑
-        callBack.call("111");
+        callBack.call(new Random().nextInt(100) + "");
     }
 
     public void ans(CallBack callBack, CallBack callBack1) {

+ 0 - 24
rocketmq-demo/src/main/java/com/sf/callback/Person1.java

@@ -1,24 +0,0 @@
-package com.sf.callback;
-
-// 提问者
-public class Person1 implements CallBack {
-    // 明确被调用者
-    private Answer answer;
-
-    public Person1(Answer answer) {
-        this.answer = answer;
-    }
-
-    // 提问题的逻辑
-    public void ask() {
-        // 找到回答的人 调用回答的逻辑
-        answer.ans(this);
-    }
-
-    // 明确answer回答之后的  通知方式(回调逻辑)
-    @Override
-    public void call(String str) {
-        System.out.println("答案: " + str);
-        System.out.println("谢谢");
-    }
-}

+ 2 - 2
rocketmq-demo/src/main/java/com/sf/callback/Person.java → rocketmq-demo/src/main/java/com/sf/callback/Questioner.java

@@ -1,11 +1,11 @@
 package com.sf.callback;
 
 // 提问者
-public class Person implements CallBack {
+public class Questioner implements CallBack {
     // 明确被调用者
     private Answer answer;
 
-    public Person(Answer answer) {
+    public Questioner(Answer answer) {
         this.answer = answer;
     }
 

+ 32 - 0
rocketmq-demo/src/main/java/com/sf/callback/Questioner1.java

@@ -0,0 +1,32 @@
+package com.sf.callback;
+
+// 提问者
+public class Questioner1 {
+    // 明确被调用者
+    private Answer answer;
+
+    public Questioner1(Answer answer) {
+        this.answer = answer;
+    }
+
+    // 提问题的逻辑
+    public void ask() {
+        // 找到回答的人 调用回答的逻辑
+        answer.ans(new CallBack() {
+            // 明确answer回答之后的  通知方式(回调逻辑)
+            @Override
+            public void call(String str) {
+                System.out.println("答案: " + str);
+                System.out.println("谢谢");
+            }
+        });
+    }
+
+    public void askLambda() {
+        // 找到回答的人 调用回答的逻辑
+        answer.ans(str -> {
+            System.out.println("答案: " + str);
+            System.out.println("谢谢");
+        });
+    }
+}

+ 6 - 6
rocketmq-demo/src/main/java/com/sf/callback/Test.java

@@ -7,12 +7,12 @@ public class Test {
         // 创建了一个回答者
         Answer answer = new Answer();
         // 创建了一个提问者 使用了之前创建的回答者
-        Person person = new Person(answer);
-//        person.ask();
-        person.askAsync();
-
-//        Person1 person1 = new Person1(answer);
-//        person1.ask();
+        Questioner person = new Questioner(answer);
+        person.ask();
+//        person.askAsync();
+        System.out.println("--------");
+        Questioner1 person1 = new Questioner1(answer);
+        person1.askLambda();
         System.out.println("end");
     }
 }

+ 10 - 0
rocketmq-demo/src/main/java/com/sf/callback1/CallBack.java

@@ -0,0 +1,10 @@
+package com.sf.callback1;
+
+// 顾客入住酒店
+// 酒店提供叫早服务  早上叫醒顾客的服务是可以定制化的  比如说九点钟敲门叫醒/七点钟电话叫醒
+// 设计回调时  往往先设计要被定制化的/用来定义通知的接口
+//   接下来 往往编写的是提供这个服务的一方  被调用者
+public interface CallBack {
+    // 被叫醒的函数
+    void beWakedUp();
+}

+ 20 - 0
rocketmq-demo/src/main/java/com/sf/callback1/Consumer.java

@@ -0,0 +1,20 @@
+package com.sf.callback1;
+
+// 顾客类
+public class Consumer {
+    // 入住酒店
+    private Hotel hotel;
+
+    public Consumer(Hotel hotel) {
+        this.hotel = hotel;
+    }
+
+    public void beWakedUp(){
+        hotel.wakeService(new CallBack() {
+            @Override
+            public void beWakedUp() {
+                System.out.println("九点钟敲门叫醒");
+            }
+        });
+    }
+}

+ 22 - 0
rocketmq-demo/src/main/java/com/sf/callback1/Hotel.java

@@ -0,0 +1,22 @@
+package com.sf.callback1;
+
+import java.util.concurrent.TimeUnit;
+
+// 酒店类
+public class Hotel {
+
+    // 叫醒服务
+    //  入参是如何叫醒顾客(顾客自定义的)
+    public void wakeService(CallBack callBack) {
+        System.out.println("顾客定制了叫醒服务");
+        // 过了一晚上
+        try {
+//            TimeUnit.HOURS.sleep(8);
+            TimeUnit.SECONDS.sleep(8);
+        } catch (InterruptedException e) {
+            throw new RuntimeException(e);
+        }
+        System.out.println("酒店开始叫醒顾客");
+        callBack.beWakedUp();
+    }
+}

+ 14 - 0
rocketmq-demo/src/main/java/com/sf/callback1/Test.java

@@ -0,0 +1,14 @@
+package com.sf.callback1;
+
+import org.checkerframework.checker.units.qual.C;
+
+public class Test {
+
+    public static void main(String[] args) {
+        System.out.println("入住酒店");
+        Hotel hotel = new Hotel();
+        Consumer consumer = new Consumer(hotel);
+        consumer.beWakedUp();
+        System.out.println("退房");
+    }
+}

+ 31 - 0
rocketmq-demo/src/main/java/com/sf/filter/sql/FilterBySQLConsumer.java

@@ -0,0 +1,31 @@
+package com.sf.filter.sql;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.MessageSelector;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class FilterBySQLConsumer {
+    public static void main(String[] args) throws Exception {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
+        consumer.setNamesrvAddr("localhost:9876");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.subscribe("myTopic", MessageSelector.bySql("age between 0 and 6"));
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                for (MessageExt me : msgs) {
+                    System.out.println(me);
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.println("Consumer Started");
+    }
+}

+ 25 - 0
rocketmq-demo/src/main/java/com/sf/filter/sql/FilterBySQLProducer.java

@@ -0,0 +1,25 @@
+package com.sf.filter.sql;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+public class FilterBySQLProducer {
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+        for (int i = 0; i < 10; i++) {
+            try {
+                byte[] body = ("Hi," + i).getBytes();
+                Message msg = new Message("myTopic", "myTag", body);
+                msg.putUserProperty("age", i + "");
+                SendResult sendResult = producer.send(msg);
+                System.out.println(sendResult);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+        producer.shutdown();
+    }
+}

+ 1 - 1
rocketmq-demo/src/main/java/com/sf/filter/FilterByTagConsumer.java → rocketmq-demo/src/main/java/com/sf/filter/tag/FilterByTagConsumer.java

@@ -1,4 +1,4 @@
-package com.sf.filter;
+package com.sf.filter.tag;
 
 import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
 import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;

+ 1 - 1
rocketmq-demo/src/main/java/com/sf/filter/FilterByTagProducer.java → rocketmq-demo/src/main/java/com/sf/filter/tag/FilterByTagProducer.java

@@ -1,4 +1,4 @@
-package com.sf.filter;
+package com.sf.filter.tag;
 
 import org.apache.rocketmq.client.producer.DefaultMQProducer;
 import org.apache.rocketmq.client.producer.SendResult;

+ 64 - 0
rocketmq-demo/src/main/java/com/sf/neworder/OrderConsumerExample.java

@@ -0,0 +1,64 @@
+package com.sf.neworder;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.SimpleConsumer;
+import org.apache.rocketmq.client.apis.message.MessageView;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+
+public class OrderConsumerExample {
+
+    private static final Logger logger = LoggerFactory.getLogger(OrderConsumerExample.class);
+
+    public static void main(String[] args) throws Exception {
+        String endpoint = "localhost:8081";
+        // 消费示例:使用 SimpleConsumer 消费普通消息,主动获取消息处理并提交。
+        ClientServiceProvider provider = ClientServiceProvider.loadService();
+        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
+        ClientConfiguration configuration = builder.build();
+        String topic = "FIFOTopic";
+        FilterExpression filterExpression = new FilterExpression("messageTag", FilterExpressionType.TAG);
+        SimpleConsumer simpleConsumer = provider.newSimpleConsumerBuilder()
+                // 设置消费者分组。
+                .setConsumerGroup("cg")
+                // 设置接入点。
+                .setClientConfiguration(configuration)
+                // 设置预绑定的订阅关系。
+                .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+                // 设置从服务端接受消息的最大等待时间
+                .setAwaitDuration(Duration.ofSeconds(10))
+                .build();
+
+        while (true) {
+            // SimpleConsumer 需要主动获取消息,并处理。
+            List<MessageView> messageViewList = simpleConsumer.receive(1, Duration.ofSeconds(30));
+            messageViewList.forEach(messageView -> {
+                System.out.println(messageView);
+                // 当返回messageView后 为了更好查看消息体  将body通过指定编码格式转化成字符串
+                ByteBuffer body = messageView.getBody();
+                String message = StandardCharsets.UTF_8.decode(body).toString();
+                body.flip();
+                logger.info("message body={}", message);
+                // 消费处理完成后,需要主动调用 ACK 提交消费结果。
+                try {
+                    simpleConsumer.ack(messageView);
+                } catch (ClientException e) {
+                    logger.error("Failed to ack message, messageId={}", messageView.getMessageId(), e);
+                }
+            });
+        }
+
+    }
+}

+ 48 - 0
rocketmq-demo/src/main/java/com/sf/neworder/OrderProducerExample.java

@@ -0,0 +1,48 @@
+package com.sf.neworder;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
+
+public class OrderProducerExample {
+
+    public static void main(String[] args) throws Exception {
+        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+        String endpoint = "localhost:8081";
+        // 消息发送的目标Topic名称,需要提前创建。
+//        String topic = "TopicTest";
+        String topic = "FIFOTopic";
+        ClientServiceProvider provider = ClientServiceProvider.loadService();
+        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
+        ClientConfiguration configuration = builder.build();
+        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
+        Producer producer = provider.newProducerBuilder()
+                .setTopics(topic)
+                .setClientConfiguration(configuration)
+                .build();
+        for (int i = 0; i < 10; i++) {
+            byte[] body = ("Hi," + i).getBytes();
+            //顺序消息发送。
+            MessageBuilder messageBuilder = new MessageBuilderImpl();
+            Message message = messageBuilder.setTopic(topic)
+                    //设置消息索引键,可根据关键字精确查找某条消息。
+                    .setKeys("messageKey")
+                    //设置消息Tag,用于消费端根据指定Tag过滤消息。
+                    .setTag("messageTag")
+                    //设置顺序消息的排序分组,该分组尽量保持离散,避免热点排序分组。
+                    .setMessageGroup("mg")
+                    //消息体。
+                    .setBody(body)
+                    .build();
+
+            //发送消息,需要关注发送结果,并捕获失败等异常
+            SendReceipt sendReceipt = producer.send(message);
+            System.out.println(sendReceipt.getMessageId());
+        }
+    }
+}