Статистика трафика
Подсистема статистики Xray обеспечивает измерение трафика на основе счётчиков, отслеживание пользователей онлайн и каналы публикации/подписки для мониторинга событий в реальном времени. Она интегрируется с диспетчером для прозрачного измерения трафика по пользователям, по входящим и по исходящим обработчикам.
Архитектура
flowchart TD
subgraph Stats Manager
M[Manager] --> C[Counters map]
M --> OM[OnlineMaps map]
M --> CH[Channels map]
end
subgraph Dispatcher Integration
D[DefaultDispatcher] --> SSW[SizeStatWriter]
SSW --> C
D --> OI[OnlineMap.AddIP]
OI --> OM
end
subgraph gRPC API
SS[StatsService] --> M
endStats Manager
Файл: app/stats/stats.go
Manager — это центральный реестр всех объектов статистики:
type Manager struct {
access sync.RWMutex
counters map[string]*Counter
onlineMap map[string]*OnlineMap
channels map[string]*Channel
running bool
}Он реализует интерфейс stats.Manager со следующими методами:
| Метод | Описание |
|---|---|
RegisterCounter(name) | Создать новый именованный счётчик |
UnregisterCounter(name) | Удалить счётчик |
GetCounter(name) | Найти счётчик по имени |
VisitCounters(visitor) | Перебрать все счётчики с помощью callback-функции |
RegisterOnlineMap(name) | Создать новую именованную карту онлайн-пользователей |
GetOnlineMap(name) | Найти карту онлайн-пользователей |
GetAllOnlineUsers() | Вернуть всех пользователей с активными IP |
RegisterChannel(name) | Создать новый канал публикации/подписки |
GetChannel(name) | Найти канал |
Counter
Файл: app/stats/counter.go
Атомарный счётчик без блокировок:
type Counter struct {
value int64
}
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}Set() возвращает старое значение (полезно для операций «получить и сбросить» через Set(0)). Add() возвращает новое значение после сложения.
SizeStatWriter
Файл: app/dispatcher/stats.go
Мост между плоскостью данных и системой статистики:
type SizeStatWriter struct {
Counter stats.Counter
Writer buf.Writer
}
func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}Он оборачивает buf.Writer и добавляет количество байт каждого MultiBuffer к своему счётчику перед передачей данных. Это перехват без копирования — данные не модифицируются.
Интеграция с диспетчером
Файл: app/dispatcher/default.go
Метод DefaultDispatcher.getLink() подключает статистику:
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}
if user != nil && len(user.Email) > 0 {
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
}
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
}
}
if p.Stats.UserOnline {
name := "user>>>" + user.Email + ">>>online"
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
om.AddIP(sessionInbounds.Source.Address.String())
}
}
}
}Конвенция именования счётчиков
Счётчики следуют иерархической конвенции с разделителем >>>:
| Шаблон | Пример | Направление |
|---|---|---|
user>>>{email}>>>traffic>>>uplink | user>>>user1@example.com>>>traffic>>>uplink | Клиент -> Xray |
user>>>{email}>>>traffic>>>downlink | user>>>user1@example.com>>>traffic>>>downlink | Xray -> Клиент |
user>>>{email}>>>online | user>>>user1@example.com>>>online | Статус онлайн (OnlineMap) |
inbound>>>{tag}>>>traffic>>>uplink | inbound>>>vmess-in>>>traffic>>>uplink | Uplink по inbound |
inbound>>>{tag}>>>traffic>>>downlink | inbound>>>vmess-in>>>traffic>>>downlink | Downlink по inbound |
outbound>>>{tag}>>>traffic>>>uplink | outbound>>>freedom>>>traffic>>>uplink | Uplink по outbound |
outbound>>>{tag}>>>traffic>>>downlink | outbound>>>freedom>>>traffic>>>downlink | Downlink по outbound |
WrapLink
Функция WrapLink() выполняет аналогичную задачу для DispatchLink() (используется обратным прокси и другими внутренними вызывающими):
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
if user != nil && len(user.Email) > 0 {
if p.Stats.UserUplink {
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
}
if p.Stats.UserDownlink {
link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
}
// Отслеживание онлайн...
}
return link
}Обратите внимание на асимметрию: для WrapLink uplink отслеживается на стороне чтения (через TimeoutWrapperReader.Counter), тогда как для getLink uplink отслеживается на стороне записи (через SizeStatWriter). Это связано с тем, что направления канала инвертированы между двумя путями.
OnlineMap
Файл: app/stats/online_map.go
Отслеживает, какие IP-адреса в данный момент подключены для данного пользователя:
type OnlineMap struct {
ipList map[string]time.Time
access sync.RWMutex
lastCleanup time.Time
cleanupPeriod time.Duration // 10 секунд
}Ключевое поведение:
AddIP(ip)добавляет или обновляет метку времени IP-адреса. Пропускает127.0.0.1.RemoveExpiredIPs()удаляет записи старше 20 секунд- Очистка запускается лениво: только при вызове
AddIP()и когда прошло 10+ секунд с последней очистки Count()возвращает количество текущих активных IP-адресовList()возвращает все активные IP-адреса в виде строк
20-секундный срок истечения означает, что IP считается «онлайн» только если он активно отправлял трафик в течение последних 20 секунд.
Channel (публикация/подписка)
Файл: app/stats/channel.go
Канал публикации/подписки для трансляции событий в реальном времени:
type Channel struct {
channel chan channelMessage
subscribers []chan interface{}
access sync.RWMutex
closed chan struct{}
blocking bool
bufferSize int
subsLimit int
}Режимы публикации:
- Блокирующий:
Publish()блокируется, если буфер канала полон (ожидает потребителя) - Неблокирующий: Если буфер полон, запускается горутина для асинхронной доставки
Режимы трансляции:
- Блокирующий: Трансляция каждому подписчику блокируется, если канал подписчика полон
- Неблокирующий: Если канал подписчика полон, запускается горутина для каждого подписчика
По умолчанию канал неблокирующий с размером буфера 64.
Жизненный цикл:
func (c *Channel) Start() error {
c.closed = make(chan struct{})
go func() {
for {
select {
case pub := <-c.channel:
for _, sub := range c.Subscribers() {
pub.broadcastNonBlocking(sub)
}
case <-c.closed:
// Отписка всех и закрытие
return
}
}
}()
}Активация на основе политик
Сбор статистики управляется системой политик. Для каждого уровня пользователя есть настройки:
type Stats struct {
UserUplink bool
UserDownlink bool
UserOnline bool
}Секция конфигурации stats должна присутствовать (даже пустая) для включения менеджера статистики:
{
"stats": {},
"policy": {
"levels": {
"0": {
"statsUserUplink": true,
"statsUserDownlink": true,
"statsUserOnline": true
}
}
}
}Запрос статистики
Через gRPC StatsService:
# Получить отдельный счётчик
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Получить и сбросить счётчик
grpcurl -d '{"name": "...", "reset": true}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Запрос по шаблону
grpcurl -d '{"pattern": "user>>>user1"}' \
localhost:10085 xray.app.stats.command.StatsService/QueryStats
# Получить количество онлайн-пользователей
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline
# Получить всех онлайн-пользователей
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsersЗаметки по реализации
Счётчики используют исключительно операции
sync/atomic— без блокировок мьютексами для чтения/записи. Это означает, что операции со счётчиками не требуют блокировок и чрезвычайно быстры (одна инструкция CAS на большинстве архитектур).Вспомогательная функция
GetOrRegisterCounter()(вfeatures/stats) создаёт счётчик, если он не существует, избегая состояния гонки между проверкой наличия счётчика диспетчером и его регистрацией.Manager.GetAllOnlineUsers()перебирает все карты онлайн-пользователей и возвращает пользователей, у которых есть хотя бы один неистёкший IP. Используется блокировка на запись (не на чтение), потому чтоIpTimeMap()может запускать внутреннюю очистку.Счётчики трафика накапливают байты бесконечно до явного сброса через
Set(0). gRPCGetStatsсreset: trueатомарно читает и сбрасывает значение.SizeStatWriterподсчитывает общее количество байт вMultiBuffer(сумма длин всех буферов), что представляет размер полезной нагрузки после внутренней буферизации Xray, но до кодирования для передачи по сети.Карты онлайн-пользователей привязаны к пользователю, а не к соединению. Множественные соединения с одного IP обновляют одну и ту же метку времени. 20-секундный срок истечения — это эвристика: если соединение пользователя неактивно 20 секунд, он удаляется из списка онлайн.