Skip to content

Commander: gRPC API для управления в реальном времени

Commander — это встроенный gRPC-сервер Xray, предоставляющий API для управления в реальном времени. Он позволяет динамически изменять конфигурацию, получать статистику и осуществлять мониторинг системы без перезапуска процесса.

Архитектура

mermaid
flowchart TD
    subgraph Commander
        C[Commander] --> GS[grpc.Server]
        GS --> HS[HandlerService]
        GS --> SS[StatsService]
        GS --> RS[RoutingService]
        GS --> LS[LoggerService]
        GS --> OS[ObservatoryService]
        GS --> RF[ReflectionService]
    end

    subgraph Transport
        GS -->|option A| OL[OutboundListener]
        OL --> OH[Outbound Handler]
        OH --> RT[Xray Routing]

        GS -->|option B| TL[TCP Listener]
        TL --> NW[Direct network]
    end

    subgraph Client
        CLI[gRPC Client] --> RT
        CLI2[gRPC Client] --> NW
    end

Ядро Commander

Файл: app/commander/commander.go

go
type Commander struct {
    sync.Mutex
    server   *grpc.Server
    services []Service
    ohm      outbound.Manager
    tag      string
    listen   string
}

Интерфейс Service

go
// app/commander/service.go
type Service interface {
    Register(*grpc.Server)
}

Каждая реализация gRPC-сервиса должна удовлетворять этому интерфейсу для регистрации своих обработчиков.

Инициализация

go
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
    c := &Commander{tag: config.Tag, listen: config.Listen}

    core.RequireFeatures(ctx, func(om outbound.Manager) {
        c.ohm = om
    })

    for _, rawConfig := range config.Service {
        config, _ := rawConfig.GetInstance()             // TypedMessage -> proto.Message
        rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
        service, _ := rawService.(Service)
        c.services = append(c.services, service)
    }
}

Каждый сервис создаётся из его protobuf-конфигурации через глобальный реестр конфигураций (common.CreateObject).

Запуск: два режима транспорта

go
func (c *Commander) Start() error {
    c.server = grpc.NewServer()
    for _, service := range c.services {
        service.Register(c.server)
    }

    if len(c.listen) > 0 {
        // Режим прямого TCP-прослушивания
        l, _ := net.Listen("tcp", c.listen)
        go c.server.Serve(l)
        return nil
    }

    // Режим Outbound-прослушивания (через маршрутизацию Xray)
    listener := &OutboundListener{
        buffer: make(chan net.Conn, 4),
        done:   done.New(),
    }
    go c.server.Serve(listener)
    c.ohm.RemoveHandler(context.Background(), c.tag)
    return c.ohm.AddHandler(context.Background(), &Outbound{
        tag:      c.tag,
        listener: listener,
    })
}

Режим 1 — прямой TCP (поле listen задано): Открывает реальный TCP-сокет. Проще, но доступен из сети.

Режим 2 — обработчик Outbound (по умолчанию): Создаёт виртуальный OutboundListener и регистрирует обработчик Outbound. gRPC-клиенты подключаются через систему inbound/маршрутизации Xray.

OutboundListener

Файл: app/commander/outbound.go

Реализация net.Listener на основе канала соединений:

go
type OutboundListener struct {
    buffer chan net.Conn  // ёмкость: 4
    done   *done.Instance
}

func (l *OutboundListener) Accept() (net.Conn, error) {
    select {
    case <-l.done.Wait():
        return nil, errors.New("listen closed")
    case c := <-l.buffer:
        return c, nil
    }
}

Обработчик Outbound

Преобразует транспортные каналы Xray в net.Conn для gRPC-сервера:

go
type Outbound struct {
    tag      string
    listener *OutboundListener
    access   sync.RWMutex
    closed   bool
}

func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
    closeSignal := done.New()
    c := cnc.NewConnection(
        cnc.ConnectionInputMulti(link.Writer),
        cnc.ConnectionOutputMulti(link.Reader),
        cnc.ConnectionOnClose(closeSignal),
    )
    co.listener.add(c)
    <-closeSignal.Wait()  // Блокировка до закрытия соединения
}

Доступные сервисы

StatsService

Файл: app/stats/command/command.go

go
type statsServer struct {
    stats     feature_stats.Manager
    startTime time.Time
}

gRPC-методы:

МетодОписание
GetStats(name, reset)Получить значение отдельного счётчика с возможностью сброса
QueryStats(pattern, reset)Запросить все счётчики, соответствующие шаблону подстроки
GetSysStats()Системная статистика: аптайм, горутины, выделение памяти, GC
GetStatsOnline(name)Получить количество онлайн-пользователей для карты онлайн-пользователей
GetStatsOnlineIpList(name)Получить список IP с метками времени для карты онлайн-пользователей
GetAllOnlineUsers()Список всех пользователей с активными IP

