Java NIO多人聊天室
RivenDong 人气:01. 服务器端代码
ChatServer类:
package nio.test.server; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.Charset; import java.util.Set; public class ChatServer { private static final int DEFAULT_PORT = 8888; private static final String QUIT = "quit"; private static final int BUFFER = 1024; private ServerSocketChannel serverSocketChannel; //服务器端用于处理IO的通道 private Selector selector; private ByteBuffer byteBufferReader = ByteBuffer.allocate(BUFFER); //用来读取消息 private ByteBuffer byteBufferWriter = ByteBuffer.allocate(BUFFER); //用来转发消息时写入其他通道的缓冲区 private Charset charset = Charset.forName("UTF-8"); //标准化编码解码 private int port; public ChatServer(){ this(DEFAULT_PORT); } public ChatServer(int port){ this.port = port; } private void start(){ try { serverSocketChannel = ServerSocketChannel.open(); //创建服务器套接字通道 serverSocketChannel.configureBlocking(false); //设置为非阻塞式调用 serverSocketChannel.socket().bind(new InetSocketAddress(port)); selector = Selector.open(); //打开选择器 serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT); System.out.println("启动服务器,监听端口:" + port + "..."); while (true) { selector.select(); //selectionKeys包含了select()接收到的所有事件 Set<SelectionKey> selectionKeys = selector.selectedKeys(); for(SelectionKey key : selectionKeys){ //处理被触发的事件 handles(key); } selectionKeys.clear(); //把集合清空 } } catch (IOException e) { e.printStackTrace(); }finally { close(selector);//启到既关闭selector又关闭通道的作用 } } /** * 处理被触发的事件 * @param key 每当通道被选择器注册时,都会创建一个选择键 * @throws IOException */ private void handles(SelectionKey key) throws IOException { // 触发 ACCEPT事件 --- 和客户端建立了连接 if(key.isAcceptable()){ ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel client = server.accept(); client.configureBlocking(false); client.register(selector, SelectionKey.OP_READ); System.out.println(getClientName(client) + "已连接"); } // 触发 READ事件 --- 客户端发送了消息给服务器端 else if(key.isReadable()){ SocketChannel client = (SocketChannel) key.channel(); String fwdMsg = receive(client); //读取客户端消息 if(fwdMsg.isEmpty()){ //客户端异常 key.cancel(); //不再监视这个通道上的read事件 selector.wakeup(); }else { forwardMessage(client, fwdMsg); //转发客户端消息 // 检查用户是否退出 if(readyToQuit(fwdMsg)){ key.cancel();//解除监听 selector.wakeup(); System.out.println(getClientName(client) + "已断开"); } } } } /** * 用于转发消息 * @param client * @param fwdMsg * @throws IOException */ private void forwardMessage(SocketChannel client, String fwdMsg) throws IOException { for(SelectionKey key : selector.keys()){ Channel connectedClient = key.channel(); if(connectedClient instanceof ServerSocketChannel) continue; if(key.isValid() && !client.equals(connectedClient)) { byteBufferWriter.clear(); byteBufferWriter.put(charset.encode((getClientName(client)) + ":" + fwdMsg)); byteBufferWriter.flip(); //写转读 while(byteBufferWriter.hasRemaining()){ ((SocketChannel)connectedClient).write(byteBufferWriter); } } } } private String receive(SocketChannel client) throws IOException { byteBufferReader.clear(); while(client.read(byteBufferReader) > 0); byteBufferReader.flip(); return String.valueOf(charset.decode(byteBufferReader)); } private String getClientName(SocketChannel client){ return "客户端[" + client.socket().getPort() + "]"; } private boolean readyToQuit(String msg){ return QUIT.equals(msg); } private void close(Closeable closeable){ if(closeable != null){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } public static void main(String[] args) { ChatServer chatServer = new ChatServer(6666); chatServer.start(); } }
2. 客户端代码
ChatClient类:
package nio.test.client; import java.io.Closeable; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.ClosedSelectorException; import java.nio.channels.SelectionKey; import java.nio.channels.Selector; import java.nio.channels.SocketChannel; import java.nio.charset.Charset; import java.util.Set; public class ChatClient { private static final String DEFAULT_SERVER_HOST = "127.0.0.1"; private static final int DEFAULT_SERVER_PORT = 6666; private static final String QUIT = "quit"; private static final int BUFFER = 1024; private String host; private int port; private SocketChannel client; private ByteBuffer byteBufferReader = ByteBuffer.allocate(BUFFER); private ByteBuffer byteBufferWriter = ByteBuffer.allocate(BUFFER); private Selector selector; private Charset charset = Charset.forName("UTF-8"); public ChatClient(){ this(DEFAULT_SERVER_HOST, DEFAULT_SERVER_PORT); } public ChatClient(String host, int port){ this.host = host; this.port = port; } public boolean readyToQuit(String msg){ return QUIT.equals(msg); } private void close(Closeable closeable){ if(closeable != null){ try { closeable.close(); } catch (IOException e) { e.printStackTrace(); } } } private void start(){ try { client = SocketChannel.open(); client.configureBlocking(false); selector = Selector.open(); client.register(selector, SelectionKey.OP_CONNECT); client.connect(new InetSocketAddress(host, port)); while(true){ selector.select(); Set<SelectionKey> selectionKeys = selector.selectedKeys(); for(SelectionKey key : selectionKeys){ handles(key); } selectionKeys.clear(); } } catch (IOException e) { e.printStackTrace(); } catch (ClosedSelectorException e){ //用户正常退出 }finally { close(selector); } } private void handles(SelectionKey key) throws IOException { // CONNECT事件 连接就绪事件 if(key.isConnectable()){ SocketChannel client = (SocketChannel)key.channel(); if(client.isConnectionPending()){//连接处于就绪状态 client.finishConnect(); // 处理用户的输入信息 new Thread(new UserInputHandler(this)).start(); } client.register(selector, SelectionKey.OP_READ); } // READ事件 服务器转发消息 else if(key.isReadable()){ SocketChannel client = (SocketChannel)key.channel(); String msg = receive(client); if(msg.isEmpty()){ // 服务器出现异常 close(selector); }else{ System.out.println(msg); } } } public void send(String msg) throws IOException { if(msg.isEmpty()){ return ; }else{ byteBufferWriter.clear(); byteBufferWriter.put(charset.encode(msg)); byteBufferWriter.flip(); while(byteBufferWriter.hasRemaining()){ client.write(byteBufferWriter); } //检查用户是否准备退出 if(readyToQuit(msg)){ close(selector); } } } private String receive(SocketChannel client) throws IOException { byteBufferReader.clear(); while(client.read(byteBufferReader) > 0); byteBufferReader.flip(); return String.valueOf(charset.decode(byteBufferReader)); } public static void main(String[] args) { ChatClient chatClient = new ChatClient(); chatClient.start(); } }
UserInputHandler类:
package nio.test.client; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; public class UserInputHandler implements Runnable{ private ChatClient chatclient; public UserInputHandler(ChatClient chatClient){ this.chatclient = chatClient; } /**r * */ @Override public void run() { try { //等待用户输入的消息 BufferedReader consoleReader = new BufferedReader( new InputStreamReader(System.in) ); while(true){ String input = consoleReader.readLine(); //向服务器发送消息 chatclient.send(input); //检查用户是否准备退出 if(chatclient.readyToQuit(input)){ break; } } } catch (IOException e) { e.printStackTrace(); } } }
3. 执行效果截图
加载全部内容