day1 服务端与消息编码
文章目录
- 消息的序列化与反序列化
- 通信过程
- 服务端的实现
- main 函数(一个简易的客户端)
本文代码地址:
本文是7天用Go从零实现RPC框架GeeRPC
的第一篇。
- 使用
encoding/gob
实现消息的编解码(序列化与反序列化) - 实现一个简易的服务端,仅接受消息,不处理,代码约
200
行
消息的序列化与反序列化
一个典型的 RPC
调用如下:
err = client.Call("Arith.Multiply", args, &reply)
客户端发送的请求包括服务名 Arith
,方法名 Multiply
,参数 args
三个,服务端的响应包括错误 error
,返回值 reply
2
个。我们将请求和响应中的参数和返回值抽象为 body
,剩余的信息放在 header
中,那么就可以抽象出数据结构 Header
:
day1-codec/codec/codec.go
package codecimport "io"type Header struct {ServiceMethod string // format "Service.Method"Seq uint64 // sequence number chosen by clientError string
}
ServiceMethod
是服务名和方法名,通常与Go
语言中的结构体和方法相映射。Seq
是请求的序号,也可以认为是某个请求的ID
,用来区分不同的请求。Error
是错误信息,客户端置为空,服务端如果如果发生错误,将错误信息置于Error
中。
我们将和消息编解码相关的代码都放到 codec
子目录中,在此之前,还需要在根目录下使用 go mod init geerpc
初始化项目,方便后续子 package
之间的引用。
进一步,抽象出对消息体进行编解码的接口 Codec
,抽象出接口是为了实现不同的 Codec
实例:
type Codec interface {io.CloserReadHeader(*Header) errorReadBody(interface{}) errorWrite(*Header, interface{}) error
}
紧接着,抽象出 Codec
的构造函数,客户端和服务端可以通过 Codec
的 Type
得到构造函数,从而创建 Codec
实例。这部分代码和工厂模式类似,与工厂模式不同的是,返回的是构造函数,而非实例。
type NewCodecFunc func(io.ReadWriteCloser) Codectype Type stringconst (GobType Type = "application/gob"JsonType Type = "application/json" // not implemented
)var NewCodecFuncMap map[Type]NewCodecFuncfunc init() {NewCodecFuncMap = make(map[Type]NewCodecFunc)NewCodecFuncMap[GobType] = NewGobCodec
}
我们定义了 2
种 Codec
,Gob
和 Json
,但是实际代码中只实现了 Gob
一种,事实上,2
者的实现非常接近,甚至只需要把 gob
换成 json
即可。
首先定义 GobCodec
结构体,这个结构体由四部分构成,conn
是由构建函数传入,通常是通过 TCP
或者 Unix
建立 socket
时得到的链接实例,dec
和 enc
对应 gob
的 Decoder
和 Encoder
,buf
是为了防止阻塞而创建的带缓冲的 Writer
,一般这么做能提升性能。
day1-codec/codec/gob.go
package codecimport ("bufio""encoding/gob""io""log"
)type GobCodec struct {conn io.ReadWriteCloserbuf *bufio.Writerdec *gob.Decoderenc *gob.Encoder
}var _ Codec = (*GobCodec)(nil)func NewGobCodec(conn io.ReadWriteCloser) Codec {buf := bufio.NewWriter(conn)return &GobCodec{conn: conn,buf: buf,dec: gob.NewDecoder(conn),enc: gob.NewEncoder(buf),}
}
在
Go
语言中,json.NewDecoder
和json.Unmarshal
都用于将JSON
数据解析为Go
中的数据结构,但它们有一些区别:
json.NewDecoder
是通过创建一个Decoder
对象,从一个io.Reader
(如os.Stdin
、文件、网络连接等)中读取JSON
数据并进行解码。json.Unmarshal
则是直接将JSON
数据(以字节切片[]byte
或者字符串的形式)解析并映射到指定的数据结构。使用场景上,如果数据是从一个输入流中读取,通常使用
json.NewDecoder
;如果已经有了JSON
数据的字节切片或字符串,使用json.Unmarshal
会更方便。json.NewEncoder
和json.Marshal
同理。
接着实现 ReadHeader
、ReadBody
、Write
和 Close
方法。
func (c *GobCodec) ReadHeader(h *Header) error {return c.dec.Decode(h)
}func (c *GobCodec) ReadBody(body interface{}) error {return c.dec.Decode(body)
}func (c *GobCodec) Write(h *Header, body interface{}) (err error) {defer func() {_ = c.buf.Flush()if err != nil {_ = c.Close()}}()if err := c.enc.Encode(h); err != nil {log.Println("rpc codec: gob error encoding header:", err)return err}if err := c.enc.Encode(body); err != nil {log.Println("rpc codec: gob error encoding body:", err)return err}return nil
}func (c *GobCodec) Close() error {return c.conn.Close()
}
通信过程
客户端与服务端的通信需要协商一些内容,例如 HTTP
报文,分为header
和 body
2
部分,body
的格式和长度通过 header
中的 Content-Type
和 Content-Length
指定,服务端通过解析 header
就能够知道如何从 body
中读取需要的信息。对于 RPC
协议来说,这部分协商是需要自主设计的。为了提升性能,一般在报文的最开始会规划固定的字节,来协商相关的信息。比如第1
个字节用来表示序列化方式,第2
个字节表示压缩方式,第3-6
字节表示 header
的长度,7-10
字节表示 body
的长度。
对于 GeeRPC
来说,目前需要协商的唯一一项内容是消息的编解码方式。我们将这部分信息,放到结构体 Option
中承载。目前,已经进入到服务端的实现阶段了。
day1-codec/server.go
package geerpcconst MagicNumber = 0x3bef5ctype Option struct {MagicNumber int // MagicNumber marks this's a geerpc requestCodecType codec.Type // client may choose different Codec to encode body
}var DefaultOption = &Option{MagicNumber: MagicNumber,CodecType: codec.GobType,
}
一般来说,涉及协议协商的这部分信息,需要设计固定的字节来传输的。但是为了实现上更简单,GeeRPC
客户端固定采用 JSON
编码 Option
,后续的 header
和 body
的编码方式由 Option
中的 CodeType
指定,服务端首先使用 JSON
解码 Option
,然后通过 Option
的 CodeType
解码剩余的内容。即报文将以这样的形式发送:
| Option{MagicNumber: xxx, CodecType: xxx} | Header{ServiceMethod ...} | Body interface{} |
| <------ 固定 JSON 编码 ------> | <------- 编码方式由 CodeType 决定 ------->|
在一次连接中,Option
固定在报文的最开始,Header
和 Body
可以有多个,即报文可能是这样的。
| Option | Header1 | Body1 | Header2 | Body2 | ...
服务端的实现
通信过程已经定义清楚了,那么服务端的实现就比较直接了。
day1-codec/server.go
// Server represents an RPC Server.
type Server struct{}// NewServer returns a new Server.
func NewServer() *Server {return &Server{}
}// DefaultServer is the default instance of *Server.
var DefaultServer = NewServer()// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func (server *Server) Accept(lis net.Listener) {for {conn, err := lis.Accept()if err != nil {log.Println("rpc server: accept error:", err)return}go server.ServeConn(conn)}
}// Accept accepts connections on the listener and serves requests
// for each incoming connection.
func Accept(lis net.Listener) { DefaultServer.Accept(lis) }
- 首先定义了结构体
Server
,没有任何的成员字段。 - 实现了
Accept
方式,net.Listener
作为参数,for
循环等待socket
连接建立,并开启子协程处理,处理过程交给了ServerConn
方法。 DefaultServer
是一个默认的Server
实例,主要为了用户使用方便。
如果想启动服务,过程是非常简单的,传入 listener
即可,tcp
协议和 unix
协议都支持。
lis, _ := net.Listen("tcp", ":9999")
geerpc.Accept(lis)
ServeConn
的实现就和之前讨论的通信过程紧密相关了,首先使用 json.NewDecoder
反序列化得到 Option
实例,检查 MagicNumber
和 CodeType
的值是否正确。然后根据 CodeType
得到对应的消息编解码器,接下来的处理交给 serverCodec
。
// ServeConn runs the server on a single connection.
// ServeConn blocks, serving the connection until the client hangs up.
func (server *Server) ServeConn(conn io.ReadWriteCloser) {defer func() { _ = conn.Close() }()var opt Optionif err := json.NewDecoder(conn).Decode(&opt); err != nil {log.Println("rpc server: options error: ", err)return}if opt.MagicNumber != MagicNumber {log.Printf("rpc server: invalid magic number %x", opt.MagicNumber)return}f := codec.NewCodecFuncMap[opt.CodecType]if f == nil {log.Printf("rpc server: invalid codec type %s", opt.CodecType)return}server.serveCodec(f(conn))
}// invalidRequest is a placeholder for response argv when error occurs
var invalidRequest = struct{}{}func (server *Server) serveCodec(cc codec.Codec) {sending := new(sync.Mutex) // make sure to send a complete responsewg := new(sync.WaitGroup) // wait until all request are handledfor {req, err := server.readRequest(cc)if err != nil {if req == nil {break // it's not possible to recover, so close the connection}req.h.Error = err.Error()server.sendResponse(cc, req.h, invalidRequest, sending)continue}wg.Add(1)go server.handleRequest(cc, req, sending, wg)}wg.Wait()_ = cc.Close()
}
serveCodec
的过程非常简单。主要包含三个阶段
- 读取请求
readRequest
- 处理请求
handleRequest
- 回复请求
sendResponse
之前提到过,在一次连接中,允许接收多个请求,即多个 request header
和 request body
,因此这里使用了for
无限制地等待请求的到来,直到发生错误(例如连接被关闭,接收到的报文有问题等),这里需要注意的点有三个:
handleRequest
使用了协程并发执行请求。- 处理请求是并发的,但是回复请求的报文必须是逐个发送的,并发容易导致多个回复报文交织在一起,客户端无法解析。在这里使用锁(
sending
)保证。 - 尽力而为,只有在
header
解析失败时,才终止循环。
// request stores all information of a call
type request struct {h *codec.Header // header of requestargv, replyv reflect.Value // argv and replyv of request
}func (server *Server) readRequestHeader(cc codec.Codec) (*codec.Header, error) {var h codec.Headerif err := cc.ReadHeader(&h); err != nil {if err != io.EOF && err != io.ErrUnexpectedEOF {log.Println("rpc server: read header error:", err)}return nil, err}return &h, nil
}func (server *Server) readRequest(cc codec.Codec) (*request, error) {h, err := server.readRequestHeader(cc)if err != nil {return nil, err}req := &request{h: h}// TODO: now we don't know the type of request argv// day 1, just suppose it's stringreq.argv = reflect.New(reflect.TypeOf(""))if err = cc.ReadBody(req.argv.Interface()); err != nil {log.Println("rpc server: read argv err:", err)}return req, nil
}func (server *Server) sendResponse(cc codec.Codec, h *codec.Header, body interface{}, sending *sync.Mutex) {sending.Lock()defer sending.Unlock()if err := cc.Write(h, body); err != nil {log.Println("rpc server: write response error:", err)}
}func (server *Server) handleRequest(cc codec.Codec, req *request, sending *sync.Mutex, wg *sync.WaitGroup) {// TODO, should call registered rpc methods to get the right replyv// day 1, just print argv and send a hello messagedefer wg.Done()log.Println(req.h, req.argv.Elem())req.replyv = reflect.ValueOf(fmt.Sprintf("geerpc resp %d", req.h.Seq))server.sendResponse(cc, req.h, req.replyv.Interface(), sending)
}
目前还不能判断 body
的类型,因此在readRequest
和 handleRequest
中,day1
将 body
作为字符串处理。接收到请求,打印 header
,并回复 geerpc resp ${req.h.Seq}
。这一部分后续再实现。
main 函数(一个简易的客户端)
day1
的内容就到此为止了,在这里我们已经实现了一个消息的编解码器 GobCodec
,并且客户端与服务端实现了简单的协议交换(protocol exchange
),即允许客户端使用不同的编码方式。同时实现了服务端的雏形,建立连接,读取、处理并回复客户端的请求。
接下来,我们就在 main
函数中看看如何使用刚实现的 GeeRPC
吧。
day1-codec/main/main.go
package mainimport ("encoding/json""fmt""geerpc""geerpc/codec""log""net""time"
)func startServer(addr chan string) {// pick a free portl, err := net.Listen("tcp", ":0")if err != nil {log.Fatal("network error:", err)}log.Println("start rpc server on", l.Addr())addr <- l.Addr().String()geerpc.Accept(l)
}func main() {addr := make(chan string)go startServer(addr)// in fact, following code is like a simple geerpc clientconn, _ := net.Dial("tcp", <-addr)defer func() { _ = conn.Close() }()time.Sleep(time.Second)// send options_ = json.NewEncoder(conn).Encode(geerpc.DefaultOption)cc := codec.NewGobCodec(conn)// send request & receive responsefor i := 0; i < 5; i++ {h := &codec.Header{ServiceMethod: "Foo.Sum",Seq: uint64(i),}_ = cc.Write(h, fmt.Sprintf("geerpc req %d", h.Seq))_ = cc.ReadHeader(h)var reply string_ = cc.ReadBody(&reply)log.Println("reply:", reply)}
}
- 在
startServer
中使用了信道addr
,确保服务端端口监听成功,客户端再发起请求。 - 客户端首先发送
Option
进行协议交换,接下来发送消息头h := &codec.Header{}
,和消息体geerpc req ${h.Seq}
。 - 最后解析服务端的响应
reply
,并打印出来。
执行结果如下:
start rpc server on [::]:63662
&{Foo.Sum 0 } geerpc req 0
reply: geerpc resp 0
&{Foo.Sum 1 } geerpc req 1
reply: geerpc resp 1
&{Foo.Sum 2 } geerpc req 2
reply: geerpc resp 2
&{Foo.Sum 3 } geerpc req 3
reply: geerpc resp 3
&{Foo.Sum 4 } geerpc req 4
reply: geerpc resp 4
原文链接:https://geektutu.com/post/geerpc-day1.html
相关文章:
day1 服务端与消息编码
文章目录 消息的序列化与反序列化通信过程服务端的实现main 函数(一个简易的客户端) 本文代码地址: 本文是7天用Go从零实现RPC框架GeeRPC的第一篇。 使用 encoding/gob 实现消息的编解码(序列化与反序列化)实现一个简易的服务端,仅接受消息,…...

