曹工杂谈:花了两天时间,写了一个netty实现的http客户端,支持同步转异步和连接池(1)--核心逻辑讲解
三国梦回 人气:1
# 背景
先说下写这个的目的,其实是好奇,dubbo是怎么实现同步转异步的,然后了解到,其依赖了请求中携带的请求id来完成这个连接复用;然后我又发现,redisson这个redis客户端,底层也是用的netty,那就比较好奇了:netty是异步的,上层是同步的,要拿结果的,同时呢,redis协议也不可能按照redisson的要求,在请求和响应里携带请求id,那,它是怎么实现同步转异步的呢,异步结果回来后,又是怎么把结果对应上的呢?
对redisson debug调试了long long time之后(你们知道的,多线程不好调试),大概理清了思路,基本就是:连接池 的思路。比如,我要访问redis:
1. 我会先去连接池里拿一个连接(其实是一个netty的socketChannel),然后用这个连接,去发起请求。
2. 上层新建一个promise(可写的future,熟悉completablefuture的可以秒懂,不熟悉的话,可以理解为一个阻塞队列,你去取东西,取不到,阻塞;生产者往队列放一个东西,你就不再阻塞了,且拿到了东西),把发送请求的任务交给下层的netty channel后,_将promise设置为netty channel的一个attribute_,然后在这个promise上阻塞等待
3. 下层的netty channel向redis 服务器发起请求
4. netty接收到redis 服务器的响应后,从channel中取到第二步设置的attribute,获取到promise,此时,相当于拿到了锁,然后打开锁,并把结果设置到promise中
5. 主线程被第四步唤醒后,拿到结果并返回。
其实问题的关键是,第二步的promise传递,要设置为channel的一个attribute,不然的话,响应回来后,也不知道把响应给谁。
理清了redisson的基本思路后,我想到了很早之前,面试oppo,二面的面试官就问了我一个问题:写过类似代理的中间件没有?(因为当时面试的是中间件部门)
然后我说没有,然后基本就凉了。
其实,中间件最主要的要求,尤其是代理这种,一方面接收请求,一方面还得作为客户端去发起请求,发起请求这一步,很容易变成性能瓶颈,不少实现里,这一步都是直接使用http client这类同步请求的工具(也是支持异步的,只是同步更常见),所以我也一直想写一个netty这种异步的客户端,同时还能同步转异步的,不能同步转异步,应用场景就比较受限了。
#实现思路
源码给懒得看文字的同学:
扯了这么多,我说下我这个http client的思路,和上面那个redisson的差不多,我这边的场景也是作为一个中间件,要访问的后端服务就几个,比如要访问http://192.168.19.102:8080下的若干服务,我这边是启动时候,就会去建一个连接池(直接配置commons pool2的池化参数,我这里配置的是,2个连接),连接池好了后,netty 的channel已经是ok的了,如下所示:
![](https://img2020.cnblogs.com/blog/519126/202003/519126-20200319090833124-1995346023.png)
这每一个长连接,是包在我们的一个核心的数据结构里的,叫NettyClient。
核心的属性,其实主要下面两个:
```java
//要连接的host和端口
private HostAndPortConfig config;
/**
* 当前使用的channel
*/
Channel channel;
```
## NettyClient的初始化
### 构造函数
构造函数如下:
```java
public NettyClient(HostAndPortConfig config) {
this.config = config;
}
@Data
@AllArgsConstructor
@NoArgsConstructor
public class HostAndPortConfig {
private String host;
private Integer port;
}
```
够简单吧,先不考虑连接池,最开始测试的时候,我就是这样,直接new对象的。
```java
public static void main(String[] args) {
HostAndPortConfig config = new HostAndPortConfig("192.168.19.102", 8080);
NettyClient client = new NettyClient(config);
client.initConnection();
NettyHttpResponse response = client.doPost("http://192.168.19.102:8080/BOL_WebService/xxxxx.do",
JSONObject.toJSONString(new Object()));
if (response == null) {
return;
}
System.out.println(response.getBody());
}
```
### 初始化连接
上面的测试代码,new完对象后,开始初始化连接。
```java
public void initConnection() {
log.info("initConnection starts...");
Bootstrap bootstrap;
//1.创建netty所需的bootstrap配置
bootstrap = createBootstrap(config);
//2.发起连接
ChannelFuture future = bootstrap.connect(config.getHost(), config.getPort());
log.info("current thread:{}", Thread.currentThread().getName());
//3.等待连接成功
boolean ret = future.awaitUninterruptibly(2000, MILLISECONDS);
boolean bIsSuccess = ret && future.isSuccess();
if (!bIsSuccess) {
//4.不成功抛异常
bIsConnectionOk = false;
log.error("host config:{}",config);
throw new RuntimeException("连接失败");
}
//5.走到这里,说明成功了,新的channle赋值给field
cleanOldChannelAndCancelReconnect(future, channel);
bIsConnectionOk = true;
}
```
这里初始化连接是直接同步等待的,如果失败,直接抛异常。第5步里,主要是把新的channel赋值给当前对象的一个field,同时,关闭旧的channle之类的。
```java
private void cleanOldChannelAndCancelReconnect(ChannelFuture future, Channel oldChannel) {
/**
* 连接成功,关闭旧的channel,再用新的channel赋值给field
*/
try {
if (oldChannel != null) {
try {
log.info("Close old netty channel " + oldChannel);
oldChannel.close();
} catch (Exception e) {
log.error("e:{}", e);
}
}
} finally {
/**
* 新channel覆盖field
*/
NettyClient.this.channel = future.channel();
NettyClient.this.bIsConnectionOk = true;
log.info("connection is ok,new channel:{}", NettyClient.this.channel);
if (NettyClient.this.scheduledFuture != null) {
log.info("cancel scheduledFuture");
NettyClient.this.scheduledFuture.cancel(true);
}
}
}
```
### netty client中,涉及的出站handler
这里说下前面的bootstrap的构造,如下:
```java
private Bootstrap createBootstrap(HostAndPortConfig config) {
Bootstrap bootstrap = new Bootstrap()
.channel(NioSocketChannel.class)
.group(NIO_EVENT_LOOP_GROUP);
bootstrap.handler(new CustomChannelInitializer(bootstrap, config, this));
bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 2000);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.option(ChannelOption.TCP_NODELAY, true);
bootstrap.option(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
return bootstrap;
}
```
handler 链,主要在CustomChannelInitializer类中。
```java
protected void initChannel(Channel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
// http客户端编解码器,包括了客户端http请求编码,http响应的解码
pipeline.addLast(new HttpClientCodec());
// 把多个HTTP请求中的数据组装成一个
pipeline.addLast(new HttpObjectAggregator(65536));
// 用于处理大数据流
pipeline.addLast(new ChunkedWriteHandler());
/**
* 重连handler
*/
pipeline.addLast(new ReconnectHandler(nettyClient));
/**
* 发送业务数据前,进行json编码
*/
pipeline.addLast(new HttpJsonRequestEncoder());
pipeline.addLast(new HttpResponseHandler());
}
```
其中,出站时(即客户端向外部write时),涉及的handler如下:
1. HttpJsonRequestEncoder,把业务对象,转变为httpRequest
2. HttpClientCodec,把第一步传给我们的httpRequest,编码为bytebuf,交给channel发送
简单说下HttpJsonRequestEncoder,这个是我自定义的:
```java
/**
* http请求发送前,使用该编码器进行编码
*
* 本来是打算在这里编码body为json,感觉没必要,直接上移到工具类了
*/
public class HttpJsonRequestEncoder extends
MessageToMessageEncoder {
final static String CHARSET_NAME = "UTF-8";
final static Charset UTF_8 = Charset.forName(CHARSET_NAME);
@Override
protected void encode(ChannelHandlerContext ctx, NettyHttpRequest nettyHttpRequest,
List
加载全部内容