当前位置: 首页 > news >正文

由浅入深Netty协议设计与解析

目录

  • 1 为什么需要协议?
  • 2 redis 协议举例
  • 3 http 协议举例
  • 4 自定义协议要素
    • 4.1 编解码器
    • 4.2 什么时候可以加 @Sharable


1 为什么需要协议?

在这里插入图片描述

TCP/IP 中消息传输基于流的方式,没有边界。

协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则

例如:在网络上传输

下雨天留客天留我不留

是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性

一种解读

下雨天留客,天留,我不留

另一种解读

下雨天,留客天,留我不?留

如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用

定长字节表示内容长度 + 实际内容

例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了

0f下雨天留客06天留09我不留

小故事

很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。

年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”

私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”

双方唇枪舌战,你来我往,真个是不亦乐乎!

这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬

2 redis 协议举例

NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("client error", e);
} finally {worker.shutdownGracefully();
}

3 http 协议举例

NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}", msg.getClass());if (msg instanceof HttpRequest) { // 请求行,请求头} else if (msg instanceof HttpContent) { //请求体}}});*/}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("server error", e);
} finally {boss.shutdownGracefully();worker.shutdownGracefully();
}

4 自定义协议要素

  • 魔数,用来在第一时间判定是否是无效数据包
  • 版本号,可以支持协议的升级
  • 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
  • 指令类型,是登录、注册、单聊、群聊… 跟业务相关
  • 请求序号,为了双工通信,提供异步能力
  • 正文长度
  • 消息正文

4.1 编解码器

根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发

@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

测试

EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
//        channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);

解读

在这里插入图片描述

4.2 什么时候可以加 @Sharable

  • 当 handler 不保存状态时,就可以安全地在多线程下被共享
  • 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
  • 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}

相关文章:

由浅入深Netty协议设计与解析

目录 1 为什么需要协议&#xff1f;2 redis 协议举例3 http 协议举例4 自定义协议要素4.1 编解码器4.2 什么时候可以加 Sharable 1 为什么需要协议&#xff1f; TCP/IP 中消息传输基于流的方式&#xff0c;没有边界。 协议的目的就是划定消息的边界&#xff0c;制定通信双方要…...

iptables防火墙(1)

iptables防火墙 一、iptables概述二、netfilter/iptables 关系三、四表五链1.四表2.五链 四、规则链之间的匹配顺序五、规则链内的匹配顺序六、iptables安装与配置七、常用的控制类型八、常用的管理选项九、规则命令1.添加新规则2.查看规则列表3.设置默认策略4.删除规则5.清空规…...

第九章 Productions最佳实践 - Productions开发的最佳实践

文章目录 第九章 Productions最佳实践 - Productions开发的最佳实践Productions开发的最佳实践项目目标项目交付文档 第九章 Productions最佳实践 - Productions开发的最佳实践 Productions开发的最佳实践 本章是一个总体概述&#xff0c;旨在帮助团队成员为从事生产项目做好…...

RocketMQ 怎么实现的消息负载均衡以及怎么能够保证消息被顺序消费

一、RocketMQ 怎么实现的消息负载均衡 RocketMQ是一种开源的分布式消息中间件&#xff0c;它使用了一种称为消息负载均衡的机制来实现消息的分发和消费的负载均衡。RocketMQ的消息负载均衡主要是通过以下两个方面实现的&#xff1a; 消息队列分组&#xff08;Message Queue G…...

【随笔记】全志 T507 PF4 引脚无法被正常设置为中断模式的问题分析

相关信息 硬件平台&#xff1a;全志T507 系统版本&#xff1a;Android 10 / Linux 4.9.170 问题描述&#xff1a;PF4 无法通过标准接口设置为中断模式&#xff0c;而 PF1、PF2、PF3、PF5 正常可用。 分析过程 一开始以为是引脚被其它驱动占用引起&#xff0c;或者该引脚不具…...

人手一个 Midjourney,StableStudio 重磅开源!

人手一个 Midjourney&#xff0c;StableStudio 重磅开源&#xff01; Stability AI 公司在上个月 19 号推出了 Alpha 版本 StableLM 大语言模型&#xff0c;包含了 30 亿和 70 亿参数&#xff0c;并且支持商用。如今他们再次推出了 AI 图像生成平台 StableStudio&#xff0c;这…...

iptables防火墙(2)

iptables防火墙&#xff08;2&#xff09; 一、SNATSNAT应用环境SNAT原理SNAT转换前条件扩展 二、DNATDNAT应用环境DNAT原理DNAT转换前提条件扩展 三、防火墙规则的备份和还原导出&#xff08;备份&#xff09;所有表的规则导入&#xff08;还原&#xff09;规则 一、SNAT SNA…...

Windows和Kali上使用proxychains代理流量

