Skip to content

Observatory: проверка состояния и мониторинг исходящих соединений

Подсистема Observatory проверяет обработчики исходящих соединений для определения их работоспособности и задержки. Балансировщики используют эти данные для выбора лучшего outbound для каждого соединения. Xray предоставляет две реализации Observatory: стандартный Observer и BurstObserver (пакетный режим).

Обзор архитектуры

mermaid
flowchart TD
    subgraph Observatory
        O[Observer] --> P[probe via HTTP GET]
        P --> S[OutboundStatus list]
    end

    subgraph BurstObservatory
        BO[BurstObserver] --> HP[HealthPing]
        HP --> HPR[HealthPingRTTS per tag]
        HPR --> BS[OutboundStatus with stats]
    end

    subgraph Integration
        S --> Bal[Balancer]
        BS --> Bal
        Bal --> OH[Outbound Handler selection]
    end

    subgraph gRPC API
        CMD[ObservatoryService] --> O
        CMD --> BO
    end

Стандартный Observer

Файл: app/observatory/observer.go

Стандартный Observer проверяет каждый выбранный outbound, выполняя HTTP GET запросы через него.

go
type Observer struct {
    config *Config
    ctx    context.Context

    statusLock sync.Mutex
    status     []*OutboundStatus

    finished *done.Instance

    ohm        outbound.Manager
    dispatcher routing.Dispatcher
}

Механизм проверки

Метод probe() создаёт HTTP-клиент, маршрутизирующий запросы через конкретный обработчик исходящих соединений:

go
func (o *Observer) probe(outbound string) ProbeResult {
    httpTransport := http.Transport{
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := v2net.ParseDestination(network + ":" + addr)
            trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
            conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
            return conn, err
        },
        TLSHandshakeTimeout: 5 * time.Second,
    }
    httpClient := &http.Client{
        Transport: &httpTransport,
        Timeout:   5 * time.Second,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }

    probeURL := "https://www.google.com/generate_204"
    if o.config.ProbeUrl != "" {
        probeURL = o.config.ProbeUrl
    }

    startTime := time.Now()
    response, err := httpClient.Do(req)
    GETTime := time.Since(startTime)

    if err != nil {
        return ProbeResult{Alive: false, LastErrorReason: errorMessage}
    }
    return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}

Ключевые детали:

  • Используется tagged.Dialer() для маршрутизации проверочного соединения через конкретный outbound
  • Отслеживание ошибок через session.TrackedConnectionError() фиксирует низкоуровневые сбои соединения
  • Не следует за перенаправлениями (ErrUseLastResponse)
  • URL проверки по умолчанию: https://www.google.com/generate_204
  • Таймаут: 5 секунд как для TLS-рукопожатия, так и для всего запроса

Фоновый цикл

go
func (o *Observer) background() {
    for !o.finished.Done() {
        hs, _ := o.ohm.(outbound.HandlerSelector)
        outbounds := hs.Select(o.config.SubjectSelector)

        sleepTime := time.Second * 10
        if o.config.ProbeInterval != 0 {
            sleepTime = time.Duration(o.config.ProbeInterval)
        }

        if !o.config.EnableConcurrency {
            // Последовательно: по одной проверке с паузой между ними
            for _, v := range outbounds {
                result := o.probe(v)
                o.updateStatusForResult(v, &result)
                time.Sleep(sleepTime)
            }
        } else {
            // Параллельно: все проверки одновременно, ожидание завершения всех
            ch := make(chan struct{}, len(outbounds))
            for _, v := range outbounds {
                go func(v string) {
                    result := o.probe(v)
                    o.updateStatusForResult(v, &result)
                    ch <- struct{}{}
                }(v)
            }
            for range outbounds { <-ch }
            time.Sleep(sleepTime)
        }
    }
}

SubjectSelector — это список шаблонов тегов, используемых с outbound.HandlerSelector.Select() для сопоставления тегов обработчиков исходящих соединений (поддерживается сопоставление по префиксу).

Отслеживание состояния

go
type OutboundStatus struct {
    Alive           bool
    Delay           int64   // миллисекунды
    LastErrorReason string
    OutboundTag     string
    LastSeenTime    int64   // unix-метка времени
    LastTryTime     int64   // unix-метка времени
}

При неудачной проверке задержка устанавливается в 99999999 (фактически бесконечность для сортировки).

Burst Observer

Файл: app/observatory/burst/burstobserver.go

Burst Observer обеспечивает более сложную проверку состояния со статистическим анализом (среднее значение, отклонение, минимум, максимум, частота ошибок).

go
type Observer struct {
    config     *Config
    ctx        context.Context
    statusLock sync.Mutex
    hp         *HealthPing
    finished   *done.Instance
    ohm        outbound.Manager
}

HealthPing

Файл: app/observatory/burst/healthping.go

Основной движок проверки работоспособности:

go
type HealthPing struct {
    ctx         context.Context
    dispatcher  routing.Dispatcher
    access      sync.Mutex
    ticker      *time.Ticker
    tickerClose chan struct{}
    Settings    *HealthPingSettings
    Results     map[string]*HealthPingRTTS
}

Настройки:

ПараметрЗначение по умолчаниюОписание
Destinationhttps://connectivitycheck.gstatic.com/generate_204URL проверки
Interval1 минутаВремя между раундами проверок
SamplingCount10Количество замеров за раунд
Timeout5 секундТаймаут одной проверки
HttpMethodHEADHTTP-метод
Connectivity(пусто)URL для проверки доступности сети

Стратегия выборки