部署WMS仓储管理系统项目后的注意事项
在探讨现代WMS仓储管理系统的部署与运营时,我们不得不深入剖析其背后的多维度考量与策略,以确保这一核心系统能够无缝融入并推动企业的整体供应链优化。WMS仓储管理系统作为连接仓库内部操作与外部供应链的桥梁,其重要性不言而喻,…...

跨网段 IP 地址通信故障分析
现如今计算机网络的规模和复杂性不断增加,跨网段通信成为网络运行中的常见需求。但如果设备处于不同网段且路由设置出现偏差时就会导致通信故障,严重影响网络的正常运行和数据传输。 1.跨网段通信的基本原理 跨网段通信依赖于路由器的路由功能。路由器根…...

存储引擎MySQL和InnoDB(数据库管理与高可用)
1、存储引擎 存储引擎是核心组成部分, 是构成数据库最基础最底层的部件, 利用这个部件,你的Mysql能够对数据进行查询、创建、更新、删除等操作, 也就是说,用户所输入的一系列的mysql语句,是由存储引擎来…...

探索局域网传输新境界 | 闪电藤 v2.2.7
在这个数字化时代,文件的快速、安全传输是我们日常工作中不可或缺的一部分。今天,电脑天空向大家介绍一款革命性的局域网文件传输工具——闪电藤,它将彻底改变你的文件传输体验。 🎨 界面设计 —— 极简之美 闪电藤采用极简的设…...
Tiling Window Management
我主要说一下windows版的 下面这个链接用的人比较多 GitHub - LGUG2Z/komorebi: A tiling window manager for Windows 🍉 建议搭配 GitHub - da-rth/yasb: A highly configurable cross-platform (Windows) status bar written in Python. GitHub - amnweb/ya…...
9. kubernetes资源——pv/pvc持久卷
kubernetes资源——pv/pvc持久卷 一、volume数据卷1、hostPath2、挂载nfs实现持久化 二、pv/pvc 持久卷/持久卷声明1、pv/pvc介绍2、pv/pvc的使用流程2.1 创建pv2.2 创建pvc2.3 创建pod,使用pv做持久化 一、volume数据卷 用于pod中的数据的持久化存储 支持很多的卷…...

