Qing 1 жил өмнө
parent
commit
001c58ff8e

+ 38 - 0
kafka-demo/.gitignore

@@ -0,0 +1,38 @@
+target/
+!.mvn/wrapper/maven-wrapper.jar
+!**/src/main/**/target/
+!**/src/test/**/target/
+
+### IntelliJ IDEA ###
+.idea/modules.xml
+.idea/jarRepositories.xml
+.idea/compiler.xml
+.idea/libraries/
+*.iws
+*.iml
+*.ipr
+
+### Eclipse ###
+.apt_generated
+.classpath
+.factorypath
+.project
+.settings
+.springBeans
+.sts4-cache
+
+### NetBeans ###
+/nbproject/private/
+/nbbuild/
+/dist/
+/nbdist/
+/.nb-gradle/
+build/
+!**/src/main/**/build/
+!**/src/test/**/build/
+
+### VS Code ###
+.vscode/
+
+### Mac OS ###
+.DS_Store

+ 7 - 0
kafka-demo/.idea/encodings.xml

@@ -0,0 +1,7 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="Encoding">
+    <file url="file://$PROJECT_DIR$/src/main/java" charset="UTF-8" />
+    <file url="file://$PROJECT_DIR$/src/main/resources" charset="UTF-8" />
+  </component>
+</project>

+ 6 - 0
kafka-demo/.idea/google-java-format.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="GoogleJavaFormatSettings">
+    <option name="enabled" value="false" />
+  </component>
+</project>

+ 5 - 0
kafka-demo/.idea/inspectionProfiles/Project_Default.xml

@@ -0,0 +1,5 @@
+<component name="InspectionProjectProfileManager">
+  <profile version="1.0">
+    <option name="myName" value="Project Default" />
+  </profile>
+</component>

+ 14 - 0
kafka-demo/.idea/misc.xml

@@ -0,0 +1,14 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="ExternalStorageConfigurationManager" enabled="true" />
+  <component name="MavenProjectsManager">
+    <option name="originalFiles">
+      <list>
+        <option value="$PROJECT_DIR$/pom.xml" />
+      </list>
+    </option>
+  </component>
+  <component name="ProjectRootManager" version="2" languageLevel="JDK_17" default="true" project-jdk-name="corretto-17" project-jdk-type="JavaSDK">
+    <output url="file://$PROJECT_DIR$/out" />
+  </component>
+</project>

+ 6 - 0
kafka-demo/.idea/vcs.xml

@@ -0,0 +1,6 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="VcsDirectoryMappings">
+    <mapping directory="$PROJECT_DIR$/.." vcs="Git" />
+  </component>
+</project>

+ 142 - 0
kafka-demo/.idea/workspace.xml

