Consul
Consul Agent¶
agent 是 consul 集群的基本组成单元,其为运行在每台机器上的 consul 守护进程。通过运行 consul 的二进制文件来启动,可以分别以 client 模式和 agent 模式启动
在 client agent 上(也就是通过本机)可以将当前机器上的服务通过 http 请求
local service_id = "gateway_server-" .. my_ip .. "-80"
local service_name = "gateway_server"
local consul_url = "http://127.0.0.1:8500/v1/agent/service/register"
local service = {
ID = service_id,
Name = service_name,
Address = my_ip,
Port = 80,
Check = {
HTTP = "http://" .. my_ip .. ":80/echo", -- health check ip and port
Interval = "5s",
Timeout = "3s",
DeregisterCriticalServiceAfter = "1m",
},
}
local request_body = json.encode(service)
local httpc = require("resty.http").new()
local res, err = httpc:request_uri(consul_url, {
method = "PUT",
body = request_body,
headers = {
["Content-Type"] = "application/json",
},
})
注册到 consul server 中。并且,client agent 也只支持注销本机注册的实例,而不支持注销集群中的其他实例(会返回 Unknown service ID )
两种运行模式:
-
Server Agent:服务端代理(也就是 consul servers)
-
通过 Raft 共识算法维护一个分布式键值存储,所有数据(包括服务注册信息、KV 数据、ACL 规则等)会持久化到磁盘(通常是 /consul/data 文件夹下),并在 Server 节点间同步
这种持久化存储的策略使得 consul 集群在重启时,可以在(通常是 /var/lib/consul 目录下)中拿到持久化到磁盘中的注册信息。
-
-
Client Agent:客户端代理(也就是 consul clients)
- 负责将请求从服务转发给 server agent,维护本地服务注册信息,并进行健康检查;主要作为一个通信工具,和 server 进行 http 交互,封装了各种 Consul API 的调用
-
另一方面 consul 采用 pull 的方式,client 还会定期接收 server 的健康检查。例如 hashicorp 的 consul api 提供的 Register 的 api,服务实例向 server 注册后,server 根据配置中的 interval 定期向 client 所在机器的服务端口发送 tcp 检查
采用 pull 模式的优势为,可以直接性地去验证服务端口的可达性
check := &api.AgentServiceCheck{ Interval: serviceOptions.Interval, Timeout: serviceOptions.Timeout, TCP: fmt.Sprintf("%s:%s", host, port), TLSSkipVerify: TLSSkipVerify, DeregisterCriticalServiceAfter: serviceOptions.DeregisterCriticalServiceAfter, } return r.opts.client.Agent().ServiceRegister(&api.AgentServiceRegistration{ Kind: api.ServiceKindTypical, ID: genAgentServiceId(service, host, port), Name: service, Port: pt, Address: host, Check: check, Tags: serviceOptions.Tags, Meta: serviceOptions.Meta, Weights: &api.AgentWeights{ Passing: serviceOptions.Weight, Warning: serviceOptions.Weight, }, })
gateway 服务也是在本机上有一个 client 代理,用于和 leader 通信。也就是说,通过访问本机的 8500 端口,可以查看当前节点,乃至于整个集群的信息
长轮询机制¶
与上面提到的定期健康检查不同:
- 定期的的健康检查是从服务的注册方角度出发的,由 server 定期来检查注册的服务是否可用,基于 Interval 参数进行
- 长轮询机制是从服务的调方角度出发的,由 client 主动发起,用于服务列表的实时性更新
以下为 hashicorp consul api@v1.8.1 对外提供的监听器方法,在服务节点发生变化时,会调用 Handler(一般由用户定义)来标记和处理变化
// RunWithClientAndLogger runs a watch plan using an external client and
// hclog.Logger instance. Using this, the plan's Datacenter, Token and LogOutput
// fields are ignored and the passed client is expected to be configured as
// needed.
func (p *Plan) RunWithClientAndHclog(client *consulapi.Client, logger hclog.Logger) error {
var watchLogger hclog.Logger
if logger == nil {
watchLogger = newWatchLogger(nil)
} else {
watchLogger = logger.Named(watchLoggerName)
}
p.client = client
// Loop until we are canceled
failures := 0
OUTER:
for !p.shouldStop() {
/* 关键长轮询请求 */
blockParamVal, result, err := p.Watcher(p)
// Check if we should terminate since the function
// could have blocked for a while
if p.shouldStop() {
break
}
// Handle an error in the watch function
if err != nil {
// 网络错误 / 超时的重试
// Perform an exponential backoff
failures++
if blockParamVal == nil {
p.lastParamVal = nil
} else {
p.lastParamVal = blockParamVal.Next(p.lastParamVal)
}
retry := retryInterval * time.Duration(failures*failures)
if retry > maxBackoffTime {
retry = maxBackoffTime
}
watchLogger.Error("Watch errored", "type", p.Type, "error", err, "retry", retry)
select {
case <-time.After(retry):
continue OUTER
case <-p.stopCh:
return nil
}
}
// Clear the failures
failures = 0
// If the index is unchanged do nothing
if p.lastParamVal != nil && p.lastParamVal.Equal(blockParamVal) {
continue
}
// Update the index, look for change
oldParamVal := p.lastParamVal
p.lastParamVal = blockParamVal.Next(oldParamVal)
if oldParamVal != nil && reflect.DeepEqual(p.lastResult, result) {
continue
}
/* 触发 handler */
// Handle the updated result
p.lastResult = result
// If a hybrid handler exists use that
if p.HybridHandler != nil {
p.HybridHandler(blockParamVal, result)
} else if p.Handler != nil {
idx, ok := blockParamVal.(WaitIndexVal)
if !ok {
watchLogger.Error("Handler only supports index-based " +
" watches but non index-based watch run. Skipping Handler.")
}
p.Handler(uint64(idx), result)
}
}
return nil
}
其中 p.Watcher(p) 就是使用长轮询来请求服务端的变化,具体来说:
这是一个阻塞函数,会发送一个带有版本号和超时时间的 HTTP 请求到 server agent:
server agent 会开启一个协程阻塞地维持这个连接,在检测到服务节点发生变化时,立刻返回响应(最多阻塞 wait 秒)
基于 Raft 的强一致性 (CP)¶
Raft 共识协议在 consul 的 server 端进行(与 client agent 无关),用于同一个数据中心下多个 server 的数据同步与选举
- term:标记 leader 的一个任期
- index:日志条目编号
每条日志包括 index,term,以及一条状态机指令(记录对于系统状态或变量的修改)
当 leader 选出后,对于每条客户端当请求,会将其作为日志条目,然后向所有 follower 发起 AppendEntries RPC 复制日志条目。当这条日志被复制到 2/3 以上的 follower 确认接收后,leader 提交这条日志,并通知 followers 提交
在提交完成后,就会执行状态修改
http 访问命令¶
# 启用 player 服务维护模式
curl -X PUT "http://localhost:8500/v1/agent/service/maintenance/service-id?enable=true&reason=player-service-maintenance"
# 验证 player 服务维护状态
curl http://localhost:8500/v1/agent/service/maintenance/player-172.16.0.13-8040
# 禁用 player 服务维护模式
curl -X PUT "http://localhost:8500/v1/agent/service/maintenance/service-id?enable=false"
这里的 service-id 区别于 service-name,标注了具体的服务名称