Springboot netty 聊天室
Baker_Streets 人气:0一、项目的创建
新建Spring项目:
选择JDK版本:
选择Spring Web:
项目名称和位置的设置:
二、代码编写
导入.jar包:
gson: https://search.maven.org/artifact/com.google.code.gson/gson/2.8.9/jar
DemoApplication:
package com.example.demo; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.context.ConfigurableApplicationContext; import org.springframework.core.env.Environment; import java.net.InetAddress; import java.net.UnknownHostException; @SpringBootApplication public class DemoApplication { public static void main(String[] args) throws UnknownHostException { ConfigurableApplicationContext application = SpringApplication.run(DemoApplication.class, args); Environment env = application.getEnvironment(); String host = InetAddress.getLocalHost().getHostAddress(); String port = env.getProperty("server.port"); System.out.println("[----------------------------------------------------------]"); System.out.println("聊天室启动成功!点击进入:\t http://" + host + ":" + port); System.out.println("[----------------------------------------------------------"); WebSocketServer.inst().run(53134); } }
User:
package com.example.demo; import java.util.Objects; public class User { public String id; public String nickname; public User(String id, String nickname) { super(); this.id = id; this.nickname = nickname; } public String getId() { return id; } public void setId(String id) { this.id = id; } public String getNickname() { return nickname; } public void setNickname(String nickname) { this.nickname = nickname; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; User user = (User) o; return id.equals(user.getId()); } @Override public int hashCode() { return Objects.hash(id); } public String getUid() { return id; } }
SessionGroup:
package com.example.demo; import com.google.gson.Gson; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.group.ChannelGroup; import io.netty.channel.group.ChannelGroupFuture; import io.netty.channel.group.DefaultChannelGroup; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.util.concurrent.ImmediateEventExecutor; import org.springframework.util.StringUtils; import java.util.Iterator; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; public final class SessionGroup { private static SessionGroup singleInstance = new SessionGroup(); // 组的映射 private ConcurrentHashMap<String, ChannelGroup> groupMap = new ConcurrentHashMap<>(); public static SessionGroup inst() { return singleInstance; } public void shutdownGracefully() { Iterator<ChannelGroup> groupIterator = groupMap.values().iterator(); while (groupIterator.hasNext()) { ChannelGroup group = groupIterator.next(); group.close(); } } public void sendToOthers(Map<String, String> result, SocketSession s) { // 获取组 ChannelGroup group = groupMap.get(s.getGroup()); if (null == group) { return; } Gson gson=new Gson(); String json = gson.toJson(result); // 自己发送的消息不返回给自己 // Channel channel = s.getChannel(); // 从组中移除通道 // group.remove(channel); ChannelGroupFuture future = group.writeAndFlush(new TextWebSocketFrame(json)); future.addListener(f -> { System.out.println("完成发送:"+json); // group.add(channel);//发送消息完毕重新添加。 }); } public void addSession(SocketSession session) { String groupName = session.getGroup(); if (StringUtils.isEmpty(groupName)) { // 组为空,直接返回 return; } ChannelGroup group = groupMap.get(groupName); if (null == group) { group = new DefaultChannelGroup(ImmediateEventExecutor.INSTANCE); groupMap.put(groupName, group); } group.add(session.getChannel()); } /** * 关闭连接, 关闭前发送一条通知消息 */ public void closeSession(SocketSession session, String echo) { ChannelFuture sendFuture = session.getChannel().writeAndFlush(new TextWebSocketFrame(echo)); sendFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { System.out.println("关闭连接:"+echo); future.channel().close(); } }); } /** * 关闭连接 */ public void closeSession(SocketSession session) { ChannelFuture sendFuture = session.getChannel().close(); sendFuture.addListener(new ChannelFutureListener() { public void operationComplete(ChannelFuture future) { System.out.println("发送所有完成:"+session.getUser().getNickname()); } }); } /** * 发送消息 * @param ctx 上下文 * @param msg 待发送的消息 */ public void sendMsg(ChannelHandlerContext ctx, String msg) { ChannelFuture sendFuture = ctx.writeAndFlush(new TextWebSocketFrame(msg)); sendFuture.addListener(f -> {//发送监听 System.out.println("对所有发送完成:"+msg); }); } }
SocketSession:
package com.example.demo; import io.netty.channel.Channel; import io.netty.channel.ChannelHandlerContext; import io.netty.util.AttributeKey; import java.util.HashMap; import java.util.Map; import java.util.UUID; public class SocketSession { public static final AttributeKey<SocketSession> SESSION_KEY = AttributeKey.valueOf("SESSION_KEY"); /** * 用户实现服务端会话管理的核心 */ // 通道 private Channel channel; // 用户 private User user; // session唯一标示 private final String sessionId; private String group; /** * session中存储的session 变量属性值 */ private Map<String, Object> map = new HashMap<String, Object>(); public SocketSession(Channel channel) {//注意传入参数channel。不同客户端会有不同channel this.channel = channel; this.sessionId = buildNewSessionId(); channel.attr(SocketSession.SESSION_KEY).set(this); } // 反向导航 public static SocketSession getSession(ChannelHandlerContext ctx) {//注意ctx,不同的客户端会有不同ctx Channel channel = ctx.channel(); return channel.attr(SocketSession.SESSION_KEY).get(); } // 反向导航 public static SocketSession getSession(Channel channel) { return channel.attr(SocketSession.SESSION_KEY).get(); } public String getId() { return sessionId; } private static String buildNewSessionId() { String uuid = UUID.randomUUID().toString(); return uuid.replaceAll("-", ""); } public synchronized void set(String key, Object value) { map.put(key, value); } public synchronized <T> T get(String key) { return (T) map.get(key); } public boolean isValid() { return getUser() != null ? true : false; } public User getUser() { return user; } public void setUser(User user) { this.user = user; } public String getGroup() { return group; } public void setGroup(String group) { this.group = group; } public Channel getChannel() { return channel; } }
WebSocketServer:
package com.example.demo; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.EventLoopGroup; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpServerCodec; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.codec.http.websocketx.extensions.compression.WebSocketServerCompressionHandler; import io.netty.handler.stream.ChunkedWriteHandler; import io.netty.handler.timeout.IdleStateHandler; import java.util.concurrent.TimeUnit; public class WebSocketServer { private static WebSocketServer wbss; private static final int READ_IDLE_TIME_OUT = 60; // 读超时 private static final int WRITE_IDLE_TIME_OUT = 0;// 写超时 private static final int ALL_IDLE_TIME_OUT = 0; // 所有超时 public static WebSocketServer inst() { return wbss = new WebSocketServer(); } public void run(int port) { EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class) .childHandler(new ChannelInitializer <SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // Netty自己的http解码器和编码器,报文级别 HTTP请求的解码和编码 pipeline.addLast(new HttpServerCodec()); // ChunkedWriteHandler 是用于大数据的分区传输 // 主要用于处理大数据流,比如一个1G大小的文件如果你直接传输肯定会撑暴jvm内存的; // 增加之后就不用考虑这个问题了 pipeline.addLast(new ChunkedWriteHandler()); // HttpObjectAggregator 是完全的解析Http消息体请求用的 // 把多个消息转换为一个单一的完全FullHttpRequest或是FullHttpResponse, // 原因是HTTP解码器会在每个HTTP消息中生成多个消息对象HttpRequest/HttpResponse,HttpContent,LastHttpContent pipeline.addLast(new HttpObjectAggregator(64 * 1024)); // WebSocket数据压缩 pipeline.addLast(new WebSocketServerCompressionHandler()); // WebSocketServerProtocolHandler是配置websocket的监听地址/协议包长度限制 pipeline.addLast(new WebSocketServerProtocolHandler("/ws", null, true, 10 * 1024)); // 当连接在60秒内没有接收到消息时,就会触发一个 IdleStateEvent 事件, // 此事件被 HeartbeatHandler 的 userEventTriggered 方法处理到 pipeline.addLast( new IdleStateHandler(READ_IDLE_TIME_OUT, WRITE_IDLE_TIME_OUT, ALL_IDLE_TIME_OUT, TimeUnit.SECONDS)); // WebSocketServerHandler、TextWebSocketFrameHandler 是自定义逻辑处理器, pipeline.addLast(new WebSocketTextHandler()); } }); Channel ch = b.bind(port).syncUninterruptibly().channel(); ch.closeFuture().syncUninterruptibly(); // 返回与当前Java应用程序关联的运行时对象 Runtime.getRuntime().addShutdownHook(new Thread() { @Override public void run() { SessionGroup.inst().shutdownGracefully(); bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); } }); } }
WebSocketTextHandler:
package com.example.demo; import com.google.gson.Gson; import com.google.gson.reflect.TypeToken; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.SimpleChannelInboundHandler; import io.netty.handler.codec.http.websocketx.TextWebSocketFrame; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import java.util.HashMap; import java.util.Map; public class WebSocketTextHandler extends SimpleChannelInboundHandler<TextWebSocketFrame> { //@Override protected void channelRead(ChannelHandlerContext ctx, TextWebSocketFrame msg) throws Exception { SocketSession session = SocketSession.getSession(ctx); TypeToken<HashMap<String, String>> typeToken = new TypeToken<HashMap<String, String>>() { }; Gson gson=new Gson(); java.util.Map<String,String> map = gson.fromJson(msg.text(), typeToken.getType()); User user = null; switch (map.get("type")) { case "msg": Map<String, String> result = new HashMap<>(); user = session.getUser(); result.put("type", "msg"); result.put("msg", map.get("msg")); result.put("sendUser", user.getNickname()); SessionGroup.inst().sendToOthers(result, session); break; case "init": String room = map.get("room"); session.setGroup(room); String nick = map.get("nick"); user = new User(session.getId(), nick); session.setUser(user); SessionGroup.inst().addSession(session); break; } } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { // 是否握手成功,升级为 Websocket 协议 if (evt == WebSocketServerProtocolHandler.ServerHandshakeStateEvent.HANDSHAKE_COMPLETE) { // 握手成功,移除 HttpRequestHandler,因此将不会接收到任何消息 // 并把握手成功的 Channel 加入到 ChannelGroup 中 new SocketSession(ctx.channel()); } else if (evt instanceof IdleStateEvent) { IdleStateEvent stateEvent = (IdleStateEvent) evt; if (stateEvent.state() == IdleState.READER_IDLE) { System.out.println("bb22"); } } else { super.userEventTriggered(ctx, evt); } } @Override protected void messageReceived(ChannelHandlerContext channelHandlerContext, TextWebSocketFrame textWebSocketFrame) throws Exception { } }
之后项目外创建一个test.html:
<!DOCTYPE HTML> <html> <head> <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/> <title>群聊天室</title> <style type="text/css"> body { margin-right:50px; margin-left:50px; } .ddois { position: fixed; left: 120px; bottom: 30px; } </style> </head> <body> 群名:<input type="text" id="room" name="group" placeholder="请输入群"> <br /><br /> 昵称:<input type="text" id="nick" name="name" placeholder="请输入昵称"> <br /><br /> <button type="button" onclick="enter()">进入聊天群</button> <br /><br /> <div id="message"></div> <br /><br /> <div class="ddois"> <textarea name="send" id="text" rows="10" cols="30" placeholder="输入发送消息"></textarea> <br /><br /> <button type="button" onclick="send()">发送</button> </div> <script type="text/javascript"> var webSocket; if (window.WebSocket) { webSocket = new WebSocket("ws://localhost:53134/ws"); } else { alert("抱歉,您的浏览器不支持WebSocket协议!"); } //连通之后的回调事件 webSocket.onopen = function() { console.log("已经连通了websocket"); // setMessageInnerHTML("已经连通了websocket"); }; //连接发生错误的回调方法 webSocket.onerror = function(event){ console.log("出错了"); // setMessageInnerHTML("连接失败"); }; //连接关闭的回调方法 webSocket.onclose = function(){ console.log("连接已关闭..."); } //接收到消息的回调方法 webSocket.onmessage = function(event){ console.log("bbdds"); var data = JSON.parse(event.data) var msg = data.msg; var nick = data.sendUser; switch(data.type){ case 'init': console.log("mmll"); break; case 'msg': console.log("bblld"); setMessageInnerHTML(nick+": "+msg); break; default: break; } } function enter(){ var map = new Map(); var nick=document.getElementById('nick').value; var room=document.getElementById('room').value; map.set("type","init"); map.set("nick",nick); console.log(room); map.set("room",room); var message = Map2Json(map); webSocket.send(message); } function send() { var msg = document.getElementById('text').value; var nick = document.getElementById('nick').value; console.log("1:"+msg); if (msg != null && msg != ""){ var map = new Map(); map.set("type","msg"); map.set("msg",msg); var map2json=Map2Json(map); if (map2json.length < 8000){ console.log("4:"+map2json); webSocket.send(map2json); }else { console.log("文本太长了,少写一点吧
加载全部内容
- 猜你喜欢
- 用户评论