Browse Source

0327 rocketmq

Qing 1 year ago
parent
commit
6b756c9bb9

+ 71 - 30
rocketmq-demo/.idea/workspace.xml

@@ -4,14 +4,21 @@
     <option name="autoReloadType" value="SELECTIVE" />
   </component>
   <component name="ChangeListManager">
-    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0324 rocketmq">
+    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="0327 callback">
       <change afterPath="$PROJECT_DIR$/../.idea/VIPJAVA.iml" afterDir="false" />
       <change afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/google-java-format.xml" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Answer.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/CallBack.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person1.java" afterDir="false" />
-      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/delay/DelayConsumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/delay/DelayProducer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagConsumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/filter/FilterByTagProducer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newhelloworld/ProducerExample.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/newhelloworld/PushConsumerExample.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/order/OrderConsumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/order/OrderProducer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/resources/log4j.properties" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
@@ -20,6 +27,22 @@
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" afterDir="false" />
       <change beforePath="$PROJECT_DIR$/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Person.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/callback/Test.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" beforeDir="false" afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/pom.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/Param.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/ParamArr.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/ParamArr.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/TestParam.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/TestParam.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/direct/DirectConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/direct/DirectListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/fanout/FanoutConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/fanout/FanoutListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/topic/TopicConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/topic/TopicListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/resources/application.yml" afterDir="false" />
     </list>
     <option name="SHOW_DIALOG" value="false" />
     <option name="HIGHLIGHT_CONFLICTS" value="true" />
@@ -57,8 +80,17 @@
     "Application.AsyncProducer (1).executor": "Run",
     "Application.AsyncProducer.executor": "Run",
     "Application.Consumer.executor": "Run",
+    "Application.DelayConsumer.executor": "Run",
+    "Application.DelayProducer.executor": "Run",
+    "Application.FilterByTagConsumer.executor": "Run",
+    "Application.FilterByTagProducer.executor": "Run",
     "Application.OnewayProducer.executor": "Run",
+    "Application.OrderConsumer (1).executor": "Run",
+    "Application.OrderConsumer.executor": "Run",
+    "Application.OrderProducer.executor": "Run",
     "Application.Producer.executor": "Run",
+    "Application.ProducerExample.executor": "Run",
+    "Application.PushConsumerExample.executor": "Run",
     "Application.Test.executor": "Run",
     "Maven. [org.apache.maven.plugins:maven-archetype-plugin:RELEASE:generate].executor": "Run",
     "RunOnceActivity.OpenProjectViewOnStart": "true",
@@ -79,13 +111,13 @@
       <recent name="com.sf.callback" />
     </key>
   </component>
-  <component name="RunManager" selected="Application.Test">
-    <configuration name="AsyncProducer (1)" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.message.AsyncProducer" />
+  <component name="RunManager" selected="Application.FilterByTagConsumer">
+    <configuration name="DelayConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.delay.DelayConsumer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.message.*" />
+          <option name="PATTERN" value="com.sf.delay.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -93,12 +125,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="Consumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Consumer" />
+    <configuration name="DelayProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.delay.DelayProducer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="PATTERN" value="com.sf.delay.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -106,12 +138,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="OnewayProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.message.OnewayProducer" />
+    <configuration name="FilterByTagConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.filter.FilterByTagConsumer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.message.*" />
+          <option name="PATTERN" value="com.sf.filter.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -119,12 +151,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="Producer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Producer" />
+    <configuration name="FilterByTagProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.filter.FilterByTagProducer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="PATTERN" value="com.sf.filter.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -132,12 +164,12 @@
         <option name="Make" enabled="true" />
       </method>
     </configuration>
-    <configuration name="Test" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
-      <option name="MAIN_CLASS_NAME" value="com.sf.callback.Test" />
+    <configuration name="OrderConsumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.order.OrderConsumer" />
       <module name="rocketmq-demo" />
       <extension name="coverage">
         <pattern>
-          <option name="PATTERN" value="com.sf.callback.*" />
+          <option name="PATTERN" value="com.sf.order.*" />
           <option name="ENABLED" value="true" />
         </pattern>
       </extension>
@@ -147,11 +179,11 @@
     </configuration>
     <recent_temporary>
       <list>
-        <item itemvalue="Application.Test" />
-        <item itemvalue="Application.Producer" />
-        <item itemvalue="Application.Consumer" />
-        <item itemvalue="Application.OnewayProducer" />
-        <item itemvalue="Application.AsyncProducer (1)" />
+        <item itemvalue="Application.FilterByTagConsumer" />
+        <item itemvalue="Application.FilterByTagProducer" />
+        <item itemvalue="Application.DelayProducer" />
+        <item itemvalue="Application.DelayConsumer" />
+        <item itemvalue="Application.OrderConsumer" />
       </list>
     </recent_temporary>
   </component>
