由浅入深Netty协议设计与解析
目录
- 1 为什么需要协议?
- 2 redis 协议举例
- 3 http 协议举例
- 4 自定义协议要素
- 4.1 编解码器
- 4.2 什么时候可以加 @Sharable
1 为什么需要协议?

TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输
下雨天留客天留我不留
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
下雨天留客,天留,我不留
另一种解读
下雨天,留客天,留我不?留
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
0f下雨天留客06天留09我不留
小故事
很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。
年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”
私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”
双方唇枪舌战,你来我往,真个是不亦乐乎!
这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬
2 redis 协议举例
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("client error", e);
} finally {worker.shutdownGracefully();
}
3 http 协议举例
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}", msg.getClass());if (msg instanceof HttpRequest) { // 请求行,请求头} else if (msg instanceof HttpContent) { //请求体}}});*/}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("server error", e);
} finally {boss.shutdownGracefully();worker.shutdownGracefully();
}
4 自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
4.1 编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
测试
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
// channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);
解读

4.2 什么时候可以加 @Sharable
- 当 handler 不保存状态时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
相关文章:
由浅入深Netty协议设计与解析
目录 1 为什么需要协议?2 redis 协议举例3 http 协议举例4 自定义协议要素4.1 编解码器4.2 什么时候可以加 Sharable 1 为什么需要协议? TCP/IP 中消息传输基于流的方式,没有边界。 协议的目的就是划定消息的边界,制定通信双方要…...
iptables防火墙(1)
iptables防火墙 一、iptables概述二、netfilter/iptables 关系三、四表五链1.四表2.五链 四、规则链之间的匹配顺序五、规则链内的匹配顺序六、iptables安装与配置七、常用的控制类型八、常用的管理选项九、规则命令1.添加新规则2.查看规则列表3.设置默认策略4.删除规则5.清空规…...
第九章 Productions最佳实践 - Productions开发的最佳实践
文章目录 第九章 Productions最佳实践 - Productions开发的最佳实践Productions开发的最佳实践项目目标项目交付文档 第九章 Productions最佳实践 - Productions开发的最佳实践 Productions开发的最佳实践 本章是一个总体概述,旨在帮助团队成员为从事生产项目做好…...
RocketMQ 怎么实现的消息负载均衡以及怎么能够保证消息被顺序消费
一、RocketMQ 怎么实现的消息负载均衡 RocketMQ是一种开源的分布式消息中间件,它使用了一种称为消息负载均衡的机制来实现消息的分发和消费的负载均衡。RocketMQ的消息负载均衡主要是通过以下两个方面实现的: 消息队列分组(Message Queue G…...
【随笔记】全志 T507 PF4 引脚无法被正常设置为中断模式的问题分析
相关信息 硬件平台:全志T507 系统版本:Android 10 / Linux 4.9.170 问题描述:PF4 无法通过标准接口设置为中断模式,而 PF1、PF2、PF3、PF5 正常可用。 分析过程 一开始以为是引脚被其它驱动占用引起,或者该引脚不具…...
人手一个 Midjourney,StableStudio 重磅开源!
人手一个 Midjourney,StableStudio 重磅开源! Stability AI 公司在上个月 19 号推出了 Alpha 版本 StableLM 大语言模型,包含了 30 亿和 70 亿参数,并且支持商用。如今他们再次推出了 AI 图像生成平台 StableStudio,这…...
iptables防火墙(2)
iptables防火墙(2) 一、SNATSNAT应用环境SNAT原理SNAT转换前条件扩展 二、DNATDNAT应用环境DNAT原理DNAT转换前提条件扩展 三、防火墙规则的备份和还原导出(备份)所有表的规则导入(还原)规则 一、SNAT SNA…...
Windows和Kali上使用proxychains代理流量
Windows和Kali上使用proxychains代理流量 PS. 本文演示都是在kali进行的,如有出入还请联系我哦1. Linux(Debian)1.1. 检查一下是否有proxychains1.2 修改config文件 2. Linux(Debian)安装proxychians43. Windows3.1 下载3.2 配置 4. Windows下的配置5. 测试 PS. 写这…...
KEYSIGHT MSOS204A 2GHZ 4通道DSOS204A高清晰度示波器
KEYSIGHT是德DSOS204A/MSOS204A高清晰度示波器 附加功能: 2 GHz 带宽(可升级) 4 个模拟通道和 16 个数字通道 最大存储深度:800 Mpts(2 通道),400 Mpts(4 通道) 最大…...
最新Java适配商城系统
城前端功能展示 商城移动端 后端基于SpringBoot 研发,前端使用 Vue、uniapp开发 前后端分离,支持分布式部署,支持Docker,各个API独立,并且有独立的消费者 api不需要单独部署,只需启动一个jar包就可以正…...
【KVM虚拟化】· virsh管理命令
目录 🍁libvirt架构概述 🍁使用virsh管理虚拟机 🍂常用命令总结 🍁kvm基本功能管理 🍂帮助命令 🍂KVM的配置文件存放目录 🍂查看虚拟机状态 🍂虚拟机关机与开机 🍂强制虚…...
JS Es6中判断b数组对象是否有跟a数组对象相同的数值(例如:id),有的话就过滤掉
如下[数组]对象a和b let a[{id:1,value:this},{id:2,value:is}] let b[{id:1,value:hello},{id:3,value:world}]filter() 方法创建一个新的数组,新数组中的元素是通过检查指定数组中符合条件的所有元素。 some() 方法用于检测数组中的元素是否满足指定条件&#x…...
python获取某电商平台口红数据并制作词云
目录标题 前言开发环境:模块使用数据来源分析代码展示获取数据制作词云 尾语 💝 前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 开发环境: Python 3.8 Pycharm 模块使用 requests jieba 结巴分词 wordcloud 词云 第三方模块安装: win R 输…...
阿里成立AIDC,用“增长”解题国际化
随着阿里巴巴集团2023财年年报的披露,AIDC也随即浮出了水面。 AIDC是阿里国际数字商业集团的英文简称,AIDC即Alibaba International Digital Commerce。阿里是在5月18日公布的截至2023年3月31日的2023财年Q4及全年财报,财报数据之外ÿ…...
全面理解:在计算机科学中同步、异步、并行、并发,他们之间到底有什么区别,如果正确更好的区分它们?
同步,异步,并行,并发的基础概念 在计算机中同步的基础概念 在计算机科学中,同步(Synchronization)是指在多个过程或线程中,它们的执行在时间上是有序的。换句话说,要执行一个特定的…...
9、Ray核心框架介绍
9、Ray核心框架介绍 导航 1.简介和背景 2.Ray的基本概念和核心组件 3.分布式任务调度和依赖管理 4.对象存储和数据共享 5.Actor模型和并发编程 6.Ray的高级功能和扩展性 7.使用Ray构建分布式应用程序的案例研究 8.Ray社区和资源 9.核心框架介绍 10.扩展1...
【华为OD机试python】工单调度策略【 2023 Q1 A卷|100分】
华为OD机试- 题目列表 2023Q1 点这里!! 2023华为OD机试-刷题指南 点这里!! ■ 题目描述 当小区通信设备上报警时,系统会自动生成待处理的工单, 工单调度系统需要根据不同的策略,调度外线工程师(FME)上站去修复工单对应的问题。 根据与运营商签订的合同,不同严重程度…...
[论文阅读72]Parameter-Efficient Transfer Learning for NLP
1. 基本信息 题目论文作者与单位来源年份Parameter-Efficient Transfer Learning for NLPNeil Houlsby等Google Research,雅盖隆大学-波兰PMLR2019 Houlsby N, Giurgiu A, Jastrzebski S, et al. Parameter-efficient transfer learning for NLP[C]//Internationa…...
0基础转行居然拿到9.5K!尘埃深处是繁花,强者从未停下!
人总是越长大越胆小,很多事情不敢做,以后就更不敢做了。 为梦想颠簸的人有很多,可能不差你这个,不如意的时候我们都会想要放弃,但是生活不是一个点,它是一条长长的线,唯有行动,才能摆…...
软考初级程序员上午单选题(13)
1、下列不能兼作输入设备和输出设备的是______。 A.可擦除型光盘 B.软盘 C.硬盘 D.键盘 2、文件型计算机病毒主要感染______。 A..TXT文件 B..GIF文件 C..EXE文件 D..MP3文件 3、_…...
中兴新支点NewStartOS初体验:从激活到日常使用,聊聊这个国产Linux桌面的真实感受
中兴新支点NewStartOS深度体验:一个技术爱好者的真实使用笔记第一次启动中兴新支点NewStartOS时,那个简洁的登录界面就给我留下了不错的印象。作为一个长期在Windows和macOS之间切换的用户,这次尝试国产Linux桌面系统,更像是一次充…...
YOLO训练前数据检查必备:一个脚本批量转换LabelImg的txt标签并可视化核对
YOLO训练前数据检查实战:批量转换与可视化核验脚本开发指南 在计算机视觉项目的实际落地过程中,数据质量往往比模型架构更能决定最终效果的上限。许多团队花费大量时间调整超参数和网络结构,却忽略了最基础的标注数据验证环节。当使用LabelIm…...
Python之encode-cli包语法、参数和实际应用案例
Python encode-cli包完整使用指南 encode-cli 是Python生态中轻量、高效的命令行编码/解码工具包,专注于提供主流编码格式的快速转换,支持命令行直接调用,无需编写复杂Python代码,适用于数据加密、文本转码、URL处理、Base64转换等…...
Elsevier-Tracker:5分钟打造您的学术论文审稿进度监控系统
Elsevier-Tracker:5分钟打造您的学术论文审稿进度监控系统 【免费下载链接】Elsevier-Tracker 项目地址: https://gitcode.com/gh_mirrors/el/Elsevier-Tracker 在科研工作者的日常中,论文审稿进度追踪常常成为消耗时间与精力的隐形负担。每天反…...
基于A2A协议将智能体注册到Nacos3.x
1.配置和简介Nacos3.x比Nacos2.x多了可以注册智能体的功能。配置密钥,32位即可启动分为集群模式和单机模式,单机模式下,默认存储在derby下。2.智能体注册中心:AgentScope也是自带注册中心的,叫AgentScopeA2aServer。现…...
别再乱用Bool和Enum了!用UE5的Gameplay Tags重构你的角色状态机(GAS避坑指南)
别再乱用Bool和Enum了!用UE5的Gameplay Tags重构你的角色状态机(GAS避坑指南)当你的ARPG角色同时陷入眩晕、灼烧和减速状态时,传统状态机往往会暴露出致命缺陷——布尔值互相覆盖、枚举组合爆炸、条件判断嵌套成灾。而UE5的Gamepl…...
SuperMap iDesktop中BIM模型缓存生成全攻略:从性能调优到Web端流畅加载的避坑指南
SuperMap iDesktop中BIM模型缓存生成全攻略:从性能调优到Web端流畅加载的避坑指南 当你在深夜加班处理一个大型商业综合体的BIM模型时,iDesktop突然闪退,进度条停留在87%——这种崩溃瞬间是否似曾相识?作为经历过数十个大型BIM项目…...
洛雪音乐音源:从零到一的音乐聚合解决方案实战指南
洛雪音乐音源:从零到一的音乐聚合解决方案实战指南 【免费下载链接】lxmusic- lxmusic(洛雪音乐)全网最新最全音源 项目地址: https://gitcode.com/gh_mirrors/lx/lxmusic- 你是否曾经为了找到一首歌而辗转于多个音乐平台?是否因为音质选择有限而…...
Lumafly:跨平台空洞骑士模组管理器,智能依赖解析与一站式管理解决方案
Lumafly:跨平台空洞骑士模组管理器,智能依赖解析与一站式管理解决方案 【免费下载链接】Lumafly A cross platform mod manager for Hollow Knight written in Avalonia. 项目地址: https://gitcode.com/gh_mirrors/lu/Lumafly Lumafly是一款基于…...
LPCM框架:芯片设计自动化的机器学习新范式
1. LPCM框架概述:芯片设计自动化的新范式在半导体行业持续面临"摩尔定律"放缓的背景下,LPCM(Large Processor Chip Model)框架代表了一种突破性的芯片设计方法论。这个框架本质上是一个融合了多模态机器学习与强化学习的…...