2024西安铁一中集训DAY27 ---- 模拟赛((bfs,dp) + 整体二分 + 线段树合并 + (扫描线 + 线段树))
文章目录 前言时间安排及成绩题解A. 倒水(bfs dp)B. 让他们连通(整体二分 按秩合并并查集 / kruskal重构树)C. 通信网络(线段树合并 二分)D. 3SUM(扫描线 线段树) 前言 T1没做出…...

STM32F401VET6 PROTEUS8 ILI9341 驱动显示及仿真
stm32cubemx新建工程代码,并生成工程 设置gpio 设置SPI 其他的参考stm32默认设置 然后编辑驱动代码 ili9341.h #ifndef ILI9341_H #define ILI9341_H#include <stdbool.h> #include <stdint.h>#include "glcdfont.h" #include "stm32…...

抖音视频素材网站有哪些?非常好用的5个抖音视频素材库分享
在打造引人入胜的抖音视频时,选择高品质的视频素材至关重要。优选的素材不仅能够显著提升视频的吸引力,还能让你的作品在众多视频中突出重围。对于抖音创作者而言,让我们探索一些备受推崇的视频素材平台,帮助你制作出既专业又引人…...

【数据结构】链式二叉树的实现和思路分析及二叉树OJ
【数据结构】链式二叉树的实现和思路分析及二叉树OJ 🔥个人主页:大白的编程日记 🔥专栏:数据结构 文章目录 【数据结构】链式二叉树的实现和思路分析及二叉树OJ前言一.链式二叉树的定义及结构二.链式二叉树的遍历2.1前序遍历2.2中…...

