Observatory: проверка состояния и мониторинг исходящих соединений
Подсистема Observatory проверяет обработчики исходящих соединений для определения их работоспособности и задержки. Балансировщики используют эти данные для выбора лучшего outbound для каждого соединения. Xray предоставляет две реализации Observatory: стандартный Observer и BurstObserver (пакетный режим).
Обзор архитектуры
flowchart TD
subgraph Observatory
O[Observer] --> P[probe via HTTP GET]
P --> S[OutboundStatus list]
end
subgraph BurstObservatory
BO[BurstObserver] --> HP[HealthPing]
HP --> HPR[HealthPingRTTS per tag]
HPR --> BS[OutboundStatus with stats]
end
subgraph Integration
S --> Bal[Balancer]
BS --> Bal
Bal --> OH[Outbound Handler selection]
end
subgraph gRPC API
CMD[ObservatoryService] --> O
CMD --> BO
endСтандартный Observer
Файл: app/observatory/observer.go
Стандартный Observer проверяет каждый выбранный outbound, выполняя HTTP GET запросы через него.
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
status []*OutboundStatus
finished *done.Instance
ohm outbound.Manager
dispatcher routing.Dispatcher
}Механизм проверки
Метод probe() создаёт HTTP-клиент, маршрутизирующий запросы через конкретный обработчик исходящих соединений:
func (o *Observer) probe(outbound string) ProbeResult {
httpTransport := http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := v2net.ParseDestination(network + ":" + addr)
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
return conn, err
},
TLSHandshakeTimeout: 5 * time.Second,
}
httpClient := &http.Client{
Transport: &httpTransport,
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
probeURL := "https://www.google.com/generate_204"
if o.config.ProbeUrl != "" {
probeURL = o.config.ProbeUrl
}
startTime := time.Now()
response, err := httpClient.Do(req)
GETTime := time.Since(startTime)
if err != nil {
return ProbeResult{Alive: false, LastErrorReason: errorMessage}
}
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}Ключевые детали:
- Используется
tagged.Dialer()для маршрутизации проверочного соединения через конкретный outbound - Отслеживание ошибок через
session.TrackedConnectionError()фиксирует низкоуровневые сбои соединения - Не следует за перенаправлениями (
ErrUseLastResponse) - URL проверки по умолчанию:
https://www.google.com/generate_204 - Таймаут: 5 секунд как для TLS-рукопожатия, так и для всего запроса
Фоновый цикл
func (o *Observer) background() {
for !o.finished.Done() {
hs, _ := o.ohm.(outbound.HandlerSelector)
outbounds := hs.Select(o.config.SubjectSelector)
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
if !o.config.EnableConcurrency {
// Последовательно: по одной проверке с паузой между ними
for _, v := range outbounds {
result := o.probe(v)
o.updateStatusForResult(v, &result)
time.Sleep(sleepTime)
}
} else {
// Параллельно: все проверки одновременно, ожидание завершения всех
ch := make(chan struct{}, len(outbounds))
for _, v := range outbounds {
go func(v string) {
result := o.probe(v)
o.updateStatusForResult(v, &result)
ch <- struct{}{}
}(v)
}
for range outbounds { <-ch }
time.Sleep(sleepTime)
}
}
}SubjectSelector — это список шаблонов тегов, используемых с outbound.HandlerSelector.Select() для сопоставления тегов обработчиков исходящих соединений (поддерживается сопоставление по префиксу).
Отслеживание состояния
type OutboundStatus struct {
Alive bool
Delay int64 // миллисекунды
LastErrorReason string
OutboundTag string
LastSeenTime int64 // unix-метка времени
LastTryTime int64 // unix-метка времени
}При неудачной проверке задержка устанавливается в 99999999 (фактически бесконечность для сортировки).
Burst Observer
Файл: app/observatory/burst/burstobserver.go
Burst Observer обеспечивает более сложную проверку состояния со статистическим анализом (среднее значение, отклонение, минимум, максимум, частота ошибок).
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
hp *HealthPing
finished *done.Instance
ohm outbound.Manager
}HealthPing
Файл: app/observatory/burst/healthping.go
Основной движок проверки работоспособности:
type HealthPing struct {
ctx context.Context
dispatcher routing.Dispatcher
access sync.Mutex
ticker *time.Ticker
tickerClose chan struct{}
Settings *HealthPingSettings
Results map[string]*HealthPingRTTS
}Настройки:
| Параметр | Значение по умолчанию | Описание |
|---|---|---|
Destination | https://connectivitycheck.gstatic.com/generate_204 | URL проверки |
Interval | 1 минута | Время между раундами проверок |
SamplingCount | 10 | Количество замеров за раунд |
Timeout | 5 секунд | Таймаут одной проверки |
HttpMethod | HEAD | HTTP-метод |
Connectivity | (пусто) | URL для проверки доступности сети |
Стратегия выборки
Планировщик работает по таймеру с интервалом = Interval * SamplingCount:
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
ticker := time.NewTicker(interval)
// Начальная проверка
go func() { h.Check(tags) }()
// Периодические проверки
go func() {
for {
go func() {
h.doCheck(tags, interval, h.Settings.SamplingCount)
h.Cleanup(tags)
}()
<-ticker.C
}
}()
}Метод doCheck() распределяет проверки случайным образом в пределах периода выборки:
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
for _, tag := range tags {
client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
for i := 0; i < rounds; i++ {
delay := time.Duration(dice.RollInt63n(int64(duration)))
time.AfterFunc(delay, func() {
delay, err := client.MeasureDelay(h.Settings.HttpMethod)
// отправка результата
})
}
}
}Такое случайное распределение предотвращает одновременную нагрузку всех проверок на сеть.
Ping-клиент
Файл: app/observatory/burst/ping.go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
tr := &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := net.ParseDestination(network + ":" + addr)
return tagged.Dialer(ctxv, dispatcher, dest, handler)
},
}
return &http.Client{
Transport: tr,
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
}Примечание: DisableKeepAlives: true гарантирует, что каждая проверка создаёт новое соединение.
Проверка связности
Перед тем как отметить проверку как неудачную, система может дополнительно проверить доступность самой сети:
func (h *HealthPing) checkConnectivity() bool {
if h.Settings.Connectivity == "" { return true }
tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
_, err := tester.MeasureDelay(h.Settings.HttpMethod)
return err == nil
}Если сеть недоступна, результат проверки отбрасывается (RTT = 0, не сохраняется).
Статистический анализ
Файл: app/observatory/burst/healthping_result.go
HealthPingRTTS поддерживает кольцевой буфер результатов проверок с ограничением по времени:
type HealthPingRTTS struct {
idx int // текущая позиция записи
cap int // ёмкость буфера
validity time.Duration // максимальный возраст результата
rtts []*pingRTT // кольцевой буфер
stats *HealthPingStats // кэшированная статистика
}Вычисляемая статистика:
type HealthPingStats struct {
All int // всего валидных замеров
Fail int // неудачных проверок
Deviation time.Duration // стандартное отклонение
Average time.Duration // среднее RTT
Max time.Duration // максимальное RTT
Min time.Duration // минимальное RTT
}Для узлов с единственным замером стандартное отклонение оценивается как Average / 2, чтобы предотвратить их несправедливое предпочтение перед узлами с большим объёмом данных.
gRPC API
Файл: app/observatory/command/command.go
type service struct {
observatory extension.Observatory
}
func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
resp, _ := s.observatory.GetObservation(ctx)
retdata := resp.(*observatory.ObservationResult)
return &GetOutboundStatusResponse{Status: retdata}, nil
}Обе реализации Observer удовлетворяют интерфейсу extension.Observatory:
type Observatory interface {
GetObservation(ctx context.Context) (proto.Message, error)
}Сбор ошибок
Файл: app/observatory/explainErrors.go
type errorCollector struct {
errors *errors.Error
}
func (e *errorCollector) SubmitError(err error) {
if e.errors == nil {
e.errors = errors.New("underlying connection error").Base(err)
} else {
e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
}
}Коллектор ошибок выстраивает цепочку ошибок соединения для диагностического логирования. Он передаётся через session.TrackedConnectionError(), чтобы фиксировать низкоуровневые сбои транспорта.
Конфигурация
Стандартный Observatory:
{
"observatory": {
"subjectSelector": ["proxy_"],
"probeUrl": "https://www.google.com/generate_204",
"probeInterval": "10s",
"enableConcurrency": true
}
}Burst Observatory:
{
"burstObservatory": {
"subjectSelector": ["proxy_"],
"pingConfig": {
"destination": "https://connectivitycheck.gstatic.com/generate_204",
"interval": "1m",
"samplingCount": 10,
"timeout": "5s",
"httpMethod": "HEAD",
"connectivity": "https://connectivitycheck.gstatic.com/generate_204"
}
}
}Заметки по реализации
Оба Observer регистрируются через
common.RegisterConfig()и создаются, когда их protobuf-конфигурация появляется в спискеApp.Возвращаемое значение
extension.ObservatoryType()гарантирует, что одновременно может быть активен только один Observatory (стандартный или пакетный, но не оба).Метод
updateStatus()стандартного Observer в настоящее время не выполняет удаление устаревших записей (отмечен комментарием TODO).Метод
Cleanup()Burst Observer удаляет результаты для тегов outbound, которые больше не соответствуют выходным данным селектора, обрабатывая динамическое удаление обработчиков.Срок действия проверки работоспособности в пакетном режиме составляет
Interval * SamplingCount * 2— вдвое больше периода выборки для учёта случайного распределения проверок.Константа
rttFailed(определена в пакете burst) представляет неудачную проверку и хранится как сигнальное значение в кольцевом буфере, учитываясь в статистикеFail.Оба Observer запускаются лениво: они начинают проверки только когда вызывается
Start()иSubjectSelectorне пуст.