@@ -164,7 +196,7 @@
       <option name="presentableId" value="Default" />
       <updated>1711266581667</updated>
       <workItem from="1711266583057" duration="4122000" />
-      <workItem from="1711533395564" duration="4228000" />
+      <workItem from="1711533395564" duration="9098000" />
     </task>
     <task id="LOCAL-00001" summary="0324 rocketmq">
       <option name="closed" value="true" />
@@ -174,7 +206,15 @@
       <option name="project" value="LOCAL" />
       <updated>1711271538450</updated>
     </task>
-    <option name="localTasksCounter" value="2" />
+    <task id="LOCAL-00002" summary="0327 callback">
+      <option name="closed" value="true" />
+      <created>1711538107269</created>
+      <option name="number" value="00002" />
+      <option name="presentableId" value="LOCAL-00002" />
+      <option name="project" value="LOCAL" />
+      <updated>1711538107269</updated>
+    </task>
+    <option name="localTasksCounter" value="3" />
     <servers />
   </component>
   <component name="TypeScriptGeneratedFilesManager">
@@ -218,6 +258,7 @@
   </component>
   <component name="VcsManagerConfiguration">
     <MESSAGE value="0324 rocketmq" />
-    <option name="LAST_COMMIT_MESSAGE" value="0324 rocketmq" />
+    <MESSAGE value="0327 callback" />
+    <option name="LAST_COMMIT_MESSAGE" value="0327 callback" />
   </component>
 </project>

+ 16 - 0
rocketmq-demo/pom.xml

@@ -20,5 +20,21 @@
             <artifactId>rocketmq-client</artifactId>
             <version>5.2.0</version>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client-java</artifactId>
+            <version>5.0.6</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+            <version>2.1.0-alpha1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>2.1.0-alpha1</version>
+        </dependency>
     </dependencies>
 </project>

+ 16 - 0
rocketmq-demo/src/main/java/com/sf/callback/Person.java

@@ -15,6 +15,22 @@ public class Person implements CallBack {
         answer.ans(this);
     }
 
+    // 异步调用提问
+    public void askAsync() {
+//        new Thread(new Runnable() {
+//            @Override
+//            public void run() {
+//                // 此时会报错 因为传过去的是Runnable对象
+//                answer.ans(this);
+//            }
+//        }).start();
+        // lambda表达式
+//        new Thread(() -> {
+//            answer.ans(this);
+//        }).start();
+        new Thread(() -> answer.ans(this)).start();
+    }
+
     // 明确answer回答之后的  通知方式(回调逻辑)
     @Override
     public void call(String str) {

+ 4 - 3
rocketmq-demo/src/main/java/com/sf/callback/Test.java

@@ -8,10 +8,11 @@ public class Test {
         Answer answer = new Answer();
         // 创建了一个提问者 使用了之前创建的回答者
         Person person = new Person(answer);
-        person.ask();
+//        person.ask();
+        person.askAsync();
 
-        Person1 person1 = new Person1(answer);
-        person1.ask();
+//        Person1 person1 = new Person1(answer);
+//        person1.ask();
         System.out.println("end");
     }
 }

+ 35 - 0
rocketmq-demo/src/main/java/com/sf/delay/DelayConsumer.java

@@ -0,0 +1,35 @@
+package com.sf.delay;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+import java.util.List;
+
+public class DelayConsumer {
+    public static void main(String[] args) throws MQClientException {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
+        consumer.setNamesrvAddr("localhost:9876");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        consumer.subscribe("TopicB", "*");
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                for (MessageExt msg : msgs) {
+                    // 输出消息被消费的时间
+                    System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
+                    System.out.println(" ," + msg);
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.println("Consumer Started");
+    }
+}

+ 27 - 0
rocketmq-demo/src/main/java/com/sf/delay/DelayProducer.java

@@ -0,0 +1,27 @@
+package com.sf.delay;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+import java.text.SimpleDateFormat;
+import java.util.Date;
+
+public class DelayProducer {
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+        for (int i = 0 ; i < 10 ; i++) {
+            byte[] body = ("Hi," + i).getBytes();
+            Message msg = new Message("TopicB", "someTag", body);
+            // 指定消息延迟等级为 3 级,即延迟10s
+             msg.setDelayTimeLevel(2);
+            SendResult sendResult = producer.send(msg);
+            // 输出消息被发送的时间
+            System.out.print(new SimpleDateFormat("mm:ss").format(new Date()));
+            System.out.println(" ," + sendResult);
+        }
+        producer.shutdown();
+    }
+}