项目成功秘诀:工单管理系统如何加速进程
国内外主流的10款项目工单管理系统对比:PingCode、Worktile、浪潮云工单管理系统、华为企业智能工单系统、金蝶云苍穹、紫光软件管理系统、Jira、Asana、ServiceNow、Smartsheet。 在管理日益复杂的个人项目时,找到一款能够真正符合需求的管理软件&#…...

OpenGauss和GaussDB有何不同
OpenGauss和GaussDB是两个不同的数据库产品,它们都具有高性能、高可靠性和高可扩展性等优点,但是它们之间也有一些区别和相似之处。了解它们之间的关系、区别、建议、适用场景和如何学习,对于提高技能和保持行业敏感性非常重要。本文将深入探…...

星环科技携手东华软件推出一表通报送联合解决方案
随着国家金融监督管理总局“一表通”试点工作的持续推进,星环科技携手东华软件推出了基于星环科技分布式分析型数据库ArgoDB和大数据基础平台TDH的一表通报送联合解决方案,并已在多地实施落地中得到充分验证。 星环科技与东华软件作为战略合作伙伴&…...

YOLOv10环境搭建、训练自己的目标检测数据集、实际验证和测试
1 环境搭建 1.1 在官方仓库的给定的使用python3.9版本,则使用conda创建对应虚拟环境。 conda create -n yolov10 python3.9 1.2 切换到对应虚拟环境 conda activate yolov10 1.3 在指定目录下克隆yolov10官方仓库代码 git clone https://github.com/THU-MIG/yo…...

