瀏覽代碼

0719 群聊系统

Qing 10 月之前
父節點
當前提交
c83a4f25bd

+ 98 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/chat/ChatClient.java

@@ -0,0 +1,98 @@
+package com.sf.javase.io.socket.chat;
+
+import lombok.SneakyThrows;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectableChannel;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.Iterator;
+import java.util.Scanner;
+import java.util.Set;
+
+// 群聊系统
+// 1、构建能实时发送消息的客户端,服务端能够监听客户端的状态(上线、下线)
+// 2、一个客户端发送消息,其他所有客户端接收消息,达到群聊效果
+// 3、一个客服端将消息发给服务端,服务端找到其他客户端,将消息广播出去
+public class ChatClient {
+
+    private SocketChannel socketChannel;
+    private Selector selector;
+
+    @SneakyThrows
+    public ChatClient(String ip, int port) {
+        selector = Selector.open();
+
+        SocketAddress socketAddress = new InetSocketAddress(ip, port);
+        socketChannel = SocketChannel.open(socketAddress);
+        socketChannel.configureBlocking(false);
+        // 作为客户端 建立连接后 要监听读事件  服务端返回的数据
+        socketChannel.register(selector, SelectionKey.OP_READ);
+        // 打印客户端所占用的端口  借以区分不同的客户端
+        System.out.println("用户" + socketChannel.getLocalAddress() + "上线了");
+    }
+
+    @SneakyThrows
+    public void sendData(String message) {
+        // 可以简化使用buffer的方式 直接将message中的字节数组通过wrap方法传入buffer
+        ByteBuffer buffer = ByteBuffer.wrap(message.getBytes());
+        socketChannel.write(buffer);
+    }
+
+    // 读取数据
+    @SneakyThrows
+    public void readData() {
+        int num = selector.select();
+        // 如果没有要处理的事件 返回
+        if (num == 0) {
+            return;
+        }
+        // 接收要执行的事件
+        Set<SelectionKey> selectionKeys = selector.selectedKeys();
+        Iterator<SelectionKey> iterator = selectionKeys.iterator();
+        while (iterator.hasNext()) {
+            SelectionKey key = iterator.next();
+            iterator.remove();
+            // 如果不是读操作 跳过
+            if (!key.isReadable()) {
+                continue;
+            }
+            SocketChannel channel = (SocketChannel) key.channel();
+            ByteBuffer buffer = ByteBuffer.allocate(1024);
+            int numRead = channel.read(buffer);
+            // 从buffer的数组中取出字节数组  然后读出多少数据 给到string构造器
+            String message = new String(buffer.array(), 0, numRead);
+            System.out.println(message);
+        }
+    }
+
+
+    public static void main(String[] args) {
+        // 通过构造器建立初始的连接
+        ChatClient client = new ChatClient("127.0.0.1", 6666);
+        // 通过另外的线程读取数据
+        new Thread(() -> {
+            // 不断的尝试进行读取数据
+            while (true) {
+                client.readData();
+                try {
+                    // 这里本身启动了一个线程 要调用当前线程的sleep
+                    Thread.currentThread().sleep(2000);
+                } catch (InterruptedException e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }).start();
+
+        // 当前的main线程接收键盘的输入发送数据
+        Scanner scanner = new Scanner(System.in);
+        while (scanner.hasNextLine()) {
+            String line = scanner.nextLine();
+            // 将当前输入的数据发送给服务端
+            client.sendData(line);
+        }
+    }
+}

+ 109 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/chat/ChatServer.java

@@ -0,0 +1,109 @@
+package com.sf.javase.io.socket.chat;
+
+import lombok.SneakyThrows;
+
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+// 服务端
+public class ChatServer {
+
+    // 服务端通道
+    private ServerSocketChannel serverSocketChannel;
+    private Selector selector;
+
+    @SneakyThrows
+    public ChatServer(int port) {
+        selector = Selector.open();
+
+        serverSocketChannel = ServerSocketChannel.open();
+        serverSocketChannel.socket().bind(new InetSocketAddress(port));
+        serverSocketChannel.configureBlocking(false);
+        // 服务端 要接收客户端连接事件
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+    }
+
+    @SneakyThrows
+    public void listenClient() {
+        System.out.println("服务端启动监听");
+        while (true) {
+            int select = selector.select();
+            if (select == 0) continue;
+            Set<SelectionKey> selectionKeys = selector.selectedKeys();
+            Iterator<SelectionKey> iterator = selectionKeys.iterator();
+            while (iterator.hasNext()) {
+                SelectionKey key = iterator.next();
+                iterator.remove();
+                if (key.isAcceptable()) {
+                    SocketChannel clientChannel = serverSocketChannel.accept();
+                    clientChannel.configureBlocking(false);
+                    clientChannel.register(selector, SelectionKey.OP_READ);
+                    // getRemoteAddress() 是客户端的ip地址+端口号
+                    System.out.println("用户" + clientChannel.getRemoteAddress() + "上线了");
+                    continue;
+                }
+                if (key.isReadable()) {
+                    readData(key);
+                }
+            }
+        }
+    }
+
+    private void readData(SelectionKey key) {
+        // 如果客户端建立连接之后 服务关闭了
+        // 此时SelectionKey存在 但无法获取通道
+        SocketChannel channel = null;
+        try {
+            channel = (SocketChannel) key.channel();
+            ByteBuffer byteBuffer = ByteBuffer.allocate(1024);
+            int read = channel.read(byteBuffer);
+            if (read > 0) {
+                String msg = new String(byteBuffer.array(), 0, read);
+                // 打印消息 及消息来源
+                System.out.println(msg + " from " + channel.socket().getRemoteSocketAddress());
+
+                // 再将消息转发给其他客户端(排除发送数据的客户端)
+                sendToOthers(msg, channel);
+            }
+
+        } catch (Exception e) {
+            // 用户下线了
+            System.out.println("用户" + channel.socket().getRemoteSocketAddress() + "下线了");
+            // 注销注册关系
+            key.cancel();
+            try {
+                channel.close();
+            } catch (IOException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+
+    }
+
+    // 发送给其他客户端
+    // 接收参数为 要发送的消息 和 发送消息的自身客户端
+    @SneakyThrows
+    private void sendToOthers(String msg,SocketChannel selfChannel) {
+        // 获取所有的通道 可以找到所有的在线用户
+        Set<SelectionKey> keys = selector.keys();
+        for (SelectionKey key : keys) {
+            SelectableChannel channel = key.channel();
+            // SocketChannel才是客户端 找到所有客户端 排除发送消息的客户端
+            if (channel instanceof SocketChannel && channel != selfChannel) {
+                SocketChannel socketChannel = (SocketChannel) channel;
+                ByteBuffer byteBuffer = ByteBuffer.wrap(msg.getBytes());
+                socketChannel.write(byteBuffer);
+            }
+        }
+    }
+
+    public static void main(String[] args) {
+        ChatServer server = new ChatServer(6666);
+        // 监听客户端
+        server.listenClient();
+    }
+}

+ 20 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/io/Client.java

@@ -0,0 +1,20 @@
+package com.sf.javase.io.socket.io;
+
+import lombok.SneakyThrows;
+
+import java.io.OutputStream;
+import java.net.Socket;
+
+public class Client {
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        Socket socket = new Socket("127.0.0.1",1234);
+        OutputStream outputStream = socket.getOutputStream();
+        String message = "Hello Socket111!";
+        outputStream.write(message.getBytes());
+        // 资源都要关闭
+        outputStream.close();
+        socket.close();
+    }
+}

+ 92 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/io/Server.java

@@ -0,0 +1,92 @@
+package com.sf.javase.io.socket.io;
+
+import lombok.SneakyThrows;
+
+import java.io.InputStream;
+import java.net.ServerSocket;
+import java.net.Socket;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+public class Server {
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        // 服务端占用端口 客户端连接端口
+        // windows 查看占用端口情况 netstat -aon|findstr "1234"
+        // mac 查看占用端口情况 sudo lsof -i tcp:1234
+        ServerSocket serverSocket = new ServerSocket(1234);
+
+        // 在实际项目开发中,用线程池来管理线程
+        // 大部分是自定义线程池的参数 少部分时候使用已有的线程池
+        // 数组-Arrays  集合-Collections 线程池-Executors
+        // 是根据需要创建新线程的线程池
+        ExecutorService threadPool = Executors.newCachedThreadPool();
+        // 起到的效果是 来一个任务就启动一个非核心线程执行
+        // 参数分别为,核心线程数=0,非核心线程数int最大值,空闲时间60s,不存储数据的阻塞队列
+        // new ThreadPoolExecutor(0, Integer.MAX_VALUE,60L, TimeUnit.SECONDS,new SynchronousQueue<Runnable>())
+
+        while (true) {
+            // 等待客户端连接  阻塞等待
+            Socket socket = serverSocket.accept();
+            // 将接受到客户端之后的逻辑 放在线程池中处理 达到异步的效果
+            // 将处理socket的任务 封装到Runnable的run方法中
+            // 线程池会启动一个线程来执行此任务
+
+            // 使用匿名内部类 来简化对类的声明
+//            threadPool.execute(new Runnable() {
+//                @Override
+//                public void run() {
+//                    handle(socket);
+//                }
+//            });
+
+            // 使用lambda表达式 简化对类及方法的声明
+            threadPool.execute(() -> {
+                handle(socket);
+            });
+
+//            MyTask task = new MyTask(socket);
+//            threadPool.execute(task);
+
+        }
+    }
+
+    @SneakyThrows
+    public static void handle(Socket socket) {
+        // 打印线程的名字 pool-1-thread-1
+        System.out.println(Thread.currentThread().getName());
+        // 接收Socket的输入流
+        InputStream inputStream = socket.getInputStream();
+        // 使用字节数组 接收数据
+        byte[] bytes = new byte[1024];
+        // 将输入流中的内容读出并写入字节数组
+        int read = inputStream.read(bytes);
+        // 根据读出来的字节数组 接收为字符串
+        String message = new String(bytes, 0, read);
+        System.out.println(message);
+    }
+}
+
+
+//class MyTask implements Runnable {
+//    private Socket socket;
+//
+//    public MyTask(Socket socket) {
+//        this.socket = socket;
+//    }
+//
+//    @SneakyThrows
+//    @Override
+//    public void run() {
+//        // 接收Socket的输入流
+//        InputStream inputStream = socket.getInputStream();
+//        // 使用字节数组 接收数据
+//        byte[] bytes = new byte[1024];
+//        // 将输入流中的内容读出并写入字节数组
+//        int read = inputStream.read(bytes);
+//        // 根据读出来的字节数组 接收为字符串
+//        String message = new String(bytes, 0, read);
+//        System.out.println(message);
+//    }
+//}

+ 49 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/nio/NioClient.java

@@ -0,0 +1,49 @@
+package com.sf.javase.io.socket.nio;
+
+import lombok.SneakyThrows;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+public class NioClient {
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        // 创建/打开一个客户端通道
+        SocketChannel socketChannel = SocketChannel.open();
+        // 客户端需要建立和服务端的链接 是写数据 通过链接将数据传送给服务端
+        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
+        // 通道连接地址
+        socketChannel.connect(address);
+
+        // 写数据
+        ByteBuffer writeBuffer = ByteBuffer.allocate(128);
+        String message = "Hello NIO Socket";
+        // 将要发送消息的字节数组写入到缓冲区
+        writeBuffer.put(message.getBytes());
+        // 写完成
+        writeBuffer.flip();
+        socketChannel.write(writeBuffer);
+
+        // 从服务端接收返回的数据
+        // 建立读数据的缓冲区
+        ByteBuffer readBuffer = ByteBuffer.allocate(128);
+        socketChannel.read(readBuffer);
+        // 读完成 刷新
+        readBuffer.flip();
+
+        // 从readBuffer取出放到stringbuffer中
+        StringBuffer stringBuffer = new StringBuffer();
+        // 判断是否还有数据
+        while (readBuffer.hasRemaining()) {
+            // 取出使用get() 返回byte 强转成char 放入stringbuffer中
+            stringBuffer.append((char) readBuffer.get());
+        }
+        // 打印服务端返回的数据
+        System.out.println("server: " + stringBuffer);
+        // 关闭
+        socketChannel.close();
+    }
+}

+ 49 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/nio/NioServer.java

@@ -0,0 +1,49 @@
+package com.sf.javase.io.socket.nio;
+
+import lombok.SneakyThrows;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.ServerSocketChannel;
+import java.nio.channels.SocketChannel;
+
+public class NioServer {
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        // 创建一个服务端通道
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+
+        // 绑定ip地址和端口号
+        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
+        serverSocketChannel.socket().bind(address);
+
+        // 接收客户端的连接
+        SocketChannel socketChannel = serverSocketChannel.accept();
+
+        // 返回数据 写数据给客户端
+        ByteBuffer writeBuffer = ByteBuffer.allocate(128);
+        writeBuffer.put("Hello Client, I'm Server.".getBytes());
+        writeBuffer.flip();
+        socketChannel.write(writeBuffer);
+
+        // 通过buffer 读写数据
+        ByteBuffer readBuffer = ByteBuffer.allocate(128);
+        socketChannel.read(readBuffer);
+        readBuffer.flip();
+        // 从readBuffer取出放到stringbuffer中
+        StringBuffer stringBuffer = new StringBuffer();
+        // 判断是否还有数据
+        while (readBuffer.hasRemaining()) {
+            // 取出使用get() 返回byte 强转成char 放入stringbuffer中
+            stringBuffer.append((char) readBuffer.get());
+        }
+        // 打印服务端返回的数据
+        System.out.println("client: " + stringBuffer);
+
+        // 关闭资源
+        socketChannel.close();
+        serverSocketChannel.close();
+    }
+}

