Qing 1 жил өмнө
parent
commit
f7f4cbf78c

+ 54 - 31
rocketmq-demo/.idea/workspace.xml

@@ -4,18 +4,11 @@
     <option name="autoReloadType" value="SELECTIVE" />
   </component>
   <component name="ChangeListManager">
-    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0327 rocketmq">
+    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0331 rocketmq">
       <change afterPath="$PROJECT_DIR$/../.idea/VIPJAVA.iml" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/google-java-format.xml" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Questioner1.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/CallBack.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Consumer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Hotel.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback1/Test.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/sql/FilterBySQLConsumer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/sql/FilterBySQLProducer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/neworder/OrderConsumerExample.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/neworder/OrderProducerExample.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newdelay/NewDelayConsumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newdelay/NewDelayProducer.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java" afterDir="false" />
@@ -27,12 +20,6 @@
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Answer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Answer.java" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Questioner.java" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person1.java" beforeDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagConsumer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/tag/FilterByTagConsumer.java" afterDir="false" />
-      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagProducer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/tag/FilterByTagProducer.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/pom.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/Param.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java" afterDir="false" />
@@ -115,10 +102,11 @@
 }]]></component>
   <component name="RecentsManager">
     <key name="CopyClassDialog.RECENTS_KEY">
+      <recent name="com.sf.newdelay" />
       <recent name="com.sf.callback" />
     </key>
   </component>
-  <component name="RunManager" selected="Application.FilterBySQLConsumer">
+  <component name="RunManager" selected="Application.NewDelayProducer">
     <configuration name="FilterBySQLConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
       <option name="MAIN_CLASS_NAME" value="com.sf.filter.sql.FilterBySQLConsumer" />
       <module name="rocketmq-demo" />
@@ -145,12 +133,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="OrderConsumerExample" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.neworder.OrderConsumerExample" />
+    <configuration name="NewDelayConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.newdelay.NewDelayConsumer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.neworder.*" />
+          <option name="PATTERN" value="com.sf.newdelay.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -158,12 +146,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="OrderProducerExample" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.neworder.OrderProducerExample" />
+    <configuration name="NewDelayProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.newdelay.NewDelayProducer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.neworder.*" />
+          <option name="PATTERN" value="com.sf.newdelay.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -171,12 +159,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="com.sf.callback1.Test" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.callback1.Test" />
+    <configuration name="OrderConsumerExample" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.neworder.OrderConsumerExample" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.callback1.*" />
+          <option name="PATTERN" value="com.sf.neworder.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -186,11 +174,11 @@
     </configuration>
     <recent_temporary>
       <list>
+        <item itemvalue="Application.NewDelayProducer" />
+        <item itemvalue="Application.NewDelayConsumer" />
         <item itemvalue="Application.FilterBySQLConsumer" />
         <item itemvalue="Application.FilterBySQLProducer" />
         <item itemvalue="Application.OrderConsumerExample" />
-        <item itemvalue="Application.OrderProducerExample" />
-        <item itemvalue="Application.com.sf.callback1.Test" />
       </list>
     </recent_temporary>
   </component>
@@ -205,7 +193,7 @@
       <workItem from="1711266583057" duration="4122000" />
       <workItem from="1711533395564" duration="9797000" />
       <workItem from="1711589785583" duration="267000" />
-      <workItem from="1711847771311" duration="4311000" />
+      <workItem from="1711847771311" duration="5698000" />
     </task>
     <task id="LOCAL-00001" summary="0324 rocketmq">
       <option name="closed" value="true" />
@@ -231,7 +219,15 @@
       <option name="project" value="LOCAL" />
       <updated>1711546222801</updated>
     </task>
-    <option name="localTasksCounter" value="4" />
+    <task id="LOCAL-00004" summary="0331 rocketmq">
+      <option name="closed" value="true" />
+      <created>1711855553008</created>
+      <option name="number" value="00004" />
+      <option name="presentableId" value="LOCAL-00004" />
+      <option name="project" value="LOCAL" />
+      <updated>1711855553008</updated>
+    </task>
+    <option name="localTasksCounter" value="5" />
     <servers />
   </component>
   <component name="TypeScriptGeneratedFilesManager">
@@ -242,6 +238,7 @@
       <map>
         <entry key="3895a3d2-79df-4765-8c62-8961ee80125f" value="TOOL_WINDOW" />
         <entry key="9f61037c-b616-4293-ba61-25f4ce732917" value="TOOL_WINDOW" />
+        <entry key="d1374c27-f9b5-419b-a9b2-c4ab941dcda6" value="TOOL_WINDOW" />
       </map>
     </option>
     <option name="TAB_STATES">