Планировщик работает по таймеру с интервалом = Interval * SamplingCount:

go
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
    interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
    ticker := time.NewTicker(interval)
    // Начальная проверка
    go func() { h.Check(tags) }()
    // Периодические проверки
    go func() {
        for {
            go func() {
                h.doCheck(tags, interval, h.Settings.SamplingCount)
                h.Cleanup(tags)
            }()
            <-ticker.C
        }
    }()
}

Метод doCheck() распределяет проверки случайным образом в пределах периода выборки:

go
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
    for _, tag := range tags {
        client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
        for i := 0; i < rounds; i++ {
            delay := time.Duration(dice.RollInt63n(int64(duration)))
            time.AfterFunc(delay, func() {
                delay, err := client.MeasureDelay(h.Settings.HttpMethod)
                // отправка результата
            })
        }
    }
}

Такое случайное распределение предотвращает одновременную нагрузку всех проверок на сеть.

Ping-клиент

Файл: app/observatory/burst/ping.go

go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
    tr := &http.Transport{
        DisableKeepAlives: true,
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := net.ParseDestination(network + ":" + addr)
            return tagged.Dialer(ctxv, dispatcher, dest, handler)
        },
    }
    return &http.Client{
        Transport: tr,
        Timeout:   timeout,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }
}

Примечание: DisableKeepAlives: true гарантирует, что каждая проверка создаёт новое соединение.

Проверка связности

Перед тем как отметить проверку как неудачную, система может дополнительно проверить доступность самой сети:

go
func (h *HealthPing) checkConnectivity() bool {
    if h.Settings.Connectivity == "" { return true }
    tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
    _, err := tester.MeasureDelay(h.Settings.HttpMethod)
    return err == nil
}

Если сеть недоступна, результат проверки отбрасывается (RTT = 0, не сохраняется).

Статистический анализ

Файл: app/observatory/burst/healthping_result.go

HealthPingRTTS поддерживает кольцевой буфер результатов проверок с ограничением по времени:

go
type HealthPingRTTS struct {
    idx      int              // текущая позиция записи
    cap      int              // ёмкость буфера
    validity time.Duration    // максимальный возраст результата
    rtts     []*pingRTT       // кольцевой буфер
    stats    *HealthPingStats // кэшированная статистика
}

Вычисляемая статистика:

go
type HealthPingStats struct {
    All       int            // всего валидных замеров
    Fail      int            // неудачных проверок
    Deviation time.Duration  // стандартное отклонение
    Average   time.Duration  // среднее RTT
    Max       time.Duration  // максимальное RTT
    Min       time.Duration  // минимальное RTT
}

Для узлов с единственным замером стандартное отклонение оценивается как Average / 2, чтобы предотвратить их несправедливое предпочтение перед узлами с большим объёмом данных.

gRPC API

Файл: app/observatory/command/command.go

go
type service struct {
    observatory extension.Observatory
}

func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
    resp, _ := s.observatory.GetObservation(ctx)
    retdata := resp.(*observatory.ObservationResult)
    return &GetOutboundStatusResponse{Status: retdata}, nil
}

Обе реализации Observer удовлетворяют интерфейсу extension.Observatory:

go
type Observatory interface {
    GetObservation(ctx context.Context) (proto.Message, error)
}

Сбор ошибок

Файл: app/observatory/explainErrors.go

go
type errorCollector struct {
    errors *errors.Error
}

func (e *errorCollector) SubmitError(err error) {
    if e.errors == nil {
        e.errors = errors.New("underlying connection error").Base(err)
    } else {
        e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
    }
}

Коллектор ошибок выстраивает цепочку ошибок соединения для диагностического логирования. Он передаётся через session.TrackedConnectionError(), чтобы фиксировать низкоуровневые сбои транспорта.

Конфигурация

Стандартный Observatory:

json
{
    "observatory": {
        "subjectSelector": ["proxy_"],
        "probeUrl": "https://www.google.com/generate_204",
        "probeInterval": "10s",
        "enableConcurrency": true
    }
}

Burst Observatory:

json
{
    "burstObservatory": {
        "subjectSelector": ["proxy_"],
        "pingConfig": {
            "destination": "https://connectivitycheck.gstatic.com/generate_204",
            "interval": "1m",
            "samplingCount": 10,
            "timeout": "5s",
            "httpMethod": "HEAD",
            "connectivity": "https://connectivitycheck.gstatic.com/generate_204"
        }
    }
}

Заметки по реализации

  • Оба Observer регистрируются через common.RegisterConfig() и создаются, когда их protobuf-конфигурация появляется в списке App.

  • Возвращаемое значение extension.ObservatoryType() гарантирует, что одновременно может быть активен только один Observatory (стандартный или пакетный, но не оба).

  • Метод updateStatus() стандартного Observer в настоящее время не выполняет удаление устаревших записей (отмечен комментарием TODO).

  • Метод Cleanup() Burst Observer удаляет результаты для тегов outbound, которые больше не соответствуют выходным данным селектора, обрабатывая динамическое удаление обработчиков.

  • Срок действия проверки работоспособности в пакетном режиме составляет Interval * SamplingCount * 2 — вдвое больше периода выборки для учёта случайного распределения проверок.

  • Константа rttFailed (определена в пакете burst) представляет неудачную проверку и хранится как сигнальное значение в кольцевом буфере, учитываясь в статистике Fail.

  • Оба Observer запускаются лениво: они начинают проверки только когда вызывается Start() и SubjectSelector не пуст.

Технический анализ для целей повторной реализации.