亲宝软件园·资讯

展开

spring中websocket定时任务实现实时推送

C~LOVER 人气:0

有时候业务要求websocket连接后,服务端实时每隔一段时间就将数据推送给客户端进行响应,这时就需要websocket+定时任务一起来实现实时推送数据给客户端了。
使用的定时任务方式为spring的TaskScheduler对象实现任务调度。

TaskScheduler定时任务实现

TaskScheduler接口提供了多种调度方法来实现运行任务的执行。

public interface TaskScheduler {
 
 	//通过触发器来决定task是否执行
    ScheduledFuture schedule(Runnable task, Trigger trigger); 
 
 	//在starttime的时候执行一次
    ScheduledFuture schedule(Runnable task, Date startTime);  
    ScheduledFuture schedule(Runnable task, Instant startTime); 
 
 	//从starttime开始每个period时间段执行一次task
    ScheduledFuture scheduleAtFixedRate(Runnable task, Instant startTime, Duration period); 
    ScheduledFuture scheduleAtFixedRate(Runnable task, Date startTime, long period); 
 
 	//每隔period执行一次
    ScheduledFuture scheduleAtFixedRate(Runnable task, long period);  
    ScheduledFuture scheduleAtFixedRate(Runnable task, Duration period);  
 
 	//从startTime开始每隔delay长时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, Date startTime, long delay); 
 
 	//每隔delay时间执行一次
    ScheduledFuture scheduleWithFixedDelay(Runnable task, long delay); 
}

简单测试一下

import cn.hutool.core.date.DateUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.context.annotation.Bean;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;

/**
 * The type Task scheduler test.
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:45:17
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TaskSchedulerTest {

    private final TaskScheduler taskScheduler;

    @Bean
    public void test() {
        //每隔3秒执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //每隔1秒执行一次
        //Trigger trigger1 = new PeriodicTrigger(1, TimeUnit.SECONDS);
        taskScheduler.schedule(new MyThread(), trigger);
    }

    private class MyThread implements Runnable {
        @Override
        public void run() {
            log.info("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
        }
    }

}

效果就是每个3秒执行一次

在这里插入图片描述

websocket+定时任务实时推送

实现的业务需求如下:客户端连上来以后就每隔3秒向客户端实时推送消息。有关websocket的实现见文章websocket简单实现

TestWebsocket.java

import cn.hutool.core.collection.CollUtil;
import cn.hutool.core.date.DateUtil;
import cn.hutool.core.map.MapUtil;
import cn.hutool.core.text.CharSequenceUtil;
import cn.hutool.json.JSONUtil;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.Trigger;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.stereotype.Component;
import org.springframework.web.socket.*;

import java.io.IOException;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledFuture;

/**
 * 测试websocket
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 14:55:29
 */
@Slf4j
@Component
@RequiredArgsConstructor
public class TestWebsocket implements WebSocketHandler {

    protected static final CopyOnWriteArrayList<WebSocketSession> WEB_SOCKET_SESSIONS = new CopyOnWriteArrayList<>();

    /**
     * 定时任务集合
     */
    Map<String, ScheduledFuture<?>> stringScheduledFutureMap = new ConcurrentHashMap<>();

    /**
     * taskScheduler
     */
    private final TaskScheduler taskScheduler;

    /**
     * 建立连接后操作
     *
     * @param session 连接session信息
     * @throws Exception exception
     */
    @Override
    public void afterConnectionEstablished(WebSocketSession session) throws Exception {
        sendMessage("连接成功~~~~~~,sessionId=" + session.getId());
        WEB_SOCKET_SESSIONS.add(session);
        //设置定时任务,每隔3s执行一次
        Trigger trigger = new CronTrigger("0/3 * * * * *");
        //开启一个定时任务
        ScheduledFuture<?> schedule = taskScheduler.schedule(new CustomizeTask(session.getId()), trigger);
        //根据session连接id定时任务线程存到map中
        stringScheduledFutureMap.put(session.getId(), schedule);
    }

    private class CustomizeTask implements Runnable {
        private final String sessionId;

        CustomizeTask(String sessionId) {
            this.sessionId = sessionId;
        }