@@ -296,6 +293,31 @@
             </State>
           </value>
         </entry>
+        <entry key="d1374c27-f9b5-419b-a9b2-c4ab941dcda6">
+          <value>
+            <State>
+              <option name="FILTERS">
+                <map>
+                  <entry key="branch">
+                    <value>
+                      <list>
+                        <option value="HEAD" />
+                      </list>
+                    </value>
+                  </entry>
+                  <entry key="structure">
+                    <value>
+                      <list>
+                        <option value="dir:/Users/Qing/IdeaProjects/SiFu/VIPJAVA/rocketmq-demo" />
+                      </list>
+                    </value>
+                  </entry>
+                </map>
+              </option>
+              <option name="SHOW_ONLY_AFFECTED_CHANGES" value="true" />
+            </State>
+          </value>
+        </entry>
       </map>
     </option>
   </component>
@@ -303,6 +325,7 @@
     <MESSAGE value="0324 rocketmq" />
     <MESSAGE value="0327 callback" />
     <MESSAGE value="0327 rocketmq" />
-    <option name="LAST_COMMIT_MESSAGE" value="0327 rocketmq" />
+    <MESSAGE value="0331 rocketmq" />
+    <option name="LAST_COMMIT_MESSAGE" value="0331 rocketmq" />
   </component>
 </project>

+ 55 - 0
rocketmq-demo/src/main/java/com/sf/newdelay/NewDelayConsumer.java

@@ -0,0 +1,55 @@
+package com.sf.newdelay;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Collections;
+
+public class NewDelayConsumer {
+    private static final Logger logger = LoggerFactory.getLogger(NewDelayConsumer.class);
+
+    private NewDelayConsumer() {
+    }
+
+    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
+        final ClientServiceProvider provider = ClientServiceProvider.loadService();
+        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+        String endpoints = "localhost:8081";
+        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+            .setEndpoints(endpoints)
+            .build();
+        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
+        String tag = "*";
+        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
+        // 为消费者指定所属的消费者分组,Group需要提前创建。
+        String consumerGroup = "cg";
+        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
+        String topic = "DelayTopic";
+        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
+        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            // 设置消费者分组。
+            .setConsumerGroup(consumerGroup)
+            // 设置预绑定的订阅关系。
+            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+            // 设置消费监听器。
+            .setMessageListener(messageView -> {
+                // 处理消息并返回消费结果。
+                System.out.println(messageView.getDeliveryTimestamp());
+                //根据消费结果返回状态。
+                return ConsumeResult.SUCCESS;
+            })
+            .build();
+        Thread.sleep(Long.MAX_VALUE);
+        // 如果不需要再使用 PushConsumer,可关闭该实例。
+        // pushConsumer.close();
+    }
+}

+ 55 - 0
rocketmq-demo/src/main/java/com/sf/newdelay/NewDelayProducer.java

@@ -0,0 +1,55 @@
+package com.sf.newdelay;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.message.MessageBuilder;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.apache.rocketmq.client.java.message.MessageBuilderImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NewDelayProducer {
+    private static final Logger logger = LoggerFactory.getLogger(NewDelayProducer.class);
+
+    public static void main(String[] args) throws ClientException {
+        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+        String endpoint = "localhost:8081";
+        // 消息发送的目标Topic名称,需要提前创建。
+        String topic = "DelayTopic";
+        ClientServiceProvider provider = ClientServiceProvider.loadService();
+        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
+        ClientConfiguration configuration = builder.build();
+        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
+        Producer producer = provider.newProducerBuilder()
+                .setTopics(topic)
+                .setClientConfiguration(configuration)
+                .build();
+        // 普通消息发送。
+        //定时/延时消息发送
+        MessageBuilder messageBuilder = new MessageBuilderImpl();
+        ;
+        //以下示例表示:延迟时间为10分钟之后的Unix时间戳。
+        Long deliverTimeStamp = System.currentTimeMillis() + 10 * 1000;
+        Message message = messageBuilder.setTopic(topic)
+                //设置消息索引键,可根据关键字精确查找某条消息。
+                .setKeys("messageKey")
+                //设置消息Tag,用于消费端根据指定Tag过滤消息。
+                .setTag("messageTag")
+                .setDeliveryTimestamp(deliverTimeStamp)
+                //消息体
+                .setBody("messageBody".getBytes())
+                .build();
+        try {
+            //发送消息,需要关注发送结果,并捕获失败等异常。
+            SendReceipt sendReceipt = producer.send(message);
+            System.out.println(sendReceipt.getMessageId());
+        } catch (ClientException e) {
+            e.printStackTrace();
+        }
+        // producer.close();
+    }
+}