跳转至

Consul

image-20250625102610395

Consul Agent

agent 是 consul 集群的基本组成单元,其为运行在每台机器上的 consul 守护进程。通过运行 consul 的二进制文件来启动,可以分别以 client 模式和 agent 模式启动

在 client agent 上(也就是通过本机)可以将当前机器上的服务通过 http 请求

local service_id = "gateway_server-" .. my_ip .. "-80"
local service_name = "gateway_server"
local consul_url = "http://127.0.0.1:8500/v1/agent/service/register"
local service = {
    ID = service_id,
    Name = service_name,
    Address = my_ip,
    Port = 80,
    Check = {
        HTTP = "http://" .. my_ip .. ":80/echo", -- health check ip and port
        Interval = "5s",
        Timeout = "3s",
        DeregisterCriticalServiceAfter = "1m",
    },
}
local request_body = json.encode(service)
local httpc = require("resty.http").new()
local res, err = httpc:request_uri(consul_url, {
    method = "PUT",
    body = request_body,
    headers = {
        ["Content-Type"] = "application/json",
    },
})

注册到 consul server 中。并且,client agent 也只支持注销本机注册的实例,而不支持注销集群中的其他实例(会返回 Unknown service ID )

curl -X PUT http://127.0.0.1:8500/v1/agent/service/deregister/SERVICE_ID

两种运行模式:

  • Server Agent:服务端代理(也就是 consul servers)

    • 通过 Raft 共识算法维护一个分布式键值存储,所有数据(包括服务注册信息、KV 数据、ACL 规则等)会持久化到磁盘(通常是 /consul/data 文件夹下),并在 Server 节点间同步

      这种持久化存储的策略使得 consul 集群在重启时,可以在(通常是 /var/lib/consul 目录下)中拿到持久化到磁盘中的注册信息。

  • Client Agent:客户端代理(也就是 consul clients)

    • 负责将请求从服务转发给 server agent,维护本地服务注册信息,并进行健康检查;主要作为一个通信工具,和 server 进行 http 交互,封装了各种 Consul API 的调用
    • 另一方面 consul 采用 pull 的方式,client 还会定期接收 server 的健康检查。例如 hashicorp 的 consul api 提供的 Register 的 api,服务实例向 server 注册后,server 根据配置中的 interval 定期向 client 所在机器的服务端口发送 tcp 检查

      采用 pull 模式的优势为,可以直接性地去验证服务端口的可达性

      check := &api.AgentServiceCheck{
          Interval:                       serviceOptions.Interval,
          Timeout:                        serviceOptions.Timeout,
          TCP:                            fmt.Sprintf("%s:%s", host, port),
          TLSSkipVerify:                  TLSSkipVerify,
          DeregisterCriticalServiceAfter: serviceOptions.DeregisterCriticalServiceAfter,
      }
      return r.opts.client.Agent().ServiceRegister(&api.AgentServiceRegistration{
          Kind:    api.ServiceKindTypical,
          ID:      genAgentServiceId(service, host, port),
          Name:    service,
          Port:    pt,
          Address: host,
          Check:   check,
          Tags:    serviceOptions.Tags,
          Meta:    serviceOptions.Meta,
          Weights: &api.AgentWeights{
              Passing: serviceOptions.Weight,
              Warning: serviceOptions.Weight,
          },
      })
      

gateway 服务也是在本机上有一个 client 代理,用于和 leader 通信。也就是说,通过访问本机的 8500 端口,可以查看当前节点,乃至于整个集群的信息

长轮询机制

与上面提到的定期健康检查不同:

  • 定期的的健康检查是从服务的注册方角度出发的,由 server 定期来检查注册的服务是否可用,基于 Interval 参数进行
  • 长轮询机制是从服务的调方角度出发的,由 client 主动发起,用于服务列表的实时性更新

以下为 hashicorp consul api@v1.8.1 对外提供的监听器方法,在服务节点发生变化时,会调用 Handler(一般由用户定义)来标记和处理变化

