亲宝软件园·资讯

展开

JAVA Netty聊天室 JAVA Netty实现聊天室+私聊功能的代码实例

-韧- 人气:0
想了解JAVA Netty实现聊天室+私聊功能的代码实例的相关内容吗,-韧-在本文为您仔细讲解JAVA Netty聊天室的相关知识和一些Code实例,欢迎阅读和指正,我们先划重点:JAVA,Netty聊天室,JAVA,Netty,私聊,下面大家一起来学习吧。

功能介绍

使用Netty框架实现聊天室功能,服务器可监控客户端上下限状态,消息转发。同时实现了点对点私聊功能。技术点我都在代码中做了备注,这里不再重复写了。希望能给想学习netty的同学一点参考。

服务器代码

服务器入口代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

/**
 * netty群聊 服务器端
 * @author zhang
 *
 */
public class NettyChatServer {
	private int port;
	
	public NettyChatServer(int port){
		this.port = port;
	}
	
	//初始化 netty服务器
	private void init() throws Exception{
		EventLoopGroup boss = new NioEventLoopGroup(1);
		EventLoopGroup work = new NioEventLoopGroup(16);
		try {
			ServerBootstrap boot = new ServerBootstrap();
			boot.group(boss,work);
			boot.channel(NioServerSocketChannel.class);//设置boss selector建立channel使用的对象
			boot.option(ChannelOption.SO_BACKLOG, 128);//boss 等待连接的 队列长度
			boot.childOption(ChannelOption.SO_KEEPALIVE, true); //让客户端保持长期活动状态
			boot.childHandler(new ChannelInitializer<SocketChannel>() {

				@Override
				protected void initChannel(SocketChannel ch) throws Exception {
					//从channel中获取pipeline 并往里边添加Handler
					ChannelPipeline pipeline = ch.pipeline();
					pipeline.addLast("encoder",new StringEncoder());
					pipeline.addLast("decoder",new StringDecoder());
					pipeline.addLast(new ServerMessageHandler());//自定义Handler来处理消息
				}
			});
			System.out.println("服务器开始启动...");
			//绑定端口 
			ChannelFuture channelFuture = boot.bind(port).sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isSuccess()){
						System.out.println("服务器正在启动...");
					}
					if(future.isDone()){
						System.out.println("服务器启动成功...OK");
					}
					
				}
			});
			//监听channel关闭
			channelFuture.channel().closeFuture().sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isCancelled()){
						System.out.println("服务器正在关闭..");
					}
					if(future.isCancellable()){
						System.out.println("服务器已经关闭..OK");
					}
					
				}
			});
			
		}finally{
			boss.shutdownGracefully();
			work.shutdownGracefully();
		}
	}
	/**
	 * 启动服务器 main 函数
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		new NettyChatServer(9090).init();

	}

}

服务器端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.channel.group.ChannelGroup;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.util.concurrent.GlobalEventExecutor;

import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
/**
 * 自定义 服务器端消息处理Handler
 * @author zhang
 *
 */
public class ServerMessageHandler extends SimpleChannelInboundHandler<String>{
	/**
	 * 管理全局的channel
	 * GlobalEventExecutor.INSTANCE 全局事件监听器
	 * 一旦将channel 加入 ChannelGroup 就不要用手动去
	 * 管理channel的连接失效后移除操作,他会自己移除
	 */
	private static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
	/**
	 * 为了实现私聊功能,这里key存储用户的唯一标识,
	 * 我保存 客户端的端口号
	 * 当然 这个集合也需要自己去维护 用户的上下线 不能像 ChannelGroup那样自己去维护
	 */
	private static Map<String,Channel> all = new HashMap<String,Channel>();
	
