Commander: gRPC API для управления в реальном времени
Commander — это встроенный gRPC-сервер Xray, предоставляющий API для управления в реальном времени. Он позволяет динамически изменять конфигурацию, получать статистику и осуществлять мониторинг системы без перезапуска процесса.
Архитектура
flowchart TD
subgraph Commander
C[Commander] --> GS[grpc.Server]
GS --> HS[HandlerService]
GS --> SS[StatsService]
GS --> RS[RoutingService]
GS --> LS[LoggerService]
GS --> OS[ObservatoryService]
GS --> RF[ReflectionService]
end
subgraph Transport
GS -->|option A| OL[OutboundListener]
OL --> OH[Outbound Handler]
OH --> RT[Xray Routing]
GS -->|option B| TL[TCP Listener]
TL --> NW[Direct network]
end
subgraph Client
CLI[gRPC Client] --> RT
CLI2[gRPC Client] --> NW
endЯдро Commander
Файл: app/commander/commander.go
type Commander struct {
sync.Mutex
server *grpc.Server
services []Service
ohm outbound.Manager
tag string
listen string
}Интерфейс Service
// app/commander/service.go
type Service interface {
Register(*grpc.Server)
}Каждая реализация gRPC-сервиса должна удовлетворять этому интерфейсу для регистрации своих обработчиков.
Инициализация
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
c := &Commander{tag: config.Tag, listen: config.Listen}
core.RequireFeatures(ctx, func(om outbound.Manager) {
c.ohm = om
})
for _, rawConfig := range config.Service {
config, _ := rawConfig.GetInstance() // TypedMessage -> proto.Message
rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
service, _ := rawService.(Service)
c.services = append(c.services, service)
}
}Каждый сервис создаётся из его protobuf-конфигурации через глобальный реестр конфигураций (common.CreateObject).
Запуск: два режима транспорта
func (c *Commander) Start() error {
c.server = grpc.NewServer()
for _, service := range c.services {
service.Register(c.server)
}
if len(c.listen) > 0 {
// Режим прямого TCP-прослушивания
l, _ := net.Listen("tcp", c.listen)
go c.server.Serve(l)
return nil
}
// Режим Outbound-прослушивания (через маршрутизацию Xray)
listener := &OutboundListener{
buffer: make(chan net.Conn, 4),
done: done.New(),
}
go c.server.Serve(listener)
c.ohm.RemoveHandler(context.Background(), c.tag)
return c.ohm.AddHandler(context.Background(), &Outbound{
tag: c.tag,
listener: listener,
})
}Режим 1 — прямой TCP (поле listen задано): Открывает реальный TCP-сокет. Проще, но доступен из сети.
Режим 2 — обработчик Outbound (по умолчанию): Создаёт виртуальный OutboundListener и регистрирует обработчик Outbound. gRPC-клиенты подключаются через систему inbound/маршрутизации Xray.
OutboundListener
Файл: app/commander/outbound.go
Реализация net.Listener на основе канала соединений:
type OutboundListener struct {
buffer chan net.Conn // ёмкость: 4
done *done.Instance
}
func (l *OutboundListener) Accept() (net.Conn, error) {
select {
case <-l.done.Wait():
return nil, errors.New("listen closed")
case c := <-l.buffer:
return c, nil
}
}Обработчик Outbound
Преобразует транспортные каналы Xray в net.Conn для gRPC-сервера:
type Outbound struct {
tag string
listener *OutboundListener
access sync.RWMutex
closed bool
}
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
closeSignal := done.New()
c := cnc.NewConnection(
cnc.ConnectionInputMulti(link.Writer),
cnc.ConnectionOutputMulti(link.Reader),
cnc.ConnectionOnClose(closeSignal),
)
co.listener.add(c)
<-closeSignal.Wait() // Блокировка до закрытия соединения
}Доступные сервисы
StatsService
Файл: app/stats/command/command.go
type statsServer struct {
stats feature_stats.Manager
startTime time.Time
}gRPC-методы:
| Метод | Описание |
|---|---|
GetStats(name, reset) | Получить значение отдельного счётчика с возможностью сброса |
QueryStats(pattern, reset) | Запросить все счётчики, соответствующие шаблону подстроки |
GetSysStats() | Системная статистика: аптайм, горутины, выделение памяти, GC |
GetStatsOnline(name) | Получить количество онлайн-пользователей для карты онлайн-пользователей |
GetStatsOnlineIpList(name) | Получить список IP с метками времени для карты онлайн-пользователей |
GetAllOnlineUsers() | Список всех пользователей с активными IP |
Метод GetSysStats предоставляет диагностику времени выполнения:
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
var rtm runtime.MemStats
runtime.ReadMemStats(&rtm)
return &SysStatsResponse{
Uptime: uint32(time.Since(s.startTime).Seconds()),
NumGoroutine: uint32(runtime.NumGoroutine()),
Alloc: rtm.Alloc,
TotalAlloc: rtm.TotalAlloc,
Sys: rtm.Sys,
Mallocs: rtm.Mallocs,
Frees: rtm.Frees,
LiveObjects: rtm.Mallocs - rtm.Frees,
NumGC: rtm.NumGC,
PauseTotalNs: rtm.PauseTotalNs,
}, nil
}Для совместимости с v2ray StatsService регистрируется дважды — под именами xray.app.stats.command.StatsService и v2ray.core.app.stats.command.StatsService:
func (s *service) Register(server *grpc.Server) {
ss := NewStatsServer(s.statsManager)
RegisterStatsServiceServer(server, ss)
vCoreDesc := StatsService_ServiceDesc
vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
server.RegisterService(&vCoreDesc, ss)
}HandlerService
Пакет: app/proxyman/command
Управление обработчиками входящих и исходящих соединений в реальном времени:
- Добавление/удаление обработчиков входящих соединений
- Добавление/удаление обработчиков исходящих соединений
- Изменение настроек обработчиков входящих соединений
RoutingService
Пакет: app/router/command
Управление маршрутизацией в реальном времени:
- Тестирование правил маршрутизации для конкретных контекстов
- Запрос таблицы маршрутизации
LoggerService
Пакет: app/log/command
- Перезапуск логгера
- Потоковая передача вывода логов
ObservatoryService
Пакет: app/observatory/command
GetOutboundStatus(): Возвращает состояние работоспособности всех наблюдаемых outbound
ReflectionService
Файл: app/commander/service.go
Включает рефлексию gRPC-сервера для инструментов:
type reflectionService struct{}
func (r reflectionService) Register(s *grpc.Server) {
reflection.Register(s)
}Конфигурация
Файл: infra/conf/api.go
{
"api": {
"tag": "api",
"listen": "127.0.0.1:10085",
"services": [
"HandlerService",
"StatsService",
"RoutingService",
"LoggerService",
"ObservatoryService",
"ReflectionService"
]
}
}Метод Build() сопоставляет имена сервисов с их protobuf-конфигурациями:
func (c *APIConfig) Build() (*commander.Config, error) {
for _, s := range c.Services {
switch strings.ToLower(s) {
case "reflectionservice":
services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
case "handlerservice":
services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
case "statsservice":
services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
// ... и т.д.
}
}
}При использовании режима outbound (без listen) необходимо:
- Входящий обработчик
dokodemo-doorс тегом API - Правило маршрутизации, направляющее этот inbound к outbound API
{
"inbounds": [{
"tag": "api-in",
"protocol": "dokodemo-door",
"port": 10085,
"settings": { "address": "127.0.0.1" }
}],
"routing": {
"rules": [{
"inboundTag": ["api-in"],
"outboundTag": "api"
}]
}
}Заметки по реализации
Commander регистрируется через
common.RegisterConfig((*Config)(nil), ...)и требуетoutbound.Managerиз реестра функциональных модулей.Буфер
OutboundListenerимеет ёмкость 4. Если gRPC-сервер медленно принимает соединения и 4 соединения уже в очереди, дополнительные соединения немедленно закрываются (отбрасываются).Метод
Outbound.Dispatch()блокируется на<-closeSignal.Wait()для поддержания транспортного канала активным на протяжении всего gRPC-соединения.При завершении работы Commander вызывается
c.server.Stop(), который принудительно завершает все активные gRPC-потоки. Плавного завершения нет.Создание сервисов использует универсальный путь
common.CreateObject(), который ищет тип protobuf-конфигурации в глобальном реестре. Это означает, что сервисы могут быть расширены сторонними пакетами, регистрирующими новые конфигурации.Поле
listenвключает режим прямого TCP, который проще в настройке, но обходит систему маршрутизации Xray. В этом режиме обработчик outbound не регистрируется.