跳转至

Grpc

grpc 数据格式

grpc 基于 http2 应用层协议,底层使用 tcp 连接

HEADERS (gRPC metadata)
  └─ :method = POST
     :path = /package.Service/Method
     content-type = application/grpc
     grpc-timeout = 1s
DATA (gRPC message)
  └─ [5-byte header][Protobuf payload]
TRAILERS (gRPC status)
  └─ grpc-status = 0 (OK)

HEADER frame 中的数据包括 grpc 的 metadata

TRAILERS frame:发送 gRPC 的状态码(如 grpc-status)和错误信息(如 grpc-message),也包括服务端返回的 metadata

其中,DATA frame 包括一个 header 和 protobuf 序列化后的二进制数据:

┌─────────———┬──────────────——┐
│1 byte      │ 4 bytes        │
│Compressed? │ Message Length │
└─────────———┴──────────────——┘

grpc 元数据

相当于是以整洁的 kv 格式组织起来了一些特殊的数据,方便地在 client 和 server 之间传递

概念

  • 在处理 rpc 请求和响应过程中需要,但是又不属于具体业务的信息,可以包含一些通用的 token、请求标识等
  • 元数据对 grpc 本身是不可见的,需要在程序或中间件中处理
  • go 语言下使用 google.golang.org/grpc/metadata 库来操作元数据
// metadata 的类型定义
type MD map[string][]string

metadata 的创建

第一种方法是使用函数 New 基于map[string]string 创建元数据:

md := metadata.New(map[string]string{"key1": "val1", "key2": "val2"})

另一种方法是使用Pairs。具有相同键的值将合并到一个列表中:

md := metadata.Pairs(
    "key1", "val1",
    "key1", "val1-2", // "key1"的值将会是 []string{"val1", "val1-2"}
    "key2", "val2",
)

所有键会自动转换为小写

metadata 传递

  • 客户端向服务端发送的 metadata 通过 context 来传递
  • 服务端向客户端发送的 metadata 通过 header 和 trailer 来传递
客户端发送 metadata

通过 AppendToOutgoingContext 将 k-v 追加到 context(即使 context 中还没有元数据)

// 创建带有metadata的context
ctx := metadata.AppendToOutgoingContext(ctx, "k1", "v1", "k1", "v2", "k2", "v3")

// 添加一些 metadata 到 context (e.g. in an interceptor)
ctx := metadata.AppendToOutgoingContext(ctx, "k3", "v4")

// 发起普通RPC请求
response, err := client.SomeRPC(ctx, someRequest)

AppendToOutgoingContext() 这里实际是将 metadata 通过 context 包的 WithValue 放到了 ctx 中,随后跟随 rpc 方法的调用链传递到最底层再编码到 Header Frame 中

func AppendToOutgoingContext(ctx context.Context, kv ...string) context.Context {
  if len(kv)%2 == 1 {
      panic(fmt.Sprintf("metadata: AppendToOutgoingContext got an odd number of input pairs for metadata: %d", len(kv)))
  }
  md, _ := ctx.Value(mdOutgoingKey{}).(rawMD)
  added := make([][]string, len(md.added)+1)
  copy(added, md.added)
  kvCopy := make([]string, 0, len(kv))
  for i := 0; i < len(kv); i += 2 {
      kvCopy = append(kvCopy, strings.ToLower(kv[i]), kv[i+1])
  }
  added[len(added)-1] = kvCopy
  return context.WithValue(ctx, mdOutgoingKey{}, rawMD{md: md.md, added: added})
}

服务端接收客户端发来的 metadata
func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    md, ok := metadata.FromIncomingContext(ctx)
    // do something with metadata
}

服务端发送 metadata

通过 header 和 trailer 来发送

func (s *server) SomeRPC(ctx context.Context, in *pb.someRequest) (*pb.someResponse, error) {
    // 创建和发送 header
    header := metadata.Pairs("header-key", "val")
    grpc.SendHeader(ctx, header)
    // 创建和发送 trailer
    trailer := metadata.Pairs("trailer-key", "val")
    grpc.SetTrailer(ctx, trailer)
}

客户端接收 metadata

客户端可以接收的元数据包括 header 和 trailer

