Skip to content

Observatory:健康检查与出站监控

Observatory 子系统通过探测出站 handler 来判断其健康状况和延迟。负载均衡器(Balancer)利用这些数据为每个连接选择最佳的出站。Xray 提供两种 Observatory 实现:标准 Observer 和突发模式 BurstObserver

架构概览

mermaid
flowchart TD
    subgraph Observatory
        O[Observer] --> P[通过 HTTP GET 探测]
        P --> S[OutboundStatus 列表]
    end

    subgraph BurstObservatory
        BO[BurstObserver] --> HP[HealthPing]
        HP --> HPR[每个标签的 HealthPingRTTS]
        HPR --> BS[包含统计信息的 OutboundStatus]
    end

    subgraph 集成
        S --> Bal[Balancer]
        BS --> Bal
        Bal --> OH[Outbound Handler 选择]
    end

    subgraph gRPC API
        CMD[ObservatoryService] --> O
        CMD --> BO
    end

标准 Observer

文件: app/observatory/observer.go

标准 Observer 通过对每个选定的出站发起 HTTP GET 请求来进行探测。

go
type Observer struct {
    config *Config
    ctx    context.Context

    statusLock sync.Mutex
    status     []*OutboundStatus

    finished *done.Instance

    ohm        outbound.Manager
    dispatcher routing.Dispatcher
}

探测机制

probe() 方法创建一个通过指定 outbound handler 路由的 HTTP 客户端:

go
func (o *Observer) probe(outbound string) ProbeResult {
    httpTransport := http.Transport{
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := v2net.ParseDestination(network + ":" + addr)
            trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
            conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
            return conn, err
        },
        TLSHandshakeTimeout: 5 * time.Second,
    }
    httpClient := &http.Client{
        Transport: &httpTransport,
        Timeout:   5 * time.Second,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }

    probeURL := "https://www.google.com/generate_204"
    if o.config.ProbeUrl != "" {
        probeURL = o.config.ProbeUrl
    }

    startTime := time.Now()
    response, err := httpClient.Do(req)
    GETTime := time.Since(startTime)

    if err != nil {
        return ProbeResult{Alive: false, LastErrorReason: errorMessage}
    }
    return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}

关键细节:

  • 使用 tagged.Dialer() 将探测连接路由到指定的 outbound
  • 通过 session.TrackedConnectionError() 进行错误追踪,捕获底层连接故障
  • 不跟随重定向(ErrUseLastResponse
  • 默认探测 URL:https://www.google.com/generate_204
  • 超时时间:TLS 握手和整体请求均为 5 秒

后台循环

go
func (o *Observer) background() {
    for !o.finished.Done() {
        hs, _ := o.ohm.(outbound.HandlerSelector)
        outbounds := hs.Select(o.config.SubjectSelector)

        sleepTime := time.Second * 10
        if o.config.ProbeInterval != 0 {
            sleepTime = time.Duration(o.config.ProbeInterval)
        }

        if !o.config.EnableConcurrency {
            // 顺序模式:逐个探测,每次间隔休眠
            for _, v := range outbounds {
                result := o.probe(v)
                o.updateStatusForResult(v, &result)
                time.Sleep(sleepTime)
            }
        } else {
            // 并发模式:同时探测所有出站,等待全部完成
            ch := make(chan struct{}, len(outbounds))
            for _, v := range outbounds {
                go func(v string) {
                    result := o.probe(v)
                    o.updateStatusForResult(v, &result)
                    ch <- struct{}{}
                }(v)
            }
            for range outbounds { <-ch }
            time.Sleep(sleepTime)
        }
    }
}

SubjectSelector 是一组标签模式,与 outbound.HandlerSelector.Select() 配合使用,用于匹配 outbound handler 的标签(支持前缀匹配)。

状态跟踪

go
type OutboundStatus struct {
    Alive           bool
    Delay           int64   // 毫秒
    LastErrorReason string
    OutboundTag     string
    LastSeenTime    int64   // Unix 时间戳
    LastTryTime     int64   // Unix 时间戳
}

当探测失败时,延迟设置为 99999999(在排序中相当于无穷大)。

Burst Observer

文件: app/observatory/burst/burstobserver.go

Burst Observer 提供更精细的健康检查,包含统计分析功能(平均值、标准差、最小值、最大值、失败率)。

go
type Observer struct {
    config     *Config
    ctx        context.Context
    statusLock sync.Mutex
    hp         *HealthPing
    finished   *done.Instance
    ohm        outbound.Manager
}

HealthPing

文件: app/observatory/burst/healthping.go

核心健康检查引擎:

go
type HealthPing struct {
    ctx         context.Context
    dispatcher  routing.Dispatcher
    access      sync.Mutex
    ticker      *time.Ticker
    tickerClose chan struct{}
    Settings    *HealthPingSettings
    Results     map[string]*HealthPingRTTS
}

配置项:

配置项默认值说明
Destinationhttps://connectivitycheck.gstatic.com/generate_204探测 URL
Interval1 分钟采样轮次间隔
SamplingCount10每轮采样次数
Timeout5 秒单次探测超时
HttpMethodHEADHTTP 方法
Connectivity(空)用于检查网络是否可用的 URL

采样策略

调度器基于定时器运行,间隔 = Interval * SamplingCount

go
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
    interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
    ticker := time.NewTicker(interval)
    // 初始检查
    go func() { h.Check(tags) }()
    // 周期性检查
    go func() {
        for {
            go func() {
                h.doCheck(tags, interval, h.Settings.SamplingCount)
                h.Cleanup(tags)
            }()
            <-ticker.C
        }
    }()
}