	private SimpleDateFormat sf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
	/**
	 * 处理收到的消息
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg)
			throws Exception {
		Channel channel = ctx.channel();
		/**
		 * 这里简单判断 如果内容里边包含#那么就是私聊
		 */
		if(msg.contains("#")){
			String id = msg.split("#")[0];
			String body = msg.split("#")[1];
			Channel userChannel = all.get(id);
			String key = channel.remoteAddress().toString().split(":")[1];
			userChannel.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+key+" 说 : "+body);
			return;
		}
		
		//判断当前消息是不是自己发送的
		for(Channel c : channels){
			String addr = c.remoteAddress().toString();
			if(channel !=c){
				c.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 说 : "+msg);
			}else{
				c.writeAndFlush(sf.format(new Date())+"\n 【自己】 "+addr+" 说 : "+msg);
			}
		}
		
	}
	/**
	 * 建立连接以后第一个调用的方法
	 */
	@Override
	public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
		Channel channel = ctx.channel();
		String addr = channel.remoteAddress().toString();
		/**
		 * 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息
		 * 
		 */
		channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 加入聊天室 ");
		channels.add(channel);
		String key = channel.remoteAddress().toString().split(":")[1];
		all.put(key, channel);
	}
	/**
	 * channel连接状态就绪以后调用
	 */
	@Override
	public void channelActive(ChannelHandlerContext ctx) throws Exception {
		String addr = ctx.channel().remoteAddress().toString();
		System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 上线 ");
	}
	/**
	 * channel连接状态断开后触发
	 */
	@Override
	public void channelInactive(ChannelHandlerContext ctx) throws Exception {
		String addr = ctx.channel().remoteAddress().toString();
		System.out.println(sf.format(new Date())+" \n【用户】 "+addr+" 下线 ");
		//下线移除
		String key = ctx.channel().remoteAddress().toString().split(":")[1];
		all.remove(key);
	}
	/**
	 * 连接发生异常时触发
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		//System.out.println("连接发生异常!");
		ctx.close();
	}
	/**
	 * 断开连接会触发该消息
	 * 同时当前channel 也会自动从ChannelGroup中被移除
	 */
	@Override
	public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
		Channel channel = ctx.channel();
		String addr = channel.remoteAddress().toString();
		/**
		 * 这里 ChannelGroup 底层封装会遍历给所有的channel发送消息
		 * 
		 */
		channels.writeAndFlush(sf.format(new Date())+"\n 【用户】 "+addr+" 离开了 ");
		//打印 ChannelGroup中的人数
		System.out.println("当前在线人数是:"+channels.size());
		System.out.println("all:"+all.size());
	}

	
}

客户端主方法代码

package nio.test.netty.groupChat;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import io.netty.util.concurrent.Future;
import io.netty.util.concurrent.GenericFutureListener;

import java.util.Scanner;

public class NettyChatClient {

	private String ip;
	
	private int port;
	
	public NettyChatClient(String ip,int port){
		this.ip = ip;
		this.port = port;
	}
	/**
	 * 初始化客户
	 */
	private void init() throws Exception{
		//创建监听事件的监听器
		EventLoopGroup work = new NioEventLoopGroup();
		try {
			Bootstrap boot = new Bootstrap();
			boot.group(work);
			boot.channel(NioSocketChannel.class);
			boot.handler(new ChannelInitializer<NioSocketChannel>() {

				@Override
				protected void initChannel(NioSocketChannel ch)
						throws Exception {
					ChannelPipeline pipeline = ch.pipeline();
					pipeline.addLast("encoder",new StringEncoder());
					pipeline.addLast("decoder",new StringDecoder());
					pipeline.addLast(new ClientMessageHandler());
					
				}
			});
			
			ChannelFuture channelFuture = boot.connect(ip, port).sync();
			channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() {

				@Override
				public void operationComplete(Future<? super Void> future)
						throws Exception {
					if(future.isSuccess()){
						System.out.println("客户端启动中...");
					}
					if(future.isDone()){
						System.out.println("客户端启动成功...OK!");
					}
				}
			});
			System.out.println(channelFuture.channel().localAddress().toString());
			System.out.println("#################################################");
			System.out.println("~~~~~~~~~~~~~~端口号#消息内容~~这样可以给单独一个用户发消息~~~~~~~~~~~~~~~~~~");
			System.out.println("#################################################");
			
			/**
			 * 这里用控制台输入数据
			 */
			Channel channel = channelFuture.channel();
			//获取channel
			Scanner scanner = new Scanner(System.in);
			while(scanner.hasNextLine()){
				String str = scanner.nextLine();
				channel.writeAndFlush(str+"\n");
			}
			channelFuture.channel().closeFuture().sync();
			scanner.close();
		} finally {
			work.shutdownGracefully();
		}
	}
	
	/**
	 * 主方法入口
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception{

		new NettyChatClient("127.0.0.1",9090).init();
	}

}

客户端消息处理Handler

package nio.test.netty.groupChat;

import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
/**
 * 客户点消息处理 Handler
 * @author zhang
 *
 */
public class ClientMessageHandler extends SimpleChannelInboundHandler<String> {

	/**
	 * 处理收到的消息
	 */
	@Override
	protected void channelRead0(ChannelHandlerContext ctx, String msg)
			throws Exception {
		System.out.println(msg);
		
	}
	/**
	 * 连接异常后触发
	 */
	@Override
	public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause)
			throws Exception {
		ctx.close();
		
	}
}

测试结果

启动了四个客户端 服务器端日志效果如下:


客户端一端日志:


客户端二日志:


客户端三日志:


客户端四日志:


现在在客户端四发送消息:


每个客户端都可以收到消息:




软化关闭客户端客户端三:

服务器日志:


其他客户端日志:




发送私聊消息:


这个客户端收不到消息


加载全部内容

相关教程
猜你喜欢
用户评论