Browse Source

0324 rocketmq

Qing 1 year ago
parent
commit
ee6f5aa7d0

+ 38 - 0
rocketmq-demo/.gitignore

@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store

+ 5 - 0
rocketmq-demo/.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,5 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+  </profile>
+</component>

+ 163 - 0
rocketmq-demo/.idea/workspace.xml

@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="AutoImportSettings">
+    <option name="autoReloadType" value="SELECTIVE" />
+  </component>
+  <component name="ChangeListManager">
+    <list default="true" id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="">
+      <change afterPath="$PROJECT_DIR$/../.idea/VIPJAVA.iml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/google-java-format.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/inspectionProfiles/Project_Default.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Consumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/message/AsyncProducer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/message/OnewayProducer.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/jarRepositories.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/jarRepositories.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/misc.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/misc.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" afterDir="false" />
+    </list>
+    <option name="SHOW_DIALOG" value="false" />
+    <option name="HIGHLIGHT_CONFLICTS" value="true" />
+    <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
+    <option name="LAST_RESOLUTION" value="IGNORE" />
+  </component>
+  <component name="FileTemplateManagerImpl">
+    <option name="RECENT_TEMPLATES">
+      <list>
+        <option value="Class" />
+      </list>
+    </option>
+  </component>
+  <component name="Git.Settings">
+    <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
+  </component>
+  <component name="ProjectColorInfo"><![CDATA[{
+  "associatedIndex": 5
+}]]></component>
+  <component name="ProjectId" id="2e7riCGNDFRhmF7p5dUKghzhJFb" />
+  <component name="ProjectLevelVcsManager">
+    <ConfirmationsSetting value="2" id="Add" />
+  </component>
+  <component name="ProjectViewState">
+    <option name="showLibraryContents" value="true" />
+  </component>
+  <component name="PropertiesComponent"><![CDATA[{
+  "keyToString": {
+    "Application.AsyncProducer (1).executor": "Run",
+    "Application.AsyncProducer.executor": "Run",
+    "Application.Consumer.executor": "Run",
+    "Application.OnewayProducer.executor": "Run",
+    "Application.Producer.executor": "Run",
+    "Maven. [org.apache.maven.plugins:maven-archetype-plugin:RELEASE:generate].executor": "Run",
+    "RunOnceActivity.OpenProjectViewOnStart": "true",
+    "RunOnceActivity.ShowReadmeOnStart": "true",
+    "git-widget-placeholder": "master",
+    "kotlin-language-version-configured": "true",
+    "node.js.detected.package.eslint": "true",
+    "node.js.detected.package.tslint": "true",
+    "node.js.selected.package.eslint": "(autodetect)",
+    "node.js.selected.package.tslint": "(autodetect)",
+    "nodejs_package_manager_path": "npm",
+    "settings.editor.selected.configurable": "MavenSettings",
+    "vue.rearranger.settings.migration": "true"
+  }
+}]]></component>
+  <component name="RunManager" selected="Application.OnewayProducer">
+    <configuration name="AsyncProducer (1)" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.message.AsyncProducer" />
+      <module name="rocketmq-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.message.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <configuration name="AsyncProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.message.AsyncProducer" />
+      <module name="rocketmq-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.message.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <configuration name="Consumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Consumer" />
+      <module name="rocketmq-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <configuration name="OnewayProducer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.message.OnewayProducer" />
+      <module name="rocketmq-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.message.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <configuration name="Producer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Producer" />
+      <module name="rocketmq-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <recent_temporary>
+      <list>
+        <item itemvalue="Application.OnewayProducer" />
+        <item itemvalue="Application.AsyncProducer (1)" />
+        <item itemvalue="Application.AsyncProducer" />
+        <item itemvalue="Application.Consumer" />
+        <item itemvalue="Application.Producer" />
+      </list>
+    </recent_temporary>
+  </component>
+  <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
+  <component name="TaskManager">
+    <task active="true" id="Default" summary="Default task">
+      <changelist id="cc632264-3e04-48d2-b24e-1e5609483d8e" name="Changes" comment="" />
+      <created>1711266581667</created>
+      <option name="number" value="Default" />
+      <option name="presentableId" value="Default" />
+      <updated>1711266581667</updated>
+      <workItem from="1711266583057" duration="4047000" />
+    </task>
+    <servers />
+  </component>
+  <component name="TypeScriptGeneratedFilesManager">
+    <option name="version" value="3" />
+  </component>
+</project>

