Java BIO聊天程序
java硕哥 人气:0我们使用一个聊天程序来说本文的主题
1、BIO 客户端服务器通讯
public class ChatServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(9000); while (true) { try { System.out.println("聊天服务已启动,等待客户连接...."); Socket socket = serverSocket.accept(); System.out.printf("建立了与%s的连接!\n",socket.getRemoteSocketAddress()); loopReadRequest(socket); } catch (IOException e) { e.printStackTrace(); } } } public static String loopReadRequest(Socket socket) throws IOException { InputStreamReader reader = new InputStreamReader(socket.getInputStream()); StringBuilder sb = new StringBuilder(); char[] cbuf = new char[256]; // 循环读取socket的输入数据流 while (true) { // read方法,读出内容写入 char 数组,read 方法会一直阻塞 // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket) // 正常读取时方法会返回读取的字符数,当输入流结束时(对方关闭了socket)方法返回 -1 int readed = reader.read(cbuf); SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress(); // 客户端执行了socket.close() if (readed == -1) { System.out.println(remoteSocketAddress + " 断开了连接!"); reader.close(); socket.close(); break; } String readedStr = new String(cbuf, 0, readed); sb.append(readedStr); // ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false if (!reader.ready()) {//reader缓冲区为空,表示数据流已读完 // 数据流已读完,此时向客户端发送响应 socket.getOutputStream().write((remoteSocketAddress+"你好,"+sb+"已收到").getBytes()); System.out.println("收到内容:"+sb); // 清除sb的内容,准备接收下一个请求内容 sb.setLength(0); System.out.println("等待客户端消息...."); } } return sb.toString(); } } public class ChatClient { public static void main(String[] args) { try { Socket socket = new Socket("localhost", 9000); Scanner scanner = new Scanner(System.in); while (true) { System.out.print(">"); String line = scanner.nextLine(); if("".equals(line)){ continue; } if ("quit".equals(line)) { scanner.close(); socket.close(); break; } socket.getOutputStream().write(line.getBytes()); System.out.println(readRequest(socket)); } } catch (IOException e) { e.printStackTrace(); } } public static String readRequest(Socket socket) throws IOException { InputStreamReader reader = new InputStreamReader(socket.getInputStream()); StringBuilder sb = new StringBuilder(); char[] cbuf = new char[256]; while (true) { int readed = reader.read(cbuf); // 读出内容写入 char 数组,read 方法会一直阻塞 // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket) // 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1 if (readed == -1) { System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!"); reader.close(); socket.close(); break; } String readedStr = new String(cbuf, 0, readed); sb.append(readedStr); if(!reader.ready()){ break; } } return sb.toString(); } }
ChatServer与ChatClient建立了长连接,且ChatServer阻塞等待ChatClient发送消息过来,程序中 Server端只能与一个Client建立连接。程序这么写,只能实现一个客户端和服务端进行通信。
如何支持多个Client的连接呢? 使用独立的线程去读取socket
2、多线程实现单聊,群聊
单聊发送 格式:-c 对方端口号 消息内容, 群聊直接发送信息就可以了,具体发送逻辑看下面的程序
public class ChatServer { private static Map<String, Socket> connnectedSockets = new ConcurrentHashMap<>(); public static void main(String[] args) throws IOException { // 1、服务端初始化工作 ServerSocket serverSocket = new ServerSocket(9000); ExecutorService executorService = getExecutorService(); // 2、主线程- 循环阻塞接收新的连接请求 while (true) { Socket socket = serverSocket.accept(); cacheSocket(socket); // 3、一个socket对应一个读取任务,交给线程池中的线程执行 // 如果使用fixed线程池,会操作读取任务分配不到线程的情况 // 现象就是发送的消息别人收不到(暂存在Socket缓存中) executorService.submit(createLoopReadTask(socket)); } } private static Runnable createLoopReadTask(Socket socket) { return new Runnable() { public void run() { try { loopReadRequestAndRedirect(socket); } catch (IOException e) { e.printStackTrace(); } } }; } private static ExecutorService getExecutorService() { ExecutorService executorService = Executors.newCachedThreadPool(); int nThreads = Runtime.getRuntime().availableProcessors(); nThreads = 1; // 如果只设置一个线程,那么最先连接进来的客户端可以发送消息 // 因为程序阻塞读取第一个socket连接的数据流,没有其他线程资源去读后面建立的socket了 executorService = Executors.newFixedThreadPool(nThreads); return executorService; } private static void cacheSocket(Socket socket) { SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress(); String[] split = remoteSocketAddress.toString().split(":"); connnectedSockets.put(split[1], socket); } public static String loopReadRequestAndRedirect(Socket socket) throws IOException { InputStreamReader reader = new InputStreamReader(socket.getInputStream()); StringBuilder sb = new StringBuilder(); char[] cbuf = new char[256]; while (true) { SocketAddress remoteSocketAddress = socket.getRemoteSocketAddress(); System.out.println(Thread.currentThread() + "执行 " + remoteSocketAddress + "发送的消息"); // 读出内容写入 char 数组,read 方法会一直阻塞 // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket) // 正常读取时方法会返回读取的字符数,当输入流结束(对方关闭了socket)时返回 -1 int readed = reader.read(cbuf); if (readed == -1) { System.out.println(remoteSocketAddress + " 断开了连接!"); reader.close(); socket.close(); break; } String readedStr = new String(cbuf, 0, readed); sb.append(readedStr); //ready()用来判断流是否可被读取,如果reader缓冲区不是空则返回true,否则返回false boolean oneReqeustStreamReaded = !reader.ready(); if (oneReqeustStreamReaded) { String requestContent = sb.toString().trim(); String prifix = requestContent.substring(0, 2); // 单聊 if ("-c".equals(prifix)) { requestContent = requestContent.substring(3); String port = requestContent.substring(0, requestContent.indexOf(" ")); requestContent = requestContent.replaceFirst(port, ""); sendToOneSocket(connnectedSockets.get(port), requestContent); // 群聊 } else { // 向客户端发送响应 socket.getOutputStream().write(("您发送的消息-'" + sb + "' 已收到").getBytes()); sendToAllSocket(sb.toString(), socket); } sb.setLength(0); } } return sb.toString(); } /** * 发送消息给某个socket * * @param socket * @param msg */ private static void sendToOneSocket(Socket socket, String msg) { // 对于同一个socket,同一时刻只有一个线程使用它发送消息 synchronized (socket) { try { socket.getOutputStream().write(msg.getBytes("UTF-8")); } catch (IOException e) { e.printStackTrace(); } } } /** * 发送消息给所有的socket * * @param msg */ private static void sendToAllSocket(String msg, Socket selfSocket) { for (String key : connnectedSockets.keySet()) { Socket socket = connnectedSockets.get(key); if (socket.equals(selfSocket)) { continue; } sendToOneSocket(socket, msg); } } } public class ChatClient { public static void main(String[] args) throws IOException { new ChatClient().start(); } public void start() throws IOException { Socket socket = new Socket("localhost", 9000); ExecutorService executorService = Executors.newFixedThreadPool(2); Runnable readTask = new Runnable() { public void run() { try { loopReadRequest(socket); } catch (IOException e) { e.printStackTrace(); } } }; executorService.submit(readTask); Runnable sendMsgTask = new Runnable() { public void run() { try { Scanner scanner = new Scanner(System.in); while (true) { System.out.print(">"); String line = scanner.nextLine(); if ("".equals(line)) { continue; } if ("quit".equals(line)) { scanner.close(); socket.close(); break; } socket.getOutputStream().write(line.getBytes()); } } catch (IOException e) { e.printStackTrace(); } } }; executorService.submit(sendMsgTask); } public void loopReadRequest(Socket socket) throws IOException { InputStreamReader reader = new InputStreamReader(socket.getInputStream()); StringBuilder sb = new StringBuilder(); char[] cbuf = new char[256]; while (true) { int readed = reader.read(cbuf); // 读出内容写入 char 数组,read 方法会一直阻塞 // 直到有输入内容 或 发生I/O错误 或 输入流结束(对方关闭了socket) // 正常读取,方法会返回读取的字符数,而当输入流结束(对方关闭了socket)则返回 -1 if (readed == -1) { System.out.println(socket.getRemoteSocketAddress() + " 断开了连接!"); reader.close(); socket.close(); break; } String readedStr = new String(cbuf, 0, readed); sb.append(readedStr); if (!reader.ready()) { System.out.println(sb); sb.setLength(0); } } } }
加载全部内容