@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project version="4">
+  <component name="AutoImportSettings">
+    <option name="autoReloadType" value="SELECTIVE" />
+  </component>
+  <component name="ChangeListManager">
+    <list default="true" id="1538f064-ef05-457e-908b-934abbee9af6" name="Changes" comment="">
+      <change afterPath="$PROJECT_DIR$/../.idea/VIPJAVA.iml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/google-java-format.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.gitignore" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/encodings.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/google-java-format.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/inspectionProfiles/Project_Default.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/misc.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/vcs.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/.idea/workspace.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/pom.xml" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Consumer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/src/main/java/com/sf/helloworld/Producer.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQConsumerService.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/MQProducerService.java" afterDir="false" />
+      <change afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rocketmq/RocketMQController.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/java/com/lc/mapper/UserMapper.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day06_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../02_JavaWeb/day07_mybatis/src/main/resources/com/lc/mapper/UserMapper.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/jarRepositories.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/jarRepositories.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/misc.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/misc.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/.idea/workspace.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" beforeDir="false" afterPath="$PROJECT_DIR$/../gn_oa_vip27/src/main/java/com/sf/utils/JdbcUtil.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../rocketmq-demo/.idea/workspace.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../rocketmq-demo/.idea/workspace.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/pom.xml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/pom.xml" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/MyRabbitListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/MyRabbitListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/Param.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/Param.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/ParamArr.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/ParamArr.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/anno/TestParam.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/anno/TestParam.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/direct/DirectConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/direct/DirectListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/direct/DirectListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/fanout/FanoutConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/fanout/FanoutListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/fanout/FanoutListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/topic/TopicConfig.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicConfig.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/topic/TopicListener.java" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/java/com/sf/mq/rabbitmq/topic/TopicListener.java" afterDir="false" />
+      <change beforePath="$PROJECT_DIR$/../springboot-demo/src/main/resources/application.yml" beforeDir="false" afterPath="$PROJECT_DIR$/../springboot-demo/src/main/resources/application.yml" afterDir="false" />
+    </list>
+    <option name="SHOW_DIALOG" value="false" />
+    <option name="HIGHLIGHT_CONFLICTS" value="true" />
+    <option name="HIGHLIGHT_NON_ACTIVE_CHANGELIST" value="false" />
+    <option name="LAST_RESOLUTION" value="IGNORE" />
+  </component>
+  <component name="FileTemplateManagerImpl">
+    <option name="RECENT_TEMPLATES">
+      <list>
+        <option value="Class" />
+      </list>
+    </option>
+  </component>
+  <component name="Git.Settings">
+    <option name="RECENT_GIT_ROOT_PATH" value="$PROJECT_DIR$/.." />
+  </component>
+  <component name="ProjectColorInfo"><![CDATA[{
+  "associatedIndex": 0
+}]]></component>
+  <component name="ProjectId" id="2eReAFaj1vPzh8sd1ADhP4vWaFS" />
+  <component name="ProjectLevelVcsManager">
+    <ConfirmationsSetting value="2" id="Add" />
+  </component>
+  <component name="ProjectViewState">
+    <option name="showLibraryContents" value="true" />
+  </component>
+  <component name="PropertiesComponent"><![CDATA[{
+  "keyToString": {
+    "Application.Consumer.executor": "Run",
+    "Application.Producer.executor": "Run",
+    "Maven. [org.apache.maven.plugins:maven-archetype-plugin:RELEASE:generate].executor": "Run",
+    "RunOnceActivity.OpenProjectViewOnStart": "true",
+    "RunOnceActivity.ShowReadmeOnStart": "true",
+    "SHARE_PROJECT_CONFIGURATION_FILES": "true",
+    "git-widget-placeholder": "master",
+    "kotlin-language-version-configured": "true",
+    "node.js.detected.package.eslint": "true",
+    "node.js.detected.package.tslint": "true",
+    "node.js.selected.package.eslint": "(autodetect)",
+    "node.js.selected.package.tslint": "(autodetect)",
+    "nodejs_package_manager_path": "npm",
+    "settings.editor.selected.configurable": "MavenSettings",
+    "vue.rearranger.settings.migration": "true"
+  }
+}]]></component>
+  <component name="RecentsManager">
+    <key name="CopyClassDialog.RECENTS_KEY">
+      <recent name="com.sf.helloworld" />
+    </key>
+  </component>
+  <component name="RunManager" selected="Application.Producer">
+    <configuration name="Consumer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Consumer" />
+      <module name="kafka-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <configuration name="Producer" type="Application" factoryName="Application" temporary="true" nameIsGenerated="true">
+      <option name="MAIN_CLASS_NAME" value="com.sf.helloworld.Producer" />
+      <module name="kafka-demo" />
+      <extension name="coverage">
+        <pattern>
+          <option name="PATTERN" value="com.sf.helloworld.*" />
+          <option name="ENABLED" value="true" />
+        </pattern>
+      </extension>
+      <method v="2">
+        <option name="Make" enabled="true" />
+      </method>
+    </configuration>
+    <recent_temporary>
+      <list>
+        <item itemvalue="Application.Producer" />
+        <item itemvalue="Application.Consumer" />
+      </list>
+    </recent_temporary>
+  </component>
+  <component name="SpellCheckerSettings" RuntimeDictionaries="0" Folders="0" CustomDictionaries="0" DefaultDictionary="application-level" UseSingleDictionary="true" transferred="true" />
+  <component name="TaskManager">
+    <task active="true" id="Default" summary="Default task">
+      <changelist id="1538f064-ef05-457e-908b-934abbee9af6" name="Changes" comment="" />
+      <created>1711871672348</created>
+      <option name="number" value="Default" />
+      <option name="presentableId" value="Default" />
+      <updated>1711871672348</updated>
+      <workItem from="1711871673523" duration="1977000" />
+    </task>
+    <servers />
+  </component>
+  <component name="TypeScriptGeneratedFilesManager">
+    <option name="version" value="3" />
+  </component>
+</project>

+ 24 - 0
kafka-demo/pom.xml

@@ -0,0 +1,24 @@
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.sf</groupId>
+    <artifactId>kafka-demo</artifactId>
+    <version>1.0-SNAPSHOT</version>
+    <packaging>jar</packaging>
+
+    <name>kafka-demo</name>
+    <url>http://maven.apache.org</url>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+    </properties>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>3.7.0</version>
+        </dependency>
+    </dependencies>
+</project>

+ 49 - 0
kafka-demo/src/main/java/com/sf/helloworld/Consumer.java

@@ -0,0 +1,49 @@
+package com.sf.helloworld;
+
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.net.InetAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class Consumer {
+
+    public static void main(String[] args) throws Exception {
+        // 准备kafka的连接参数
+        Properties properties = new Properties();
+        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "demo-group");
+        properties.put(ConsumerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName());
+
+        // 消费者订阅topic
+        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
+        String topic = "quickstart-events";
+//        String[] arr = new String[]{topic};
+//        consumer.subscribe(Arrays.asList(arr));
+        consumer.subscribe(Collections.singletonList(topic));
+
+        while (true) {
+            // 消费者拉取消息
+            ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
+            System.out.println("收到消息");
+            for (ConsumerRecord<String, String> record : records) {
+                System.out.println(record);
+            }
+            TimeUnit.SECONDS.sleep(1);
+        }
+    }
+}

+ 37 - 0
kafka-demo/src/main/java/com/sf/helloworld/Producer.java

@@ -0,0 +1,37 @@
+package com.sf.helloworld;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+
+import java.util.Properties;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+
+public class Producer {
+
+    public static void main(String[] args) throws Exception{
+        // 准备kafka的连接参数
+        Properties properties = new Properties();
+        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
+        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        // 配置应答参数
+        properties.put(ProducerConfig.ACKS_CONFIG, "1");
+
+        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);
+        String topic = "quickstart-events";
+        for (int i = 1000; i < 1010; i++) {
+            String msg = "hello," + i;
+            // kafka的生产者发送消息是按照记录ProducerRecord来发送的
+            // ProducerRecord是通过topic和msg来构造的
+            ProducerRecord<String, String> record = new ProducerRecord<>(topic, msg);
+            kafkaProducer.send(record);
+            System.out.println("消息发送成功,msg=" + msg);
+            TimeUnit.SECONDS.sleep(1);
+        }
+        kafkaProducer.close();
+    }
+}