Метод GetSysStats предоставляет диагностику времени выполнения:

go
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
    var rtm runtime.MemStats
    runtime.ReadMemStats(&rtm)
    return &SysStatsResponse{
        Uptime:       uint32(time.Since(s.startTime).Seconds()),
        NumGoroutine: uint32(runtime.NumGoroutine()),
        Alloc:        rtm.Alloc,
        TotalAlloc:   rtm.TotalAlloc,
        Sys:          rtm.Sys,
        Mallocs:      rtm.Mallocs,
        Frees:        rtm.Frees,
        LiveObjects:  rtm.Mallocs - rtm.Frees,
        NumGC:        rtm.NumGC,
        PauseTotalNs: rtm.PauseTotalNs,
    }, nil
}

Для совместимости с v2ray StatsService регистрируется дважды — под именами xray.app.stats.command.StatsService и v2ray.core.app.stats.command.StatsService:

go
func (s *service) Register(server *grpc.Server) {
    ss := NewStatsServer(s.statsManager)
    RegisterStatsServiceServer(server, ss)
    vCoreDesc := StatsService_ServiceDesc
    vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
    server.RegisterService(&vCoreDesc, ss)
}

HandlerService

Пакет: app/proxyman/command

Управление обработчиками входящих и исходящих соединений в реальном времени:

  • Добавление/удаление обработчиков входящих соединений
  • Добавление/удаление обработчиков исходящих соединений
  • Изменение настроек обработчиков входящих соединений

RoutingService

Пакет: app/router/command

Управление маршрутизацией в реальном времени:

  • Тестирование правил маршрутизации для конкретных контекстов
  • Запрос таблицы маршрутизации

LoggerService

Пакет: app/log/command

  • Перезапуск логгера
  • Потоковая передача вывода логов

ObservatoryService

Пакет: app/observatory/command

  • GetOutboundStatus(): Возвращает состояние работоспособности всех наблюдаемых outbound

ReflectionService

Файл: app/commander/service.go

Включает рефлексию gRPC-сервера для инструментов:

go
type reflectionService struct{}

func (r reflectionService) Register(s *grpc.Server) {
    reflection.Register(s)
}

Конфигурация

Файл: infra/conf/api.go

json
{
    "api": {
        "tag": "api",
        "listen": "127.0.0.1:10085",
        "services": [
            "HandlerService",
            "StatsService",
            "RoutingService",
            "LoggerService",
            "ObservatoryService",
            "ReflectionService"
        ]
    }
}

Метод Build() сопоставляет имена сервисов с их protobuf-конфигурациями:

go
func (c *APIConfig) Build() (*commander.Config, error) {
    for _, s := range c.Services {
        switch strings.ToLower(s) {
        case "reflectionservice":
            services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
        case "handlerservice":
            services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
        case "statsservice":
            services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
        // ... и т.д.
        }
    }
}

При использовании режима outbound (без listen) необходимо:

  1. Входящий обработчик dokodemo-door с тегом API
  2. Правило маршрутизации, направляющее этот inbound к outbound API
json
{
    "inbounds": [{
        "tag": "api-in",
        "protocol": "dokodemo-door",
        "port": 10085,
        "settings": { "address": "127.0.0.1" }
    }],
    "routing": {
        "rules": [{
            "inboundTag": ["api-in"],
            "outboundTag": "api"
        }]
    }
}

Заметки по реализации

  • Commander регистрируется через common.RegisterConfig((*Config)(nil), ...) и требует outbound.Manager из реестра функциональных модулей.

  • Буфер OutboundListener имеет ёмкость 4. Если gRPC-сервер медленно принимает соединения и 4 соединения уже в очереди, дополнительные соединения немедленно закрываются (отбрасываются).

  • Метод Outbound.Dispatch() блокируется на <-closeSignal.Wait() для поддержания транспортного канала активным на протяжении всего gRPC-соединения.

  • При завершении работы Commander вызывается c.server.Stop(), который принудительно завершает все активные gRPC-потоки. Плавного завершения нет.

  • Создание сервисов использует универсальный путь common.CreateObject(), который ищет тип protobuf-конфигурации в глобальном реестре. Это означает, что сервисы могут быть расширены сторонними пакетами, регистрирующими новые конфигурации.

  • Поле listen включает режим прямого TCP, который проще в настройке, но обходит систему маршрутизации Xray. В этом режиме обработчик outbound не регистрируется.

Технический анализ для целей повторной реализации.