Observatory:健康检查与出站监控
Observatory 子系统通过探测出站 handler 来判断其健康状况和延迟。负载均衡器(Balancer)利用这些数据为每个连接选择最佳的出站。Xray 提供两种 Observatory 实现:标准 Observer 和突发模式 BurstObserver。
架构概览
flowchart TD
subgraph Observatory
O[Observer] --> P[通过 HTTP GET 探测]
P --> S[OutboundStatus 列表]
end
subgraph BurstObservatory
BO[BurstObserver] --> HP[HealthPing]
HP --> HPR[每个标签的 HealthPingRTTS]
HPR --> BS[包含统计信息的 OutboundStatus]
end
subgraph 集成
S --> Bal[Balancer]
BS --> Bal
Bal --> OH[Outbound Handler 选择]
end
subgraph gRPC API
CMD[ObservatoryService] --> O
CMD --> BO
end标准 Observer
文件: app/observatory/observer.go
标准 Observer 通过对每个选定的出站发起 HTTP GET 请求来进行探测。
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
status []*OutboundStatus
finished *done.Instance
ohm outbound.Manager
dispatcher routing.Dispatcher
}探测机制
probe() 方法创建一个通过指定 outbound handler 路由的 HTTP 客户端:
func (o *Observer) probe(outbound string) ProbeResult {
httpTransport := http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := v2net.ParseDestination(network + ":" + addr)
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
return conn, err
},
TLSHandshakeTimeout: 5 * time.Second,
}
httpClient := &http.Client{
Transport: &httpTransport,
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
probeURL := "https://www.google.com/generate_204"
if o.config.ProbeUrl != "" {
probeURL = o.config.ProbeUrl
}
startTime := time.Now()
response, err := httpClient.Do(req)
GETTime := time.Since(startTime)
if err != nil {
return ProbeResult{Alive: false, LastErrorReason: errorMessage}
}
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}关键细节:
- 使用
tagged.Dialer()将探测连接路由到指定的 outbound - 通过
session.TrackedConnectionError()进行错误追踪,捕获底层连接故障 - 不跟随重定向(
ErrUseLastResponse) - 默认探测 URL:
https://www.google.com/generate_204 - 超时时间:TLS 握手和整体请求均为 5 秒
后台循环
func (o *Observer) background() {
for !o.finished.Done() {
hs, _ := o.ohm.(outbound.HandlerSelector)
outbounds := hs.Select(o.config.SubjectSelector)
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
if !o.config.EnableConcurrency {
// 顺序模式:逐个探测,每次间隔休眠
for _, v := range outbounds {
result := o.probe(v)
o.updateStatusForResult(v, &result)
time.Sleep(sleepTime)
}
} else {
// 并发模式:同时探测所有出站,等待全部完成
ch := make(chan struct{}, len(outbounds))
for _, v := range outbounds {
go func(v string) {
result := o.probe(v)
o.updateStatusForResult(v, &result)
ch <- struct{}{}
}(v)
}
for range outbounds { <-ch }
time.Sleep(sleepTime)
}
}
}SubjectSelector 是一组标签模式,与 outbound.HandlerSelector.Select() 配合使用,用于匹配 outbound handler 的标签(支持前缀匹配)。
状态跟踪
type OutboundStatus struct {
Alive bool
Delay int64 // 毫秒
LastErrorReason string
OutboundTag string
LastSeenTime int64 // Unix 时间戳
LastTryTime int64 // Unix 时间戳
}当探测失败时,延迟设置为 99999999(在排序中相当于无穷大)。
Burst Observer
文件: app/observatory/burst/burstobserver.go
Burst Observer 提供更精细的健康检查,包含统计分析功能(平均值、标准差、最小值、最大值、失败率)。
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
hp *HealthPing
finished *done.Instance
ohm outbound.Manager
}HealthPing
文件: app/observatory/burst/healthping.go
核心健康检查引擎:
type HealthPing struct {
ctx context.Context
dispatcher routing.Dispatcher
access sync.Mutex
ticker *time.Ticker
tickerClose chan struct{}
Settings *HealthPingSettings
Results map[string]*HealthPingRTTS
}配置项:
| 配置项 | 默认值 | 说明 |
|---|---|---|
Destination | https://connectivitycheck.gstatic.com/generate_204 | 探测 URL |
Interval | 1 分钟 | 采样轮次间隔 |
SamplingCount | 10 | 每轮采样次数 |
Timeout | 5 秒 | 单次探测超时 |
HttpMethod | HEAD | HTTP 方法 |
Connectivity | (空) | 用于检查网络是否可用的 URL |
采样策略
调度器基于定时器运行,间隔 = Interval * SamplingCount:
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
ticker := time.NewTicker(interval)
// 初始检查
go func() { h.Check(tags) }()
// 周期性检查
go func() {
for {
go func() {
h.doCheck(tags, interval, h.Settings.SamplingCount)
h.Cleanup(tags)
}()
<-ticker.C
}
}()
}doCheck() 方法在采样周期内随机分布探测请求:
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
for _, tag := range tags {
client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
for i := 0; i < rounds; i++ {
delay := time.Duration(dice.RollInt63n(int64(duration)))
time.AfterFunc(delay, func() {
delay, err := client.MeasureDelay(h.Settings.HttpMethod)
// 上报结果
})
}
}
}这种随机分布可以避免所有探测请求同时冲击网络。
Ping 客户端
文件: app/observatory/burst/ping.go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
tr := &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := net.ParseDestination(network + ":" + addr)
return tagged.Dialer(ctxv, dispatcher, dest, handler)
},
}
return &http.Client{
Transport: tr,
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
}注意:DisableKeepAlives: true 确保每次探测都创建新的连接。
网络连通性检查
在将探测标记为失败之前,系统可以选择性地检查网络本身是否可用:
func (h *HealthPing) checkConnectivity() bool {
if h.Settings.Connectivity == "" { return true }
tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
_, err := tester.MeasureDelay(h.Settings.HttpMethod)
return err == nil
}如果网络不可用,探测结果将被丢弃(RTT = 0,不存储)。
统计分析
文件: app/observatory/burst/healthping_result.go
HealthPingRTTS 维护一个带有时间有效性的环形缓冲区来存储 ping 结果:
type HealthPingRTTS struct {
idx int // 当前写入位置
cap int // 缓冲区容量
validity time.Duration // 结果最大有效时长
rtts []*pingRTT // 环形缓冲区
stats *HealthPingStats // 缓存的统计数据
}计算的统计指标:
type HealthPingStats struct {
All int // 有效样本总数
Fail int // 探测失败次数
Deviation time.Duration // 标准差
Average time.Duration // 平均 RTT
Max time.Duration // 最大 RTT
Min time.Duration // 最小 RTT
}对于只有 1 个样本的节点,标准差估算为 Average / 2,以防止其因数据不足而被不合理地优先选择。
gRPC API
文件: app/observatory/command/command.go
type service struct {
observatory extension.Observatory
}
func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
resp, _ := s.observatory.GetObservation(ctx)
retdata := resp.(*observatory.ObservationResult)
return &GetOutboundStatusResponse{Status: retdata}, nil
}两种 Observer 实现都满足 extension.Observatory 接口:
type Observatory interface {
GetObservation(ctx context.Context) (proto.Message, error)
}错误收集
文件: app/observatory/explainErrors.go
type errorCollector struct {
errors *errors.Error
}
func (e *errorCollector) SubmitError(err error) {
if e.errors == nil {
e.errors = errors.New("underlying connection error").Base(err)
} else {
e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
}
}错误收集器将连接错误链式串联以便诊断日志记录。它通过 session.TrackedConnectionError() 传递,从而捕获底层传输层的故障。
配置
标准 Observatory:
{
"observatory": {
"subjectSelector": ["proxy_"],
"probeUrl": "https://www.google.com/generate_204",
"probeInterval": "10s",
"enableConcurrency": true
}
}Burst Observatory:
{
"burstObservatory": {
"subjectSelector": ["proxy_"],
"pingConfig": {
"destination": "https://connectivitycheck.gstatic.com/generate_204",
"interval": "1m",
"samplingCount": 10,
"timeout": "5s",
"httpMethod": "HEAD",
"connectivity": "https://connectivitycheck.gstatic.com/generate_204"
}
}
}实现要点
两种 Observer 均通过
common.RegisterConfig()注册,当其 Protobuf 配置出现在App列表中时被实例化。extension.ObservatoryType()的返回值确保同一时间只能有一个 Observatory 处于活跃状态(标准或突发,不能同时启用)。标准 Observer 的
updateStatus()方法目前对移除过期条目的处理是空操作(标记了 TODO 注释)。Burst Observer 的
Cleanup()方法会移除不再被选择器匹配的 outbound 标签的结果,以处理 handler 动态移除的情况。Burst 模式下健康检查的有效期为
Interval * SamplingCount * 2——采样周期的两倍,以适应探测的随机分布。rttFailed常量(定义在 burst 包中)表示探测失败,作为哨兵值存储在环形缓冲区中,计入Fail统计。两种 Observer 均为延迟启动:仅在
Start()被调用且SubjectSelector非空时才开始探测。