Skip to content

Observatory: Health Checking and Outbound Monitoring

The Observatory subsystem probes outbound handlers to determine their health and latency. Balancers use this data to select the best outbound for each connection. Xray provides two observatory implementations: the standard Observer and the burst-mode BurstObserver.

Architecture Overview

mermaid
flowchart TD
    subgraph Observatory
        O[Observer] --> P[probe via HTTP GET]
        P --> S[OutboundStatus list]
    end

    subgraph BurstObservatory
        BO[BurstObserver] --> HP[HealthPing]
        HP --> HPR[HealthPingRTTS per tag]
        HPR --> BS[OutboundStatus with stats]
    end

    subgraph Integration
        S --> Bal[Balancer]
        BS --> Bal
        Bal --> OH[Outbound Handler selection]
    end

    subgraph gRPC API
        CMD[ObservatoryService] --> O
        CMD --> BO
    end

Standard Observer

File: app/observatory/observer.go

The standard observer probes each selected outbound by making HTTP GET requests through it.

go
type Observer struct {
    config *Config
    ctx    context.Context

    statusLock sync.Mutex
    status     []*OutboundStatus

    finished *done.Instance

    ohm        outbound.Manager
    dispatcher routing.Dispatcher
}

Probe Mechanism

The probe() method creates an HTTP client that routes through a specific outbound handler:

go
func (o *Observer) probe(outbound string) ProbeResult {
    httpTransport := http.Transport{
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := v2net.ParseDestination(network + ":" + addr)
            trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
            conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
            return conn, err
        },
        TLSHandshakeTimeout: 5 * time.Second,
    }
    httpClient := &http.Client{
        Transport: &httpTransport,
        Timeout:   5 * time.Second,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }

    probeURL := "https://www.google.com/generate_204"
    if o.config.ProbeUrl != "" {
        probeURL = o.config.ProbeUrl
    }

    startTime := time.Now()
    response, err := httpClient.Do(req)
    GETTime := time.Since(startTime)

    if err != nil {
        return ProbeResult{Alive: false, LastErrorReason: errorMessage}
    }
    return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}

Key details:

  • Uses tagged.Dialer() to route the probe connection through the specific outbound
  • Error tracking via session.TrackedConnectionError() captures underlying connection failures
  • Does not follow redirects (ErrUseLastResponse)
  • Default probe URL: https://www.google.com/generate_204
  • Timeout: 5 seconds for both TLS handshake and overall request

Background Loop

go
func (o *Observer) background() {
    for !o.finished.Done() {
        hs, _ := o.ohm.(outbound.HandlerSelector)
        outbounds := hs.Select(o.config.SubjectSelector)

        sleepTime := time.Second * 10
        if o.config.ProbeInterval != 0 {
            sleepTime = time.Duration(o.config.ProbeInterval)
        }

        if !o.config.EnableConcurrency {
            // Sequential: probe one at a time with sleep between
            for _, v := range outbounds {
                result := o.probe(v)
                o.updateStatusForResult(v, &result)
                time.Sleep(sleepTime)
            }
        } else {
            // Concurrent: probe all at once, wait for all
            ch := make(chan struct{}, len(outbounds))
            for _, v := range outbounds {
                go func(v string) {
                    result := o.probe(v)
                    o.updateStatusForResult(v, &result)
                    ch <- struct{}{}
                }(v)
            }
            for range outbounds { <-ch }
            time.Sleep(sleepTime)
        }
    }
}

The SubjectSelector is a list of tag patterns used with outbound.HandlerSelector.Select() to match outbound handler tags (supports prefix matching).

Status Tracking

go
type OutboundStatus struct {
    Alive           bool
    Delay           int64   // milliseconds
    LastErrorReason string
    OutboundTag     string
    LastSeenTime    int64   // unix timestamp
    LastTryTime     int64   // unix timestamp
}

When a probe fails, delay is set to 99999999 (effectively infinity for sorting).

Burst Observer

File: app/observatory/burst/burstobserver.go

The burst observer provides more sophisticated health checking with statistical analysis (average, deviation, min, max, fail rate).

go
type Observer struct {
    config     *Config
    ctx        context.Context
    statusLock sync.Mutex
    hp         *HealthPing
    finished   *done.Instance
    ohm        outbound.Manager
}

HealthPing

File: app/observatory/burst/healthping.go

The core health checking engine:

go
type HealthPing struct {
    ctx         context.Context
    dispatcher  routing.Dispatcher
    access      sync.Mutex
    ticker      *time.Ticker
    tickerClose chan struct{}
    Settings    *HealthPingSettings
    Results     map[string]*HealthPingRTTS
}

Settings:

SettingDefaultDescription
Destinationhttps://connectivitycheck.gstatic.com/generate_204Probe URL
Interval1 minuteTime between sampling rounds
SamplingCount10Number of samples per round
Timeout5 secondsPer-probe timeout
HttpMethodHEADHTTP method
Connectivity(empty)URL to check if network is up