+ 24 - 0
rocketmq-demo/pom.xml

@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.sf</groupId>
+    <artifactId>rocketmq-demo</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>rocketmq-demo</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.rocketmq</groupId>
+            <artifactId>rocketmq-client</artifactId>
+            <version>5.2.0</version>
+        </dependency>
+    </dependencies>
+</project>

+ 36 - 0
rocketmq-demo/src/main/java/com/sf/helloworld/Consumer.java

@@ -0,0 +1,36 @@
+package com.sf.helloworld;
+
+import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
+import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
+import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
+import org.apache.rocketmq.common.message.MessageExt;
+
+import java.util.List;
+
+public class Consumer {
+
+    public static void main(String[] args) throws Exception {
+        // 通过消费者组名 创建消费者 使用push方式 推送消息
+        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("cg");
+        consumer.setNamesrvAddr("localhost:9876");
+        // 订阅topic 并设置和tag的匹配关系
+        consumer.subscribe("someTopic", "*");
+        // 注册消息监听器
+        //  使用的是 支持多并发的 消息监听器
+        consumer.registerMessageListener(new MessageListenerConcurrently() {
+            @Override
+            public ConsumeConcurrentlyStatus consumeMessage(
+                    List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
+                // list是多条消息的列表  consumeConcurrentlyContext 是消费的上下文
+                for (MessageExt messageExt : list) {
+                    System.out.println(messageExt);
+                }
+                // 在最后返回消费成功的状态
+                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
+            }
+        });
+        // 启动消费者
+        consumer.start();
+    }
+}

+ 25 - 0
rocketmq-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -0,0 +1,25 @@
+package com.sf.helloworld;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+public class Producer {
+
+    public static void main(String[] args) throws Exception {
+        // 根据生产者组创建生产者
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        // 设置连接注册中心的地址
+        producer.setNamesrvAddr("localhost:9876");
+        // 开启生产者
+        producer.start();
+
+        // 构造消息  通过主题topic 标签tag 消息体(字节数组)
+        Message message = new Message("someTopic","someTag","hello rocketmq".getBytes());
+        // 发送消息 使用send
+        SendResult result = producer.send(message);
+        System.out.println(result);
+        // 关闭生产者 使用shutdown
+        producer.shutdown();
+    }
+}

+ 42 - 0
rocketmq-demo/src/main/java/com/sf/message/AsyncProducer.java

@@ -0,0 +1,42 @@
+package com.sf.message;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.client.producer.SendCallback;
+import org.apache.rocketmq.client.producer.SendResult;
+import org.apache.rocketmq.common.message.Message;
+
+import java.util.concurrent.TimeUnit;
+
+/**
+ * 异步消息
+ */
+public class AsyncProducer {
+
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        // 异步发送失败后的重试次数 如果设置为0 代表不重试 非0 代表重试几次
+        producer.setRetryTimesWhenSendAsyncFailed(3);
+        // 设置Topic对应的queue数量 默认为4
+        producer.setDefaultTopicQueueNums(2);
+        // 启动
+        producer.start();
+
+        Message message = new Message("someTopic", "TagA", "hello asyn message 111".getBytes());
+        producer.send(message, new SendCallback() {
+            @Override
+            public void onSuccess(SendResult sendResult) {
+                System.out.println(sendResult);
+            }
+
+            @Override
+            public void onException(Throwable e) {
+                e.printStackTrace();
+            }
+        });
+        // 延迟关闭
+        TimeUnit.SECONDS.sleep(5);
+        producer.shutdown();
+
+    }
+}

+ 27 - 0
rocketmq-demo/src/main/java/com/sf/message/OnewayProducer.java

@@ -0,0 +1,27 @@
+package com.sf.message;
+
+import org.apache.rocketmq.client.producer.DefaultMQProducer;
+import org.apache.rocketmq.common.message.Message;
+
+/**
+ * 单向消息
+ */
+public class OnewayProducer {
+
+    public static void main(String[] args) throws Exception {
+        DefaultMQProducer producer = new DefaultMQProducer("pg");
+        producer.setNamesrvAddr("localhost:9876");
+        producer.start();
+
+        for (int i = 0; i < 10; i++) {
+            String body = "message" + i;
+            Message message = new Message("single", body.getBytes());
+            // 只管发送 不管是否接收
+            // 适用于消息很多 能接收消息丢失的场景
+            producer.sendOneway(message);
+        }
+
+        producer.shutdown();
+
+    }
+}