由浅入深Netty协议设计与解析
目录
- 1 为什么需要协议?
- 2 redis 协议举例
- 3 http 协议举例
- 4 自定义协议要素
- 4.1 编解码器
- 4.2 什么时候可以加 @Sharable
1 为什么需要协议?
TCP/IP 中消息传输基于流的方式,没有边界。
协议的目的就是划定消息的边界,制定通信双方要共同遵守的通信规则
例如:在网络上传输
下雨天留客天留我不留
是中文一句著名的无标点符号句子,在没有标点符号情况下,这句话有数种拆解方式,而意思却是完全不同,所以常被用作讲述标点符号的重要性
一种解读
下雨天留客,天留,我不留
另一种解读
下雨天,留客天,留我不?留
如何设计协议呢?其实就是给网络传输的信息加上“标点符号”。但通过分隔符来断句不是很好,因为分隔符本身如果用于传输,那么必须加以区分。因此,下面一种协议较为常用
定长字节表示内容长度 + 实际内容
例如,假设一个中文字符长度为 3,按照上述协议的规则,发送信息方式如下,就不会被接收方弄错意思了
0f下雨天留客06天留09我不留
小故事
很久很久以前,一位私塾先生到一家任教。双方签订了一纸协议:“无鸡鸭亦可无鱼肉亦可白菜豆腐不可少不得束修金”。此后,私塾先生虽然认真教课,但主人家则总是给私塾先生以白菜豆腐为菜,丝毫未见鸡鸭鱼肉的款待。私塾先生先是很不解,可是后来也就想通了:主人把鸡鸭鱼肉的钱都会换为束修金的,也罢。至此双方相安无事。
年关将至,一个学年段亦告结束。私塾先生临行时,也不见主人家为他交付束修金,遂与主家理论。然主家亦振振有词:“有协议为证——无鸡鸭亦可,无鱼肉亦可,白菜豆腐不可少,不得束修金。这白纸黑字明摆着的,你有什么要说的呢?”
私塾先生据理力争:“协议是这样的——无鸡,鸭亦可;无鱼,肉亦可;白菜豆腐不可,少不得束修金。”
双方唇枪舌战,你来我往,真个是不亦乐乎!
这里的束修金,也作“束脩”,应当是泛指教师应当得到的报酬
2 redis 协议举例
NioEventLoopGroup worker = new NioEventLoopGroup();
byte[] LINE = {13, 10};
try {Bootstrap bootstrap = new Bootstrap();bootstrap.channel(NioSocketChannel.class);bootstrap.group(worker);bootstrap.handler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) {ch.pipeline().addLast(new LoggingHandler());ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {// 会在连接 channel 建立成功后,会触发 active 事件@Overridepublic void channelActive(ChannelHandlerContext ctx) {set(ctx);get(ctx);}private void get(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*2".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("get".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}private void set(ChannelHandlerContext ctx) {ByteBuf buf = ctx.alloc().buffer();buf.writeBytes("*3".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("set".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("aaa".getBytes());buf.writeBytes(LINE);buf.writeBytes("$3".getBytes());buf.writeBytes(LINE);buf.writeBytes("bbb".getBytes());buf.writeBytes(LINE);ctx.writeAndFlush(buf);}@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {ByteBuf buf = (ByteBuf) msg;System.out.println(buf.toString(Charset.defaultCharset()));}});}});ChannelFuture channelFuture = bootstrap.connect("localhost", 6379).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("client error", e);
} finally {worker.shutdownGracefully();
}
3 http 协议举例
NioEventLoopGroup boss = new NioEventLoopGroup();
NioEventLoopGroup worker = new NioEventLoopGroup();
try {ServerBootstrap serverBootstrap = new ServerBootstrap();serverBootstrap.channel(NioServerSocketChannel.class);serverBootstrap.group(boss, worker);serverBootstrap.childHandler(new ChannelInitializer<SocketChannel>() {@Overrideprotected void initChannel(SocketChannel ch) throws Exception {ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));ch.pipeline().addLast(new HttpServerCodec());ch.pipeline().addLast(new SimpleChannelInboundHandler<HttpRequest>() {@Overrideprotected void channelRead0(ChannelHandlerContext ctx, HttpRequest msg) throws Exception {// 获取请求log.debug(msg.uri());// 返回响应DefaultFullHttpResponse response =new DefaultFullHttpResponse(msg.protocolVersion(), HttpResponseStatus.OK);byte[] bytes = "<h1>Hello, world!</h1>".getBytes();response.headers().setInt(CONTENT_LENGTH, bytes.length);response.content().writeBytes(bytes);// 写回响应ctx.writeAndFlush(response);}});/*ch.pipeline().addLast(new ChannelInboundHandlerAdapter() {@Overridepublic void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {log.debug("{}", msg.getClass());if (msg instanceof HttpRequest) { // 请求行,请求头} else if (msg instanceof HttpContent) { //请求体}}});*/}});ChannelFuture channelFuture = serverBootstrap.bind(8080).sync();channelFuture.channel().closeFuture().sync();
} catch (InterruptedException e) {log.error("server error", e);
} finally {boss.shutdownGracefully();worker.shutdownGracefully();
}
4 自定义协议要素
- 魔数,用来在第一时间判定是否是无效数据包
- 版本号,可以支持协议的升级
- 序列化算法,消息正文到底采用哪种序列化反序列化方式,可以由此扩展,例如:json、protobuf、hessian、jdk
- 指令类型,是登录、注册、单聊、群聊… 跟业务相关
- 请求序号,为了双工通信,提供异步能力
- 正文长度
- 消息正文
4.1 编解码器
根据上面的要素,设计一个登录请求消息和登录响应消息,并使用 Netty 完成收发
@Slf4j
public class MessageCodec extends ByteToMessageCodec<Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, ByteBuf out) throws Exception {// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
测试
EmbeddedChannel channel = new EmbeddedChannel(new LoggingHandler(),new LengthFieldBasedFrameDecoder(1024, 12, 4, 0, 0),new MessageCodec()
);
// encode
LoginRequestMessage message = new LoginRequestMessage("zhangsan", "123", "张三");
// channel.writeOutbound(message);
// decode
ByteBuf buf = ByteBufAllocator.DEFAULT.buffer();
new MessageCodec().encode(null, message, buf);ByteBuf s1 = buf.slice(0, 100);
ByteBuf s2 = buf.slice(100, buf.readableBytes() - 100);
s1.retain(); // 引用计数 2
channel.writeInbound(s1); // release 1
channel.writeInbound(s2);
解读
4.2 什么时候可以加 @Sharable
- 当 handler 不保存状态时,就可以安全地在多线程下被共享
- 但要注意对于编解码器类,不能继承 ByteToMessageCodec 或 CombinedChannelDuplexHandler 父类,他们的构造方法对 @Sharable 有限制
- 如果能确保编解码器不会保存状态,可以继承 MessageToMessageCodec 父类
@Slf4j
@ChannelHandler.Sharable
/*** 必须和 LengthFieldBasedFrameDecoder 一起使用,确保接到的 ByteBuf 消息是完整的*/
public class MessageCodecSharable extends MessageToMessageCodec<ByteBuf, Message> {@Overrideprotected void encode(ChannelHandlerContext ctx, Message msg, List<Object> outList) throws Exception {ByteBuf out = ctx.alloc().buffer();// 1. 4 字节的魔数out.writeBytes(new byte[]{1, 2, 3, 4});// 2. 1 字节的版本,out.writeByte(1);// 3. 1 字节的序列化方式 jdk 0 , json 1out.writeByte(0);// 4. 1 字节的指令类型out.writeByte(msg.getMessageType());// 5. 4 个字节out.writeInt(msg.getSequenceId());// 无意义,对齐填充out.writeByte(0xff);// 6. 获取内容的字节数组ByteArrayOutputStream bos = new ByteArrayOutputStream();ObjectOutputStream oos = new ObjectOutputStream(bos);oos.writeObject(msg);byte[] bytes = bos.toByteArray();// 7. 长度out.writeInt(bytes.length);// 8. 写入内容out.writeBytes(bytes);outList.add(out);}@Overrideprotected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {int magicNum = in.readInt();byte version = in.readByte();byte serializerType = in.readByte();byte messageType = in.readByte();int sequenceId = in.readInt();in.readByte();int length = in.readInt();byte[] bytes = new byte[length];in.readBytes(bytes, 0, length);ObjectInputStream ois = new ObjectInputStream(new ByteArrayInputStream(bytes));Message message = (Message) ois.readObject();log.debug("{}, {}, {}, {}, {}, {}", magicNum, version, serializerType, messageType, sequenceId, length);log.debug("{}", message);out.add(message);}
}
相关文章:

由浅入深Netty协议设计与解析
目录 1 为什么需要协议?2 redis 协议举例3 http 协议举例4 自定义协议要素4.1 编解码器4.2 什么时候可以加 Sharable 1 为什么需要协议? TCP/IP 中消息传输基于流的方式,没有边界。 协议的目的就是划定消息的边界,制定通信双方要…...
iptables防火墙(1)
iptables防火墙 一、iptables概述二、netfilter/iptables 关系三、四表五链1.四表2.五链 四、规则链之间的匹配顺序五、规则链内的匹配顺序六、iptables安装与配置七、常用的控制类型八、常用的管理选项九、规则命令1.添加新规则2.查看规则列表3.设置默认策略4.删除规则5.清空规…...
第九章 Productions最佳实践 - Productions开发的最佳实践
文章目录 第九章 Productions最佳实践 - Productions开发的最佳实践Productions开发的最佳实践项目目标项目交付文档 第九章 Productions最佳实践 - Productions开发的最佳实践 Productions开发的最佳实践 本章是一个总体概述,旨在帮助团队成员为从事生产项目做好…...
RocketMQ 怎么实现的消息负载均衡以及怎么能够保证消息被顺序消费
一、RocketMQ 怎么实现的消息负载均衡 RocketMQ是一种开源的分布式消息中间件,它使用了一种称为消息负载均衡的机制来实现消息的分发和消费的负载均衡。RocketMQ的消息负载均衡主要是通过以下两个方面实现的: 消息队列分组(Message Queue G…...

【随笔记】全志 T507 PF4 引脚无法被正常设置为中断模式的问题分析
相关信息 硬件平台:全志T507 系统版本:Android 10 / Linux 4.9.170 问题描述:PF4 无法通过标准接口设置为中断模式,而 PF1、PF2、PF3、PF5 正常可用。 分析过程 一开始以为是引脚被其它驱动占用引起,或者该引脚不具…...

人手一个 Midjourney,StableStudio 重磅开源!
人手一个 Midjourney,StableStudio 重磅开源! Stability AI 公司在上个月 19 号推出了 Alpha 版本 StableLM 大语言模型,包含了 30 亿和 70 亿参数,并且支持商用。如今他们再次推出了 AI 图像生成平台 StableStudio,这…...

iptables防火墙(2)
iptables防火墙(2) 一、SNATSNAT应用环境SNAT原理SNAT转换前条件扩展 二、DNATDNAT应用环境DNAT原理DNAT转换前提条件扩展 三、防火墙规则的备份和还原导出(备份)所有表的规则导入(还原)规则 一、SNAT SNA…...

Windows和Kali上使用proxychains代理流量
Windows和Kali上使用proxychains代理流量 PS. 本文演示都是在kali进行的,如有出入还请联系我哦1. Linux(Debian)1.1. 检查一下是否有proxychains1.2 修改config文件 2. Linux(Debian)安装proxychians43. Windows3.1 下载3.2 配置 4. Windows下的配置5. 测试 PS. 写这…...

KEYSIGHT MSOS204A 2GHZ 4通道DSOS204A高清晰度示波器
KEYSIGHT是德DSOS204A/MSOS204A高清晰度示波器 附加功能: 2 GHz 带宽(可升级) 4 个模拟通道和 16 个数字通道 最大存储深度:800 Mpts(2 通道),400 Mpts(4 通道) 最大…...

最新Java适配商城系统
城前端功能展示 商城移动端 后端基于SpringBoot 研发,前端使用 Vue、uniapp开发 前后端分离,支持分布式部署,支持Docker,各个API独立,并且有独立的消费者 api不需要单独部署,只需启动一个jar包就可以正…...

【KVM虚拟化】· virsh管理命令
目录 🍁libvirt架构概述 🍁使用virsh管理虚拟机 🍂常用命令总结 🍁kvm基本功能管理 🍂帮助命令 🍂KVM的配置文件存放目录 🍂查看虚拟机状态 🍂虚拟机关机与开机 🍂强制虚…...
JS Es6中判断b数组对象是否有跟a数组对象相同的数值(例如:id),有的话就过滤掉
如下[数组]对象a和b let a[{id:1,value:this},{id:2,value:is}] let b[{id:1,value:hello},{id:3,value:world}]filter() 方法创建一个新的数组,新数组中的元素是通过检查指定数组中符合条件的所有元素。 some() 方法用于检测数组中的元素是否满足指定条件&#x…...

python获取某电商平台口红数据并制作词云
目录标题 前言开发环境:模块使用数据来源分析代码展示获取数据制作词云 尾语 💝 前言 嗨喽~大家好呀,这里是魔王呐 ❤ ~! 开发环境: Python 3.8 Pycharm 模块使用 requests jieba 结巴分词 wordcloud 词云 第三方模块安装: win R 输…...

阿里成立AIDC,用“增长”解题国际化
随着阿里巴巴集团2023财年年报的披露,AIDC也随即浮出了水面。 AIDC是阿里国际数字商业集团的英文简称,AIDC即Alibaba International Digital Commerce。阿里是在5月18日公布的截至2023年3月31日的2023财年Q4及全年财报,财报数据之外ÿ…...
全面理解:在计算机科学中同步、异步、并行、并发,他们之间到底有什么区别,如果正确更好的区分它们?
同步,异步,并行,并发的基础概念 在计算机中同步的基础概念 在计算机科学中,同步(Synchronization)是指在多个过程或线程中,它们的执行在时间上是有序的。换句话说,要执行一个特定的…...
9、Ray核心框架介绍
9、Ray核心框架介绍 导航 1.简介和背景 2.Ray的基本概念和核心组件 3.分布式任务调度和依赖管理 4.对象存储和数据共享 5.Actor模型和并发编程 6.Ray的高级功能和扩展性 7.使用Ray构建分布式应用程序的案例研究 8.Ray社区和资源 9.核心框架介绍 10.扩展1...
【华为OD机试python】工单调度策略【 2023 Q1 A卷|100分】
华为OD机试- 题目列表 2023Q1 点这里!! 2023华为OD机试-刷题指南 点这里!! ■ 题目描述 当小区通信设备上报警时,系统会自动生成待处理的工单, 工单调度系统需要根据不同的策略,调度外线工程师(FME)上站去修复工单对应的问题。 根据与运营商签订的合同,不同严重程度…...

[论文阅读72]Parameter-Efficient Transfer Learning for NLP
1. 基本信息 题目论文作者与单位来源年份Parameter-Efficient Transfer Learning for NLPNeil Houlsby等Google Research,雅盖隆大学-波兰PMLR2019 Houlsby N, Giurgiu A, Jastrzebski S, et al. Parameter-efficient transfer learning for NLP[C]//Internationa…...
0基础转行居然拿到9.5K!尘埃深处是繁花,强者从未停下!
人总是越长大越胆小,很多事情不敢做,以后就更不敢做了。 为梦想颠簸的人有很多,可能不差你这个,不如意的时候我们都会想要放弃,但是生活不是一个点,它是一条长长的线,唯有行动,才能摆…...

软考初级程序员上午单选题(13)
1、下列不能兼作输入设备和输出设备的是______。 A.可擦除型光盘 B.软盘 C.硬盘 D.键盘 2、文件型计算机病毒主要感染______。 A..TXT文件 B..GIF文件 C..EXE文件 D..MP3文件 3、_…...

调用支付宝接口响应40004 SYSTEM_ERROR问题排查
在对接支付宝API的时候,遇到了一些问题,记录一下排查过程。 Body:{"datadigital_fincloud_generalsaas_face_certify_initialize_response":{"msg":"Business Failed","code":"40004","sub_msg…...

Xshell远程连接Kali(默认 | 私钥)Note版
前言:xshell远程连接,私钥连接和常规默认连接 任务一 开启ssh服务 service ssh status //查看ssh服务状态 service ssh start //开启ssh服务 update-rc.d ssh enable //开启自启动ssh服务 任务二 修改配置文件 vi /etc/ssh/ssh_config //第一…...
leetcodeSQL解题:3564. 季节性销售分析
leetcodeSQL解题:3564. 季节性销售分析 题目: 表:sales ---------------------- | Column Name | Type | ---------------------- | sale_id | int | | product_id | int | | sale_date | date | | quantity | int | | price | decimal | -…...
CRMEB 中 PHP 短信扩展开发:涵盖一号通、阿里云、腾讯云、创蓝
目前已有一号通短信、阿里云短信、腾讯云短信扩展 扩展入口文件 文件目录 crmeb\services\sms\Sms.php 默认驱动类型为:一号通 namespace crmeb\services\sms;use crmeb\basic\BaseManager; use crmeb\services\AccessTokenServeService; use crmeb\services\sms\…...

【JVM】Java虚拟机(二)——垃圾回收
目录 一、如何判断对象可以回收 (一)引用计数法 (二)可达性分析算法 二、垃圾回收算法 (一)标记清除 (二)标记整理 (三)复制 (四ÿ…...
【Android】Android 开发 ADB 常用指令
查看当前连接的设备 adb devices 连接设备 adb connect 设备IP 断开已连接的设备 adb disconnect 设备IP 安装应用 adb install 安装包的路径 卸载应用 adb uninstall 应用包名 查看已安装的应用包名 adb shell pm list packages 查看已安装的第三方应用包名 adb shell pm list…...

群晖NAS如何在虚拟机创建飞牛NAS
套件中心下载安装Virtual Machine Manager 创建虚拟机 配置虚拟机 飞牛官网下载 https://iso.liveupdate.fnnas.com/x86_64/trim/fnos-0.9.2-863.iso 群晖NAS如何在虚拟机创建飞牛NAS - 个人信息分享...

MacOS下Homebrew国内镜像加速指南(2025最新国内镜像加速)
macos brew国内镜像加速方法 brew install 加速formula.jws.json下载慢加速 🍺 最新版brew安装慢到怀疑人生?别怕,教你轻松起飞! 最近Homebrew更新至最新版,每次执行 brew 命令时都会自动从官方地址 https://formulae.…...
【安全篇】金刚不坏之身:整合 Spring Security + JWT 实现无状态认证与授权
摘要 本文是《Spring Boot 实战派》系列的第四篇。我们将直面所有 Web 应用都无法回避的核心问题:安全。文章将详细阐述认证(Authentication) 与授权(Authorization的核心概念,对比传统 Session-Cookie 与现代 JWT(JS…...

论文阅读:Matting by Generation
今天介绍一篇关于 matting 抠图的文章,抠图也算是计算机视觉里面非常经典的一个任务了。从早期的经典算法到如今的深度学习算法,已经有很多的工作和这个任务相关。这两年 diffusion 模型很火,大家又开始用 diffusion 模型做各种 CV 任务了&am…...