trailer可以用于服务器希望在处理请求后给客户端发送任何内容,例如在流式RPC中只有等所有结果都流到客户端后才能计算出负载信息,这时候就不能使用headers(header在数据之前,trailer在数据之后)。比如需要计算前面所有数据的 md5 值

http trailer

var header, trailer metadata.MD // 声明存储header和trailer的变量
r, err := client.SomeRPC(
    ctx,
    someRequest,
    grpc.Header(&header),    // 将会接收header
    grpc.Trailer(&trailer),  // 将会接收trailer
)
func Header(md *metadata.MD) CallOption {
    return HeaderCallOption{HeaderAddr: md}
}

// HeaderCallOption is a CallOption for collecting response header metadata.
// The metadata field will be populated *after* the RPC completes.
type HeaderCallOption struct {
    HeaderAddr *metadata.MD
}

func (o HeaderCallOption) before(*callInfo) error { return nil }
func (o HeaderCallOption) after(_ *callInfo, attempt *csAttempt) {
    *o.HeaderAddr, _ = attempt.transportStream.Header()
   }

这里的 grpc.Header(&header) 实际上是传递了一个 CallOption 接口作为 rpc 调用的参数。具体地,grpc.Header() 将需要填充的 header 的地址作为实现了 CallOption 接口的结构体的成员值。

后续在 调用接口方法的时候会为这个地址处的内容赋值

一种设计模式:一个结构体将一个指针作为成员变量,并且绑定一个为成员变量赋值的方法;后续通过调用这个成员方法来修改对应内存地址处的值(指针的灵活性得以体现)

grpc 拦截器

目前探讨非流式的拦截器

  • 预处理
  • 调用中
  • 调用后

通过基于函数类实现自己的 intercepter 方法并注册到框架中,框架会自动根据拦截器链调用 interceptor 方法

在链式拦截器中,每个拦截器都必须显式调用 invoker(客户端)或 handler(服务端),框架的调用是通过嵌套闭包实现的(后一个 interceptor 会成为前一个的 invoker)(洋葱模型)

客户端

方法签名:

func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption) error

这里 grpc 提供的 UnaryInvoker 为函数类型,定义为:

type UnaryInvoker func(
    ctx context.Context,     
    method string,            // 方法名(如 "/service/method")
    req, reply interface{},   // 请求和响应结构体
    cc *ClientConn,          // 客户端底层连接
    opts ...CallOption,      // options
) error

注册:

conn, err := grpc.NewClient("127.0.0.1:8972",
    grpc.WithUnaryInterceptor(unaryInterceptor), // With 返回一个参数为选项表实例的函数
)

服务端

方法签名:

func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)

其中,UnaryHandler 定义为:

type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)

在请求的 processUnaryRPC() 这里 handler 已经通过闭包在内部设置好了需要调用的 service 的具体方法

handler := func(ctx, req interface{}) (interface{}, error) {
    return srv.(GreeterServer).SayHello(ctx, req.(*HelloReq))
}

注册:

s := grpc.NewServer(
    grpc.UnaryInterceptor(unaryInterceptor),
)

gRPC-Gateway

一个 protoc 插件,读取 gRPC 服务定义(远程方法、参数类型),生成一个反向代理服务器,将 RESTful JSON API 转换为 grpc 协议的请求,并返回 RESTful 响应

image-20250614210924504

需要引入依赖

google
└── api
    ├── annotations.proto
    └── http.proto

并下载 gRPC-Gateway 插件,来支持 stub 代码的生成

go install github.com/grpc-ecosystem/grpc-gateway/v2/protoc-gen-grpc-gateway@v2

需要修改 proto 文件,添加 http 相关注释

service Greeter {
  rpc SayHello (HelloRequest) returns (HelloReply) {
    // 为 rpc 方法添加 google.api.http 注释
    option (google.api.http) = {
      post: "/v1/example/echo"
      body: "*"
    };
  }
}
protoc -I=pb --go_out=pb --go_opt=paths=source_relative --go-grpc_out=pb --go-grpc_opt=paths=source_relative --grpc-gateway_out=pb --grpc-gateway_opt=paths=source_relative hello.proto

随后需要在 server 端的 main.go 中启动一个 gateway client 代理连接 rpc 服务,并监听新的端口上的 http 请求

