Java 实现http轮询实时返回数据接口 Java 利用DeferredResult实现http轮询实时返回数据接口
bridgeli 人气:1今天这篇文章呢,不难,其实是解答我一直以来心里的一个疑问。是这样的,之前看五八技术委员会主席沈剑老师的公众号架构师之路的一篇文章:http 如何像 tcp 一样实时的收消息,里面其中的一个方案是用 http 短连接轮询的方式实现“伪长连接”。但是对于轮询,我们的第一反应肯定是有延时,但是标题不是说的是实时吗?当然我们可以把轮询的时长缩短一些,先不说这样大部分时间的轮询调用,可能都没消息返回,造成服务器资源浪费,轮询时间再短也是有延时啊,所以难道是伪实时?反正一般消息延时个三五秒,甚至十秒八秒一分钟,大家也不会在意,只会认为对方返回慢,对不起,这是我们程序员的锅,但是 http 真的不能实现实时吗?沈剑老师提出了一种方法:首选 webim 和 webserver 之间建立一条 http 连接,专门用作消息通道,这条连接叫 http 消息连接。然后会有如下处理:
1. 没有消息到达的时候,这个 http 消息连接将被夯住,不返回,由于 http 是短连接,这个 http 消息连接最多被夯住 90 秒,就会被断开(这是浏览器或者 webserver 的行为);
2. 在 1 的情况下,如果 http 消息连接被断开,立马再发起一个 http 消息连接;
此时在在 1 和 2 的配合下,浏览器与 webserver 之间将永远有一条消息连接在,然后还有一种情况
3. 每次收到消息时,这个消息连接就能及时将消息带回浏览器页面,并且在返回后,会立马再发起一个 http 消息连接
这样就能做到使用 http 端连接轮询的方式实现了实时收消息。不过需要说明的是,其实还有一种情况:消息到达时,上一个 http 消息连接正在返回,也就是第二种情况的时候突然来了一个消息,此时没有 http 消息连接可用。虽然理论上 http 消息连接的返回是瞬时的,没有消息连接可用出现的概率极小,但是根据墨菲定律我们知道,这种情况肯定会出现,所以这种情况下我们可以将消息暂存入消息池中,下一个消息连接到达后,无需等待,直接去消息池中取消息,将将消息带回,然后立刻返回生成新的消息连接即可。
不过以上都不是今天这篇文章的重点,和今天这篇文章的标题也没有任何关系。重点是当时看了沈剑老师的这篇文章后我一直有一个疑问:第一步的时候如何夯住?总不能 sleep 吧,这多不优雅啊,由于一直以为没有遇到过类似的需求,所以这么几年来我也没深究这个问题,但是心里确实一直记着,直到前一段时间,听马士兵教育的公开课,当时再讲类似的问题的时候提到了夯住 http 的连接(具体是哪个问题,还真不记得了),虽然当时上课的老师没提怎么实现,但是评论区我问了一下,如何夯住不返回?然后有一个同学回复说,用 DeferredResult,然后下课后搜了一下资料,果然可以,如下是实现的笔记,所以这才是重点,希望对有这个疑问的同学也有一点帮助。
1. 消息返回实体类,大家可以根据实际情况,自己定义即可:
package cn.bridgeli.deferredresulttest.entity; import lombok.Data; import lombok.Getter; /** * @author bridgeli */ @Data public class DeferredResultResponse { private Integer code; private String msg; public enum Msg { TIMEOUT("超时"), FAILED("失败"), SUCCESS("成功"); @Getter private String desc; Msg(String desc) { this.desc = desc; } } }
2. controller 接口:
package cn.bridgeli.deferredresulttest.controller; import cn.bridgeli.deferredresulttest.entity.DeferredResultResponse; import cn.bridgeli.deferredresulttest.service.DeferredResultService; import org.springframework.http.HttpStatus; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestParam; import org.springframework.web.bind.annotation.RestController; import org.springframework.web.context.request.async.DeferredResult; import javax.annotation.Resource; /** * @author bridgeli */ @RestController @RequestMapping(value = "/deferred-result") public class DeferredResultController { @Resource private DeferredResultService deferredResultService; /** * 为了方便测试,简单模拟一个 * 多个请求用同一个requestId会出问题 */ private final String requestId = "test"; @GetMapping(value = "/get") public DeferredResult<DeferredResultResponse> get(@RequestParam(value = "timeout", required = false, defaultValue = "10000") Long timeout) { DeferredResult<DeferredResultResponse> deferredResult = new DeferredResult<>(timeout); deferredResultService.process(requestId, deferredResult); return deferredResult; } /** * 设置DeferredResult对象的result属性,模拟异步操作 * * @param desired * @return */ @GetMapping(value = "/result") public String settingResult(@RequestParam(value = "desired", required = false, defaultValue = "成功") String desired) { DeferredResultResponse deferredResultResponse = new DeferredResultResponse(); if (DeferredResultResponse.Msg.SUCCESS.getDesc().equals(desired)) { deferredResultResponse.setCode(HttpStatus.OK.value()); deferredResultResponse.setMsg(desired); } else { deferredResultResponse.setCode(HttpStatus.INTERNAL_SERVER_ERROR.value()); deferredResultResponse.setMsg(DeferredResultResponse.Msg.FAILED.getDesc()); } deferredResultService.settingResult(requestId, deferredResultResponse); return "Done"; } }
其中:/get 接口模拟沈剑老师说的消息连接,/result 接口模拟有一条新消息来了,然后 /get 接口会立即返回。主要注意的是 requestId,在实际项目中不能使用同一个,否则会出现问题,这个测一下就知道了,也很容易想到原因。
3. service 实现:
package cn.bridgeli.deferredresulttest.service; import cn.bridgeli.deferredresulttest.entity.DeferredResultResponse; import org.springframework.http.HttpStatus; import org.springframework.stereotype.Service; import org.springframework.web.context.request.async.DeferredResult; import java.util.Map; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; import java.util.function.Consumer; /** * @author bridgeli */ @Service public class DeferredResultService { private Map<String, Consumer<DeferredResultResponse>> taskMap; public DeferredResultService() { taskMap = new ConcurrentHashMap<>(); } /** * 将请求id与setResult映射 * * @param requestId * @param deferredResult */ public void process(String requestId, DeferredResult<DeferredResultResponse> deferredResult) { // 请求超时的回调函数 deferredResult.onTimeout(() -> { taskMap.remove(requestId); DeferredResultResponse deferredResultResponse = new DeferredResultResponse(); deferredResultResponse.setCode(HttpStatus.REQUEST_TIMEOUT.value()); deferredResultResponse.setMsg(DeferredResultResponse.Msg.TIMEOUT.getDesc()); deferredResult.setResult(deferredResultResponse); }); Optional.ofNullable(taskMap) .filter(t -> !t.containsKey(requestId)) .orElseThrow(() -> new IllegalArgumentException(String.format("requestId=%s is existing", requestId))); taskMap.putIfAbsent(requestId, deferredResult::setResult); } /** * 这里相当于异步的操作方法 * 设置DeferredResult对象的setResult方法 * * @param requestId * @param deferredResultResponse */ public void settingResult(String requestId, DeferredResultResponse deferredResultResponse) { if (taskMap.containsKey(requestId)) { Consumer<DeferredResultResponse> deferredResultResponseConsumer = taskMap.get(requestId); // 这里相当于DeferredResult对象的setResult方法 deferredResultResponseConsumer.accept(deferredResultResponse); taskMap.remove(requestId); } } }
文章最后,我想在说明另外一个问题,我们利用 DeferredResult 实现了 http 轮询返回,其实换个思路想问题,我们是不是也实现了 http 接口延时返回?所以如果你有延时返回的需求,同样可以借助 DeferredResult 实现。
加载全部内容