当前位置: 首页 > news >正文

WebSocket消息推送

创建WebSocket工具类

package org.jmis.riskassess.config;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@Component
@ServerEndpoint(value = "/message-service")
public class WebSocketUtil {private static final ConcurrentHashMap<String, Session> sessions = new ConcurrentHashMap<>();private static final Logger logger = LoggerFactory.getLogger(WebSocketUtil.class);public static void pushMessage(String userId, String message) {Session session = sessions.get(userId);if (session != null && session.isOpen()) {try {session.getBasicRemote().sendText(message);} catch (IOException e) {logger.error("Failed to send message to userId: " + userId, e);}} else {// 会话失效,从会话集合中移除sessions.remove(userId, session);
//			logger.warn("Session is invalid for userId: " + userId + ", removing from sessions");}}@OnOpenpublic void onOpen(Session session, EndpointConfig config) {Map<String, List<String>> queryParams = session.getRequestParameterMap();String userId = queryParams.get("userId").get(0);sessions.put(userId, session);logger.info("WebSocket opened: " + session.getId() + ", userId: " + userId);}@OnClosepublic void onClose(Session session) {String closedSessionId = session.getId();sessions.entrySet().removeIf(entry -> entry.getValue().getId().equals(closedSessionId));logger.info("WebSocket closed: " + closedSessionId);}@OnMessagepublic void onMessage(String message, Session session) {String userId = (String) session.getUserProperties().get("userId");if (userId == null) {session.getUserProperties().put("userId", message);logger.info("User ID saved: " + message);}}public static int getSessionCount() {return sessions.size();}public static int getOpenConnectionCount() {int openConnectionCount = 0;for (Session session : sessions.values()) {if (session.isOpen()) {openConnectionCount++;}}return openConnectionCount;}public static List<Session> getOpenConnections() {List<Session> openConnections = new ArrayList<>();for (Session session : sessions.values()) {if (session.isOpen()) {openConnections.add(session);}}return openConnections;}
}

创建WebSocket配置文件

package org.jmis.riskassess.config;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.web.socket.server.standard.ServerEndpointExporter;@Configuration
public class WebSocketConfig {@Beanpublic ServerEndpointExporter serverEndpointExporter() {return new ServerEndpointExporter();}
}

业务逻辑

package org.jmis.riskassess.safeSystemDataTask;import cn.hutool.core.collection.CollectionUtil;
import com.alibaba.fastjson.JSONObject;
import com.baomidou.mybatisplus.core.conditions.Wrapper;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.query.QueryWrapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.jmis.riskassess.config.WebSocketUtil;
import org.jmis.riskassess.entity.Message;
import org.jmis.riskassess.entity.MessageUser;
import org.jmis.riskassess.pojo.MessageInfo;
import org.jmis.riskassess.pojo.MineFoundation;
import org.jmis.riskassess.service.IMessageService;
import org.jmis.riskassess.service.IMessageUserService;
import org.jmis.riskassess.vo.AlarmRealtimeVO;
import org.jmis.riskassess.vo.DzAlarmAcceptVO;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.scheduling.annotation.Scheduled;
import org.springframework.stereotype.Component;
import org.springjmis.core.tool.utils.Func;
import org.springjmis.core.tool.utils.ObjectUtil;import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;@Component
public class MessagePushTask {@Autowiredprivate WebSocketUtil webSocketUtil;@Autowiredprivate IMessageUserService messageUserService;@Autowiredprivate IMessageService messageService;@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void pushUnreadMessages() {// 查询未读消息的用户以及消息内容List<Map<String,Object>> userIds = messageUserService.findUsersWithUnreadMessages();// 推送未读消息给相应的用户for (Map<String, Object> user : userIds) {Long idLong = (Long) user.get("id");String id = String.valueOf(idLong);String userId = user.get("user_id").toString();Object isImportant= user.get("is_important");String messageName = (String) user.get("message_name");String details = (String) user.get("details");String messageIdLong = user.get("message_id").toString();String messageId = String.valueOf(messageIdLong);String type = (String) user.get("type");// 构造推送消息的内容JSONObject pushMessage = new JSONObject();pushMessage.put("messageId", messageId);pushMessage.put("messageName", messageName);pushMessage.put("details", details);pushMessage.put("userId", userId);pushMessage.put("type", type);pushMessage.put("isImportant", isImportant);pushMessage.put("id", id);// 将推送消息转换为JSON字符串String jsonMessage = pushMessage.toJSONString();// 推送消息给用户WebSocketUtil.pushMessage(userId, jsonMessage);}}//甲烷报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void alarmMessages() {// 查询未读消息的用户以及消息内容List<AlarmRealtimeVO> alarmRealtimeVOS = messageUserService.alarmMessages();// 推送未读消息给相应的用户for (AlarmRealtimeVO realtimeVO : alarmRealtimeVOS) {String id = realtimeVO.getAlarmid();String userId = realtimeVO.getUserId();String messageName = "超限报警";String details = "报警类型:"+realtimeVO.getAlarmType()+","+"测点类型:"+realtimeVO.getSensorname()+","+"报警地点:"+realtimeVO.getNodeplace()+","+"报警开始时间:"+realtimeVO.getStartTime()+","+"报警持续时长:"+realtimeVO.getTimeLong()+","+"报警最大值:"+realtimeVO.getMaxdata();saveMessage(id,messageName,details,"jkAlarm");saveMessageUser(id,userId);}}//矿井超员报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void mineOverAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> messageInfos = messageUserService.mineOverAlarmMessages();// 推送未读消息给相应的用户for (MessageInfo realtimeVO : messageInfos) {saveMessage(realtimeVO.getId(),realtimeVO.getMessageName(),realtimeVO.getDetails(),"ryAlarm");saveMessageUser(realtimeVO.getId(),realtimeVO.getUserId());}}//地震报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void dzAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> messageInfos = messageUserService.dzAlarmMessages();// 推送未读消息给相应的用户for (MessageInfo realtimeVO : messageInfos) {String id = realtimeVO.getId();List<String> list = Func.toStrList(realtimeVO.getUserId());String messageName = "地震报警";String details = realtimeVO.getDetails();saveMessage(id,messageName,details,"dzAlarm");for (String s : list) {saveMessageUser(id,s);}}}//天气报警推送@Scheduled(fixedDelay = 10000) // 每10秒执行一次public void tqAlarmMessages() {// 查询未读消息的用户以及消息内容List<MessageInfo> tqAlarmMessages = messageUserService.tqAlarmMessages();// 存入message表和messageUser表for (MessageInfo realtimeVO : tqAlarmMessages) {String id = realtimeVO.getId();List<String> list = Func.toStrList(realtimeVO.getUserId());String messageName = realtimeVO.getMessageName();String details = realtimeVO.getDetails();saveMessage(id,messageName,details,"tqAlarm");for (String s : list) {saveMessageUser(id,s);}}}public void saveMessage(String id ,String messageName,String details,String type){List<String> list = messageService.messageIdList();if (!list.contains(id)){Message message=new Message();message.setMessageId(id);message.setMessageName(messageName);message.setType(type);message.setDetails(details);messageService.save(message);}}public void saveMessageUser(String id,String userId){List<MessageUser> messageUserServiceOne= messageUserService.getMessageUser(id,userId);if (CollectionUtil.isEmpty(messageUserServiceOne)) {MessageUser messageUser = new MessageUser();messageUser.setUserId(userId);messageUser.setMessageId(id);messageUserService.save(messageUser);}}}

相关文章:

WebSocket消息推送

创建WebSocket工具类 package org.jmis.riskassess.config;import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component;import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.serve…...

二维码智慧门牌管理系统:让城市管理更智能、便捷

文章目录 前言一、二维码智慧门牌管理系统的特点二、数据集约化与规范化三、管理智能化与长效化四、标识规范化与易维护五、服务多元化与便捷化 前言 随着城市化进程的加速&#xff0c;城市管理面临着越来越多的挑战。为了解决地名地址管理交织错综、地名地址支撑政府管理成效…...

React动态添加标签组件

背景 在前端开发的过程中,一些表单的输入经常需要输入多个内容,如果采用一个输入框逗号分隔的方式,展示起来不是很清晰,一般需要采用标签的方式 需求 可以指定空状态时的标题设置标签颜色每个标签的最大长度(字符数)接口传递的时候的分隔标记(是用逗号,还是其他)直接处理表单,不…...

[Linux]套接字通信

摘于https://subingwen.cn,作者:苏丙榅 侵删 文章目录 1. 套接字-socket1.1 概念1.2 网络协议1.3 socket编程1.3.1 字节序1.3.2 IP地址转换1.3.3 sockaddr 数据结构1.3.4 套接字函数 1.4 TCP通信流程1.4.1 服务器端通信流程1.4.2 客户端的通信流程 1.5 扩展阅读1.5.1 初始化套…...

MySQL的故事——MySQL架构与历史

MySQL架构与历史 文章目录 MySQL架构与历史一、MySQL逻辑架构二、并发控制三、事务四、多版本并发控制(MVCC) 一、MySQL逻辑架构 第一层&#xff1a;连接处理、授权认证、安全等等 第二层&#xff1a;查询解析、分析、优化、缓存以及所有的内置函数。包含跨存储引擎的功能&…...

手写Mybatis:第12章-完善ORM框架,增删改查操作

文章目录 一、目标&#xff1a;完善增删改查二、设计&#xff1a;完善增删改查三、实现&#xff1a;完善增删改查3.1 工程结构3.2 完善增删改查类图3.3 扩展解析元素3.4 新增执行方法3.4.1 执行器接口添加update3.4.2 执行器抽象基类3.4.3 简单执行器 3.5 语句处理器实现3.5.1 …...

【1】DDR---容量计算

1、容量计算 density&#xff1a;芯片容量&#xff0c;bit为单位 depth&#xff1a;地址空间&#xff0c; width&#xff1a;数据位宽 densitydepth*width 2、三星DDR 4Gbit&#xff08;总容量&#xff09;256M&#xff08;地址空间&#xff09;*16&#xff08;位宽&#xff…...

YashanDB:潜心实干,数据库核心技术突破没有捷径可走

都说数据库是三大基础软件中的一块硬骨头&#xff0c;技术门槛高、研发周期长、工程要求高&#xff0c;市场长期被几大巨头所把持。 因此&#xff0c;实现突破一直是中国数据库产业的夙愿。自上个世纪80年代起&#xff0c;中国数据库产业走过艰辛坎坷的四十余载&#xff0c;终…...

Talk | ICCV‘23南洋理工大学博士后李祥泰:面向统一高效的视频分割方法设计

本期为TechBeat人工智能社区第528期线上Talk&#xff01; 北京时间9月6日(周三)20:00&#xff0c;南洋理工大学博士后研究员—李祥泰的Talk已准时在TechBeat人工智能社区开播&#xff01; 他与大家分享的主题是: “面向统一高效的视频分割方法设计”&#xff0c;他分享了其在视…...

怎样把英语视频字幕翻译成中文

我们知道&#xff0c;随着中外文化交流日益频繁&#xff0c;视频翻译作为一种重要的跨文化交流方式&#xff0c;也越来越受到重视。那么&#xff0c;怎样把英语视频翻译成中文&#xff0c;北京视频翻译哪里比较专业&#xff1f; 据了解&#xff0c;视频翻译是直接将一种语言的音…...

智慧铁路:机车整备场数字孪生

机车整备场是铁路运输系统中的重要组成部分&#xff0c;它承担着机车的维修、保养和整备工作&#xff0c;对保障铁路运输的运维和安全起着至关重要的作用。 随着铁路运输的发展、机车技术的不断进步&#xff0c;以及数字化转型的不断推进&#xff0c;数字孪生技术在机车整备场…...

ImageSharp.Web实战:轻松搭建高效图片服务

很多情况下&#xff0c;在开发如PC、H5、小程序等综合平台的时候&#xff0c;图片的展示是个比较头疼的问题。尤其是有会员功能&#xff0c;会员可以上传图片的平台&#xff0c;更是一件麻烦事。平台展示图片的地方&#xff0c;尺寸是定义好的。但用户不配合&#xff0c;上传的…...

端口扫描-安全体系-网络安全技术和协议

端口扫描-安全体系-网络安全技术和协议 端口扫描信息安全的保证体系和评估方法网络安全技术网络攻击和威胁(重要)网络安全协议 端口扫描 全TCP连接:三次握手 半打开式扫描:前两次握手 FIN扫描:不用建立TCP连接 第三方扫描: 拒绝服务攻击有: 同步包风暴ICMP攻击SNMP攻击 都是修改…...

C# wpf 实现截屏框热键截屏功能

wpf截屏系列 第一章 使用GDI实现截屏 第二章 使用DockPanel制作截屏框 第三章 实现截屏框热键截屏&#xff08;本章&#xff09; 第四章 实现截屏框实时截屏 第五章 使用ffmpeg命令行实现录屏 文章目录 wpf截屏系列前言一、实现步骤1、响应热键2、截屏显示&#xff08;1&#…...

springboot + activiti实现activiti微服务化

概述 本文介绍如何将springbootactiviti进行整合,并配合eureka,zuul和feign实现activiti的微服务化,将流程控制和业务逻辑分离. 并实现了几个比较特殊的功能,比如时间段委托(某人请假或出差,出差时间内,所有待办交给被委托人处理),比如节点的无限级加签功能(流程本身有不确定性…...

c语言练习41:深入理解字符串函数strlen strcpy strcat

深入理解字符串函数strlen strcpy strcat 模拟实现&#xff1a;”strlen strcpy strcat strlen strcat: #define _CRT_SECURE_NO_WARNINGS #include<stdio.h> #include<assert.h> strlen 1.通过指针移动模拟 //int my_strlen(char* str) { // size_t c…...

Vue3+Vue-i18n+I18N ALLY+VSCODE 自动翻译多国语言

ps: 效果图放前面,符合的往下看&#xff0c;不符合的出门右转&#xff0c;希望多多点赞评论支持。 三种语言模式&#xff0c;分别是中文、英文、日文 批量翻译 最后的结果 配置vue-i18n 1、下载安装vue-i18n&#xff0c;9以上的版本。 2、创建对应文件夹 3、对应文件夹中代…...

idea意外退出mac

目录 问题描述 解决过程 问题描述 mac上的idea我很久没用了&#xff0c;之前用的时候还是发布新版的开源项目&#xff0c;这几天再用的时候&#xff0c;就出现了idea意外退出的问题&#xff0c;我上网查找了很久&#xff0c;对于我的问题都没有很好的解决。 解决过程 在寻求…...

百度智能云千帆大模型平台2.0来了!从大模型到生产力落地的怪兽级平台!!

目录 前言 最佳算力效能为企业降低门槛 最多大模型&#xff0c;最多数据集为企业保驾护航 企业级安全对于企业来说是硬性要求 前言 普通人或许感知不明显&#xff0c;但是对于企业而言&#xff0c;身处AI时代&#xff0c;是否选择投资大模型&#xff0c;是否拥抱人工智能…...

k8s nfs-client 添加挂载参数 —— 筑梦之路

背景介绍 为什么要使用noresvport参数挂载NAS&#xff1f;不重新挂载会有什么后果&#xff1f; 如果发生网络切换或者后端服务的HA倒换&#xff0c;小概率会造成NFS文件系统阻塞&#xff0c;那就可能需要几分钟时间连接才会自动恢复&#xff0c;极端情况下甚至需要重启ECS才能恢…...

利用ngx_stream_return_module构建简易 TCP/UDP 响应网关

一、模块概述 ngx_stream_return_module 提供了一个极简的指令&#xff1a; return <value>;在收到客户端连接后&#xff0c;立即将 <value> 写回并关闭连接。<value> 支持内嵌文本和内置变量&#xff08;如 $time_iso8601、$remote_addr 等&#xff09;&a…...

JavaScript 中的 ES|QL:利用 Apache Arrow 工具

作者&#xff1a;来自 Elastic Jeffrey Rengifo 学习如何将 ES|QL 与 JavaScript 的 Apache Arrow 客户端工具一起使用。 想获得 Elastic 认证吗&#xff1f;了解下一期 Elasticsearch Engineer 培训的时间吧&#xff01; Elasticsearch 拥有众多新功能&#xff0c;助你为自己…...

PPT|230页| 制造集团企业供应链端到端的数字化解决方案:从需求到结算的全链路业务闭环构建

制造业采购供应链管理是企业运营的核心环节&#xff0c;供应链协同管理在供应链上下游企业之间建立紧密的合作关系&#xff0c;通过信息共享、资源整合、业务协同等方式&#xff0c;实现供应链的全面管理和优化&#xff0c;提高供应链的效率和透明度&#xff0c;降低供应链的成…...

学校招生小程序源码介绍

基于ThinkPHPFastAdminUniApp开发的学校招生小程序源码&#xff0c;专为学校招生场景量身打造&#xff0c;功能实用且操作便捷。 从技术架构来看&#xff0c;ThinkPHP提供稳定可靠的后台服务&#xff0c;FastAdmin加速开发流程&#xff0c;UniApp则保障小程序在多端有良好的兼…...

如何理解 IP 数据报中的 TTL?

目录 前言理解 前言 面试灵魂一问&#xff1a;说说对 IP 数据报中 TTL 的理解&#xff1f;我们都知道&#xff0c;IP 数据报由首部和数据两部分组成&#xff0c;首部又分为两部分&#xff1a;固定部分和可变部分&#xff0c;共占 20 字节&#xff0c;而即将讨论的 TTL 就位于首…...

2023赣州旅游投资集团

单选题 1.“不登高山&#xff0c;不知天之高也&#xff1b;不临深溪&#xff0c;不知地之厚也。”这句话说明_____。 A、人的意识具有创造性 B、人的认识是独立于实践之外的 C、实践在认识过程中具有决定作用 D、人的一切知识都是从直接经验中获得的 参考答案: C 本题解…...

laravel8+vue3.0+element-plus搭建方法

创建 laravel8 项目 composer create-project --prefer-dist laravel/laravel laravel8 8.* 安装 laravel/ui composer require laravel/ui 修改 package.json 文件 "devDependencies": {"vue/compiler-sfc": "^3.0.7","axios": …...

听写流程自动化实践,轻量级教育辅助

随着智能教育工具的发展&#xff0c;越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式&#xff0c;也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建&#xff0c;…...

Python基于历史模拟方法实现投资组合风险管理的VaR与ES模型项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档&#xff09;&#xff0c;如需数据代码文档可以直接到文章最后关注获取。 1.项目背景 在金融市场日益复杂和波动加剧的背景下&#xff0c;风险管理成为金融机构和个人投资者关注的核心议题之一。VaR&…...

【无标题】路径问题的革命性重构:基于二维拓扑收缩色动力学模型的零点隧穿理论

路径问题的革命性重构&#xff1a;基于二维拓扑收缩色动力学模型的零点隧穿理论 一、传统路径模型的根本缺陷 在经典正方形路径问题中&#xff08;图1&#xff09;&#xff1a; mermaid graph LR A((A)) --- B((B)) B --- C((C)) C --- D((D)) D --- A A -.- C[无直接路径] B -…...