Skip to content

Статистика трафика

Подсистема статистики Xray обеспечивает измерение трафика на основе счётчиков, отслеживание пользователей онлайн и каналы публикации/подписки для мониторинга событий в реальном времени. Она интегрируется с диспетчером для прозрачного измерения трафика по пользователям, по входящим и по исходящим обработчикам.

Архитектура

mermaid
flowchart TD
    subgraph Stats Manager
        M[Manager] --> C[Counters map]
        M --> OM[OnlineMaps map]
        M --> CH[Channels map]
    end

    subgraph Dispatcher Integration
        D[DefaultDispatcher] --> SSW[SizeStatWriter]
        SSW --> C
        D --> OI[OnlineMap.AddIP]
        OI --> OM
    end

    subgraph gRPC API
        SS[StatsService] --> M
    end

Stats Manager

Файл: app/stats/stats.go

Manager — это центральный реестр всех объектов статистики:

go
type Manager struct {
    access    sync.RWMutex
    counters  map[string]*Counter
    onlineMap map[string]*OnlineMap
    channels  map[string]*Channel
    running   bool
}

Он реализует интерфейс stats.Manager со следующими методами:

МетодОписание
RegisterCounter(name)Создать новый именованный счётчик
UnregisterCounter(name)Удалить счётчик
GetCounter(name)Найти счётчик по имени
VisitCounters(visitor)Перебрать все счётчики с помощью callback-функции
RegisterOnlineMap(name)Создать новую именованную карту онлайн-пользователей
GetOnlineMap(name)Найти карту онлайн-пользователей
GetAllOnlineUsers()Вернуть всех пользователей с активными IP
RegisterChannel(name)Создать новый канал публикации/подписки
GetChannel(name)Найти канал

Counter

Файл: app/stats/counter.go

Атомарный счётчик без блокировок:

go
type Counter struct {
    value int64
}

func (c *Counter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *Counter) Set(newValue int64) int64 {
    return atomic.SwapInt64(&c.value, newValue)
}

func (c *Counter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

Set() возвращает старое значение (полезно для операций «получить и сбросить» через Set(0)). Add() возвращает новое значение после сложения.

SizeStatWriter

Файл: app/dispatcher/stats.go

Мост между плоскостью данных и системой статистики:

go
type SizeStatWriter struct {
    Counter stats.Counter
    Writer  buf.Writer
}

func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
    w.Counter.Add(int64(mb.Len()))
    return w.Writer.WriteMultiBuffer(mb)
}

Он оборачивает buf.Writer и добавляет количество байт каждого MultiBuffer к своему счётчику перед передачей данных. Это перехват без копирования — данные не модифицируются.

Интеграция с диспетчером

Файл: app/dispatcher/default.go

Метод DefaultDispatcher.getLink() подключает статистику:

go
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
    uplinkReader, uplinkWriter := pipe.New(opt...)
    downlinkReader, downlinkWriter := pipe.New(opt...)

    inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
    outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}

    if user != nil && len(user.Email) > 0 {
        p := d.policy.ForLevel(user.Level)

        if p.Stats.UserUplink {
            name := "user>>>" + user.Email + ">>>traffic>>>uplink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
            }
        }

        if p.Stats.UserDownlink {
            name := "user>>>" + user.Email + ">>>traffic>>>downlink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
            }
        }

        if p.Stats.UserOnline {
            name := "user>>>" + user.Email + ">>>online"
            if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
                om.AddIP(sessionInbounds.Source.Address.String())
            }
        }
    }
}

Конвенция именования счётчиков

Счётчики следуют иерархической конвенции с разделителем >>>:

ШаблонПримерНаправление
user>>>{email}>>>traffic>>>uplinkuser>>>user1@example.com>>>traffic>>>uplinkКлиент -> Xray
user>>>{email}>>>traffic>>>downlinkuser>>>user1@example.com>>>traffic>>>downlinkXray -> Клиент
user>>>{email}>>>onlineuser>>>user1@example.com>>>onlineСтатус онлайн (OnlineMap)
inbound>>>{tag}>>>traffic>>>uplinkinbound>>>vmess-in>>>traffic>>>uplinkUplink по inbound
inbound>>>{tag}>>>traffic>>>downlinkinbound>>>vmess-in>>>traffic>>>downlinkDownlink по inbound
outbound>>>{tag}>>>traffic>>>uplinkoutbound>>>freedom>>>traffic>>>uplinkUplink по outbound
outbound>>>{tag}>>>traffic>>>downlinkoutbound>>>freedom>>>traffic>>>downlinkDownlink по outbound

Функция WrapLink() выполняет аналогичную задачу для DispatchLink() (используется обратным прокси и другими внутренними вызывающими):

go
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
    link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}

    if user != nil && len(user.Email) > 0 {
        if p.Stats.UserUplink {
            link.Reader.(*buf.TimeoutWrapperReader).Counter = c
        }
        if p.Stats.UserDownlink {
            link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
        }
        // Отслеживание онлайн...
    }
    return link
}

