当前位置: 首页 > news >正文

Spring Boot+Netty

因工作中需要给第三方屏幕厂家下发广告,音频,图片等内容,对方提供TCP接口于是我使用Netty长链接进行数据传输

1.添加依赖

 <!--  netty依赖--><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency>   

2.创建Netty服务

@Slf4j
@Component
public class NettyServer {public void start(InetSocketAddress address) {//配置服务端的NIO线程组EventLoopGroup bossGroup = new NioEventLoopGroup();EventLoopGroup workerGroup = new NioEventLoopGroup();try {// 绑定线程池,编码解码//服务端接受连接的队列长度,如果队列已满,客户端连接将被拒绝ServerBootstrap bootstrap = new ServerBootstrap().group(bossGroup, workerGroup)// 指定Channel.channel(NioServerSocketChannel.class)//使用指定的端口设置套接字地址.localAddress(address)//使用自定义处理类.childHandler(new NettyServerChannelInitializer())//服务端可连接队列数,对应TCP/IP协议listen函数中backlog参数.option(ChannelOption.SO_BACKLOG, 128)//保持长连接,2小时无数据激活心跳机制.childOption(ChannelOption.SO_KEEPALIVE, true)//将小的数据包包装成更大的帧进行传送,提高网络的负载.childOption(ChannelOption.TCP_NODELAY, true);// 绑定端口,开始接收进来的连接ChannelFuture future = bootstrap.bind(address).sync();if (future.isSuccess()) {log.info("netty服务器开始监听端口:{}",address.getPort());}//关闭channel和块,直到它被关闭future.channel().closeFuture().sync();} catch (Exception e) {e.printStackTrace();bossGroup.shutdownGracefully();workerGroup.shutdownGracefully();}}
}

3.创建Socket配置类(也可以直接在步骤2中写死)

1.在配置文件中

socket:# 监听端口 8090port: 8090#ip地址host: 0.0.0.0
#  host: 192.168.31.2@Setter
@Getter
@ToString
@Component
@Configuration
@PropertySource("classpath:application.yml")
@ConfigurationProperties(prefix = "socket")
public class SocketProperties {private Integer port;private String host;}

4.在springboot 启动类中启用Netty服务

@SpringBootApplication
public class Application implements CommandLineRunner {public static void main(String[] args) {SpringApplication application = new SpringApplication(Application.class);application.setApplicationStartup(new BufferingApplicationStartup(2048));application.run(args);}@Resourceprivate NettyServer nettyServer;@Resourceprivate SocketProperties socketProperties;@Overridepublic void run(String... args) {InetSocketAddress address = new InetSocketAddress(socketProperties.getHost(),socketProperties.getPort());nettyServer.start(address);}}

5.创建字符解析器,解析收到的消息

/*** 功能描述: 服务端初始化,客户端与服务器端连接一旦创建,这个类中方法就会被回调,设置出站编码器和入站解码器**/
public class NettyServerChannelInitializer extends ChannelInitializer<SocketChannel> {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {ChannelPipeline pipeline = socketChannel.pipeline();//接收消息格式,使用自定义解析数据格式
//        pipeline.addLast("decoder",new MyDecoder());//发送消息格式,使用自定义解析数据格式
//        pipeline.addLast("encoder",new MyEncoder());pipeline.addLast(new StringEncoder(CharsetUtil.UTF_8));//针对客户端,如果在1分钟时没有想服务端发送写心跳(ALL),则主动断开//如果是读空闲或者写空闲,不处理,这里根据自己业务考虑使用pipeline.addLast(new IdleStateHandler(0,0,90, TimeUnit.SECONDS));//自定义的空闲检测pipeline.addLast(new NettyServerHandler());}
}

6.创建Handler 类处理消息

/*** 功能描述: netty服务端处理类*/@Slf4j
@Component
public class NettyServerHandler extends ChannelInboundHandlerAdapter {/*** 功能描述: 有客户端连接服务器会触发此函数** @param ctx 通道* @return void*/@Overridepublic void channelActive(ChannelHandlerContext ctx) {InetSocketAddress insocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = insocket.getAddress().getHostAddress();int clientPort = insocket.getPort();//获取连接通道唯一标识ChannelId channelId = ctx.channel().id();//如果map中不包含此连接,就保存连接if (ChannelMap.getChannelMap().containsKey(channelId)) {log.info("客户端:{},是连接状态,连接通道数量:{} ", channelId, ChannelMap.getChannelMap().size());} else {//保存连接ChannelMap.addChannel(channelId, ctx.channel());log.info("客户端:{},连接netty服务器[IP:{}-->PORT:{}]", channelId, clientIp, clientPort);log.info("连接通道数量: {}", ChannelMap.getChannelMap().size());}}/*** 功能描述: 有客户端终止连接服务器会触发此函数* @param ctx 通道处理程序上下文* @return void*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) {InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();ChannelId channelId = ctx.channel().id();//包含此客户端才去删除if (ChannelMap.getChannelMap().containsKey(channelId)) {//删除连接ChannelMap.getChannelMap().remove(channelId);log.info("客户端:{},断开netty服务器[IP:{}-->PORT:{}]", channelId, clientIp, inSocket.getPort());log.info("连接通道数量: " + ChannelMap.getChannelMap().size());}}@Transactional@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;ByteBuf rebuf = Unpooled.buffer();RedisUtils.setChannelId(ctx.channel().id().toString(), ctx.channel().id());// 读取帧头标识byte frameHeader = buf.readByte();if (frameHeader != 0x7E) {byte[] data = ByteBufUtil.getBytes(buf);String hex = bytesToHex(data);buf.release();String content = ((ByteBuf) msg).toString(Charset.defaultCharset());}  // 读取消息帧类型else {byte messageType = buf.readByte();// 读取帧尾标识if (buf.isReadable()) {// 读取校验值byte checksum = buf.readByte();byte frameTail = buf.readByte();}}buf.release();}/*** 功能描述: 服务端给客户端发送消息** @param channelId 连接通道唯一id* @param msg       需要发送的消息内容* @return void*/public void channelWrite(ChannelId channelId, Object msg) throws Exception {Channel channel = ChannelMap.getChannelMap().get(channelId);if (channel == null) {log.info("通道:{},不存在", channelId);return;}if (msg == null || msg == "") {log.info("服务端响应空的消息");return;}//将客户端的信息直接返回写入ctxchannel.write(msg);//刷新缓存区channel.flush();}@Overridepublic void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception {String socketString = ctx.channel().remoteAddress().toString();if (evt instanceof IdleStateEvent) {IdleStateEvent event = (IdleStateEvent) evt;if (event.state() == IdleState.READER_IDLE) {log.info("Client:{},READER_IDLE 读超时", socketString);Channel channel = ctx.channel();ChannelId id = channel.id();// 超时未收到心跳包,更新设备状态为离线// todo 更新设备状态ctx.disconnect();ChannelMap.removeChannelByName(id);} else if (event.state() == IdleState.WRITER_IDLE) {log.info("Client:{}, WRITER_IDLE 写超时", socketString);ctx.disconnect();Channel channel = ctx.channel();ChannelId id = channel.id();ChannelMap.removeChannelByName(id);} else if (event.state() == IdleState.ALL_IDLE) {log.info("Client:{},ALL_IDLE 总超时", socketString);Channel channel = ctx.channel();ChannelId id = channel.id();// 超时未收到心跳包,更新设备状态为离线// todo 更新设备状态ctx.disconnect();ChannelMap.removeChannelByName(id);}}}/*** 功能描述: 发生异常会触发此函数** @param ctx   通道处理程序上下文* @param cause 异常* @return void*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {ctx.close();log.info("{}:发生了错误,此连接被关闭。此时连通数量:{}", ctx.channel().id(), ChannelMap.getChannelMap().size());}}

ChannelMap类

/*** 功能描述: 管理通道Map类**/
public class ChannelMap {/*** 管理一个全局map,保存连接进服务端的通道数量*/private static final ConcurrentHashMap<ChannelId, Channel> CHANNEL_MAP = new ConcurrentHashMap<>(128);public static ConcurrentHashMap<ChannelId, Channel> getChannelMap() {return CHANNEL_MAP;}/***  获取指定name的channel*/public static Channel getChannelByName(ChannelId channelId){if(CollectionUtils.isEmpty(CHANNEL_MAP)){return null;}return CHANNEL_MAP.get(channelId);}/***  将通道中的消息推送到每一个客户端*/public static boolean pushNewsToAllClient(String obj){if(CollectionUtils.isEmpty(CHANNEL_MAP)){return false;}for(ChannelId channelId: CHANNEL_MAP.keySet()) {Channel channel = CHANNEL_MAP.get(channelId);channel.writeAndFlush(new TextWebSocketFrame(obj));}return true;}/***  将channel和对应的name添加到ConcurrentHashMap*/public static void addChannel(ChannelId channelId,Channel channel){CHANNEL_MAP.put(channelId,channel);}/***  移除掉name对应的channel*/public static boolean removeChannelByName(ChannelId channelId){if(CHANNEL_MAP.containsKey(channelId)){CHANNEL_MAP.remove(channelId);return true;}return false;}}

相关文章:

Spring Boot+Netty

因工作中需要给第三方屏幕厂家下发广告&#xff0c;音频&#xff0c;图片等内容&#xff0c;对方提供TCP接口于是我使用Netty长链接进行数据传输 1.添加依赖 <!-- netty依赖--><dependency><groupId>io.netty</groupId><artifactId>netty-all&…...

LCR 023. 相交链表

一.题目&#xff1a; LCR 023. 相交链表 - 力扣&#xff08;LeetCode&#xff09; 二.我的原始解法-无&#xff1a; 三.其他人的正确及好的解法&#xff0c;力扣解法参考&#xff1a; 哈希表法及双指针法&#xff1a;LCR 023. 相交链表 - 力扣&#xff08;LeetCode&#xff0…...

Linux命令行下载工具

1. curl 1.1. 介绍 curl是一个功能强大的命令行工具&#xff0c;用于在各种网络协议下传输数据。它支持多种协议&#xff0c;包括但不限于 HTTP、HTTPS、FTP、FTPS、SCP、SFTP、SMTP、POP3、IMAP 等&#xff0c;这使得它在网络数据交互场景中有广泛的应用。curl可以模拟浏览器…...

期末复习-Hadoop名词解释+简答题纯享版

目录 一、名称解释&#xff08;8选5&#xff09; 1.什么是大数据 2.大数据的5V特征 3.什么是SSH 4.HDFS&#xff08;p32&#xff09; 5.名称节点 6.数据节点 7.元数据 8.倒排索引 9.单点故障 10.高可用 11.数据仓库 二、简答题 1.简述Hadoop的优点及其含义 2.简述…...

嵌入式Linux无窗口系统下搭建 Qt 开发环境

嵌入式Linux无窗口系统下搭建 Qt 开发环境 本文将介绍如何在树莓派的嵌入式 Linux 环境下&#xff0c;搭建 Qt 开发环境&#xff0c;实现无窗口系统模式&#xff08;framebuffer&#xff09;下的图形程序开发。 1. 安装 Qt 环境 接下来&#xff0c;安装核心 Qt 开发库以及与 …...

C#基础教程

1. C# 基础语法和操作符 C# 中的运算符优先级 namespace OperatorsAppl {class Program7{static void Main(string[] args){int a 20; // 定义变量aint b 10; // 定义变量bint c 15; // 定义变量cint d 5; // 定义变量dint e; // 定义变量e// 演示运算符优先级&…...

Alibaba EasyExcel 导入导出全家桶

一、阿里巴巴EasyExcel的优势 首先说下EasyExcel相对 Apache poi的优势&#xff1a; EasyExcel也是阿里研发在poi基础上做了封装&#xff0c;改进产物。它替开发者做了注解列表解析&#xff0c;表格填充等一系列代码编写工作&#xff0c;并将此抽象成通用和可扩展的框架。相对p…...

Spring Cloud + MyBatis Plus + GraphQL 完整示例

Spring Cloud MyBatis Plus GraphQL 完整示例 1、创建Spring Boot子项目1.1 配置POM&#xff0c;添加必要的依赖1.2 配置MyBatis-Plus 2、集成GraphQL2.1 定义schema.graphqls2.2 添加GraphQL解析器2.3 配置schame文件配置 3、访问测试3.1 查询测试&#xff08;演示&#xff…...

uni-app简洁的移动端登录注册界面

非常简洁的登录、注册界面模板&#xff0c;使用uni-app编写&#xff0c;直接复制粘贴即可&#xff0c;无任何引用&#xff0c;全部公开。 废话不多说&#xff0c;代码如下&#xff1a; login.vue文件 <template><view class"content"><view class&quo…...

LongVU:用于长视频语言理解的空间时间自适应压缩

晚上闲暇时间看到一种用于长视频语言理解的空间时间自适应压缩机制的研究工作LongVU&#xff0c;主要内容包括&#xff1a; 背景与挑战&#xff1a;多模态大语言模型&#xff08;MLLMs&#xff09;在视频理解和分析方面取得了进展&#xff0c;但处理长视频仍受限于LLM的上下文长…...

Elasticsearch数据迁移(快照)

1. 数据条件 一台原始es服务器&#xff08;192.168.xx.xx&#xff09;&#xff0c;数据迁移后的目标服务器&#xff08;10.2.xx.xx&#xff09;。 2台服务器所处环境&#xff1a; centos7操作系统&#xff0c; elasticsearch-7.3.0。 2. 为原始es服务器数据创建快照 修改elas…...

Linux Cgroup学习笔记

文章目录 Cgroup(Control Group)引言简介Cgroup v1通用接口文件blkio子系统cpu子系统cpuacct子系统cpuset子系统devices子系统freezer子系统hugetlb子系统memory子系统net_cls子系统net_prio子系统perf_event子系统pids子系统misc子系统 Cgroup V2基础操作组织进程和线程popula…...

百问FB显示开发图像处理 - PNG图像处理

2.3 PNG图像处理 2.3.1 PNG文件格式和libpng编译 ​ 跟JPEG文件格式一样&#xff0c;PNG也是一种使用了算法压缩后的图像格式&#xff0c;与JPEG不同&#xff0c;PNG使用从LZ77派生的无损数据压缩算法。对于PNG文件格式&#xff0c;也有相应的开源工具libpng。 libpng库可从…...

【JavaWeb后端学习笔记】MySQL多表查询(内连接、外连接、子查询)

MySQL 多表查询 1、连接查询1.1 内连接1.2 外连接 2、子查询2.1 标量子查询2.2 列子查询2.3 行子查询2.4 表子查询 3、多表查询案例 多表查询有两大类&#xff1a;连接查询和子查询。 连接查询又分为隐式/显式内连接和左/右外连接。 子查询又分为标量子查询、列子查询、行子查询…...

RocketMQ 过滤消息 基于tag过滤和SQL过滤

RocketMQ 过滤消息分为两种&#xff0c;一种tag过滤&#xff0c;另外一种是复杂的sql过滤。 tag过滤 首先创建producer然后启动&#xff0c;在这里创建了字符串的数组tags。字符串数组里面放置了多个字符串&#xff0c;然后去发送15条消息。 15条消息随着i的增长&#xff0c;…...

element-ui 基本样式的一些更改【持续更新】

1、 去除el-tabs的底部灰色横线 ::v-deep .el-tabs__nav-wrap::after {height: 0px;}2、el-table设置表头颜色 <el-table:data"tableData":header-cell-style"{background:#F7F8FA,color:#4E5869}"><el-table-columnlabel"序号"type&qu…...

element-ui radio和checkbox禁用时不置灰还是原来不禁用时的样式

把要紧用的内容加上一个class"notEdit-page" z注意要在style里面写不能加上scoped /*//checkBox自定义禁用样式*//*//checkBox自定义禁用样式*/ .notEdit-page.el-checkbox__input.is-disabled.is-checked.el-checkbox__inner::after {border-color: #fff; } .notEdi…...

第一部分:基础知识 6. 函数 --[MySQL轻松入门教程]

MySQL 提供了丰富的内置函数&#xff0c;涵盖了字符串处理、数值计算、日期时间操作、聚合分析以及控制流等多个方面。这些函数可以帮助用户更高效地进行数据查询和处理。 1.字符串函数 MySQL 提供了丰富的字符串函数来帮助用户处理和操作字符串数据。下面是一些常用的 MySQL…...

【蓝桥杯每日一题】扫雷

扫雷 知识点 2024-12-3 蓝桥杯每日一题 扫雷 dfs &#xff08;bfs也是可行的&#xff09; 题目大意 在一个二维平面上放置这N个炸雷&#xff0c;每个炸雷的信息有$(x_i,y_i,r_i) $&#xff0c;前两个是坐标信息&#xff0c;第三个是爆炸半径。然后会输入M个排雷火箭&#xff0…...

【算法】棋盘覆盖问题源代码及精简版

目录 一、题目 二、样例 三、示例代码 四、精简代码 五、总结 对于棋盘覆盖问题的解答和优化。 一、题目 输入格式&#xff1a; 第一行&#xff0c;一个整数n&#xff08;棋盘n*n&#xff0c;n确保是2的幂次&#xff0c;n<64&#xff09; 第二行&#xff0c;两个整数…...

ESP32读取DHT11温湿度数据

芯片&#xff1a;ESP32 环境&#xff1a;Arduino 一、安装DHT11传感器库 红框的库&#xff0c;别安装错了 二、代码 注意&#xff0c;DATA口要连接在D15上 #include "DHT.h" // 包含DHT库#define DHTPIN 15 // 定义DHT11数据引脚连接到ESP32的GPIO15 #define D…...

python爬虫——气象数据爬取

一、导入库与全局配置 python 运行 import json import datetime import time import requests from sqlalchemy import create_engine import csv import pandas as pd作用&#xff1a; 引入数据解析、网络请求、时间处理、数据库操作等所需库。requests&#xff1a;发送 …...

LUA+Reids实现库存秒杀预扣减 记录流水 以及自己的思考

目录 lua脚本 记录流水 记录流水的作用 流水什么时候删除 我们在做库存扣减的时候&#xff0c;显示基于Lua脚本和Redis实现的预扣减 这样可以在秒杀扣减的时候保证操作的原子性和高效性 lua脚本 // ... 已有代码 ...Overridepublic InventoryResponse decrease(Inventor…...

StarRocks 全面向量化执行引擎深度解析

StarRocks 全面向量化执行引擎深度解析 StarRocks 的向量化执行引擎是其高性能的核心设计&#xff0c;相比传统行式处理引擎&#xff08;如MySQL&#xff09;&#xff0c;性能可提升 5-10倍。以下是分层拆解&#xff1a; 1. 向量化 vs 传统行式处理 维度行式处理向量化处理数…...

CVE-2023-25194源码分析与漏洞复现(Kafka JNDI注入)

漏洞概述 漏洞名称&#xff1a;Apache Kafka Connect JNDI注入导致的远程代码执行漏洞 CVE编号&#xff1a;CVE-2023-25194 CVSS评分&#xff1a;8.8 影响版本&#xff1a;Apache Kafka 2.3.0 - 3.3.2 修复版本&#xff1a;≥ 3.4.0 漏洞类型&#xff1a;反序列化导致的远程代…...

Qt学习及使用_第1部分_认识Qt---Qt开发基本流程

前言 学以致用,通过QT框架的学习,一边实践,一边探索编程的方方面面. 参考书:<Qt 6 C开发指南>(以下称"本书") 标识说明:概念用粗体倾斜.重点内容用(加粗黑体)---重点内容(红字)---重点内容(加粗红字), 本书原话内容用深蓝色标识,比较重要的内容用加粗倾…...

Spring是如何实现无代理对象的循环依赖

无代理对象的循环依赖 什么是循环依赖解决方案实现方式测试验证 引入代理对象的影响创建代理对象问题分析 源码见&#xff1a;mini-spring 什么是循环依赖 循环依赖是指在对象创建过程中&#xff0c;两个或多个对象相互依赖&#xff0c;导致创建过程陷入死循环。以下通过一个简…...

学习 Hooks【Plan - June - Week 2】

一、React API React 提供了丰富的核心 API&#xff0c;用于创建组件、管理状态、处理副作用、优化性能等。本文档总结 React 常用的 API 方法和组件。 1. React 核心 API React.createElement(type, props, …children) 用于创建 React 元素&#xff0c;JSX 会被编译成该函数…...

第6章:Neo4j数据导入与导出

在实际应用中&#xff0c;数据的导入与导出是使用Neo4j的重要环节。无论是初始数据加载、系统迁移还是数据备份&#xff0c;都需要高效可靠的数据传输机制。本章将详细介绍Neo4j中的各种数据导入与导出方法&#xff0c;帮助读者掌握不同场景下的最佳实践。 6.1 数据导入策略 …...

论文笔记:Large Language Models for Next Point-of-Interest Recommendation

SIGIR 2024 1 intro 传统的基于数值的POI推荐方法在处理上下文信息时存在两个主要限制 需要将异构的LBSN数据转换为数字&#xff0c;这可能导致上下文信息的固有含义丢失仅依赖于统计和人为设计来理解上下文信息&#xff0c;缺乏对上下文信息提供的语义概念的理解 ——>使用…...