// RunWithClientAndLogger runs a watch plan using an external client and
// hclog.Logger instance. Using this, the plan's Datacenter, Token and LogOutput
// fields are ignored and the passed client is expected to be configured as
// needed.
func (p *Plan) RunWithClientAndHclog(client *consulapi.Client, logger hclog.Logger) error {
    var watchLogger hclog.Logger
    if logger == nil {
        watchLogger = newWatchLogger(nil)
    } else {
        watchLogger = logger.Named(watchLoggerName)
    }

    p.client = client

    // Loop until we are canceled
    failures := 0
OUTER:
    for !p.shouldStop() {
        /* 关键长轮询请求 */
        blockParamVal, result, err := p.Watcher(p)

        // Check if we should terminate since the function
        // could have blocked for a while
        if p.shouldStop() {
            break
        }

        // Handle an error in the watch function
        if err != nil {
             // 网络错误 / 超时的重试
            // Perform an exponential backoff
            failures++
            if blockParamVal == nil {
                p.lastParamVal = nil
            } else {
                p.lastParamVal = blockParamVal.Next(p.lastParamVal)
            }
            retry := retryInterval * time.Duration(failures*failures)
            if retry > maxBackoffTime {
                retry = maxBackoffTime
            }
            watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry)
            select {
            case <-time.After(retry):
                continue OUTER
            case <-p.stopCh:
                return nil
            }
        }

        // Clear the failures
        failures = 0

        // If the index is unchanged do nothing
        if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) {
            continue
        }

        // Update the index, look for change
        oldParamVal := p.lastParamVal
        p.lastParamVal = blockParamVal.Next(oldParamVal)
        if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) {
            continue
        }

        /* 触发 handler */
        // Handle the updated result
        p.lastResult = result
        // If a hybrid handler exists use that
        if p.HybridHandler != nil {
            p.HybridHandler(blockParamVal, result)
        } else if p.Handler != nil {
            idx, ok := blockParamVal.(WaitIndexVal)
            if !ok {
                watchLogger.Error("Handler only supports index-based " +
                    " watches but non index-based watch run. Skipping Handler.")
            }
            p.Handler(uint64(idx), result)
        }
    }
    return nil
}

其中 p.Watcher(p) 就是使用长轮询来请求服务端的变化,具体来说:

这是一个阻塞函数,会发送一个带有版本号和超时时间的 HTTP 请求到 server agent:

GET /v1/health/service/my-service?index=12345&wait=60s

server agent 会开启一个协程阻塞地维持这个连接,在检测到服务节点发生变化时,立刻返回响应(最多阻塞 wait 秒)

基于 Raft 的强一致性 (CP)

Raft 共识协议在 consul 的 server 端进行(与 client agent 无关),用于同一个数据中心下多个 server 的数据同步与选举

  • term:标记 leader 的一个任期
  • index:日志条目编号

每条日志包括 index,term,以及一条状态机指令(记录对于系统状态或变量的修改)

当 leader 选出后,对于每条客户端当请求,会将其作为日志条目,然后向所有 follower 发起 AppendEntries RPC 复制日志条目。当这条日志被复制到 2/3 以上的 follower 确认接收后,leader 提交这条日志,并通知 followers 提交

image-20250626171921643

在提交完成后,就会执行状态修改

http 访问命令

# 启用 player 服务维护模式
curl -X PUT "http://localhost:8500/v1/agent/service/maintenance/service-id?enable=true&reason=player-service-maintenance"

# 验证 player 服务维护状态
curl http://localhost:8500/v1/agent/service/maintenance/player-172.16.0.13-8040

# 禁用 player 服务维护模式
curl -X PUT "http://localhost:8500/v1/agent/service/maintenance/service-id?enable=false"

这里的 service-id 区别于 service-name,标注了具体的服务名称

# 查看本节点的所有服务
curl http://localhost:8500/v1/agent/services | jq '.'

# 查看服务的详细信息
curl http://localhost:8500/v1/agent/services | jq 'to_entries[] | {name: .key, service: .value.Service, port: .value.Port, address: .value.Address}'