Harmony Next -- 通用标题栏:高度自定义,可设置沉浸式状态,正常状态下为:左侧返回、居中标题,左中右均可自定义视图。
hm_common_title_bar OpenHarmony三方库中心仓:https://ohpm.openharmony.cn/#/cn/detail/common_title_bar 介绍 一款通用标题栏,支持高度自定义,可设置沉浸式状态,正常状态下为:左侧返回、居中标题,左…...

甄选范文“论数据分片技术及其应用”软考高级论文,系统架构设计师论文
论文真题 数据分片就是按照一定的规则,将数据集划分成相互独立、正交的数据子集,然后将数据子集分布到不同的节点上。通过设计合理的数据分片规则,可将系统中的数据分布在不同的物理数据库中,达到提升应用系统数据处理速度的目的。 请围绕“论数据分片技术及其应用”论题…...

【elementui】记录el-table设置左、右列固定时,加大滚动条宽度至使滚动条部分被固定列遮挡的解决方法
当前elementui版本:2.8.2 现象:此处el-table__body-wrapper默认的滚动条宽度为8px,我加大到10px,如果不设置fixed一切正常,设置fixed后会被遮挡一点 el-table__fixed-right::before, .el-table__fixed::before 设置…...
Python人工智能:一、语音合成和语音识别
在Python中,语音合成(Text-To-Speech, TTS)和语音识别(Speech-To-Text, STT)是两个非常重要的功能,它们在人工智能、自动化、辅助技术以及许多其他领域都有广泛的应用。下面将分别介绍这两个领域在Python中…...