doCheck() 方法在采样周期内随机分布探测请求:

go
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
    for _, tag := range tags {
        client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
        for i := 0; i < rounds; i++ {
            delay := time.Duration(dice.RollInt63n(int64(duration)))
            time.AfterFunc(delay, func() {
                delay, err := client.MeasureDelay(h.Settings.HttpMethod)
                // 上报结果
            })
        }
    }
}

这种随机分布可以避免所有探测请求同时冲击网络。

Ping 客户端

文件: app/observatory/burst/ping.go

go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
    tr := &http.Transport{
        DisableKeepAlives: true,
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := net.ParseDestination(network + ":" + addr)
            return tagged.Dialer(ctxv, dispatcher, dest, handler)
        },
    }
    return &http.Client{
        Transport: tr,
        Timeout:   timeout,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }
}

注意:DisableKeepAlives: true 确保每次探测都创建新的连接。

网络连通性检查

在将探测标记为失败之前,系统可以选择性地检查网络本身是否可用:

go
func (h *HealthPing) checkConnectivity() bool {
    if h.Settings.Connectivity == "" { return true }
    tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
    _, err := tester.MeasureDelay(h.Settings.HttpMethod)
    return err == nil
}

如果网络不可用,探测结果将被丢弃(RTT = 0,不存储)。

统计分析

文件: app/observatory/burst/healthping_result.go

HealthPingRTTS 维护一个带有时间有效性的环形缓冲区来存储 ping 结果:

go
type HealthPingRTTS struct {
    idx      int              // 当前写入位置
    cap      int              // 缓冲区容量
    validity time.Duration    // 结果最大有效时长
    rtts     []*pingRTT       // 环形缓冲区
    stats    *HealthPingStats // 缓存的统计数据
}

计算的统计指标:

go
type HealthPingStats struct {
    All       int            // 有效样本总数
    Fail      int            // 探测失败次数
    Deviation time.Duration  // 标准差
    Average   time.Duration  // 平均 RTT
    Max       time.Duration  // 最大 RTT
    Min       time.Duration  // 最小 RTT
}

对于只有 1 个样本的节点,标准差估算为 Average / 2,以防止其因数据不足而被不合理地优先选择。

gRPC API

文件: app/observatory/command/command.go

go
type service struct {
    observatory extension.Observatory
}

func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
    resp, _ := s.observatory.GetObservation(ctx)
    retdata := resp.(*observatory.ObservationResult)
    return &GetOutboundStatusResponse{Status: retdata}, nil
}

两种 Observer 实现都满足 extension.Observatory 接口:

go
type Observatory interface {
    GetObservation(ctx context.Context) (proto.Message, error)
}

错误收集

文件: app/observatory/explainErrors.go

go
type errorCollector struct {
    errors *errors.Error
}

func (e *errorCollector) SubmitError(err error) {
    if e.errors == nil {
        e.errors = errors.New("underlying connection error").Base(err)
    } else {
        e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
    }
}

错误收集器将连接错误链式串联以便诊断日志记录。它通过 session.TrackedConnectionError() 传递,从而捕获底层传输层的故障。

配置

标准 Observatory:

json
{
    "observatory": {
        "subjectSelector": ["proxy_"],
        "probeUrl": "https://www.google.com/generate_204",
        "probeInterval": "10s",
        "enableConcurrency": true
    }
}

Burst Observatory:

json
{
    "burstObservatory": {
        "subjectSelector": ["proxy_"],
        "pingConfig": {
            "destination": "https://connectivitycheck.gstatic.com/generate_204",
            "interval": "1m",
            "samplingCount": 10,
            "timeout": "5s",
            "httpMethod": "HEAD",
            "connectivity": "https://connectivitycheck.gstatic.com/generate_204"
        }
    }
}

实现要点

  • 两种 Observer 均通过 common.RegisterConfig() 注册,当其 Protobuf 配置出现在 App 列表中时被实例化。

  • extension.ObservatoryType() 的返回值确保同一时间只能有一个 Observatory 处于活跃状态(标准或突发,不能同时启用)。

  • 标准 Observer 的 updateStatus() 方法目前对移除过期条目的处理是空操作(标记了 TODO 注释)。

  • Burst Observer 的 Cleanup() 方法会移除不再被选择器匹配的 outbound 标签的结果,以处理 handler 动态移除的情况。

  • Burst 模式下健康检查的有效期为 Interval * SamplingCount * 2——采样周期的两倍,以适应探测的随机分布。

  • rttFailed 常量(定义在 burst 包中)表示探测失败,作为哨兵值存储在环形缓冲区中,计入 Fail 统计。

  • 两种 Observer 均为延迟启动:仅在 Start() 被调用且 SubjectSelector 非空时才开始探测。

用于重新实现目的的技术分析。