lis, _ := net.Listen("tcp", ":8972")
s := grpc.NewServer(
    grpc.UnaryInterceptor(unaryInterceptor),
)
pb.RegisterGreeterServer(s, &service{})

go func() {
    log.Fatalln(s.Serve(lis)) // 异步启动 grpc 服务端
}()

// 和客户端创建 ClientConn 的方式一致
conn, _ := grpc.NewClient(
    "127.0.0.1:8972", // 连接到本地 gRPC 服务
    grpc.WithTransportCredentials(insecure.NewCredentials()),
)

gwmux := runtime.NewServeMux()
// 将 http handler 的处理 forward 到 conn 的 rpc 客户端中
pb.RegisterGreeterHandler(context.Background(), gwmux, conn)

// 对外开放 8090 端口
gwServer := &http.Server{
    Addr:    ":8090",
    Handler: gwmux,
}
  • ServeMux 是 HTTP 服务器的路由器/分发器,负责将不同的 HTTP 请求路径分发到对应的处理函数,根据请求的 URL 路径决定由哪个处理器来处理
  • grpc.NewClient 是 grpc 新版的创建空闲连接(只有在真正发起 rpc 请求时才会调用 dail)的方法,其返回的 ClientConn 类型的 conn 本质上(嵌入了)是一个 socket

值得注意的是,这里会把 http 请求 header中的字段解码到服务端 interceptor 的 context 中

名称解析与负载均衡

名称解析

grpc 默认采用 DNS 解析服务进程 ip 地址

grpc.Dial("dns:///example:50051")

底层会调用进行 DNS 解析

addrs, err := net.LookupHost("example") // 获取 A/AAAA 记录

来获取地址

默认情况下,gRPC-Go 的 dnsResolver 会周期性地重新解析 DNS 记录,以捕获服务端 IP 的变化(如 Kubernetes Pod 重启导致 IP 变更)。使用 time.Ticker 来 30s 检查一次

负载均衡

grpc 中的负载均衡是以请求为单位的(一个 client 多次发送请求发送到不同的服务)

grpc 默认实现了列表第一和 RR 策略

附录

http 的演进:

  • 1.0 -> 1.1
    • http1.0 不是长连接
    • http1.1 默认 Connection:keep-alive,增加了 pipeline 技术:第二次,第三次请求不需要等待第一次请求的返回再发出
  • 1.1 -> 2.0

    • http1.1 存在队头阻塞:请求 1 的响应如果没有准备好,请求 2,3 的响应即使准备好了,也需要延迟发送
    • http2.0:通过在应用层划分帧,给每个帧一个 streamID,可以实现乱序组装

      这里说的是同一个 Stream 的帧可以分块发送(同一个 StreamID 下各个数据块的传输顺序是固定的),且不同 Stream 的帧可以交替传输(通过 StreamID 来分辨属于不同请求 / 响应对的数据块)

      一对请求 / 响应共用同一个 StreamID,但是传输方向不同

http2 帧格式

每个 HTTP/2 帧的二进制格式(共 9 字节头部 + 可变长度负载):

Type 规定的帧类型下,header / data 等在同一个 streamid 下分块传输

settings frame 用于控制连接

trailer 帧是一种特殊的 header 帧,设置 END_HEADERSEND_STREAM

字段 长度 说明
Length 3 字节 负载(Payload)的长度(不包括头部),最大 16KB(2^14 - 1)。
Type 1 字节 帧类型(如 HEADERSDATASETTINGS 等)。
Flags 1 字节 帧的标志位(如 END_HEADERSEND_STREAM),用于控制帧的行为。
Stream ID 4 字节 流的唯一标识符(0 表示连接控制帧 SETTINGS)。
Payload 可变长度 实际数据(如 HTTP 头部、请求体等)。

grpc 使用到的 http2 特性

  • 客户端同时发起多个 UnaryInvoke 时,可以共用一个 tcp 连接(在 net/http2 包中利用 stream ID 分辨)
  • 使用 HTTP/2 将数据分解为更小的二进制帧(HEADERS 帧和 DATA 帧) 的机制
  • 利用 HTTP2 的 HPACK 压缩请求/响应头的机制,将 metadata (原始为 Content-Type: application/grpc)压缩后进行传输