Observatory: Health Checking and Outbound Monitoring
The Observatory subsystem probes outbound handlers to determine their health and latency. Balancers use this data to select the best outbound for each connection. Xray provides two observatory implementations: the standard Observer and the burst-mode BurstObserver.
Architecture Overview
flowchart TD
subgraph Observatory
O[Observer] --> P[probe via HTTP GET]
P --> S[OutboundStatus list]
end
subgraph BurstObservatory
BO[BurstObserver] --> HP[HealthPing]
HP --> HPR[HealthPingRTTS per tag]
HPR --> BS[OutboundStatus with stats]
end
subgraph Integration
S --> Bal[Balancer]
BS --> Bal
Bal --> OH[Outbound Handler selection]
end
subgraph gRPC API
CMD[ObservatoryService] --> O
CMD --> BO
endStandard Observer
File: app/observatory/observer.go
The standard observer probes each selected outbound by making HTTP GET requests through it.
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
status []*OutboundStatus
finished *done.Instance
ohm outbound.Manager
dispatcher routing.Dispatcher
}Probe Mechanism
The probe() method creates an HTTP client that routes through a specific outbound handler:
func (o *Observer) probe(outbound string) ProbeResult {
httpTransport := http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := v2net.ParseDestination(network + ":" + addr)
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
return conn, err
},
TLSHandshakeTimeout: 5 * time.Second,
}
httpClient := &http.Client{
Transport: &httpTransport,
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
probeURL := "https://www.google.com/generate_204"
if o.config.ProbeUrl != "" {
probeURL = o.config.ProbeUrl
}
startTime := time.Now()
response, err := httpClient.Do(req)
GETTime := time.Since(startTime)
if err != nil {
return ProbeResult{Alive: false, LastErrorReason: errorMessage}
}
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}Key details:
- Uses
tagged.Dialer()to route the probe connection through the specific outbound - Error tracking via
session.TrackedConnectionError()captures underlying connection failures - Does not follow redirects (
ErrUseLastResponse) - Default probe URL:
https://www.google.com/generate_204 - Timeout: 5 seconds for both TLS handshake and overall request
Background Loop
func (o *Observer) background() {
for !o.finished.Done() {
hs, _ := o.ohm.(outbound.HandlerSelector)
outbounds := hs.Select(o.config.SubjectSelector)
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
if !o.config.EnableConcurrency {
// Sequential: probe one at a time with sleep between
for _, v := range outbounds {
result := o.probe(v)
o.updateStatusForResult(v, &result)
time.Sleep(sleepTime)
}
} else {
// Concurrent: probe all at once, wait for all
ch := make(chan struct{}, len(outbounds))
for _, v := range outbounds {
go func(v string) {
result := o.probe(v)
o.updateStatusForResult(v, &result)
ch <- struct{}{}
}(v)
}
for range outbounds { <-ch }
time.Sleep(sleepTime)
}
}
}The SubjectSelector is a list of tag patterns used with outbound.HandlerSelector.Select() to match outbound handler tags (supports prefix matching).
Status Tracking
type OutboundStatus struct {
Alive bool
Delay int64 // milliseconds
LastErrorReason string
OutboundTag string
LastSeenTime int64 // unix timestamp
LastTryTime int64 // unix timestamp
}When a probe fails, delay is set to 99999999 (effectively infinity for sorting).
Burst Observer
File: app/observatory/burst/burstobserver.go
The burst observer provides more sophisticated health checking with statistical analysis (average, deviation, min, max, fail rate).
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
hp *HealthPing
finished *done.Instance
ohm outbound.Manager
}HealthPing
File: app/observatory/burst/healthping.go
The core health checking engine:
type HealthPing struct {
ctx context.Context
dispatcher routing.Dispatcher
access sync.Mutex
ticker *time.Ticker
tickerClose chan struct{}
Settings *HealthPingSettings
Results map[string]*HealthPingRTTS
}Settings:
| Setting | Default | Description |
|---|---|---|
Destination | https://connectivitycheck.gstatic.com/generate_204 | Probe URL |
Interval | 1 minute | Time between sampling rounds |
SamplingCount | 10 | Number of samples per round |
Timeout | 5 seconds | Per-probe timeout |
HttpMethod | HEAD | HTTP method |
Connectivity | (empty) | URL to check if network is up |
Sampling Strategy
The scheduler runs on a ticker with interval = Interval * SamplingCount:
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
ticker := time.NewTicker(interval)
// Initial check
go func() { h.Check(tags) }()
// Periodic checks
go func() {
for {
go func() {
h.doCheck(tags, interval, h.Settings.SamplingCount)
h.Cleanup(tags)
}()
<-ticker.C
}
}()
}The doCheck() method distributes probes randomly within the sampling period:
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
for _, tag := range tags {
client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
for i := 0; i < rounds; i++ {
delay := time.Duration(dice.RollInt63n(int64(duration)))
time.AfterFunc(delay, func() {
delay, err := client.MeasureDelay(h.Settings.HttpMethod)
// report result
})
}
}
}This random distribution prevents all probes from hitting the network simultaneously.
Ping Client
File: app/observatory/burst/ping.go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
tr := &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := net.ParseDestination(network + ":" + addr)
return tagged.Dialer(ctxv, dispatcher, dest, handler)
},
}
return &http.Client{
Transport: tr,
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
}Note: DisableKeepAlives: true ensures each probe creates a fresh connection.
Connectivity Check
Before marking a probe as failed, the system can optionally check if the network itself is down:
func (h *HealthPing) checkConnectivity() bool {
if h.Settings.Connectivity == "" { return true }
tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
_, err := tester.MeasureDelay(h.Settings.HttpMethod)
return err == nil
}If the network is down, the probe result is discarded (RTT = 0, not stored).
Statistical Analysis
File: app/observatory/burst/healthping_result.go
The HealthPingRTTS maintains a circular buffer of ping results with time-based validity:
type HealthPingRTTS struct {
idx int // current write position
cap int // buffer capacity
validity time.Duration // max age of a result
rtts []*pingRTT // circular buffer
stats *HealthPingStats // cached statistics
}Statistics computed:
type HealthPingStats struct {
All int // total valid samples
Fail int // failed probes
Deviation time.Duration // standard deviation
Average time.Duration // mean RTT
Max time.Duration // maximum RTT
Min time.Duration // minimum RTT
}For nodes with only 1 sample, standard deviation is estimated as Average / 2 to prevent them from being unfairly preferred over nodes with more data.
gRPC API
File: app/observatory/command/command.go
type service struct {
observatory extension.Observatory
}
func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
resp, _ := s.observatory.GetObservation(ctx)
retdata := resp.(*observatory.ObservationResult)
return &GetOutboundStatusResponse{Status: retdata}, nil
}Both Observer implementations satisfy the extension.Observatory interface:
type Observatory interface {
GetObservation(ctx context.Context) (proto.Message, error)
}Error Collection
File: app/observatory/explainErrors.go
type errorCollector struct {
errors *errors.Error
}
func (e *errorCollector) SubmitError(err error) {
if e.errors == nil {
e.errors = errors.New("underlying connection error").Base(err)
} else {
e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
}
}The error collector chains connection errors for diagnostic logging. It's passed through session.TrackedConnectionError() so that low-level transport failures are captured.
Configuration
Standard Observatory:
{
"observatory": {
"subjectSelector": ["proxy_"],
"probeUrl": "https://www.google.com/generate_204",
"probeInterval": "10s",
"enableConcurrency": true
}
}Burst Observatory:
{
"burstObservatory": {
"subjectSelector": ["proxy_"],
"pingConfig": {
"destination": "https://connectivitycheck.gstatic.com/generate_204",
"interval": "1m",
"samplingCount": 10,
"timeout": "5s",
"httpMethod": "HEAD",
"connectivity": "https://connectivitycheck.gstatic.com/generate_204"
}
}
}Implementation Notes
Both observers register via
common.RegisterConfig()and are instantiated when their protobuf config appears in theApplist.The
extension.ObservatoryType()return value ensures only one observatory can be active at a time (standard or burst, not both).The standard observer's
updateStatus()method is currently a no-op for removing stale entries (marked with a TODO comment).The burst observer's
Cleanup()method removes results for outbound tags that are no longer in the selector output, handling dynamic handler removal.Health check validity in burst mode is
Interval * SamplingCount * 2-- double the sampling period to account for random distribution of probes.The
rttFailedconstant (defined in the burst package) represents a probe failure and is stored as a sentinel value in the circular buffer, counting toward theFailstatistic.Both observers are started lazily: they only begin probing when
Start()is called andSubjectSelectoris non-empty.