Обратите внимание на асимметрию: для WrapLink uplink отслеживается на стороне чтения (через TimeoutWrapperReader.Counter), тогда как для getLink uplink отслеживается на стороне записи (через SizeStatWriter). Это связано с тем, что направления канала инвертированы между двумя путями.

OnlineMap

Файл: app/stats/online_map.go

Отслеживает, какие IP-адреса в данный момент подключены для данного пользователя:

go
type OnlineMap struct {
    ipList        map[string]time.Time
    access        sync.RWMutex
    lastCleanup   time.Time
    cleanupPeriod time.Duration  // 10 секунд
}

Ключевое поведение:

  • AddIP(ip) добавляет или обновляет метку времени IP-адреса. Пропускает 127.0.0.1.
  • RemoveExpiredIPs() удаляет записи старше 20 секунд
  • Очистка запускается лениво: только при вызове AddIP() и когда прошло 10+ секунд с последней очистки
  • Count() возвращает количество текущих активных IP-адресов
  • List() возвращает все активные IP-адреса в виде строк

20-секундный срок истечения означает, что IP считается «онлайн» только если он активно отправлял трафик в течение последних 20 секунд.

Channel (публикация/подписка)

Файл: app/stats/channel.go

Канал публикации/подписки для трансляции событий в реальном времени:

go
type Channel struct {
    channel     chan channelMessage
    subscribers []chan interface{}
    access      sync.RWMutex
    closed      chan struct{}
    blocking    bool
    bufferSize  int
    subsLimit   int
}

Режимы публикации:

  • Блокирующий: Publish() блокируется, если буфер канала полон (ожидает потребителя)
  • Неблокирующий: Если буфер полон, запускается горутина для асинхронной доставки

Режимы трансляции:

  • Блокирующий: Трансляция каждому подписчику блокируется, если канал подписчика полон
  • Неблокирующий: Если канал подписчика полон, запускается горутина для каждого подписчика

По умолчанию канал неблокирующий с размером буфера 64.

Жизненный цикл:

go
func (c *Channel) Start() error {
    c.closed = make(chan struct{})
    go func() {
        for {
            select {
            case pub := <-c.channel:
                for _, sub := range c.Subscribers() {
                    pub.broadcastNonBlocking(sub)
                }
            case <-c.closed:
                // Отписка всех и закрытие
                return
            }
        }
    }()
}

Активация на основе политик

Сбор статистики управляется системой политик. Для каждого уровня пользователя есть настройки:

go
type Stats struct {
    UserUplink   bool
    UserDownlink bool
    UserOnline   bool
}

Секция конфигурации stats должна присутствовать (даже пустая) для включения менеджера статистики:

json
{
    "stats": {},
    "policy": {
        "levels": {
            "0": {
                "statsUserUplink": true,
                "statsUserDownlink": true,
                "statsUserOnline": true
            }
        }
    }
}

Запрос статистики

Через gRPC StatsService:

bash
# Получить отдельный счётчик
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Получить и сбросить счётчик
grpcurl -d '{"name": "...", "reset": true}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Запрос по шаблону
grpcurl -d '{"pattern": "user>>>user1"}' \
    localhost:10085 xray.app.stats.command.StatsService/QueryStats

# Получить количество онлайн-пользователей
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline

# Получить всех онлайн-пользователей
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsers

Заметки по реализации

  • Счётчики используют исключительно операции sync/atomic — без блокировок мьютексами для чтения/записи. Это означает, что операции со счётчиками не требуют блокировок и чрезвычайно быстры (одна инструкция CAS на большинстве архитектур).

  • Вспомогательная функция GetOrRegisterCounter()features/stats) создаёт счётчик, если он не существует, избегая состояния гонки между проверкой наличия счётчика диспетчером и его регистрацией.

  • Manager.GetAllOnlineUsers() перебирает все карты онлайн-пользователей и возвращает пользователей, у которых есть хотя бы один неистёкший IP. Используется блокировка на запись (не на чтение), потому что IpTimeMap() может запускать внутреннюю очистку.

  • Счётчики трафика накапливают байты бесконечно до явного сброса через Set(0). gRPC GetStats с reset: true атомарно читает и сбрасывает значение.

  • SizeStatWriter подсчитывает общее количество байт в MultiBuffer (сумма длин всех буферов), что представляет размер полезной нагрузки после внутренней буферизации Xray, но до кодирования для передачи по сети.

  • Карты онлайн-пользователей привязаны к пользователю, а не к соединению. Множественные соединения с одного IP обновляют одну и ту же метку времени. 20-секундный срок истечения — это эвристика: если соединение пользователя неактивно 20 секунд, он удаляется из списка онлайн.

Технический анализ для целей повторной реализации.