+ 101 - 0
springboot-demo/src/main/java/com/sf/javase/io/socket/nio/SelectorServer.java

@@ -0,0 +1,101 @@
+package com.sf.javase.io.socket.nio;
+
+import lombok.SneakyThrows;
+
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.nio.ByteBuffer;
+import java.nio.channels.*;
+import java.util.Iterator;
+import java.util.Set;
+
+// 整合了监听器的服务端
+public class SelectorServer {
+
+    @SneakyThrows
+    public static void main(String[] args) {
+        // 创建一个服务端通道
+        ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
+
+        // 绑定ip地址和端口号
+        SocketAddress address = new InetSocketAddress("127.0.0.1", 4321);
+        serverSocketChannel.socket().bind(address);
+        // 通道默认是阻塞的 可以设置为非阻塞的
+        serverSocketChannel.configureBlocking(false);
+        // 打开一个选择器
+        Selector selector = Selector.open();
+        // 选择器根据不同的事件 进行不同的处理 这里是建立连接的事件
+        serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
+
+        // 使用读写buffer
+        ByteBuffer readBuffer = ByteBuffer.allocate(128);
+        ByteBuffer writeBuffer = ByteBuffer.allocate(128);
+        writeBuffer.put("hello client".getBytes());
+
+        // 管理通道 处理不同的事件
+        while (true) {
+            // 不断获取select()方法的返回值 判断是否要进行后续操作
+            int read = selector.select();
+            // 代表有多少个要处理的任务
+            if (read == 0) {
+                Thread.sleep(5000);
+                continue;
+            }
+
+            // 找出所有要处理的任务
+            // 获取到 有哪些通道要执行操作
+            Set<SelectionKey> selectionKeys = selector.selectedKeys();
+            // 接收迭代器 遍历集合
+            Iterator<SelectionKey> iterator = selectionKeys.iterator();
+            while (iterator.hasNext()) {
+                // 接收一个 移除一个
+                SelectionKey selectionKey = iterator.next();
+                iterator.remove();
+                // 根据不同的事件进行不同的处理
+                // 建立连接 - 读操作 - 写操作
+                if (selectionKey.isAcceptable()) {
+                    System.out.println("Acceptable");
+                    // 如果是建立连接的事件
+                    SocketChannel socketChannel = serverSocketChannel.accept();
+                    socketChannel.configureBlocking(false);
+                    // 服务端端口
+                    System.out.println(socketChannel.getLocalAddress());
+                    // 客户端端口
+                    System.out.println(socketChannel.getRemoteAddress());
+                    // 继续注册事件 这里注册写操作
+                    socketChannel.register(selector, SelectionKey.OP_WRITE);
+                } else if (selectionKey.isWritable()) {
+                    System.out.println("Writable");
+                    // 如果是写操作事件
+//                    SelectableChannel channel1 = selectionKey.channel();
+                    // SocketChannel 就是抽象类SelectableChannel的子类 所以做了强制转化
+                    SocketChannel channel = (SocketChannel) selectionKey.channel();
+                    writeBuffer.flip();
+                    channel.write(writeBuffer);
+                    // 注册新的事件  此时调用key的方法 注册读操作
+                    selectionKey.interestOps(SelectionKey.OP_READ);
+                } else if (selectionKey.isReadable()){
+                    System.out.println("Readable");
+                    SocketChannel channel = (SocketChannel) selectionKey.channel();
+                    readBuffer.clear();
+                    channel.read(readBuffer);
+                    readBuffer.flip();
+
+                    // 从readBuffer取出放到stringbuffer中
+                    StringBuffer stringBuffer = new StringBuffer();
+                    // 判断是否还有数据
+                    while (readBuffer.hasRemaining()) {
+                        // 取出使用get() 返回byte 强转成char 放入stringbuffer中
+                        stringBuffer.append((char) readBuffer.get());
+                    }
+                    // 打印服务端返回的数据
+                    System.out.println("client: " + stringBuffer);
+                    channel.close();
+                }else if (selectionKey.isConnectable()){
+                    System.out.println("Connectable");
+                }
+            }
+            Thread.sleep(2000);
+        }
+    }
+}