【netty从入门到放弃】netty转发tcp数据到多客户端
目录
- 创建数据库表
- xml
- 实体类
- 启动类
- 线程类
- 客户端代码
- handle
- controller类
- 缓存tcp链接
接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端
任务拆解:
- 首先需要建立多客户端,每个客户端有一个独立的clientId和对应的tcp通道对应
- 能动态的根据clientId关闭对应的转发任务
- 停止服务的时候,需要断开所有的客户端连接,减少开销
- 客户端需要实现断线重连(考虑到断网的清空)
注意:本篇文章是只是实现转发操作,不支持转发的服务器,反向控制设备,需要做特殊处理,如果大家感兴趣,给我留言
下面我们根据我们头脑风暴的结果,来想办法实现上面的过程
创建数据库表
CREATE TABLE `station_message_transmit` (`id` bigint(32) NOT NULL COMMENT '主键',`station_id` int(11) NOT NULL COMMENT '站点id',`host` varchar(50) DEFAULT NULL COMMENT '主机ip',`port` int(11) DEFAULT NULL COMMENT '端口',`create_by` varchar(64) DEFAULT NULL COMMENT '创建人',`create_time` datetime DEFAULT NULL COMMENT '创建时间',`update_by` varchar(64) DEFAULT NULL COMMENT '修改人',`update_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
- 所有的转发数据,都是基于单个站点(单个设备)
- id是唯一的,后续会通过该id绑定tcp通道,来实现发数据,关闭连接等操作
xml
<?xml version="1.0"?>
<projectxsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"xmlns="http://maven.apache.org/POM/4.0.0"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"><modelVersion>4.0.0</modelVersion><parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.0.5.RELEASE</version><relativePath /></parent><groupId>boot.base.tcp.client</groupId><artifactId>boot-example-base-tcp-client-2.0.5</artifactId><version>0.0.1-SNAPSHOT</version><name>boot-example-base-tcp-client-2.0.5</name><url>http://maven.apache.org</url><properties><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding><java.version>1.8</java.version></properties><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>io.netty</groupId><artifactId>netty-all</artifactId></dependency><dependency><groupId>io.springfox</groupId><artifactId>springfox-swagger2</artifactId><version>2.9.2</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.20</version><scope>provided</scope></dependency></dependencies><build><plugins><!-- 打包成一个可执行jar --><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifactId><executions><execution><goals><goal>repackage</goal></goals></execution></executions></plugin></plugins></build>
</project>
- 所需要的依赖,这里只是实现一个简单的demo,来实践一下,我的设想是否能实现。
实体类
- yml配置文件就不需要配置,一切从简,默认的端口是8080
package com.test;import lombok.Data;/*** @author wu* @version 1.0* @date 2023/10/18 16:39*/
@Data
public class StationMessageTransmit {/** 唯一编号 */private Long id;/** 站点id */private Integer stationId;/** 主机ip */private String host;/** 端口 */private Integer port;
}
启动类
package com.test;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ApplicationEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.context.event.ContextClosedEvent;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.ContextStartedEvent;
import org.springframework.context.event.ContextStoppedEvent;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;import java.util.ArrayList;
import java.util.List;
import java.util.Map;/*** wu*/
@SpringBootApplication
@EnableAsync
@EnableScheduling
public class BootNettyClientApplication implements CommandLineRunner, ApplicationListener {public static void main( String[] args ) {SpringApplication app = new SpringApplication(BootNettyClientApplication.class);app.run(args);System.out.println( "Hello World!" );}@Async@Overridepublic void run(String... args) throws Exception {StationMessageTransmit tran = new StationMessageTransmit();tran.setId(1L);tran.setHost("192.168.10.128");tran.setPort(5000);tran.setStationId(13);StationMessageTransmit tran1 = new StationMessageTransmit();tran1.setId(2L);tran1.setHost("192.168.10.128");tran1.setPort(5001);tran1.setStationId(13);List<StationMessageTransmit> traces = new ArrayList<StationMessageTransmit>();traces.add(tran);traces.add(tran1);for (StationMessageTransmit trace : traces) {BootNettyClientThread thread = new BootNettyClientThread(trace);thread.start();}}@Overridepublic void onApplicationEvent(ApplicationEvent applicationEvent) {if(applicationEvent instanceof ContextClosedEvent){System.out.println("应用关闭事件");for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext channelHandlerContext = entry.getValue();if(channelHandlerContext != null){System.out.println("关闭链接:"+entry.getKey());channelHandlerContext.close();}}}else if(applicationEvent instanceof ContextRefreshedEvent){System.out.println("应用刷新事件");}else if(applicationEvent instanceof ContextStartedEvent){System.out.println("应用开启事件");}else if(applicationEvent instanceof ContextStoppedEvent){System.out.println("应用停止事件");}}
}
-
run方法里面主要干的活,是一个伪代码,模拟从数据拿数据,再初始化创建多个客户端。
-
onApplicationEvent方法主要是监控服务停止的事件,这是考虑到,tcp是长链接,跟其他服务器链接是一直没有中断,会存在多次重建连接的问题,所以需要再关闭事件中,关闭所有的tcp客户端连接
线程类
package com.test;/**** netty 客户端* wu*/
public class BootNettyClientThread extends Thread {private StationMessageTransmit trace;public BootNettyClientThread(StationMessageTransmit trace){this.trace = trace;}@Overridepublic void run() {try {new BootNettyClient().connect(trace);} catch (Exception e) {throw new RuntimeException(e);}}
}
- 传实体类,主要是为了保证clientId和通道保证对应
客户端代码
package com.test;import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.LineBasedFrameDecoder;
import io.netty.handler.codec.string.StringDecoder;/**** netty 客户端* wu*/
public class BootNettyClient {private EventLoopGroup group;public void connect(StationMessageTransmit trace) throws Exception{/*** 客户端的NIO线程组**/group = new NioEventLoopGroup();try {/*** Bootstrap 是一个启动NIO服务的辅助启动类 客户端的*/Bootstrap bootstrap = new Bootstrap();bootstrap = bootstrap.group(group);bootstrap = bootstrap.channel(NioSocketChannel.class).option(ChannelOption.TCP_NODELAY, true);/*** 设置 I/O处理类,主要用于网络I/O事件,记录日志,编码、解码消息*/bootstrap = bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel socketChannel) throws Exception {socketChannel.pipeline().addLast(new LineBasedFrameDecoder(1024 * 1024));socketChannel.pipeline().addLast(new StringDecoder());socketChannel.pipeline().addLast(new TcpHandler(trace));}});/*** 连接服务端*/ChannelFuture future = bootstrap.connect(trace.getHost(), trace.getPort()).sync();if(future.isSuccess()) {System.out.println("netty client start success="+trace.toString());/*** 等待连接端口关闭*/future.channel().closeFuture().sync();}} finally {/*** 退出,释放资源*/group.shutdownGracefully().sync();}}}
handle
package com.test;import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.channel.EventLoop;/**** I/O数据读写处理类* wu*/
@ChannelHandler.Sharable
public class TcpHandler extends ChannelInboundHandlerAdapter{private static ScheduledExecutorService SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(5);private StationMessageTransmit trace;public TcpHandler(StationMessageTransmit trace){this.trace = trace;}/*** 从服务端收到新的数据时,这个方法会在收到消息时被调用*/@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception, IOException {if(msg == null){return;}System.out.println("channelRead:read msg:"+msg.toString());//回应服务端//ctx.write("I got server message thanks server!");}/*** 从服务端收到新的数据、读取完成时调用*/@Overridepublic void channelReadComplete(ChannelHandlerContext ctx) throws IOException {System.out.println("channelReadComplete");ctx.flush();}/*** 当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时*/@Overridepublic void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws IOException {System.out.println("exceptionCaught");cause.printStackTrace();ctx.close();//抛出异常,断开与客户端的连接}/*** 客户端与服务端第一次建立连接时 执行*/@Overridepublic void channelActive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelActive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();System.out.println("服务器ip:"+clientIp+",clientId:"+trace.getId());BootNettyClientGroupCache.save(trace.getId().toString(), ctx);}/*** 客户端与服务端 断连时 执行*/@Overridepublic void channelInactive(ChannelHandlerContext ctx) throws Exception, IOException {super.channelInactive(ctx);InetSocketAddress inSocket = (InetSocketAddress) ctx.channel().remoteAddress();String clientIp = inSocket.getAddress().getHostAddress();ctx.close(); //断开连接时,必须关闭,否则造成资源浪费System.out.println("channelInactive:"+clientIp);//客户端重连//reset();}/*** 客户端重连*/public void reset(){//增加一个伪代码,从服务器查询id对应的转发数据是否存在,不存在,则不继续运行转发任务SCHEDULED_EXECUTOR.schedule(() -> {try {System.err.println("服务端链接不上,开始重连操作...");new BootNettyClient().connect(trace);} catch (Exception e) {e.printStackTrace();}}, 3, TimeUnit.SECONDS);}}
- reset方法是为了实现客户端重连,3秒钟调用一次
- channelInactive方法,客户端和服务器断开连接时会触发
- channelActive方法,客户端和服务器建立连接时,需要实现client和通道的绑定关系,方便后续回写数据
controller类
package com.test;import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandlerContext;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import java.util.Map;/*** wu*/
@RestController
public class BootNettyClientController {/*** 给所有客户端发送消息* @param content* @return*/@PostMapping("/reportAllClientDataToServer")public String reportAllClientDataToServer(@RequestParam(name="content", required = true) String content) {for (Map.Entry<String, ChannelHandlerContext> entry : BootNettyClientGroupCache.groupMapCache.entrySet()) {ChannelHandlerContext ctx = entry.getValue();ctx.writeAndFlush(Unpooled.buffer().writeBytes(content.getBytes()));}return "ok";}/*** 停止指定的客户端* @param code* @return* @throws InterruptedException*/@PostMapping("/stopStationByCode")public String downDataToClient(@RequestParam(name="code", required = true) String code) throws InterruptedException {ChannelHandlerContext ctx = BootNettyClientGroupCache.get(code);ctx.close();BootNettyClientGroupCache.remove(code);return "success";}}
- 主要是提供两个测试方法,可以通过apifox调试工具进行模拟请求
缓存tcp链接
package com.test;import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.EventLoopGroup;import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;/*** wu*/
public class BootNettyClientGroupCache {/*** 存放所有的连接,key是转发id,value是对应的数据*/public static volatile Map<String, ChannelHandlerContext> groupMapCache = new ConcurrentHashMap<String, ChannelHandlerContext>();public static void add(String code, ChannelHandlerContext group){groupMapCache.put(code,group);}public static ChannelHandlerContext get(String code){return groupMapCache.get(code);}public static void remove(String code){groupMapCache.remove(code);}public static void save(String code, ChannelHandlerContext channel) {if(groupMapCache.get(code) == null) {add(code,channel);}}}
- 存放所有的通道
相关文章:
【netty从入门到放弃】netty转发tcp数据到多客户端
目录 创建数据库表xml实体类启动类线程类客户端代码handlecontroller类缓存tcp链接 接到一个需求,需要实现转发通讯模块tcp数据其他的服务器,也就是转发tcp数据到多客户端 任务拆解: 首先需要建立多客户端,每个客户端有一个独立的clientId和…...
Linux | gdb的基本使用
目录 前言 一、调试文件的生成 二、调试指令 1、选择调试文件 2、查看代码 3、运行代码 4、断点 5、打印与常显示 6、其他 总结 前言 前面我们学习了如何使用gcc/g来进行对代码进行编译,本章我们将使用gdb来对代码进行调试,学习本章的前提是有…...
C++之this指针
前言 C中对象模型和this指针是面向对象编程中的重要概念。对象模型描述了对象在内存中的布局和行为,包括成员变量、成员函数的存储方式和访问权限。this指针是一个隐含的指针,指向当前对象的地址,用于在成员函数中引用当前对象的成员变量和成…...
大模型,重构自动驾驶
文|刘俊宏 编|王一粟 大模型如何重构自动驾驶?答案已经逐渐露出水面。 “在大数据、大模型为特征,以数据驱动为开发模式的自动驾驶3.0时代,自动驾驶大模型将在车端、云端上实现一个统一的端到端的平台管理。”毫末智…...
Jmeter执行接口自动化测试-如何初始化清空旧数据
需求分析: 每次执行完自动化测试,我们不会执行删除接口把数据删除,而需要留着手工测试,此时会导致下次执行测试有旧数据我们手工可能也会新增数据,导致下次执行自动化测试有旧数据 下面介绍两种清空数据的方法 一、通过…...
dashboard报错 错误:无法获取网络列表、dashboard报错 错误:无法获取云主机列表 解决流程
文章目录 错误说明dashboard上报错底层命令报错查看日志message日志httpd报错日志错误日志分析开始解决测试底层命令dashboard错误说明 dashboard上报错 首先,dashboard上无论是管理员还是其他项目,均无法获取云主机和网络信息,具体报错如下...
C语言中的3种注释方法
C语言中的3种注释方法 2021年8月28日星期六席锦 在用C语言编程时,常用的注释方式有如下几种: (1)单行注释 // … (2)多行注释 /* … */ (3)条件编译注释 #if 0…#endif (1)(2)在入门教程中比较常见。 对于(1) 【单行注释 // …】,注释只能显示…...
20款VS Code实用插件推荐
前言: VS Code是一个轻量级但功能强大的源代码编辑器,轻量级指的是下载下来的VS Code其实就是一个简单的编辑器,强大指的是支持多种语言的环境插件拓展,也正是因为这种支持插件式安装环境开发让VS Code成为了开发语言工具中的霸主…...
攻防世界web篇-robots
打开网址后,发现是一个空白页面的网页 但是,这个题目是robots,所以就联想到robots.txt这个目录,于是我就试了一下 注意:这里有个php的文件,这个应该就是一个目录文件 当输入后,直接回车&#…...
6 个可解锁部分 GPT-4 功能的 Chrome 扩展(无需支付 ChatGPT Plus 费用)
在过去的几个月里,我广泛探索了 ChatGPT 的所有可用插件。在此期间,我发现了一些令人惊叹的插件,它们改进了我使用 ChatGPT 的方式,但现在,我将透露一些您需要了解的内容。 借助 Chrome 扩展程序,所有 Chat…...
centos 7.9 安装sshpass
1.作用 sshpass是一个用于非交互式SSH密码验证的实用程序。它可以用于自动输入密码以进行SSH登录,从而简化了自动化脚本和批处理作业中的SSH连接过程。 sshpass命令可以与ssh命令一起使用,通过在命令行中提供密码参数来执行远程命令。以下是一个示例命…...
CompletableFuture多任务异步,获取返回值,汇总结果
线程池异步的基础知识 详情见:https://blog.csdn.net/sinat_32502451/article/details/133039624 线程池执行多任务,获取返回值 线程池的 submit()方法,可以提交任务,并返回 Future接口。 而 future.get(),可以获取…...
Linux上Qt和Opencv人脸识别项目学习路线(嵌入式/C++)
本文将介绍Linux人脸识别项目的开发流程, 只作简略介绍所需知识点及大致流程。 注:若需详细教程请联系作者(见文末)。 一、基本开发环境搭建 1.1 安装虚拟机Ubuntu 虚拟机采用的是VMware,需要下载VMware安装包、ubuntu系统镜像…...
spring 源码阅读之@Configuration解析
Configuration解析 Configuration注解用于标识一个类是配置类,用于声明和组织Bean定义,首先Configuration本身也是一个Component,在其注解定义上标有Component Target(ElementType.TYPE) Retention(RetentionPolicy.RUNTIME) Documented Co…...
Java Web 33道面试题汇总
更多面试合集在:https://javaxiaobear.cn 1、http 的长连接和短连接? HTTP 协议有 HTTP/1.0 版本和 HTTP/1.1 版本。HTTP1.1 默认保持长连接(HTTP persistent connection,也翻译为持久连接),数据传输完成了保持 TCP 连接不断开(不发 RST 包、不四次握手),等待在同域名…...
设计模式记录
设计模式 抽象工厂模式单例模式要实现一个单例,需要关注的点有下面几个: 抽象工厂模式 PHP工厂模式是一种可扩展、可维护和可重复使用的方法,旨在提供通用接口,用于创建对象。工厂模式的主要组成部分包括抽象工厂、具体工厂、抽象产品和具体产…...
Java设计模式之亨元模式(Flyweight Pattern)
亨元模式(Flyweight Pattern)是一种结构型设计模式,旨在通过共享对象来最大限度地减少内存使用和提高性能。该模式适用于需要创建大量相似对象的情况,其中许多对象具有相同的状态。通过共享相同的状态,亨元模式可以减少…...
正点原子嵌入式linux驱动开发——Linux中断
不管是单片机裸机实验还是Linux下的驱动实验,中断都是频繁使用的功能,在裸机中使用中断需要做一大堆的工作,比如配置寄存器,使能IRQ等等。但是Linux内核提供了完善的中断框架,只需要申请中断,然后注册中断处…...
基于Jaya优化算法的电力系统最优潮流研究(Matlab代码实现)
💥💥💞💞欢迎来到本博客❤️❤️💥💥 🏆博主优势:🌞🌞🌞博客内容尽量做到思维缜密,逻辑清晰,为了方便读者。 ⛳️座右铭&a…...
Write-Ahead Log(PostgreSQL 14 Internals翻译版)
日志 如果发生停电、操作系统错误或数据库服务器崩溃等故障,RAM中的所有内容都将丢失;只有写入磁盘的数据才会被保留。要在故障后启动服务器,必须恢复数据一致性。如果磁盘本身已损坏,则必须通过备份恢复来解决相同的问题。 理论…...
国防科技大学计算机基础课程笔记02信息编码
1.机内码和国标码 国标码就是我们非常熟悉的这个GB2312,但是因为都是16进制,因此这个了16进制的数据既可以翻译成为这个机器码,也可以翻译成为这个国标码,所以这个时候很容易会出现这个歧义的情况; 因此,我们的这个国…...
练习(含atoi的模拟实现,自定义类型等练习)
一、结构体大小的计算及位段 (结构体大小计算及位段 详解请看:自定义类型:结构体进阶-CSDN博客) 1.在32位系统环境,编译选项为4字节对齐,那么sizeof(A)和sizeof(B)是多少? #pragma pack(4)st…...
苍穹外卖--缓存菜品
1.问题说明 用户端小程序展示的菜品数据都是通过查询数据库获得,如果用户端访问量比较大,数据库访问压力随之增大 2.实现思路 通过Redis来缓存菜品数据,减少数据库查询操作。 缓存逻辑分析: ①每个分类下的菜品保持一份缓存数据…...
HBuilderX安装(uni-app和小程序开发)
下载HBuilderX 访问官方网站:https://www.dcloud.io/hbuilderx.html 根据您的操作系统选择合适版本: Windows版(推荐下载标准版) Windows系统安装步骤 运行安装程序: 双击下载的.exe安装文件 如果出现安全提示&…...
css的定位(position)详解:相对定位 绝对定位 固定定位
在 CSS 中,元素的定位通过 position 属性控制,共有 5 种定位模式:static(静态定位)、relative(相对定位)、absolute(绝对定位)、fixed(固定定位)和…...
NLP学习路线图(二十三):长短期记忆网络(LSTM)
在自然语言处理(NLP)领域,我们时刻面临着处理序列数据的核心挑战。无论是理解句子的结构、分析文本的情感,还是实现语言的翻译,都需要模型能够捕捉词语之间依时序产生的复杂依赖关系。传统的神经网络结构在处理这种序列依赖时显得力不从心,而循环神经网络(RNN) 曾被视为…...
【OSG学习笔记】Day 16: 骨骼动画与蒙皮(osgAnimation)
骨骼动画基础 骨骼动画是 3D 计算机图形中常用的技术,它通过以下两个主要组件实现角色动画。 骨骼系统 (Skeleton):由层级结构的骨头组成,类似于人体骨骼蒙皮 (Mesh Skinning):将模型网格顶点绑定到骨骼上,使骨骼移动…...
鸿蒙DevEco Studio HarmonyOS 5跑酷小游戏实现指南
1. 项目概述 本跑酷小游戏基于鸿蒙HarmonyOS 5开发,使用DevEco Studio作为开发工具,采用Java语言实现,包含角色控制、障碍物生成和分数计算系统。 2. 项目结构 /src/main/java/com/example/runner/├── MainAbilitySlice.java // 主界…...
听写流程自动化实践,轻量级教育辅助
随着智能教育工具的发展,越来越多的传统学习方式正在被数字化、自动化所优化。听写作为语文、英语等学科中重要的基础训练形式,也迎来了更高效的解决方案。 这是一款轻量但功能强大的听写辅助工具。它是基于本地词库与可选在线语音引擎构建,…...
深度学习习题2
1.如果增加神经网络的宽度,精确度会增加到一个特定阈值后,便开始降低。造成这一现象的可能原因是什么? A、即使增加卷积核的数量,只有少部分的核会被用作预测 B、当卷积核数量增加时,神经网络的预测能力会降低 C、当卷…...