Windows和Kali上使用proxychains代理流量 PS. 本文演示都是在kali进行的&#xff0c;如有出入还请联系我哦1. Linux(Debian)1.1. 检查一下是否有proxychains1.2 修改config文件 2. Linux(Debian)安装proxychians43. Windows3.1 下载3.2 配置 4. Windows下的配置5. 测试 PS. 写这…...

KEYSIGHT MSOS204A 2GHZ 4通道DSOS204A高清晰度示波器

KEYSIGHT是德DSOS204A/MSOS204A高清晰度示波器 附加功能&#xff1a; 2 GHz 带宽&#xff08;可升级&#xff09; 4 个模拟通道和 16 个数字通道 最大存储深度&#xff1a;800 Mpts&#xff08;2 通道&#xff09;&#xff0c;400 Mpts&#xff08;4 通道&#xff09; 最大…...

最新Java适配商城系统

城前端功能展示 商城移动端 后端基于SpringBoot 研发&#xff0c;前端使用 Vue、uniapp开发 前后端分离&#xff0c;支持分布式部署&#xff0c;支持Docker&#xff0c;各个API独立&#xff0c;并且有独立的消费者 api不需要单独部署&#xff0c;只需启动一个jar包就可以正…...

【KVM虚拟化】· virsh管理命令

目录 &#x1f341;libvirt架构概述 &#x1f341;使用virsh管理虚拟机 &#x1f342;常用命令总结 &#x1f341;kvm基本功能管理 &#x1f342;帮助命令 &#x1f342;KVM的配置文件存放目录 &#x1f342;查看虚拟机状态 &#x1f342;虚拟机关机与开机 &#x1f342;强制虚…...

JS Es6中判断b数组对象是否有跟a数组对象相同的数值(例如:id),有的话就过滤掉

如下[数组]对象a和b let a[{id:1,value:this},{id:2,value:is}] let b[{id:1,value:hello},{id:3,value:world}]filter() 方法创建一个新的数组&#xff0c;新数组中的元素是通过检查指定数组中符合条件的所有元素。 some() 方法用于检测数组中的元素是否满足指定条件&#x…...

python获取某电商平台口红数据并制作词云

目录标题 前言开发环境:模块使用数据来源分析代码展示获取数据制作词云 尾语 &#x1f49d; 前言 嗨喽~大家好呀&#xff0c;这里是魔王呐 ❤ ~! 开发环境: Python 3.8 Pycharm 模块使用 requests jieba 结巴分词 wordcloud 词云 第三方模块安装&#xff1a; win R 输…...

阿里成立AIDC,用“增长”解题国际化

随着阿里巴巴集团2023财年年报的披露&#xff0c;AIDC也随即浮出了水面。 AIDC是阿里国际数字商业集团的英文简称&#xff0c;AIDC即Alibaba International Digital Commerce。阿里是在5月18日公布的截至2023年3月31日的2023财年Q4及全年财报&#xff0c;财报数据之外&#xff…...

全面理解:在计算机科学中同步、异步、并行、并发,他们之间到底有什么区别,如果正确更好的区分它们?

同步&#xff0c;异步&#xff0c;并行&#xff0c;并发的基础概念 在计算机中同步的基础概念 在计算机科学中&#xff0c;同步&#xff08;Synchronization&#xff09;是指在多个过程或线程中&#xff0c;它们的执行在时间上是有序的。换句话说&#xff0c;要执行一个特定的…...

9、Ray核心框架介绍

9、Ray核心框架介绍 导航 1.简介和背景 2.Ray的基本概念和核心组件 3.分布式任务调度和依赖管理 4.对象存储和数据共享 5.Actor模型和并发编程 6.Ray的高级功能和扩展性 7.使用Ray构建分布式应用程序的案例研究 8.Ray社区和资源 9.核心框架介绍 10.扩展1...

【华为OD机试python】工单调度策略【 2023 Q1 A卷|100分】

华为OD机试- 题目列表 2023Q1 点这里!! 2023华为OD机试-刷题指南 点这里!! ■ 题目描述 当小区通信设备上报警时,系统会自动生成待处理的工单, 工单调度系统需要根据不同的策略,调度外线工程师(FME)上站去修复工单对应的问题。 根据与运营商签订的合同,不同严重程度…...

[论文阅读72]Parameter-Efficient Transfer Learning for NLP

1. 基本信息 题目论文作者与单位来源年份Parameter-Efficient Transfer Learning for NLPNeil Houlsby等Google Research&#xff0c;雅盖隆大学-波兰PMLR2019 Houlsby N, Giurgiu A, Jastrzebski S, et al. Parameter-efficient transfer learning for NLP[C]//Internationa…...

0基础转行居然拿到9.5K!尘埃深处是繁花,强者从未停下!

人总是越长大越胆小&#xff0c;很多事情不敢做&#xff0c;以后就更不敢做了。 为梦想颠簸的人有很多&#xff0c;可能不差你这个&#xff0c;不如意的时候我们都会想要放弃&#xff0c;但是生活不是一个点&#xff0c;它是一条长长的线&#xff0c;唯有行动&#xff0c;才能摆…...