C/C++进阶 (8)哈希表(STL)
个人主页:仍有未知等待探索-CSDN博客 专题分栏:C 本文着重于模拟实现哈希表,并非是哈希表的使用。 实现的哈希表的底层用的是线性探测法,并非是哈希桶。 目录 一、标准库中的哈希表 1、unordered_map 2、unordered_set 二、模…...
OpenLayers 可视化之热力图
注:当前使用的是 ol 5.3.0 版本,天地图使用的key请到天地图官网申请,并替换为自己的key 热力图(Heatmap)又叫热点图,是一种通过特殊高亮显示事物密度分布、变化趋势的数据可视化技术。采用颜色的深浅来显示…...

Linux 文件类型,目录与路径,文件与目录管理
文件类型 后面的字符表示文件类型标志 普通文件:-(纯文本文件,二进制文件,数据格式文件) 如文本文件、图片、程序文件等。 目录文件:d(directory) 用来存放其他文件或子目录。 设备…...

【JVM】- 内存结构
引言 JVM:Java Virtual Machine 定义:Java虚拟机,Java二进制字节码的运行环境好处: 一次编写,到处运行自动内存管理,垃圾回收的功能数组下标越界检查(会抛异常,不会覆盖到其他代码…...

视频字幕质量评估的大规模细粒度基准
大家读完觉得有帮助记得关注和点赞!!! 摘要 视频字幕在文本到视频生成任务中起着至关重要的作用,因为它们的质量直接影响所生成视频的语义连贯性和视觉保真度。尽管大型视觉-语言模型(VLMs)在字幕生成方面…...
【决胜公务员考试】求职OMG——见面课测验1
2025最新版!!!6.8截至答题,大家注意呀! 博主码字不易点个关注吧,祝期末顺利~~ 1.单选题(2分) 下列说法错误的是:( B ) A.选调生属于公务员系统 B.公务员属于事业编 C.选调生有基层锻炼的要求 D…...
相机Camera日志分析之三十一:高通Camx HAL十种流程基础分析关键字汇总(后续持续更新中)
【关注我,后续持续新增专题博文,谢谢!!!】 上一篇我们讲了:有对最普通的场景进行各个日志注释讲解,但相机场景太多,日志差异也巨大。后面将展示各种场景下的日志。 通过notepad++打开场景下的日志,通过下列分类关键字搜索,即可清晰的分析不同场景的相机运行流程差异…...
汇编常见指令
汇编常见指令 一、数据传送指令 指令功能示例说明MOV数据传送MOV EAX, 10将立即数 10 送入 EAXMOV [EBX], EAX将 EAX 值存入 EBX 指向的内存LEA加载有效地址LEA EAX, [EBX4]将 EBX4 的地址存入 EAX(不访问内存)XCHG交换数据XCHG EAX, EBX交换 EAX 和 EB…...

论文笔记——相干体技术在裂缝预测中的应用研究
目录 相关地震知识补充地震数据的认识地震几何属性 相干体算法定义基本原理第一代相干体技术:基于互相关的相干体技术(Correlation)第二代相干体技术:基于相似的相干体技术(Semblance)基于多道相似的相干体…...

解读《网络安全法》最新修订,把握网络安全新趋势
《网络安全法》自2017年施行以来,在维护网络空间安全方面发挥了重要作用。但随着网络环境的日益复杂,网络攻击、数据泄露等事件频发,现行法律已难以完全适应新的风险挑战。 2025年3月28日,国家网信办会同相关部门起草了《网络安全…...

计算机基础知识解析:从应用到架构的全面拆解
目录 前言 1、 计算机的应用领域:无处不在的数字助手 2、 计算机的进化史:从算盘到量子计算 3、计算机的分类:不止 “台式机和笔记本” 4、计算机的组件:硬件与软件的协同 4.1 硬件:五大核心部件 4.2 软件&#…...