gRPC-Go基础(4)metadata和超时设置
文章目录
- 0. 简介
- 1. metadata
- 1.1 metadata结构
- 1.2 metadata创建
- 1.3 客户端处理metadata
- 1.4 服务端处理metadata
- 1.5 metadata的传输
- 2. 超时设置
- 2.1 客户端输出超时信息
- 2.2 服务端端接收超时信息
- 3. 小结
0. 简介
Go在多个go routine之间传递数据使用的是Go SDK提供的context包,而context的作用范围在进程内,而gRPC使用的是跨进程的网络传输,那如何实现跨进程的元数据传输呢?
1. metadata
1.1 metadata结构
metadata的简单理解,就是http 的 Header 中的 key-value 对
gRPC使用metadata在服务之间传输全局数据,metadata形式为键值对(k-v)列表,如下:
type MD map[string][]string
- k一般为字符串,v可以是字符串,也可以是二进制数据,当v是二进制数据时,k必须以-bin结尾,二进制数据会被base64编码然后传输;
- 同时,k不能以grpc-开头,因为这是为gRPC所保留;
- k中大写字符会被转化为小写;
- 如上,一个k可以对应多个v。
1.2 metadata创建
New方法
md := metadata.New(map[string]string{"k1":"v1","k2":"v2"})
Pair方法(相同的key自动合并)
md := metadata.Pairs("k1", "v1","k1", "v1.2","k2-bin", string([]byte{1, 2}),
)
k1对应的值会被自动合并为map[string][]string{k1: {v1, v1.2}}。k2-bin对应的值会被进行base64编码。
1.3 客户端处理metadata
1.3.1 发送metadata
NewOutgoingContext
md := metadata.Pairs("k1", "v1","k1", "v1.2","k2-bin", string([]byte{1, 2}),
)
ctx := metadata.NewOutgoingContext(context.Background(), md)
将新创建的 Metadata 添加到 context 中,这样会 覆盖 掉原来已有的 metadata。其实际上就是调用了context.WithValue 方法,生成了一个子context而已,这个子context中包含了传入的metadata。
// NewOutgoingContext creates a new context with outgoing md attached. If used
// in conjunction with AppendToOutgoingContext, NewOutgoingContext will
// overwrite any previously-appended metadata.
func NewOutgoingContext(ctx context.Context, md MD) context.Context {return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md})
}
AppendToOutgoingContext
ctx = metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v1.2", "k2-bin", string([]byte{1, 2}))
AppendToOutgoingContext方法将k-v对添加到已有的context中。如果对应的context没有metadata,那么就会创建一个;如果已有metadata了,那么就将数据添加到原来的metadata(推荐使用 AppendToOutgoingContext)。
1.3.2 接收metadata
目前,gRPC的客户端支持接收的metadata包括header和trailer。
一元RPC
header和trailer可以通过Header和Trailer方法,在调用gRPC方法的CallOption时传入,在函数调用结束后取出。
var header, trailer metadata.MD
res, err := grpcClient.SimpleRoute(ctx, &simplepb.SimpleRequest{Data: "I am iguochan"}, grpc.Header(&header), grpc.Trailer(&trailer))
值得注意的是,虽然流式RPC的方法调用中也有CallOption,但是这两个方法**明确仅用于一元RPC。
// Header returns a CallOptions that retrieves the header metadata
// for a unary RPC.
func Header(md *metadata.MD) CallOption {return HeaderCallOption{HeaderAddr: md}
}// Trailer returns a CallOptions that retrieves the trailer metadata
// for a unary RPC.
func Trailer(md *metadata.MD) CallOption {return TrailerCallOption{TrailerAddr: md}
}
流式RPC
所有的流式RPC都将使用以下形式接收服务端发送的header和trailer。通过接口ClientStream的Header() (metadata.MD, error)和Trailer() metadata.MD方法取出。
stream, err := grpcClient.RouteList(context.Background())// 从流中取header
header, err := stream.Header()// 从流中取出trailer
trailer := stream.Trailer()
1.4 服务端处理metadata
1.4.1 发送metadata
和客户端接收metadata一样,服务端只能发送header和trailer这两类metadata。
一元RPC
有关这些方法的具体用法可以参考go文档。
func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {// ...md := metadata.Pairs(/* ... */)err := grpc.SendHeader(ctx, md)// ...md := metadata.Pairs(/* ... */)err = grpc.SetTrailer(ctx, md)
}
流式RPC
func (r *Server) RouteList(server simplepb.Route_RouteListServer) error {// ...md := metadata.Pairs(/* ... */)err := server.SendHeader(md)// ...md := metadata.Pairs(/* ... */)server.SetTrailer(md)// ...
}
1.4.2 接收metadata
要读取客户端发送的元数据,服务器需要从 RPC 上下文中检索它。如果是一元调用,则可以使用 RPC 处理程序的上下文。对于流式调用,服务器需要从流中获取上下文。
一元RPC
func (r *Server) SimpleRoute(ctx context.Context, request *simplepb.SimpleRequest) (*simplepb.SimpleResponse, error) {md, ok := metadata.FromIncomingContext(ctx)if ok {// do something with metadata}// ...
}
流式RPC
func (r *Server) ListValue(request *simplepb.SimpleRequest, server simplepb.Route_ListValueServer) error {md, ok := metadata.FromIncomingContext(server.Context())if ok {// do something with metadata}// ...
}
1.5 metadata的传输
以上介绍了metadata的用法,本节将介绍一下metadata在gRPC各端之间的传输机制。上面说过,在Go中,metadata就是实际上就是调用了context.WithValue 方法,生成了一个子context,那其实不管是在客户端还是服务端,都属于context的上下文传递。
1.5.1 客户端输出metadata
根据前面的文章,我们知道,用户定义好protobuf后通过protoc生成服务端的interface和客户端的桩代码(stub),在桩代码中,已经包含了客户端的实现:
func (c *routeClient) SimpleRoute(ctx context.Context, in *SimpleRequest, opts ...grpc.CallOption) (*SimpleResponse, error) {out := new(SimpleResponse)err := c.cc.Invoke(ctx, "/simplepb.Route/SimpleRoute", in, out, opts...)if err != nil {return nil, err}return out, nil
}
当发起gRPC请求后,会调用Invoke方法,最终会调用invoke方法:
func invoke(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption) error {cs, err := newClientStream(ctx, unaryStreamDesc, cc, method, opts...)if err != nil {return err}if err := cs.SendMsg(req); err != nil {return err}return cs.RecvMsg(reply)
}
invoke方法会调用newClientStream,newClientStream方法比较长,在里面会生成一个函数newStream,并调用newClientStreamWithParams:
func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {// ...var newStream = func(ctx context.Context, done func()) (iresolver.ClientStream, error) {return newClientStreamWithParams(ctx, desc, cc, method, mc, onCommit, done, opts...)}// ...
}
newClientStreamWithParams最红会选择一个transport去传输:
func newClientStreamWithParams(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, mc serviceconfig.MethodConfig, onCommit, doneFunc func(), opts ...CallOption) (_ iresolver.ClientStream, err error) {// ...// Pick the transport to use and create a new stream on the transport.// Assign cs.attempt upon success.op := func(a *csAttempt) error {if err := a.getTransport(); err != nil {return err}if err := a.newStream(); err != nil {return err}// Because this operation is always called either here (while creating// the clientStream) or by the retry code while locked when replaying// the operation, it is safe to access cs.attempt directly.cs.attempt = areturn nil}// ...
}
而gRPC选择的是HTTP2作为传输协议,所以最终a.newStream()会最终调用到HTTP2的NewStream:
// NewStream creates a stream and registers it into the transport as "active"
// streams. All non-nil errors returned will be *NewStreamError.
func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (*Stream, error) {ctx = peer.NewContext(ctx, t.getPeer())headerFields, err := t.createHeaderFields(ctx, callHdr)if err != nil {return nil, &NewStreamError{Err: err, AllowTransparentRetry: false}}
}
createHeaderFields,顾名思义,就是构建HTTP请求时的头(HEADERS Frame),其有关metadata从context到HTTP Header的转换实现如下:
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {// ...if md, added, ok := metadata.FromOutgoingContextRaw(ctx); ok {var k stringfor k, vv := range md {// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.if isReservedHeader(k) {continue}for _, v := range vv {headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})}}for _, vv := range added {for i, v := range vv {if i%2 == 0 {k = strings.ToLower(v)continue}// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.if isReservedHeader(k) {continue}headerFields = append(headerFields, hpack.HeaderField{Name: k, Value: encodeMetadataHeader(k, v)})}}}// ...
}
可以看到,所谓metadata,其实最终是通过HTTP2协议的头帧带入到网络的,主要存储在Binary-Header和ASCII-Header,感兴趣的同学可以参考gRPC over HTTP2。
1.5.2 服务端接收metadata
服务端通过Serve方法来启动,监听来自客户端的连接:
err = grpcServer.Serve(listener)
if err != nil {log.Fatalf("grpcServer.Serve err: %v", err)
}
Serve方法最终会调用到handleRawConn,handleRawConn会通过HTTP2去接收消息。
func (s *Server) handleRawConn(lisAddr string, rawConn net.Conn) {// ...st := s.newHTTP2Transport(rawConn)// ...go func() {s.serveStreams(st)s.removeConn(lisAddr, st)}()
}
接下来的调用流程涉及的方法是:serveStreams——>HandleStreams——>operateHeaders,最终在operateHeaders,会将所有的头文件中的传入的Header(除去默认的一些字段等)读入到mdata中,这就是服务端拿到的metadata。
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {// ...for _, hf := range frame.Fields {switch hf.Name {// ...default:if isReservedHeader(hf.Name) && !isWhitelistedHeader(hf.Name) {break}v, err := decodeMetadataHeader(hf.Name, hf.Value)if err != nil {headerError = truelogger.Warningf("Failed to decode metadata header (%q, %q): %v", hf.Name, hf.Value, err)break}mdata[hf.Name] = append(mdata[hf.Name], v)}}// ...
}
2. 超时设置
除了一些元数据的传输,在gRPC中,我们还支持将context中的超时传递到接下来的每个服务中,甚至可以跨语言,而这是怎么做到的呢?首先我们想到是通过metadata传输,实际上并不是的。
2.1 客户端输出超时信息
其实从前面1.5.1 客户端输出metadata的最后一步,createHeaderFields方法中,就有如下处理,超时的数值会被作为key是grpc-timeout的值放在HTTP2的头中(正如备注中所言,其实在网络上的传输时间是没有被计入的,会有一定误差)。
func (t *http2Client) createHeaderFields(ctx context.Context, callHdr *CallHdr) ([]hpack.HeaderField, error) {// ...if dl, ok := ctx.Deadline(); ok {// Send out timeout regardless its value. The server can detect timeout context by itself.// TODO(mmukhi): Perhaps this field should be updated when actually writing out to the wire.timeout := time.Until(dl)headerFields = append(headerFields, hpack.HeaderField{Name: "grpc-timeout", Value: grpcutil.EncodeDuration(timeout)})}// ...
}
2.2 服务端端接收超时信息
同理,在HTTP2的server端的头处理函数operateHeaders中,也会对grpc-timeout对应的值进行处理,将其值读入到timeout中,并置上timeoutSet标志,用于后续处理。
func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(*Stream), traceCtx func(context.Context, string) context.Context) (fatal bool) {// ...var (// ...timeoutSet booltimeout time.Duration)// ...for _, hf := range frame.Fields {switch hf.Name {// ...case "grpc-timeout":timeoutSet = truevar err errorif timeout, err = decodeTimeout(hf.Value); err != nil {headerError = true}}}// ...if timeoutSet {s.ctx, s.cancel = context.WithTimeout(t.ctx, timeout)} else {s.ctx, s.cancel = context.WithCancel(t.ctx)}// ...
}
3. 小结
根据以上分析,我们可以发现,不管是metadata还是超时,都是通过HTTP2的头帧传入到对侧的。
相关文章:
gRPC-Go基础(4)metadata和超时设置
文章目录 0. 简介1. metadata1.1 metadata结构1.2 metadata创建1.3 客户端处理metadata1.4 服务端处理metadata1.5 metadata的传输 2. 超时设置2.1 客户端输出超时信息2.2 服务端端接收超时信息 3. 小结 0. 简介 Go在多个go routine之间传递数据使用的是Go SDK提供的context包…...
语言模型:从n-gram到神经网络的演进
目录 1 前言2 语言模型的两个任务2.1 自然语言理解2.2 自然语言生成 3 n-gram模型4 神经网络语言模型5 结语 1 前言 语言模型是自然语言处理领域中的关键技术之一,它致力于理解和生成人类语言。从最初的n-gram模型到如今基于神经网络的深度学习模型,语言…...
docker compose 部署 grafana + loki + vector 监控kafka消息
Centos7 随笔记录记录 docker compose 统一管理 granfana loki vector 监控kafka 信息。 当然如果仅仅是想通过 Grafana 监控kafka,推荐使用 Grafana Prometheus 通过JMX监控kafka 目录 1. 目录结构 2. 前提已安装Docker-Compose 3. docker-compose 自定义服…...
kubeadm创建k8s集群
kubeadm来快速的搭建一个k8s集群: 二进制搭建适合大集群,50台以上。 kubeadm更适合中下企业的业务集群。 部署框架 master192.168.10.10dockerkubelet kubeadm kubectl flannelnode1192.168.10.20dockerkubelet kubeadm kubectl flannelnode2192.168.1…...
鸿蒙开发之android对比开发《基础知识》
基于华为鸿蒙未来可能不再兼容android应用,推出鸿蒙开发系列文档,帮助android开发人员快速上手鸿蒙应用开发。 1. 鸿蒙使用什么基础语言开发? ArkTS是鸿蒙生态的应用开发语言。它在保持TypeScript(简称TS)基本语法风…...
2702 高级打字机
因为Undo操作只能撤销Type操作,所以Undo x 实际上就是删除文章末尾x个字母。用一个栈即可解决(每个字母最多进出一次)。 这种情况下只需要设计一个合理的数据结构依次执行操作即可。 版本树:Undo x撤销最近的x次修改操作…...
yolov5旋转目标检测-遥感图像检测-无人机旋转目标检测-附代码和原理
综述 为了解决旋转目标检测问题,研究者们提出了多种方法和算法。以下是一些常见的旋转目标检测方法: 基于滑动窗口的方法:在图像上以不同的尺度和角度滑动窗口,通过分类器判断窗口中是否存在目标。这种方法简单直观,…...
Qt学习:Qt的意义安装Qt
Qt 的简介 QT 是一个跨平台的 C图形用户界面应用程序框架。它为程序开发者提供图形界面所需的所有功能。它是完全面向对象的,很容易扩展,并且允许真正地组件编程。 支持平台 xP 、 Vista、Win7、win8、win2008、win10Windows . Unix/Linux: Ubuntu 等…...
Anylogic Pro 8.8.x for Mac / for Linux Crack
Digital twins – a step towards a digital enterprise AnyLogic是唯一一个支持创建模拟模型的方法的模拟建模工具:面向过程(离散事件)、系统动态和代理,以及它们的任何组合。AnyLogic提供的建模语言的独特性、灵活性和强大性使…...
ROS无人机初始化GPS定位漂移误差,确保无人机稳定飞行
引言: 由于GPS在室外漂移的误差比较大,在长时间静止后启动,程序发布的位置可能已经和预期的位置相差较大,导致无法完成任务,尤其是气压计的数据不准,可能会导致无人机不能起飞或者一飞冲天。本文主要是在进…...
k8s网络类型
k8s中的通信模式: pod内部之间容器与容器之间的通信。 在同一个pod中的容器共享资源和网络,使用同一个网络命名空间。可以直接通信的。 同一个node节点之内,不同pod之间的通信。 每一个pod都有一个全局的真实的IP地址,同一个n…...
Seata 中封装了四种分布式事务模式,分别是: AT 模式, TCC 模式, Saga 模式, XA 模式,
文章目录 seata概述Seata 中封装了四种分布式事务模式,分别是:AT 模式,TCC 模式,Saga 模式,XA 模式, 今天我们来聊聊seata seata 概述 在微服务架构下,由于数据库和应用服务的拆分,…...
为什么设计制造行业需要数据加密?
设计制造行业是一个涉及多种技术、工艺、材料和产品的广泛领域,它对经济和社会的发展有着重要的影响。然而,随着数字化、智能化和网络化的发展,设计制造行业也面临着越来越多的数据安全风险,如数据泄露、数据篡改、数据窃取等。这…...
查看ios app运行日志
摘要 本文介绍了一款名为克魔助手的iOS应用日志查看工具,该工具可以方便地查看iPhone设备上应用和系统运行时的实时日志和奔溃日志。同时还提供了奔溃日志分析查看模块,可以对苹果奔溃日志进行符号化、格式化和分析,极大地简化了开发者的调试…...
怎么卸载macOS上的爱思助手如何卸载macOS上的logitech g hub,如何卸载顽固macOS应用
1.在App Store里下载Cleaner One Pro (注意,不需要订阅付费!!!白嫖基础功能就完全够了!!!) 2.运行软件,在左侧目录中选择“应用程序管理”,然后点…...
侦探IP“去推理化”:《名侦探柯南》剧场版走过26年
2023年贺岁档,柯南剧场版的第26部《黑铁的鱼影》如期上映。 这部在日本狂卷票房128亿日元的作品,被誉为有史以来柯南剧场版在商业成绩上最好的一部。 但该作在4月份日本还未上映前,就于国内陷入了巨大的争议。 试映内容里,灰原…...
图论 经典例题
1 拓扑排序 对有向图的节点排序,使得对于每一条有向边 U-->V U都出现在V之前 *有环无法拓扑排序 indegree[], nxs[];//前者表示节点 i 的入度,后者表示节点 i 指向的节点 queue [] for i in range(n):if indege[i] 0: queue.add(i)// 入度为0的节…...
Oracle数据updater如何回滚
1.查询update语句执行的时间节点 ; select t.FIRST_LOAD_TIME, t.SQL_TEXT from v$sqlarea t where to_char(t.FIRST_LOAD_TIME) > 2023-03-19/17:00:00 order by t.FIRST_LOAD_TIME desc;开启表的行迁移 alter table test enable row movement;3.回滚表数据到…...
redis开启密码验证
开启密码验证 (1)配置文件中设置 redis.conf文件里面配置requirepass参数,redis认证密码:foobared,然后重启redis服务 ./redis-cli 127.0.0.1:6379> 127.0.0.1:6379> 127.0.0.1:6379> CONFIG SET requi…...
一种删除 KubeSphere 中一直卡在 Terminating 的 Namespace--KubeSphere Logging System的简单方法
文章目录 一、问题提出二、删除方法1,获取kubesphere-logging-syste的详细信息json文件2,编辑kubesphere-logging-system.json3,执行清理命令 三、检查结果 一、问题提出 在使用 KubeSphere 的时候发现有一个日志服务KubeSphere Logging Sys…...
超短脉冲激光自聚焦效应
前言与目录 强激光引起自聚焦效应机理 超短脉冲激光在脆性材料内部加工时引起的自聚焦效应,这是一种非线性光学现象,主要涉及光学克尔效应和材料的非线性光学特性。 自聚焦效应可以产生局部的强光场,对材料产生非线性响应,可能…...
AtCoder 第409场初级竞赛 A~E题解
A Conflict 【题目链接】 原题链接:A - Conflict 【考点】 枚举 【题目大意】 找到是否有两人都想要的物品。 【解析】 遍历两端字符串,只有在同时为 o 时输出 Yes 并结束程序,否则输出 No。 【难度】 GESP三级 【代码参考】 #i…...
剑指offer20_链表中环的入口节点
链表中环的入口节点 给定一个链表,若其中包含环,则输出环的入口节点。 若其中不包含环,则输出null。 数据范围 节点 val 值取值范围 [ 1 , 1000 ] [1,1000] [1,1000]。 节点 val 值各不相同。 链表长度 [ 0 , 500 ] [0,500] [0,500]。 …...
【SQL学习笔记1】增删改查+多表连接全解析(内附SQL免费在线练习工具)
可以使用Sqliteviz这个网站免费编写sql语句,它能够让用户直接在浏览器内练习SQL的语法,不需要安装任何软件。 链接如下: sqliteviz 注意: 在转写SQL语法时,关键字之间有一个特定的顺序,这个顺序会影响到…...
Module Federation 和 Native Federation 的比较
前言 Module Federation 是 Webpack 5 引入的微前端架构方案,允许不同独立构建的应用在运行时动态共享模块。 Native Federation 是 Angular 官方基于 Module Federation 理念实现的专为 Angular 优化的微前端方案。 概念解析 Module Federation (模块联邦) Modul…...
Maven 概述、安装、配置、仓库、私服详解
目录 1、Maven 概述 1.1 Maven 的定义 1.2 Maven 解决的问题 1.3 Maven 的核心特性与优势 2、Maven 安装 2.1 下载 Maven 2.2 安装配置 Maven 2.3 测试安装 2.4 修改 Maven 本地仓库的默认路径 3、Maven 配置 3.1 配置本地仓库 3.2 配置 JDK 3.3 IDEA 配置本地 Ma…...
Linux --进程控制
本文从以下五个方面来初步认识进程控制: 目录 进程创建 进程终止 进程等待 进程替换 模拟实现一个微型shell 进程创建 在Linux系统中我们可以在一个进程使用系统调用fork()来创建子进程,创建出来的进程就是子进程,原来的进程为父进程。…...
c# 局部函数 定义、功能与示例
C# 局部函数:定义、功能与示例 1. 定义与功能 局部函数(Local Function)是嵌套在另一个方法内部的私有方法,仅在包含它的方法内可见。 • 作用:封装仅用于当前方法的逻辑,避免污染类作用域,提升…...
深度剖析 DeepSeek 开源模型部署与应用:策略、权衡与未来走向
在人工智能技术呈指数级发展的当下,大模型已然成为推动各行业变革的核心驱动力。DeepSeek 开源模型以其卓越的性能和灵活的开源特性,吸引了众多企业与开发者的目光。如何高效且合理地部署与运用 DeepSeek 模型,成为释放其巨大潜力的关键所在&…...
水泥厂自动化升级利器:Devicenet转Modbus rtu协议转换网关
在水泥厂的生产流程中,工业自动化网关起着至关重要的作用,尤其是JH-DVN-RTU疆鸿智能Devicenet转Modbus rtu协议转换网关,为水泥厂实现高效生产与精准控制提供了有力支持。 水泥厂设备众多,其中不少设备采用Devicenet协议。Devicen…...