软考初级程序员上午单选题(13)

1、下列不能兼作输入设备和输出设备的是______。 A&#xff0e;可擦除型光盘 B&#xff0e;软盘 C&#xff0e;硬盘 D&#xff0e;键盘 2、文件型计算机病毒主要感染______。 A&#xff0e;.TXT文件 B&#xff0e;.GIF文件 C&#xff0e;.EXE文件 D&#xff0e;.MP3文件 3、_…...

【Python】 -- 趣味代码 - 小恐龙游戏

文章目录 文章目录 00 小恐龙游戏程序设计框架代码结构和功能游戏流程总结01 小恐龙游戏程序设计02 百度网盘地址00 小恐龙游戏程序设计框架 这段代码是一个基于 Pygame 的简易跑酷游戏的完整实现,玩家控制一个角色(龙)躲避障碍物(仙人掌和乌鸦)。以下是代码的详细介绍:…...

多模态2025:技术路线“神仙打架”,视频生成冲上云霄

文&#xff5c;魏琳华 编&#xff5c;王一粟 一场大会&#xff0c;聚集了中国多模态大模型的“半壁江山”。 智源大会2025为期两天的论坛中&#xff0c;汇集了学界、创业公司和大厂等三方的热门选手&#xff0c;关于多模态的集中讨论达到了前所未有的热度。其中&#xff0c;…...

如何将联系人从 iPhone 转移到 Android

从 iPhone 换到 Android 手机时&#xff0c;你可能需要保留重要的数据&#xff0c;例如通讯录。好在&#xff0c;将通讯录从 iPhone 转移到 Android 手机非常简单&#xff0c;你可以从本文中学习 6 种可靠的方法&#xff0c;确保随时保持连接&#xff0c;不错过任何信息。 第 1…...

解决本地部署 SmolVLM2 大语言模型运行 flash-attn 报错

出现的问题 安装 flash-attn 会一直卡在 build 那一步或者运行报错 解决办法 是因为你安装的 flash-attn 版本没有对应上&#xff0c;所以报错&#xff0c;到 https://github.com/Dao-AILab/flash-attention/releases 下载对应版本&#xff0c;cu、torch、cp 的版本一定要对…...

ABAP设计模式之---“简单设计原则(Simple Design)”

“Simple Design”&#xff08;简单设计&#xff09;是软件开发中的一个重要理念&#xff0c;倡导以最简单的方式实现软件功能&#xff0c;以确保代码清晰易懂、易维护&#xff0c;并在项目需求变化时能够快速适应。 其核心目标是避免复杂和过度设计&#xff0c;遵循“让事情保…...

视频行为标注工具BehaviLabel(源码+使用介绍+Windows.Exe版本)

前言&#xff1a; 最近在做行为检测相关的模型&#xff0c;用的是时空图卷积网络&#xff08;STGCN&#xff09;&#xff0c;但原有kinetic-400数据集数据质量较低&#xff0c;需要进行细粒度的标注&#xff0c;同时粗略搜了下已有开源工具基本都集中于图像分割这块&#xff0c…...

Aspose.PDF 限制绕过方案:Java 字节码技术实战分享(仅供学习)

Aspose.PDF 限制绕过方案&#xff1a;Java 字节码技术实战分享&#xff08;仅供学习&#xff09; 一、Aspose.PDF 简介二、说明&#xff08;⚠️仅供学习与研究使用&#xff09;三、技术流程总览四、准备工作1. 下载 Jar 包2. Maven 项目依赖配置 五、字节码修改实现代码&#…...

嵌入式学习笔记DAY33(网络编程——TCP)

一、网络架构 C/S &#xff08;client/server 客户端/服务器&#xff09;&#xff1a;由客户端和服务器端两个部分组成。客户端通常是用户使用的应用程序&#xff0c;负责提供用户界面和交互逻辑 &#xff0c;接收用户输入&#xff0c;向服务器发送请求&#xff0c;并展示服务…...

安宝特案例丨Vuzix AR智能眼镜集成专业软件,助力卢森堡医院药房转型,赢得辉瑞创新奖

在Vuzix M400 AR智能眼镜的助力下&#xff0c;卢森堡罗伯特舒曼医院&#xff08;the Robert Schuman Hospitals, HRS&#xff09;凭借在无菌制剂生产流程中引入增强现实技术&#xff08;AR&#xff09;创新项目&#xff0c;荣获了2024年6月7日由卢森堡医院药剂师协会&#xff0…...

华为OD机考-机房布局

import java.util.*;public class DemoTest5 {public static void main(String[] args) {Scanner in new Scanner(System.in);// 注意 hasNext 和 hasNextLine 的区别while (in.hasNextLine()) { // 注意 while 处理多个 caseSystem.out.println(solve(in.nextLine()));}}priv…...