Grpc
grpc 数据格式¶
grpc 基于 http2 应用层协议,底层使用 tcp 连接
HEADERS (gRPC metadata)
└─ :method = POST
:path = /package.Service/Method
content-type = application/grpc
grpc-timeout = 1s
DATA (gRPC message)
└─ [5-byte header][Protobuf payload]
TRAILERS (gRPC status)
└─ grpc-status = 0 (OK)
HEADER frame 中的数据包括 grpc 的 metadata
TRAILERS frame:发送 gRPC 的状态码(如 grpc-status
)和错误信息(如 grpc-message
),也包括服务端返回的 metadata
其中,DATA frame 包括一个 header 和 protobuf 序列化后的二进制数据:
┌─────────———┬──────────────——┐
│1 byte │ 4 bytes │
│Compressed? │ Message Length │
└─────────———┴──────────────——┘
grpc 元数据¶
相当于是以整洁的 kv 格式组织起来了一些特殊的数据,方便地在 client 和 server 之间传递
概念¶
- 在处理 rpc 请求和响应过程中需要,但是又不属于具体业务的信息,可以包含一些通用的 token、请求标识等
- 元数据对 grpc 本身是不可见的,需要在程序或中间件中处理
- go 语言下使用 google.golang.org/grpc/metadata 库来操作元数据
metadata 的创建¶
第一种方法是使用函数 New
基于map[string]string
创建元数据:
另一种方法是使用Pairs
。具有相同键的值将合并到一个列表中:
md := metadata.Pairs(
"key1", "val1",
"key1", "val1-2", // "key1"的值将会是 []string{"val1", "val1-2"}
"key2", "val2",
)
所有键会自动转换为小写
metadata 传递¶
- 客户端向服务端发送的 metadata 通过 context 来传递
- 服务端向客户端发送的 metadata 通过 header 和 trailer 来传递
客户端发送 metadata¶
通过 AppendToOutgoingContext 将 k-v 追加到 context(即使 context 中还没有元数据)
// 创建带有metadata的context
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")
// 添加一些 metadata 到 context (e.g. in an interceptor)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")
// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)
AppendToOutgoingContext() 这里实际是将 metadata 通过 context 包的 WithValue 放到了 ctx 中,随后跟随 rpc 方法的调用链传递到最底层再编码到 Header Frame 中
func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context { if len(kv)%2 == 1 { panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv))) } md, _ := ctx.Value(mdOutgoingKey{}).(rawMD) added := make([][]string, len(md.added)+1) copy(added, md.added) kvCopy := make([]string, 0, len(kv)) for i := 0; i < len(kv); i += 2 { kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1]) } added[len(added)-1] = kvCopy return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added}) }
服务端接收客户端发来的 metadata¶
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
md, ok := metadata.FromIncomingContext(ctx)
// do something with metadata
}
服务端发送 metadata¶
通过 header 和 trailer 来发送
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
// 创建和发送 header
header := metadata.Pairs("header-key", "val")
grpc.SendHeader(ctx, header)
// 创建和发送 trailer
trailer := metadata.Pairs("trailer-key", "val")
grpc.SetTrailer(ctx, trailer)
}
客户端接收 metadata¶
客户端可以接收的元数据包括 header 和 trailer
trailer可以用于服务器希望在处理请求后给客户端发送任何内容,例如在流式RPC中只有等所有结果都流到客户端后才能计算出负载信息,这时候就不能使用headers(header在数据之前,trailer在数据之后)。比如需要计算前面所有数据的 md5 值
var header, trailer metadata.MD // 声明存储header和trailer的变量
r, err := client.SomeRPC(
ctx,
someRequest,
grpc.Header(&header), // 将会接收header
grpc.Trailer(&trailer), // 将会接收trailer
)
func Header(md *metadata.MD) CallOption { return HeaderCallOption{HeaderAddr: md} } // HeaderCallOption is a CallOption for collecting response header metadata. // The metadata field will be populated *after* the RPC completes. type HeaderCallOption struct { HeaderAddr *metadata.MD } func (o HeaderCallOption) before(*callInfo) error { return nil } func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) { *o.HeaderAddr, _ = attempt.transportStream.Header() }
这里的 grpc.Header(&header) 实际上是传递了一个 CallOption 接口作为 rpc 调用的参数。具体地,grpc.Header() 将需要填充的 header 的地址作为实现了 CallOption 接口的结构体的成员值。
后续在 调用接口方法的时候会为这个地址处的内容赋值
一种设计模式:一个结构体将一个指针作为成员变量,并且绑定一个为成员变量赋值的方法;后续通过调用这个成员方法来修改对应内存地址处的值(指针的灵活性得以体现)
grpc 拦截器¶
目前探讨非流式的拦截器
- 预处理
- 调用中
- 调用后
通过基于函数类实现自己的 intercepter 方法并注册到框架中,框架会自动根据拦截器链调用 interceptor 方法
在链式拦截器中,每个拦截器都必须显式调用
invoker
(客户端)或handler
(服务端),框架的调用是通过嵌套闭包实现的(后一个 interceptor 会成为前一个的 invoker)(洋葱模型)
客户端¶
方法签名:
func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error
这里 grpc 提供的 UnaryInvoker 为函数类型,定义为:
type UnaryInvoker func(
ctx context.Context,
method string, // 方法名(如 "/service/method")
req, reply interface{}, // 请求和响应结构体
cc *ClientConn, // 客户端底层连接
opts ...CallOption, // options
) error
注册:
conn, err := grpc.NewClient("127.0.0.1:8972",
grpc.WithUnaryInterceptor(unaryInterceptor), // With 返回一个参数为选项表实例的函数
)
服务端¶
方法签名:
func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
其中,UnaryHandler 定义为:
在请求的 processUnaryRPC() 这里 handler 已经通过闭包在内部设置好了需要调用的 service 的具体方法
注册:
gRPC-Gateway¶
一个 protoc 插件,读取 gRPC 服务定义(远程方法、参数类型),生成一个反向代理服务器,将 RESTful JSON API 转换为 grpc 协议的请求,并返回 RESTful 响应
需要引入依赖
并下载 gRPC-Gateway 插件,来支持 stub 代码的生成
需要修改 proto 文件,添加 http 相关注释
service Greeter {
rpc SayHello (HelloRequest) returns (HelloReply) {
// 为 rpc 方法添加 google.api.http 注释
option (google.api.http) = {
post: "/v1/example/echo"
body: "*"
};
}
}
protoc -I=pb --go_out=pb --go_opt=paths=source_relative --go-grpc_out=pb --go-grpc_opt=paths=source_relative --grpc-gateway_out=pb --grpc-gateway_opt=paths=source_relative hello.proto
随后需要在 server 端的 main.go 中启动一个 gateway client 代理连接 rpc 服务,并监听新的端口上的 http 请求
lis, _ := net.Listen("tcp", ":8972")
s := grpc.NewServer(
grpc.UnaryInterceptor(unaryInterceptor),
)
pb.RegisterGreeterServer(s, &service{})
go func() {
log.Fatalln(s.Serve(lis)) // 异步启动 grpc 服务端
}()
// 和客户端创建 ClientConn 的方式一致
conn, _ := grpc.NewClient(
"127.0.0.1:8972", // 连接到本地 gRPC 服务
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
gwmux := runtime.NewServeMux()
// 将 http handler 的处理 forward 到 conn 的 rpc 客户端中
pb.RegisterGreeterHandler(context.Background(), gwmux, conn)
// 对外开放 8090 端口
gwServer := &http.Server{
Addr: ":8090",
Handler: gwmux,
}
- ServeMux 是 HTTP 服务器的路由器/分发器,负责将不同的 HTTP 请求路径分发到对应的处理函数,根据请求的 URL 路径决定由哪个处理器来处理
- grpc.NewClient 是 grpc 新版的创建空闲连接(只有在真正发起 rpc 请求时才会调用 dail)的方法,其返回的 ClientConn 类型的 conn 本质上(嵌入了)是一个 socket
值得注意的是,这里会把 http 请求 header中的字段解码到服务端 interceptor 的 context 中
名称解析与负载均衡¶
名称解析¶
grpc 默认采用 DNS 解析服务进程 ip 地址
底层会调用进行 DNS 解析
来获取地址
默认情况下,gRPC-Go 的
dnsResolver
会周期性地重新解析 DNS 记录,以捕获服务端 IP 的变化(如 Kubernetes Pod 重启导致 IP 变更)。使用 time.Ticker 来 30s 检查一次
负载均衡¶
grpc 中的负载均衡是以请求为单位的(一个 client 多次发送请求发送到不同的服务)
grpc 默认实现了列表第一和 RR 策略
附录¶
http 的演进:¶
- 1.0 -> 1.1
- http1.0 不是长连接
- http1.1 默认 Connection:keep-alive,增加了 pipeline 技术:第二次,第三次请求不需要等待第一次请求的返回再发出
-
1.1 -> 2.0
- http1.1 存在队头阻塞:请求 1 的响应如果没有准备好,请求 2,3 的响应即使准备好了,也需要延迟发送
-
http2.0:通过在应用层划分帧,给每个帧一个 streamID,可以实现乱序组装
这里说的是同一个 Stream 的帧可以分块发送(同一个 StreamID 下各个数据块的传输顺序是固定的),且不同 Stream 的帧可以交替传输(通过 StreamID 来分辨属于不同请求 / 响应对的数据块)
一对请求 / 响应共用同一个 StreamID,但是传输方向不同
http2 帧格式¶
每个 HTTP/2 帧的二进制格式(共 9 字节头部 + 可变长度负载):
Type 规定的帧类型下,header / data 等在同一个 streamid 下分块传输
settings frame 用于控制连接
trailer 帧是一种特殊的 header 帧,设置
END_HEADERS
、END_STREAM
字段 | 长度 | 说明 |
---|---|---|
Length | 3 字节 | 负载(Payload)的长度(不包括头部),最大 16KB(2^14 - 1)。 |
Type | 1 字节 | 帧类型(如 HEADERS 、DATA 、SETTINGS 等)。 |
Flags | 1 字节 | 帧的标志位(如 END_HEADERS 、END_STREAM ),用于控制帧的行为。 |
Stream ID | 4 字节 | 流的唯一标识符(0 表示连接控制帧 SETTINGS )。 |
Payload | 可变长度 | 实际数据(如 HTTP 头部、请求体等)。 |
grpc 使用到的 http2 特性¶
- 客户端同时发起多个 UnaryInvoke 时,可以共用一个 tcp 连接(在 net/http2 包中利用 stream ID 分辨)
- 使用 HTTP/2 将数据分解为更小的二进制帧(
HEADERS
帧和DATA
帧) 的机制 - 利用 HTTP2 的 HPACK 压缩请求/响应头的机制,将 metadata (原始为 Content-Type: application/grpc)压缩后进行传输