【netty从入门到放弃】netty转发tcp数据到多客户端
目录
- 创建数据库表
- xml
- 实体类
- 启动类
- 线程类
- 客户端代码
- handle
- controller类
- 缓存tcp链接
接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端
任务拆解:
- 首先需要建立多客户端,每个客户端有一个独立的clientId和对应的tcp通道对应
- 能动态的根据clientId关闭对应的转发任务
- 停止服务的时候,需要断开所有的客户端连接,减少开销
- 客户端需要实现断线重连(考虑到断网的清空)
注意:本篇文章是只是实现转发操作,不支持转发的服务器,反向控制设备,需要做特殊处理,如果大家感兴趣,给我留言
下面我们根据我们头脑风暴的结果,来想办法实现上面的过程
创建数据库表
CREATE TABLE `station_message_transmit` (`id` bigint(32) NOT NULL COMMENT '主键',`station_id` int(11) NOT NULL COMMENT '站点id',`host` varchar(50) DEFAULT NULL COMMENT '主机ip',`port` int(11) DEFAULT NULL COMMENT '端口',`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_by` varchar(64) DEFAULT NULL COMMENT '修改人',`update_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 所有的转发数据,都是基于单个站点(单个设备)
- id是唯一的,后续会通过该id绑定tcp通道,来实现发数据,关闭连接等操作
xml
<?xml version="1.0"?>
<projectxsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version><relativePath /></parent><groupId>boot.base.tcp.client</groupId><artifactId>boot-example-base-tcp-client-2.0.5</artifactId><version>0.0.1-SNAPSHOT</version><name>boot-example-base-tcp-client-2.0.5</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- 打包成一个可执行jar --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>
- 所需要的依赖,这里只是实现一个简单的demo,来实践一下,我的设想是否能实现。
实体类
- yml配置文件就不需要配置,一切从简,默认的端口是8080
package com.test;import lombok.Data;/*** @author wu* @version 1.0* @date 2023/10/18 16:39*/
@Data
public class StationMessageTransmit {/** 唯一编号 */private Long id;/** 站点id */private Integer stationId;/** 主机ip */private String host;/** 端口 */private Integer port;
}
启动类
package com.test;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** wu*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {public static void main( String[] args ) {SpringApplication app = new SpringApplication(BootNettyClientApplication.class);app.run(args);System.out.println( "Hello World!" );}@Async@Overridepublic void run(String... args) throws Exception {StationMessageTransmit tran = new StationMessageTransmit();tran.setId(1L);tran.setHost("192.168.10.128");tran.setPort(5000);tran.setStationId(13);StationMessageTransmit tran1 = new StationMessageTransmit();tran1.setId(2L);tran1.setHost("192.168.10.128");tran1.setPort(5001);tran1.setStationId(13);List<StationMessageTransmit> traces = new ArrayList<StationMessageTransmit>();traces.add(tran);traces.add(tran1);for (StationMessageTransmit trace : traces) {BootNettyClientThread thread = new BootNettyClientThread(trace);thread.start();}}@Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {if(applicationEvent instanceof ContextClosedEvent){System.out.println("应用关闭事件");for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext channelHandlerContext = entry.getValue();if(channelHandlerContext != null){System.out.println("关闭链接:"+entry.getKey());channelHandlerContext.close();}}}else if(applicationEvent instanceof ContextRefreshedEvent){System.out.println("应用刷新事件");}else if(applicationEvent instanceof ContextStartedEvent){System.out.println("应用开启事件");}else if(applicationEvent instanceof ContextStoppedEvent){System.out.println("应用停止事件");}}
}
-
run方法里面主要干的活,是一个伪代码,模拟从数据拿数据,再初始化创建多个客户端。
-
onApplicationEvent方法主要是监控服务停止的事件,这是考虑到,tcp是长链接,跟其他服务器链接是一直没有中断,会存在多次重建连接的问题,所以需要再关闭事件中,关闭所有的tcp客户端连接
线程类
package com.test;/**** netty 客户端* wu*/
public class BootNettyClientThread extends Thread {private StationMessageTransmit trace;public BootNettyClientThread(StationMessageTransmit trace){this.trace = trace;}@Overridepublic void run() {try {new BootNettyClient().connect(trace);} catch (Exception e) {throw new RuntimeException(e);}}
}
- 传实体类,主要是为了保证clientId和通道保证对应
客户端代码
package com.test;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;/**** netty 客户端* wu*/
public class BootNettyClient {private EventLoopGroup group;public void connect(StationMessageTransmit trace) throws Exception{/*** 客户端的NIO线程组**/group = new NioEventLoopGroup();try {/*** Bootstrap 是一个启动NIO服务的辅助启动类 客户端的*/Bootstrap bootstrap = new Bootstrap();bootstrap = bootstrap.group(group);bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);/*** 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息*/bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TcpHandler(trace));}});/*** 连接服务端*/ChannelFuture future = bootstrap.connect(trace.getHost(), trace.getPort()).sync();if(future.isSuccess()) {System.out.println("netty client start success="+trace.toString());/*** 等待连接端口关闭*/future.channel().closeFuture().sync();}} finally {/*** 退出,释放资源*/group.shutdownGracefully().sync();}}}
handle
package com.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;/**** I/O数据读写处理类* wu*/
@ChannelHandler.Sharable
public class TcpHandler extends ChannelInboundHandlerAdapter{private static ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(5);private StationMessageTransmit trace;public TcpHandler(StationMessageTransmit trace){this.trace = trace;}/*** 从服务端收到新的数据时,这个方法会在收到消息时被调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {if(msg == null){return;}System.out.println("channelRead:read msg:"+msg.toString());//回应服务端//ctx.write("I got server message thanks server!");}/*** 从服务端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {System.out.println("channelReadComplete");ctx.flush();}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {System.out.println("exceptionCaught");cause.printStackTrace();ctx.close();//抛出异常,断开与客户端的连接}/*** 客户端与服务端第一次建立连接时 执行*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelActive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();System.out.println("服务器ip:"+clientIp+",clientId:"+trace.getId());BootNettyClientGroupCache.save(trace.getId().toString(), ctx);}/*** 客户端与服务端 断连时 执行*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelInactive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();ctx.close(); //断开连接时,必须关闭,否则造成资源浪费System.out.println("channelInactive:"+clientIp);//客户端重连//reset();}/*** 客户端重连*/public void reset(){//增加一个伪代码,从服务器查询id对应的转发数据是否存在,不存在,则不继续运行转发任务SCHEDULED_EXECUTOR.schedule(() -> {try {System.err.println("服务端链接不上,开始重连操作...");new BootNettyClient().connect(trace);} catch (Exception e) {e.printStackTrace();}}, 3, TimeUnit.SECONDS);}}
- reset方法是为了实现客户端重连,3秒钟调用一次
- channelInactive方法,客户端和服务器断开连接时会触发
- channelActive方法,客户端和服务器建立连接时,需要实现client和通道的绑定关系,方便后续回写数据
controller类
package com.test;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;/*** wu*/
@RestController
public class BootNettyClientController {/*** 给所有客户端发送消息* @param content* @return*/@PostMapping("/reportAllClientDataToServer")public String reportAllClientDataToServer(@RequestParam(name="content", required = true) String content) {for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext ctx = entry.getValue();ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));}return "ok";}/*** 停止指定的客户端* @param code* @return* @throws InterruptedException*/@PostMapping("/stopStationByCode")public String downDataToClient(@RequestParam(name="code", required = true) String code) throws InterruptedException {ChannelHandlerContext ctx = BootNettyClientGroupCache.get(code);ctx.close();BootNettyClientGroupCache.remove(code);return "success";}}
- 主要是提供两个测试方法,可以通过apifox调试工具进行模拟请求
缓存tcp链接
package com.test;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** wu*/
public class BootNettyClientGroupCache {/*** 存放所有的连接,key是转发id,value是对应的数据*/public static volatile Map<String, ChannelHandlerContext> groupMapCache = new ConcurrentHashMap<String, ChannelHandlerContext>();public static void add(String code, ChannelHandlerContext group){groupMapCache.put(code,group);}public static ChannelHandlerContext get(String code){return groupMapCache.get(code);}public static void remove(String code){groupMapCache.remove(code);}public static void save(String code, ChannelHandlerContext channel) {if(groupMapCache.get(code) == null) {add(code,channel);}}}
- 存放所有的通道
相关文章:
【netty从入门到放弃】netty转发tcp数据到多客户端
目录 创建数据库表xml实体类启动类线程类客户端代码handlecontroller类缓存tcp链接 接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端 任务拆解: 首先需要建立多客户端,每个客户端有一个独立的clientId和…...
Linux | gdb的基本使用
目录 前言 一、调试文件的生成 二、调试指令 1、选择调试文件 2、查看代码 3、运行代码 4、断点 5、打印与常显示 6、其他 总结 前言 前面我们学习了如何使用gcc/g来进行对代码进行编译,本章我们将使用gdb来对代码进行调试,学习本章的前提是有…...
C++之this指针
前言 C中对象模型和this指针是面向对象编程中的重要概念。对象模型描述了对象在内存中的布局和行为,包括成员变量、成员函数的存储方式和访问权限。this指针是一个隐含的指针,指向当前对象的地址,用于在成员函数中引用当前对象的成员变量和成…...
大模型,重构自动驾驶
文|刘俊宏 编|王一粟 大模型如何重构自动驾驶?答案已经逐渐露出水面。 “在大数据、大模型为特征,以数据驱动为开发模式的自动驾驶3.0时代,自动驾驶大模型将在车端、云端上实现一个统一的端到端的平台管理。”毫末智…...
Jmeter执行接口自动化测试-如何初始化清空旧数据
需求分析: 每次执行完自动化测试,我们不会执行删除接口把数据删除,而需要留着手工测试,此时会导致下次执行测试有旧数据我们手工可能也会新增数据,导致下次执行自动化测试有旧数据 下面介绍两种清空数据的方法 一、通过…...
dashboard报错 错误:无法获取网络列表、dashboard报错 错误:无法获取云主机列表 解决流程
文章目录 错误说明dashboard上报错底层命令报错查看日志message日志httpd报错日志错误日志分析开始解决测试底层命令dashboard错误说明 dashboard上报错 首先,dashboard上无论是管理员还是其他项目,均无法获取云主机和网络信息,具体报错如下...
C语言中的3种注释方法
C语言中的3种注释方法 2021年8月28日星期六席锦 在用C语言编程时,常用的注释方式有如下几种: (1)单行注释 // … (2)多行注释 /* … */ (3)条件编译注释 #if 0…#endif (1)(2)在入门教程中比较常见。 对于(1) 【单行注释 // …】,注释只能显示…...
20款VS Code实用插件推荐
前言: VS Code是一个轻量级但功能强大的源代码编辑器,轻量级指的是下载下来的VS Code其实就是一个简单的编辑器,强大指的是支持多种语言的环境插件拓展,也正是因为这种支持插件式安装环境开发让VS Code成为了开发语言工具中的霸主…...
攻防世界web篇-robots
打开网址后,发现是一个空白页面的网页 但是,这个题目是robots,所以就联想到robots.txt这个目录,于是我就试了一下 注意:这里有个php的文件,这个应该就是一个目录文件 当输入后,直接回车&#…...
6 个可解锁部分 GPT-4 功能的 Chrome 扩展(无需支付 ChatGPT Plus 费用)
在过去的几个月里,我广泛探索了 ChatGPT 的所有可用插件。在此期间,我发现了一些令人惊叹的插件,它们改进了我使用 ChatGPT 的方式,但现在,我将透露一些您需要了解的内容。 借助 Chrome 扩展程序,所有 Chat…...
centos 7.9 安装sshpass
1.作用 sshpass是一个用于非交互式SSH密码验证的实用程序。它可以用于自动输入密码以进行SSH登录,从而简化了自动化脚本和批处理作业中的SSH连接过程。 sshpass命令可以与ssh命令一起使用,通过在命令行中提供密码参数来执行远程命令。以下是一个示例命…...
CompletableFuture多任务异步,获取返回值,汇总结果
线程池异步的基础知识 详情见:https://blog.csdn.net/sinat_32502451/article/details/133039624 线程池执行多任务,获取返回值 线程池的 submit()方法,可以提交任务,并返回 Future接口。 而 future.get(),可以获取…...
Linux上Qt和Opencv人脸识别项目学习路线(嵌入式/C++)
本文将介绍Linux人脸识别项目的开发流程, 只作简略介绍所需知识点及大致流程。 注:若需详细教程请联系作者(见文末)。 一、基本开发环境搭建 1.1 安装虚拟机Ubuntu 虚拟机采用的是VMware,需要下载VMware安装包、ubuntu系统镜像…...
spring 源码阅读之@Configuration解析
Configuration解析 Configuration注解用于标识一个类是配置类,用于声明和组织Bean定义,首先Configuration本身也是一个Component,在其注解定义上标有Component Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented Co…...
Java Web 33道面试题汇总
更多面试合集在:https://javaxiaobear.cn 1、http 的长连接和短连接? HTTP 协议有 HTTP/1.0 版本和 HTTP/1.1 版本。HTTP1.1 默认保持长连接(HTTP persistent connection,也翻译为持久连接),数据传输完成了保持 TCP 连接不断开(不发 RST 包、不四次握手),等待在同域名…...
设计模式记录
设计模式 抽象工厂模式单例模式要实现一个单例,需要关注的点有下面几个: 抽象工厂模式 PHP工厂模式是一种可扩展、可维护和可重复使用的方法,旨在提供通用接口,用于创建对象。工厂模式的主要组成部分包括抽象工厂、具体工厂、抽象产品和具体产…...
Java设计模式之亨元模式(Flyweight Pattern)
亨元模式(Flyweight Pattern)是一种结构型设计模式,旨在通过共享对象来最大限度地减少内存使用和提高性能。该模式适用于需要创建大量相似对象的情况,其中许多对象具有相同的状态。通过共享相同的状态,亨元模式可以减少…...
正点原子嵌入式linux驱动开发——Linux中断
不管是单片机裸机实验还是Linux下的驱动实验,中断都是频繁使用的功能,在裸机中使用中断需要做一大堆的工作,比如配置寄存器,使能IRQ等等。但是Linux内核提供了完善的中断框架,只需要申请中断,然后注册中断处…...
基于Jaya优化算法的电力系统最优潮流研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
Write-Ahead Log(PostgreSQL 14 Internals翻译版)
日志 如果发生停电、操作系统错误或数据库服务器崩溃等故障,RAM中的所有内容都将丢失;只有写入磁盘的数据才会被保留。要在故障后启动服务器,必须恢复数据一致性。如果磁盘本身已损坏,则必须通过备份恢复来解决相同的问题。 理论…...
铭豹扩展坞 USB转网口 突然无法识别解决方法
当 USB 转网口扩展坞在一台笔记本上无法识别,但在其他电脑上正常工作时,问题通常出在笔记本自身或其与扩展坞的兼容性上。以下是系统化的定位思路和排查步骤,帮助你快速找到故障原因: 背景: 一个M-pard(铭豹)扩展坞的网卡突然无法识别了,扩展出来的三个USB接口正常。…...
华为云AI开发平台ModelArts
华为云ModelArts:重塑AI开发流程的“智能引擎”与“创新加速器”! 在人工智能浪潮席卷全球的2025年,企业拥抱AI的意愿空前高涨,但技术门槛高、流程复杂、资源投入巨大的现实,却让许多创新构想止步于实验室。数据科学家…...
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする
日语学习-日语知识点小记-构建基础-JLPT-N4阶段(33):にする 1、前言(1)情况说明(2)工程师的信仰2、知识点(1) にする1,接续:名词+にする2,接续:疑问词+にする3,(A)は(B)にする。(2)復習:(1)复习句子(2)ために & ように(3)そう(4)にする3、…...
QMC5883L的驱动
简介 本篇文章的代码已经上传到了github上面,开源代码 作为一个电子罗盘模块,我们可以通过I2C从中获取偏航角yaw,相对于六轴陀螺仪的yaw,qmc5883l几乎不会零飘并且成本较低。 参考资料 QMC5883L磁场传感器驱动 QMC5883L磁力计…...
鸿蒙中用HarmonyOS SDK应用服务 HarmonyOS5开发一个医院挂号小程序
一、开发准备 环境搭建: 安装DevEco Studio 3.0或更高版本配置HarmonyOS SDK申请开发者账号 项目创建: File > New > Create Project > Application (选择"Empty Ability") 二、核心功能实现 1. 医院科室展示 /…...
django filter 统计数量 按属性去重
在Django中,如果你想要根据某个属性对查询集进行去重并统计数量,你可以使用values()方法配合annotate()方法来实现。这里有两种常见的方法来完成这个需求: 方法1:使用annotate()和Count 假设你有一个模型Item,并且你想…...
将对透视变换后的图像使用Otsu进行阈值化,来分离黑色和白色像素。这句话中的Otsu是什么意思?
Otsu 是一种自动阈值化方法,用于将图像分割为前景和背景。它通过最小化图像的类内方差或等价地最大化类间方差来选择最佳阈值。这种方法特别适用于图像的二值化处理,能够自动确定一个阈值,将图像中的像素分为黑色和白色两类。 Otsu 方法的原…...
数据库分批入库
今天在工作中,遇到一个问题,就是分批查询的时候,由于批次过大导致出现了一些问题,一下是问题描述和解决方案: 示例: // 假设已有数据列表 dataList 和 PreparedStatement pstmt int batchSize 1000; // …...
C++ Visual Studio 2017厂商给的源码没有.sln文件 易兆微芯片下载工具加开机动画下载。
1.先用Visual Studio 2017打开Yichip YC31xx loader.vcxproj,再用Visual Studio 2022打开。再保侟就有.sln文件了。 易兆微芯片下载工具加开机动画下载 ExtraDownloadFile1Info.\logo.bin|0|0|10D2000|0 MFC应用兼容CMD 在BOOL CYichipYC31xxloaderDlg::OnIni…...
Mobile ALOHA全身模仿学习
一、题目 Mobile ALOHA:通过低成本全身远程操作学习双手移动操作 传统模仿学习(Imitation Learning)缺点:聚焦与桌面操作,缺乏通用任务所需的移动性和灵活性 本论文优点:(1)在ALOHA…...