Sampling Strategy

The scheduler runs on a ticker with interval = Interval * SamplingCount:

go
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
    interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
    ticker := time.NewTicker(interval)
    // Initial check
    go func() { h.Check(tags) }()
    // Periodic checks
    go func() {
        for {
            go func() {
                h.doCheck(tags, interval, h.Settings.SamplingCount)
                h.Cleanup(tags)
            }()
            <-ticker.C
        }
    }()
}

The doCheck() method distributes probes randomly within the sampling period:

go
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
    for _, tag := range tags {
        client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
        for i := 0; i < rounds; i++ {
            delay := time.Duration(dice.RollInt63n(int64(duration)))
            time.AfterFunc(delay, func() {
                delay, err := client.MeasureDelay(h.Settings.HttpMethod)
                // report result
            })
        }
    }
}

This random distribution prevents all probes from hitting the network simultaneously.

Ping Client

File: app/observatory/burst/ping.go

go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
    tr := &http.Transport{
        DisableKeepAlives: true,
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := net.ParseDestination(network + ":" + addr)
            return tagged.Dialer(ctxv, dispatcher, dest, handler)
        },
    }
    return &http.Client{
        Transport: tr,
        Timeout:   timeout,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }
}

Note: DisableKeepAlives: true ensures each probe creates a fresh connection.

Connectivity Check

Before marking a probe as failed, the system can optionally check if the network itself is down:

go
func (h *HealthPing) checkConnectivity() bool {
    if h.Settings.Connectivity == "" { return true }
    tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
    _, err := tester.MeasureDelay(h.Settings.HttpMethod)
    return err == nil
}

If the network is down, the probe result is discarded (RTT = 0, not stored).

Statistical Analysis

File: app/observatory/burst/healthping_result.go

The HealthPingRTTS maintains a circular buffer of ping results with time-based validity:

go
type HealthPingRTTS struct {
    idx      int              // current write position
    cap      int              // buffer capacity
    validity time.Duration    // max age of a result
    rtts     []*pingRTT       // circular buffer
    stats    *HealthPingStats // cached statistics
}

Statistics computed:

go
type HealthPingStats struct {
    All       int            // total valid samples
    Fail      int            // failed probes
    Deviation time.Duration  // standard deviation
    Average   time.Duration  // mean RTT
    Max       time.Duration  // maximum RTT
    Min       time.Duration  // minimum RTT
}

For nodes with only 1 sample, standard deviation is estimated as Average / 2 to prevent them from being unfairly preferred over nodes with more data.

gRPC API

File: app/observatory/command/command.go

go
type service struct {
    observatory extension.Observatory
}

func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
    resp, _ := s.observatory.GetObservation(ctx)
    retdata := resp.(*observatory.ObservationResult)
    return &GetOutboundStatusResponse{Status: retdata}, nil
}

Both Observer implementations satisfy the extension.Observatory interface:

go
type Observatory interface {
    GetObservation(ctx context.Context) (proto.Message, error)
}

Error Collection

File: app/observatory/explainErrors.go

go
type errorCollector struct {
    errors *errors.Error
}

func (e *errorCollector) SubmitError(err error) {
    if e.errors == nil {
        e.errors = errors.New("underlying connection error").Base(err)
    } else {
        e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
    }
}

The error collector chains connection errors for diagnostic logging. It's passed through session.TrackedConnectionError() so that low-level transport failures are captured.

Configuration

Standard Observatory:

json
{
    "observatory": {
        "subjectSelector": ["proxy_"],
        "probeUrl": "https://www.google.com/generate_204",
        "probeInterval": "10s",
        "enableConcurrency": true
    }
}

Burst Observatory:

json
{
    "burstObservatory": {
        "subjectSelector": ["proxy_"],
        "pingConfig": {
            "destination": "https://connectivitycheck.gstatic.com/generate_204",
            "interval": "1m",
            "samplingCount": 10,
            "timeout": "5s",
            "httpMethod": "HEAD",
            "connectivity": "https://connectivitycheck.gstatic.com/generate_204"
        }
    }
}

Implementation Notes

  • Both observers register via common.RegisterConfig() and are instantiated when their protobuf config appears in the App list.

  • The extension.ObservatoryType() return value ensures only one observatory can be active at a time (standard or burst, not both).

  • The standard observer's updateStatus() method is currently a no-op for removing stale entries (marked with a TODO comment).

  • The burst observer's Cleanup() method removes results for outbound tags that are no longer in the selector output, handling dynamic handler removal.

  • Health check validity in burst mode is Interval * SamplingCount * 2 -- double the sampling period to account for random distribution of probes.

  • The rttFailed constant (defined in the burst package) represents a probe failure and is stored as a sentinel value in the circular buffer, counting toward the Fail statistic.

  • Both observers are started lazily: they only begin probing when Start() is called and SubjectSelector is non-empty.

Technical analysis for re-implementation purposes.