netty与websockt实现聊天
配置websockt:
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.context.annotation.Configuration;/*** websocket配置*/
@Data
@Configuration
@ConfigurationProperties(prefix = "ws")
public class WsConfig {/*** websockt服务端口,不可和web服务同一个端口*/private Integer port=8779;/*** 心跳超时时间-单位-秒*/private Integer heartTimeout = 60;/*** 默认匹配的路径*/private String url="/";
}
返回实体载体:
import com.alibaba.fastjson2.JSON;
import lombok.Data;import java.io.Serializable;@Data
public class MsgBody implements Serializable {public enum MsgType{/*** 普通文字消息*/text,/*** 图片消息*/img,/*** 文件*/file,}public enum Type{/*** 自己*/self,/*** 别人*/other,}private Type type;private MsgType msgType;/*** 消息主体*/private String msgContent;public String toJson(){return JSON.toJSONString(this);}
}
import com.alibaba.fastjson2.JSON;import java.io.Serializable;
import java.util.LinkedHashMap;/*** websocket返回载体*/
public class WsBean extends LinkedHashMap<String,Object> implements Serializable {/*** 指定调用前端的回调函数*/public enum CallbackEm{/*** 通知回调函数*/notice,/*** 收到消息的回调*/receive_msg,}public WsBean(CallbackEm callbackEm,Object data) {super();this.put("code",callbackEm.name());this.put("data",data);}public static WsBean get(CallbackEm callbackEm){return new WsBean(callbackEm,"");}public static WsBean get(CallbackEm callbackEm,Object data){return new WsBean(callbackEm,data);}public WsBean setData(Object data){this.put("data",data);return this;}public WsBean set(String key,Object value){this.put(key,value);return this;}public String toJson(){return JSON.toJSONString(this);}}
netty通道:
import cn.hutool.core.map.BiMap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelId;
import io.netty.channel.group.DefaultChannelGroup;
import io.netty.handler.codec.http.websocketx.TextWebSocketFrame;
import io.netty.util.concurrent.GlobalEventExecutor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;/*** 这个类用来处理用户和连接的关联关系*/
@Slf4j
@Component
public class NioWebSocketChannelPool {/*** 用户保持连接*/private final DefaultChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);/*** 保持连接用户对应的长连接id,此为双向绑定*/private final BiMap<String, ChannelId> bindUserMap = new BiMap<>(new ConcurrentHashMap<>());/*** 新增一个客户端通道*/public void addChannel(Channel channel){channels.add(channel);}/*** 移除一个客户端通道* @param channel*/public void removeChannel(Channel channel){String mapKey = bindUserMap.getKey(channel.id());if (mapKey != null){bindUserMap.remove(mapKey);}channels.remove(channel);}/*** 绑定用户*/public void bindUser(String userId,Channel channel){bindUserMap.put(userId,channel.id());}/*** 向用户推送消息*/public void sendToUser(String userId,WsBean data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public void sendToUser(String userId,MsgBody data){ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}}public BiMap<String,ChannelId> getBindUserMap(){return bindUserMap;}/*** 群发推送消息*/public void writeAndFlush(WsBean data){Set<String> onlineIds = getBindUserMap().keySet();onlineIds.forEach(userId->{ChannelId channelId = bindUserMap.get(userId);if (channelId != null){channels.find(channelId).writeAndFlush(new TextWebSocketFrame(data.toJson()));}});}}
netty处理类:
import cn.hutool.core.util.StrUtil;
import com.xx.framework.config.WsConfig;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpRequest;
import io.netty.handler.codec.http.websocketx.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;import javax.annotation.Resource;
import java.util.HashMap;
import java.util.Map;/*** 处理端*/
@Slf4j
@ChannelHandler.Sharable
@Component
public class NioWebSocketHandler extends SimpleChannelInboundHandler<WebSocketFrame> {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketChannelPool webSocketChannelPool;@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception {log.info("客户端连接:{}",ctx.channel().id());webSocketChannelPool.addChannel(ctx.channel());super.channelActive(ctx);}@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception {log.info("客户端端口连接:{}",ctx.channel().id());webSocketChannelPool.removeChannel(ctx.channel());super.channelInactive(ctx);}@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws Exception {ctx.channel().flush();super.channelReadComplete(ctx);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.info("客户端请求数据类型:{}",msg.getClass());if (msg instanceof FullHttpRequest){fullHttpRequestHandler(ctx,(FullHttpRequest)msg);}super.channelRead(ctx, msg);}/*** 处理连接请求,客户端websockt发送握手包时会执行第一次请求* @param ctx* @param request*/private void fullHttpRequestHandler(ChannelHandlerContext ctx, FullHttpRequest request) {String uri = request.getUri();Map<String,String> params = getParams(uri);log.info("客户端请求参数:{}",params);/*** 判断请求路径是否跟配置中的一致*/if (wsConfig.getUrl().equals(getBasePath(uri))){/*** 因为有可能携带了参数,导致客户端一致无法返回握手包,因此在校验通过后,重置请求路径*/request.setUri(wsConfig.getUrl());}else{ctx.close();}String userId = params.get("user_id");if (StrUtil.isBlank(userId)){log.info("用户ID为空,无法登录");return;}webSocketChannelPool.bindUser(userId,ctx.channel());}/*** 获取URI中参数以外部分路径* @param uri* @return*/private String getBasePath(String uri) {if (uri == null || uri.isEmpty()){return null;}int idx = uri.indexOf("?");if (idx == -1){return uri;}return uri.substring(0,idx);}/*** 请路径参数转换成Map对象,如果路径参数出现重复参数名,将以最后的参数值为准* @param uri* @return*/private Map<String, String> getParams(String uri) {Map<String,String> params = new HashMap<>(10);int idx = uri.indexOf("?");if (idx != -1){String[] paramsArr = uri.substring(idx+1).split("&");for (String param:paramsArr){idx = param.indexOf("=");params.put(param.substring(0,idx),param.substring(idx+1));}}return params;}/*** 客户端发送断开请求处理*/private void closeWebSocketFrameHandler(ChannelHandlerContext ctx, CloseWebSocketFrame frame){ctx.close();}/*** 创建连接之后,客户端发送的消息都会在这里处理*/private void textWebSocketFrameHandler(ChannelHandlerContext ctx, TextWebSocketFrame frame){//客户端发送过来的内容不进行业务处理,原样返回,一般不做处理
// log.info("收到客户端信息-channelId: {}, 消息内容: {}", ctx.channel().id(), frame.text());}/*** 处理客户端心跳包*/private void pingWebSocketFrameHandler(ChannelHandlerContext ctx, PingWebSocketFrame frame){ctx.channel().writeAndFlush(new PongWebSocketFrame(frame.content().retain()));}@Overrideprotected void channelRead0(ChannelHandlerContext channelHandlerContext, WebSocketFrame webSocketFrame) throws Exception {//根据请求数量类型进厂分发处理if (webSocketFrame instanceof PingWebSocketFrame){PingWebSocketFrame pingWebSocketFrame = (PingWebSocketFrame) webSocketFrame;pingWebSocketFrameHandler(channelHandlerContext,pingWebSocketFrame);}else if (webSocketFrame instanceof TextWebSocketFrame){TextWebSocketFrame textWebSocketFrame = (TextWebSocketFrame) webSocketFrame;textWebSocketFrameHandler(channelHandlerContext,textWebSocketFrame);}else if (webSocketFrame instanceof CloseWebSocketFrame){CloseWebSocketFrame closeWebSocketFrame = (CloseWebSocketFrame) webSocketFrame;closeWebSocketFrameHandler(channelHandlerContext,closeWebSocketFrame);}}
}
netty服务类:
package com.xx.framework.ws;import com.xx.framework.config.WsConfig;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.http.HttpObjectAggregator;
import io.netty.handler.codec.http.HttpServerCodec;
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
import io.netty.handler.stream.ChunkedWriteHandler;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.DisposableBean;
import org.springframework.beans.factory.InitializingBean;
import org.springframework.core.Ordered;
import org.springframework.stereotype.Component;import javax.annotation.Resource;/*** 服务端*/
@Slf4j
@Component
public class NioWebSocketServer implements InitializingBean , DisposableBean, Ordered {@Resourceprivate WsConfig wsConfig;@Resourceprivate NioWebSocketHandler nioWebSocketHandler;private EventLoopGroup bossGroup;private EventLoopGroup workGroup;private ChannelFuture channelFuture;@Overridepublic void destroy() throws Exception {log.info("shutting down netty server....");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}if (channelFuture != null){channelFuture.channel().closeFuture().syncUninterruptibly();}log.info("netty server shutdown");}@Overridepublic void afterPropertiesSet() throws Exception {try{bossGroup = new NioEventLoopGroup();workGroup = new NioEventLoopGroup();ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.option(ChannelOption.SO_BACKLOG,1024).group(bossGroup,workGroup).channel(NioServerSocketChannel.class).localAddress(wsConfig.getPort()).childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();pipeline.addLast(new HttpServerCodec()).addLast(new ChunkedWriteHandler()).addLast(new HttpObjectAggregator(8192)).addLast(nioWebSocketHandler).addLast(new WebSocketServerProtocolHandler(wsConfig.getUrl(),null,true));}});channelFuture = serverBootstrap.bind().sync();}finally {if (channelFuture != null && channelFuture.isSuccess()){log.info("netty server startup on port:{} (websockt)with context path '{}'",wsConfig.getPort(),"/");}else{log.info("netty server startup failed");if (bossGroup != null){bossGroup.shutdownGracefully().sync();}if (workGroup != null){workGroup.shutdownGracefully().sync();}}}}@Overridepublic int getOrder() {return 0;}
}
以上代码serviceImpl应用:
/*** 新增聊天记录** @param chatRecord 聊天记录* @return 结果*/@Overridepublic int insertChatRecord(ChatRecord chatRecord) {MsgBody msgBody = new MsgBody();if (StrUtil.isNotBlank(chatRecord.getMsgType())){if (chatRecord.getMsgType().equals(MsgTypeEnum.TEXT.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.text);}else if (chatRecord.getMsgType().equals(MsgTypeEnum.IMG.getMsgType())){msgBody.setMsgType(MsgBody.MsgType.img);}}msgBody.setMsgContent(chatRecord.getMsgContent());chatRecord.setMsgContent(JSON.toJSONString(msgBody));chatRecord.setId(IdUtils.getLongId());chatRecord.setCreateTime(DateUtils.getNowDate());int insert = chatRecordMapper.insertChatRecord(chatRecord);if(insert > 0){msgBody.setType(MsgBody.Type.self);//通知浏览器用户 webSocketChannelPool.sendToUser(chatRecord.getSendUserId().toString(),msgBody);msgBody.setType(MsgBody.Type.other);webSocketChannelPool.sendToUser(chatRecord.getReceiveUserId().toString(),msgBody);}return insert;}
相关文章:
netty与websockt实现聊天
配置websockt: import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;/*** websocket配置*/ Data Configuration ConfigurationProperties(prefix &qu…...
21.2 CSS 三大特性与页面布局
1. 开发者工具修改样式 使用开发者工具修改样式, 操作步骤如下: * 1. 打开开发者工具: 在浏览器中右键点击页面, 然后选择检查或者使用快捷键(一般是 F12 或者 CtrlShiftI)来打开开发者工具.* 2. 打开样式编辑器: 在开发者工具中, 找到选项卡或面板, 一般是Elements或者Elemen…...
MySQL 特殊语法时间格式以及Greadb连接
一、时间语法 DATE_FORMAT和to_char() select to_char(now(),%Y-%m-%d %H:%i:%s) from dual; select DATE_FORMAT(now(),%Y-%m-%d %H:%i:%s) from dual; 2.to_date() 和STR_TO_DATE(#{date},%Y-%m-%d ) select to_date(now(),yyyy-mm-dd hh24:mi:ss) from dual;...
Python(.pyc)反编译:pycdc工具安装与使用
本文将介绍如何将python的.pyc文件反编译成源码,以便我们对源码的学习与改进。pycdc工具安装 下载地址: 1、Github地址:https://github.com/zrax/pycdc ,下载后需要使用CMake进行编译。 2、已下载好及编译好的地址:ht…...
山西电力市场日前价格预测【2023-08-28】
日前价格预测 预测明日(2023-08-28)山西电力市场全天平均日前电价为319.70元/MWh。其中,最高日前电价为371.80元/MWh,预计出现在19: 15。最低日前电价为278.59元/MWh,预计出现在13: 00。 价差方向预测 1: …...
python3/pip3 SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed
环境: mac os 背景: 电脑之前安装的是python3.9 , 现在升级到python3.10。 从python官网下载macos版本的python3.10 pkg。 双击安装。 程序使用aiohttp访问ebay 。 出错: aiohttp.client_exceptions.ClientConnectorCertifi…...
Python中的迭代器与生成器
文章目录 1、迭代器2、生成器3、列表推导式和生成器表达式4、enumerate() 在Python中,迭代器(Iterator)和生成器(Generator)是两种用于处理可迭代对象的重要工具。而可迭代对象包括列表,元组,字…...
简单着色器编写(下)
函数部分介绍完了,最后来介绍一下main函数中的部分。 std::string vertexShader "#version 330 core\n" "\n" "layout(location0)in vec4 position;" "\n" "void main()\n" "{\n&…...
go并发编程基础
go并发编程 1waitgroup WaitGroup就是等待所有的goroutine全部执行完毕,add方式和Down方法要配套使用 package mainimport ("fmt""sync" )func main() {var wq sync.WaitGroupwq.Add(100) //监控多少个goroutine执行结束for i: 0;i<100;…...
PHP之 导入excel表格时,获取日期时间变成浮点数
读取到的时间 float(0.20833333333333) 原格式 15:00:00 代码 if (Request::isPost()) {$file_url input(upfile); // 本地上传文件地址// 读取文件内容$local_file_url __dir__./../../../public.$file_url;// $spreadsheet new Spreadsheet();// $sheet $spreadsheet-…...
学习 Java 报表技术导入 Maven 依赖出错:jacob 无法下载、jasperreports 依赖错误
发生缘由 最近在做一个可视化项目,用到了 Java 报表技术。在跟着「黑马」课程导入 pom.xml 文件的时候提示下载依赖错误。 com.jacob 包无法下载Failed to read artifact descriptor for com.lowagie:itext:jar:2.1.7.js6 运行环境 电脑系统版本:Win…...
力扣-哈希-最长连续序列
题目 给定一个未排序的整数数组 nums ,找出数字连续的最长序列(不要求序列元素在原数组中连续)的长度。 请你设计并实现时间复杂度为 O(n) 的算法解决此问题。 示例 1: **输入:**nums [100,4,200,1,3,2] **输出&a…...
Java线程 - 详解(1)
一,创建线程 方法一:继承Thread类 class MyThread extends Thread{Overridepublic void run() {System.out.println("线程1");} }public class Test {public static void main(String[] args) {MyThread myThread new MyThread();myThread.…...
结构体-C语言(初阶)
目录 一、结构体声明 1.1 结构概念 1.2 结构声明 1.3 结构成员的类型 1.4 结构体变量的定义和初始化 二、结构体成员的访问 2.1 结构体变量访问成员 2.2 结构体指针访问指向变量的成员 三、结构体传参 一、结构体声明 1.1 结构概念 结构是一些值的集合,这些值称为…...
【网络】HTTPS的加密
目录 第一组,非对称加密第二组,非对称加密第三组,对称加密证书签名 HTTPS使用的是非对称加密加对称加密的方案 (非对称加密:公钥加/解密,私钥解/加密) (对称加密:一组对称…...
Nacos安装指南
Nacos安装指南 1.Windows安装 开发阶段采用单机安装即可。 1.1.下载安装包 在Nacos的GitHub页面,提供有下载链接,可以下载编译好的Nacos服务端或者源代码: GitHub主页:https://github.com/alibaba/nacos GitHub的Release下载…...
java-Optional 类详解
目录 前言 Optional的构造方法 Optional的相关方法介绍 isPresent用法: get用法: filter用法: orElse用法: orElseGet用法 orElseThrow用法 map用法 flatMap用法: 前言 Optional 类是java8的新特性࿰…...
sql数据库怎么备份,sql 实时备份
在当今互联网时代,数据已经成为企业的核心资产。然而,数据的安全性和完整性面临硬件问题、软件故障、人工操作错误等各种威胁。为了保证数据的安全,实时备份已经成为公司必须采取的重要措施之一。下面我们就重点介绍SQL实时备份的重要实施方法…...
RK3399平台开发系列讲解(存储篇)Linux 存储系统的 I/O 栈
平台内核版本安卓版本RK3399Linux4.4Android7.1🚀返回专栏总目录 文章目录 一、Linux 存储系统全景二、Linux 存储系统的缓存沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本篇将介绍 Linux 存储系统的 I/O 原理。 一、Linux 存储系统全景 我们可以把 Linux 存储系…...
Java“牵手”天猫淘口令转换API接口数据,天猫API接口申请指南
天猫平台商品淘口令接口是开放平台提供的一种API接口,通过调用API接口,开发者可以获取天猫商品的标题、价格、库存、商品快递费用,宝贝ID,发货地,区域ID,快递费用,月销量、总销量、库存、详情描…...
Ubuntu系统下交叉编译openssl
一、参考资料 OpenSSL&&libcurl库的交叉编译 - hesetone - 博客园 二、准备工作 1. 编译环境 宿主机:Ubuntu 20.04.6 LTSHost:ARM32位交叉编译器:arm-linux-gnueabihf-gcc-11.1.0 2. 设置交叉编译工具链 在交叉编译之前&#x…...
K8S认证|CKS题库+答案| 11. AppArmor
目录 11. AppArmor 免费获取并激活 CKA_v1.31_模拟系统 题目 开始操作: 1)、切换集群 2)、切换节点 3)、切换到 apparmor 的目录 4)、执行 apparmor 策略模块 5)、修改 pod 文件 6)、…...
使用van-uploader 的UI组件,结合vue2如何实现图片上传组件的封装
以下是基于 vant-ui(适配 Vue2 版本 )实现截图中照片上传预览、删除功能,并封装成可复用组件的完整代码,包含样式和逻辑实现,可直接在 Vue2 项目中使用: 1. 封装的图片上传组件 ImageUploader.vue <te…...
Nginx server_name 配置说明
Nginx 是一个高性能的反向代理和负载均衡服务器,其核心配置之一是 server 块中的 server_name 指令。server_name 决定了 Nginx 如何根据客户端请求的 Host 头匹配对应的虚拟主机(Virtual Host)。 1. 简介 Nginx 使用 server_name 指令来确定…...
Robots.txt 文件
什么是robots.txt? robots.txt 是一个位于网站根目录下的文本文件(如:https://example.com/robots.txt),它用于指导网络爬虫(如搜索引擎的蜘蛛程序)如何抓取该网站的内容。这个文件遵循 Robots…...
select、poll、epoll 与 Reactor 模式
在高并发网络编程领域,高效处理大量连接和 I/O 事件是系统性能的关键。select、poll、epoll 作为 I/O 多路复用技术的代表,以及基于它们实现的 Reactor 模式,为开发者提供了强大的工具。本文将深入探讨这些技术的底层原理、优缺点。 一、I…...
腾讯云V3签名
想要接入腾讯云的Api,必然先按其文档计算出所要求的签名。 之前也调用过腾讯云的接口,但总是卡在签名这一步,最后放弃选择SDK,这次终于自己代码实现。 可能腾讯云翻新了接口文档,现在阅读起来,清晰了很多&…...
iview框架主题色的应用
1.下载 less要使用3.0.0以下的版本 npm install less2.7.3 npm install less-loader4.0.52./src/config/theme.js文件 module.exports {yellow: {theme-color: #FDCE04},blue: {theme-color: #547CE7} }在sass中使用theme配置的颜色主题,无需引入,直接可…...
Kubernetes 网络模型深度解析:Pod IP 与 Service 的负载均衡机制,Service到底是什么?
Pod IP 的本质与特性 Pod IP 的定位 纯端点地址:Pod IP 是分配给 Pod 网络命名空间的真实 IP 地址(如 10.244.1.2)无特殊名称:在 Kubernetes 中,它通常被称为 “Pod IP” 或 “容器 IP”生命周期:与 Pod …...
C++_哈希表
本篇文章是对C学习的哈希表部分的学习分享 相信一定会对你有所帮助~ 那咱们废话不多说,直接开始吧! 一、基础概念 1. 哈希核心思想: 哈希函数的作用:通过此函数建立一个Key与存储位置之间的映射关系。理想目标:实现…...
