Java多对多网络通讯 Java实现多对多网络通讯的流程
艾萨克三毛 人气:0基本流程
客户端发送信息(指定目标客户端)至固定的一个服务端,服务端接收信息进行处理后发送至相应的客户端
通讯核心类
Socket类与流相辅相成,完成通讯。在accept方法返回了一个Socket对象后,获取socket的输入输出流,就可以接收信息或发送信息了,以一对一为例:
服务端 :
import java.io.*; import java.net.ServerSocket; import java.net.Socket; /** * @ClassName Server * @Description 服务端 * @Author issac * @Date 2021/4/13 17:26 */ public class Server { public static void main(String[] args) throws IOException { // 创建服务端套接字并指定端口 ServerSocket server = new ServerSocket(88); // 接收创建建立,返回连接创建好后服务器的socket对象 Socket socket = server.accept(); InputStreamReader reader = new InputStreamReader(socket.getInputStream()); BufferedReader bufferedReader = new BufferedReader(reader); // 获取请求 String request = bufferedReader.readLine(); System.out.println("client say:" + request); // 写到输出流传递给客户端 PrintWriter writer = new PrintWriter(socket.getOutputStream()); String line = "hello too"; writer.println(line); writer.flush(); // 关闭处理流的工具、socket套接字、服务套接字 writer.close(); bufferedReader.close(); socket.close(); server.close(); } }
客户端 :
import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.io.PrintWriter; import java.net.Socket; /** * @ClassName Client * @Description 客户端 * @Author issac * @Date 2021/4/13 17:26 */ public class Client { public static void main(String[] args) throws IOException { // 创建socket连接,指明其地址和端口 Socket socket = new Socket("127.0.0.1", 88); // 获取套接字的输出流,输出hello PrintWriter writer = new PrintWriter(socket.getOutputStream()); String readLine = "Hello"; writer.println(readLine); writer.flush(); // 从套接字的输入流中获取信息 InputStreamReader reader = new InputStreamReader(socket.getInputStream()); BufferedReader bufferedReader = new BufferedReader(reader); String respond = bufferedReader.readLine(); System.out.println("server say:" + respond); bufferedReader.close(); writer.close(); socket.close(); } }
运行结果:
需要注意的是accept方法在没有连接的时候会阻塞,而导致后面的代码无法执行,在接下来的多对多通讯中需要依靠多线程来解决这个问题。
多对多代码实现
为了方便服务端和客户端对信息的处理,解析。首先定义一个消息类,定义属性分别为端口的本地地址,发送的消息内容,发送的目标地址。定义静态方法:将字符串解析为该类实例,处理消息的收发:
import com.alibaba.fastjson.JSON; import java.io.Serializable; import com.alibaba.fastjson.JSON; import java.io.*; import java.net.Socket; /** * 在网络中,所有被进行通讯的对象,都需要实现 Serializable 这个接口 * <p> * 该类,主要用于本项目例子中,socket传输的对象,请勿使用其他或字符串, * 为了后期更方便修改或者是其他操作 * * @ClassName SocketMessage * @Description TODO * @Author issac * @Date 2021/4/18 22:02 */ public class SocketMessage implements Serializable { /** * 我自己的名称 ip:port **/ private String key; /** * 我的目标 ip:port **/ private String to; /** * 发送的内容 **/ private String content; public String getKey() { return key; } public void setKey(String key) { this.key = key; } public String getTo() { return to; } public void setTo(String to) { this.to = to; } public String getContent() { return content; } public void setContent(String content) { this.content = content; } /** * 向目标客户端写出从发送者获取到的消息 */ public static void writeTargetMessage(SocketMessage message, Socket socket) throws IOException { PrintWriter writer = new PrintWriter(socket.getOutputStream()); // 统一字符串标准,以便于服务端解析 writer.println(JSON.toJSONString(message)); writer.flush(); } /** * 将输入流中接收的字符串解析为SocketMessage对象 * * @param is * @return SocketMessage * @throws Exception */ public static SocketMessage parseSocket(InputStream is) throws Exception { BufferedReader reader = new BufferedReader(new InputStreamReader(is)); String info = reader.readLine(); return parseSocketByStr(info); } /** * 将传入字符串解析为SocketMessage对象并返回 * * @param str * @return SocketMessage */ public static SocketMessage parseSocketByStr(String str) { SocketMessage socketMessage = null; try { socketMessage = JSON.parseObject(str, SocketMessage.class); } catch (Exception ex) { throw new RuntimeException("socket之间通讯不能不使用SocketMessage"); } return socketMessage; } @Override public String toString() { // 通过 阿里巴巴 的FastJson 库,将一个对象转换为 字符串 ,统一标准,以便于将字符串解析为该类 return JSON.toJSONString(this); } }
再单独定义一个服务端的消息处理类,该类用于发送消息至特定的客户端,所以定义两个属性,1.发送的消息,2.目标客户端的套接字:
import java.net.Socket; /** * @ClassName SocketMessageHandler * @Description 服务端针对客户端的消息处理器 * @Author issac * @Date 2021/4/18 22:34 */ public class SocketMessageHandler { SocketMessage sm; Socket targetSocket; public SocketMessageHandler(SocketMessage sm,Socket targetSocket) { this.sm = sm; this.targetSocket = targetSocket; } public void setSm(SocketMessage sm) { this.sm = sm; } /** * 发送消息 */ public void send() { if (this.sm == null) { return; } try { System.out.println(sm.getContent()); // 发送 SocketMessage.writeTargetMessage(sm, this.targetSocket); } catch ( Exception ex) { ex.printStackTrace(); } } }
接下来进行服务端的定义,我们的服务端需要处理多个客户端的消息,所以要定义一个容器存放客户端地址,在此之前我们已经定义了处理服务端消息的SocketMessageHandler类,因为我们的最终目的是为了处理信息,所以可以直接将SocketMessageHandler类存放至容器。我们用map来存储,而key就是客户端的地址:
import com.issac.task_05.task.msg.SocketMessage; import com.issac.task_05.task.msg.SocketMessageHandler; import java.net.ServerSocket; import java.net.Socket; import java.util.HashMap; import java.util.Map; /** * n - m: 一个服务端,同时服务多个客户端 * * @ClassName SocketServer * @Description 服务端 * @Author issac * @Date 2021/4/18 21:29 */ public class SocketServer { // 存放消息处理器 private static final Map<String, SocketMessageHandler> clientContainer = new HashMap<>(); public static void main(String[] args) { try { ServerSocket ss = new ServerSocket(8888); Socket accept; while (true) { /* 只有建立新连接时accept才会有响应而执行以下代码,否则会阻塞:客户端与服务器连接,并将已连接的客户端放入容器 */ accept = ss.accept(); SocketMessage msg = SocketMessage.parseSocket(accept.getInputStream()); // 获取信息 System.out.println("客户端建立连接:" + msg.getKey()); // 建立连接后将客户端地址存入容器 clientContainer.put(msg.getKey(), new SocketMessageHandler(msg, accept)); /* 在已经建立连接后,没有新连接,accept会处于阻塞状态,因此我们需要另外开辟一个线程来处理消息 */ new ServerThread(accept, clientContainer).start(); } } catch (Exception ex) { ex.printStackTrace(); } } }
在这里需要注意ServerSocket类的accept方法,在没有新连接的时候,该方法会阻塞,而之后的代码就无法执行了。我们在客户端与服务端连接成功之后进行消息收发的时候是没有新连接产生的,此时的阻塞导致无法进行通讯,于是乎我们需要再开辟一个线程,进行消息处理。那么我们定义一个继承Thread的消息处理类,将每次连接成功返回的套接字接收,进行信息处理。如此一来,只要有消息的传递该线程就可以进行获取:
import com.issac.task_05.task.msg.SocketMessage; import com.issac.task_05.task.msg.SocketMessageHandler; import java.io.InputStream; import java.net.Socket; import java.util.Map; /** * @ClassName ServerThread * @Description 处理信息 * @Author issac * @Date 2021/4/21 21:25 */ public class ServerThread extends Thread{ private Socket socket; InputStream inputStream; Map<String, SocketMessageHandler> clientContainer; public ServerThread(Socket socket,Map<String, SocketMessageHandler> clientContainer){ this.socket = socket; this.clientContainer = clientContainer; } public void run(){ try{ while (true){ // 将输入流中的数据解析为SocketMessage对象 inputStream = socket.getInputStream(); SocketMessage msg = SocketMessage.parseSocket(inputStream); System.out.println(msg); // 在容器中获取目标地址 SocketMessageHandler socketMessageHandler = clientContainer.get(msg.getTo()); // 设置需要传输的信息 socketMessageHandler.setSm(msg); // 传输信息 socketMessageHandler.send(); } }catch (Exception e){ e.printStackTrace(); } } }
最后就是客户端了,每个客户端所对应的服务端都相同,在客户端写一个简易的菜单,选择接收或发送消息即可:
import com.issac.task_05.task.msg.SocketMessage; import java.net.Socket; import java.util.Scanner; /** * @ClassName Client * @Description 客户端 * @Author issac * @Date 2021/4/19 21:08 */ public class Client { public static void main(String[] args) { Scanner scanner = new Scanner(System.in); Socket s = null; try { s = new Socket("localhost", 8888); // 第一次启动,创建socket,向服务器发送我是谁 SocketMessage initMsg = getSocketMsg(s.getLocalSocketAddress().toString(), null, null); System.out.println("开始与服务器建立连接: " + initMsg.toString()); SocketMessage.writeTargetMessage(initMsg, s); // 开始 循环等待 while (true) { System.out.println("===================menu====================="); System.out.println("1:发送消息"); System.out.println("2:接收消息"); int choice = scanner.nextInt(); switch (choice){ case 1: // 发送消息 String target = input("请输入您要发给谁:"); String content = input("请输入您要发送的内容:"); System.out.println(); SocketMessage afterMsg = getSocketMsg(s.getLocalSocketAddress().toString(), target, content); SocketMessage.writeTargetMessage(afterMsg, s); break; case 2: // 接收并打印消息 showRequiredMsg(s); break; default: } } } catch (Exception ex) { ex.printStackTrace(); } } /** * 根据提示输入内容 **/ public static String input(String tip) { Scanner input = new Scanner(System.in); System.out.println(tip); return input.next(); } /** * 将用户输入传递的本地地址,目标地址与传递内容转化为SocketMessage对象 * @param localSocketAddress * @param to * @param content * @return */ public static SocketMessage getSocketMsg(String localSocketAddress, String to, String content) { SocketMessage socketMessage = new SocketMessage(); // to 为null的时候,说明只是对服务器的初始 socketMessage.setKey(localSocketAddress.replaceAll("\\/", "")); socketMessage.setTo(to); socketMessage.setContent(content); return socketMessage; } /** * 接收消息并打印 * @param socket * @throws Exception */ public static void showRequiredMsg(Socket socket) throws Exception { SocketMessage socketMessage = SocketMessage.parseSocket(socket.getInputStream()); String source = socketMessage.getKey(); String content = socketMessage.getContent(); System.out.println("接收到来自《"+source+"》的信息:"+content+"\n"); } }
运行结果:
加载全部内容