        @Override
        public void run() {
            try {
                String message = CharSequenceUtil.format("定时执行线程名称=【{}】,执行时间=【{}】", Thread.currentThread().getName(), DateUtil.date());
                sendMessage(JSONUtil.toJsonStr(message), sessionId);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    /**
     * 接收到消息后的处理
     *
     * @param session 连接session信息
     * @param message 信息
     * @throws Exception exception
     */
    @Override
    public void handleMessage(WebSocketSession session, WebSocketMessage<?> message) throws Exception {
        sendMessage("接收到的消息为=【" + message + "】,sessionId=【" + session.getId() + "】,回复消息=【你好呀!】");
    }

    /**
     * ws连接出错时调用
     *
     * @param session   session连接信息
     * @param exception exception
     * @throws Exception exception
     */
    @Override
    public void handleTransportError(WebSocketSession session, Throwable exception) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接出错,即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
    }

    /**
     * 连接关闭后调用
     *
     * @param session     session连接信息
     * @param closeStatus 关闭状态
     * @throws Exception exception
     */
    @Override
    public void afterConnectionClosed(WebSocketSession session, CloseStatus closeStatus) throws Exception {
        if (session.isOpen()) {
            sendMessage("ws连接即将关闭此session,sessionId=【" + session.getId() + "】");
            session.close();
        }
        WEB_SOCKET_SESSIONS.remove(session);
        String sessionId = session.getId();
        ScheduledFuture<?> scheduledFuture = MapUtil.get(stringScheduledFutureMap, sessionId, ScheduledFuture.class);
        if (scheduledFuture != null) {
            //暂停对应session的开启的定时任务
            scheduledFuture.cancel(true);
            //集合移除
            stringScheduledFutureMap.remove(sessionId);
        }
    }

    /**
     * 是否支持分片消息
     */
    @Override
    public boolean supportsPartialMessages() {
        return false;
    }

    /**
     * 群发发送消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                webSocketSession.sendMessage(new TextMessage(message));
            }
        }
    }

    /**
     * 发给指定连接消息
     *
     * @param message 消息
     * @throws IOException ioException
     */
    public void sendMessage(String message, String sessionId) throws IOException {
        if (CollUtil.isNotEmpty(WEB_SOCKET_SESSIONS)) {
            for (WebSocketSession webSocketSession : WEB_SOCKET_SESSIONS) {
                if (sessionId.equals(webSocketSession.getId())) {
                    webSocketSession.sendMessage(new TextMessage(message));
                }
            }
        }
    }
}

websocket绑定URL

import com.yjj.test.websocket.TestWebsocket;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.WebSocketHandler;
import org.springframework.web.socket.config.annotation.EnableWebSocket;
import org.springframework.web.socket.config.annotation.WebSocketConfigurer;
import org.springframework.web.socket.config.annotation.WebSocketHandlerRegistry;

import javax.annotation.Resource;

/**
 * websocket配置
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -12-28 15:10:11
 */
@EnableWebSocket
@Configuration
public class WebSocketConfig implements WebSocketConfigurer {

    @Resource
    private TestWebsocket testWebsocket;

    /**
     * Register {@link WebSocketHandler WebSocketHandlers} including SockJS fallback options if desired.
     *
     * @param registry
     */
    @Override
    public void registerWebSocketHandlers(WebSocketHandlerRegistry registry) {
        registry.addHandler(testWebsocket, "/test").setAllowedOrigins("*");
    }
}

websocket与定时任务同时存在时,需要加入配置定义线程池进行线程的管理

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/**
 * 当定时任务和websocket同时存在时报错解决
 *
 * @author yjj
 * @version 1.0
 * @since 2022 -04-28 17:35:54
 */
@Configuration
public class ScheduledConfig {

    /**
     * Schedule本身是单线程执行的
     *
     * @return the task scheduler
     */
    @Bean
    public TaskScheduler taskScheduler() {
        ThreadPoolTaskScheduler scheduling = new ThreadPoolTaskScheduler();
        scheduling.setPoolSize(20);
        return scheduling;
    }
}

效果如下
连接上以后服务每隔3秒会向客户端实时推送消息

在这里插入图片描述

 到此这篇关于spring中websocket定时任务实现实时推送的文章就介绍到这了,更多相关spring websocket实时推送内容请搜索以前的文章或继续浏览下面的相关文章希望大家以后多多支持!

加载全部内容

相关教程
猜你喜欢
用户评论