+ 31 - 0
rocketmq-demo/src/main/java/com/sf/filter/FilterByTagConsumer.java

@@ -0,0 +1,31 @@
+package com.sf.filter;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class FilterByTagConsumer {
+    public static void main(String[] args) throws Exception {
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("pg");
+        consumer.setNamesrvAddr("localhost:9876");
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        // 订阅时通过表达式 筛选其中的tag
+        consumer.subscribe("myTopic", "myTagA || myTagB");
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                for (MessageExt me : msgs) {
+                    System.out.println(me);
+                }
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        consumer.start();
+        System.out.println("Consumer Started");
+    }
+}

+ 22 - 0
rocketmq-demo/src/main/java/com/sf/filter/FilterByTagProducer.java

@@ -0,0 +1,22 @@
+package com.sf.filter;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+public class FilterByTagProducer {
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+        String[] tags = {"myTagA", "myTagB", "myTagC"};
+        for (int i = 0; i < 10; i++) {
+            byte[] body = ("Hi," + i).getBytes();
+            String tag = tags[i % tags.length];
+            Message msg = new Message("myTopic", tag, body);
+            SendResult sendResult = producer.send(msg);
+            System.out.println(sendResult);
+        }
+        producer.shutdown();
+    }
+}

+ 2 - 0
rocketmq-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -11,6 +11,8 @@ public class Producer {
         DefaultMQProducer producer = new DefaultMQProducer("pg");
         // 设置连接注册中心的地址
         producer.setNamesrvAddr("localhost:9876");
+        producer.setSendMsgTimeout(3000);
+        producer.setRetryTimesWhenSendFailed(2);
         // 开启生产者
         producer.start();
 

+ 48 - 0
rocketmq-demo/src/main/java/com/sf/newhelloworld/ProducerExample.java

@@ -0,0 +1,48 @@
+package com.sf.newhelloworld;
+
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientConfigurationBuilder;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.message.Message;
+import org.apache.rocketmq.client.apis.producer.Producer;
+import org.apache.rocketmq.client.apis.producer.SendReceipt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ProducerExample {
+    private static final Logger logger = LoggerFactory.getLogger(ProducerExample.class);
+
+    public static void main(String[] args) throws ClientException {
+        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+        String endpoint = "localhost:8081";
+        // 消息发送的目标Topic名称,需要提前创建。
+        String topic = "TestTopic";
+        ClientServiceProvider provider = ClientServiceProvider.loadService();
+        ClientConfigurationBuilder builder = ClientConfiguration.newBuilder().setEndpoints(endpoint);
+        ClientConfiguration configuration = builder.build();
+        // 初始化Producer时需要设置通信配置以及预绑定的Topic。
+        Producer producer = provider.newProducerBuilder()
+            .setTopics(topic)
+            .setClientConfiguration(configuration)
+            .build();
+        // 普通消息发送。
+        Message message = provider.newMessageBuilder()
+            .setTopic(topic)
+            // 设置消息索引键,可根据关键字精确查找某条消息。
+            .setKeys("messageKey")
+            // 设置消息Tag,用于消费端根据指定Tag过滤消息。
+            .setTag("messageTag")
+            // 消息体。
+            .setBody("messageBody".getBytes())
+            .build();
+        try {
+            // 发送消息,需要关注发送结果,并捕获失败等异常。
+            SendReceipt sendReceipt = producer.send(message);
+            logger.info("Send message successfully, messageId={}", sendReceipt.getMessageId());
+        } catch (ClientException e) {
+            logger.error("Failed to send message", e);
+        }
+        // producer.close();
+    }
+}

+ 53 - 0
rocketmq-demo/src/main/java/com/sf/newhelloworld/PushConsumerExample.java

@@ -0,0 +1,53 @@
+package com.sf.newhelloworld;
+
+import java.io.IOException;
+import java.util.Collections;
+import org.apache.rocketmq.client.apis.ClientConfiguration;
+import org.apache.rocketmq.client.apis.ClientException;
+import org.apache.rocketmq.client.apis.ClientServiceProvider;
+import org.apache.rocketmq.client.apis.consumer.ConsumeResult;
+import org.apache.rocketmq.client.apis.consumer.FilterExpression;
+import org.apache.rocketmq.client.apis.consumer.FilterExpressionType;
+import org.apache.rocketmq.client.apis.consumer.PushConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class PushConsumerExample {
+    private static final Logger logger = LoggerFactory.getLogger(PushConsumerExample.class);
+
+    private PushConsumerExample() {
+    }
+
+    public static void main(String[] args) throws ClientException, IOException, InterruptedException {
+        final ClientServiceProvider provider = ClientServiceProvider.loadService();
+        // 接入点地址,需要设置成Proxy的地址和端口列表,一般是xxx:8081;xxx:8081。
+        String endpoints = "localhost:8081";
+        ClientConfiguration clientConfiguration = ClientConfiguration.newBuilder()
+            .setEndpoints(endpoints)
+            .build();
+        // 订阅消息的过滤规则,表示订阅所有Tag的消息。
+        String tag = "*";
+        FilterExpression filterExpression = new FilterExpression(tag, FilterExpressionType.TAG);
+        // 为消费者指定所属的消费者分组,Group需要提前创建。
+        String consumerGroup = "cg";
+        // 指定需要订阅哪个目标Topic,Topic需要提前创建。
+        String topic = "TestTopic";
+        // 初始化PushConsumer,需要绑定消费者分组ConsumerGroup、通信参数以及订阅关系。
+        PushConsumer pushConsumer = provider.newPushConsumerBuilder()
+            .setClientConfiguration(clientConfiguration)
+            // 设置消费者分组。
+            .setConsumerGroup(consumerGroup)
+            // 设置预绑定的订阅关系。
+            .setSubscriptionExpressions(Collections.singletonMap(topic, filterExpression))
+            // 设置消费监听器。
+            .setMessageListener(messageView -> {
+                // 处理消息并返回消费结果。
+                logger.info("Consume message successfully, messageId={}", messageView.getMessageId());
+                return ConsumeResult.SUCCESS;
+            })
+            .build();
+        Thread.sleep(Long.MAX_VALUE);
+        // 如果不需要再使用 PushConsumer,可关闭该实例。
+        // pushConsumer.close();
+    }
+}

+ 45 - 0
rocketmq-demo/src/main/java/com/sf/order/OrderConsumer.java

@@ -0,0 +1,45 @@
+package com.sf.order;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.client.exception.MQClientException;
+import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class OrderConsumer {
+    
+    public static void main(String[] args) throws MQClientException {
+        // 定义一个push消费者
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
+        // 指定nameServer
+        consumer.setNamesrvAddr("localhost:9876");
+        // 指定从第一条消息开始消费
+        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
+        // 指定消费topic与tag
+        consumer.subscribe("TopicC", "*");
+        // 指定采用“广播模式”进行消费,默认为“集群模式”
+        // consumer.setMessageModel(MessageModel.BROADCASTING);
+        // 注册消息监听器
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            // 一旦broker中有了其订阅的消息就会触发该方法的执行,
+            // 其返回值为当前consumer消费的状态
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
+                // 逐条消费消息
+                for (MessageExt msg : msgs) {
+                    System.out.println(msg);
+                    System.out.println(new String(msg.getBody()));
+                }
+                // 返回消费状态:消费成功
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // 开启消费者消费
+        consumer.start();
+        System.out.println("Consumer Started");
+    }
+}

+ 33 - 0
rocketmq-demo/src/main/java/com/sf/order/OrderProducer.java

@@ -0,0 +1,33 @@
+package com.sf.order;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.MessageQueueSelector;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+import org.apache.rocketmq.common.message.MessageQueue;
+
+import java.util.List;
+
+public class OrderProducer {
+
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+        for (int i = 0 ; i < 100 ; i++) {
+            Integer orderId = i;
+            byte[] body = ("Hi," + i).getBytes();
+            Message msg = new Message("TopicC", "TagC", body);
+            SendResult sendResult = producer.send(msg, new MessageQueueSelector() {
+                @Override
+                public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {
+                        Integer id = (Integer) arg;
+                        int index = id % mqs.size();
+                        return mqs.get(index);
+                    }
+                }, orderId);
+            System.out.println(sendResult);
+        }
+        producer.shutdown();
+    }
+}

+ 17 - 0
rocketmq-demo/src/main/resources/log4j.properties

@@ -0,0 +1,17 @@
+log4j.rootLogger=debug, stdout, R
+
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+
+# Pattern to output the caller's file name and line number.
+log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n
+
+log4j.appender.R=org.apache.log4j.RollingFileAppender
+log4j.appender.R.File=example.log
+
+log4j.appender.R.MaxFileSize=100KB
+# Keep one backup file
+log4j.appender.R.MaxBackupIndex=5
+
+log4j.appender.R.layout=org.apache.log4j.PatternLayout
+log4j.appender.R.layout.ConversionPattern=%p %t %c - %m%n