Commander:运行时管理 gRPC API
Commander 是 Xray 内置的 gRPC 服务器,提供运行时管理 API。它支持动态配置变更、统计信息查询和系统监控,无需重启进程。
架构
flowchart TD
subgraph Commander
C[Commander] --> GS[grpc.Server]
GS --> HS[HandlerService]
GS --> SS[StatsService]
GS --> RS[RoutingService]
GS --> LS[LoggerService]
GS --> OS[ObservatoryService]
GS --> RF[ReflectionService]
end
subgraph 传输方式
GS -->|方式 A| OL[OutboundListener]
OL --> OH[Outbound Handler]
OH --> RT[Xray 路由]
GS -->|方式 B| TL[TCP 监听器]
TL --> NW[直接网络]
end
subgraph 客户端
CLI[gRPC 客户端] --> RT
CLI2[gRPC 客户端] --> NW
endCommander 核心
文件: app/commander/commander.go
type Commander struct {
sync.Mutex
server *grpc.Server
services []Service
ohm outbound.Manager
tag string
listen string
}Service 接口
// app/commander/service.go
type Service interface {
Register(*grpc.Server)
}每个 gRPC 服务实现都必须满足此接口以注册其 handler。
初始化
func NewCommander(ctx context.Context, config *Config) (*Commander, error) {
c := &Commander{tag: config.Tag, listen: config.Listen}
core.RequireFeatures(ctx, func(om outbound.Manager) {
c.ohm = om
})
for _, rawConfig := range config.Service {
config, _ := rawConfig.GetInstance() // TypedMessage -> proto.Message
rawService, _ := common.CreateObject(ctx, config) // proto.Message -> Service
service, _ := rawService.(Service)
c.services = append(c.services, service)
}
}每个服务通过全局配置注册表(common.CreateObject)从其 Protobuf 配置创建。
启动:两种传输模式
func (c *Commander) Start() error {
c.server = grpc.NewServer()
for _, service := range c.services {
service.Register(c.server)
}
if len(c.listen) > 0 {
// 直接 TCP 监听模式
l, _ := net.Listen("tcp", c.listen)
go c.server.Serve(l)
return nil
}
// Outbound 监听模式(通过 Xray 路由)
listener := &OutboundListener{
buffer: make(chan net.Conn, 4),
done: done.New(),
}
go c.server.Serve(listener)
c.ohm.RemoveHandler(context.Background(), c.tag)
return c.ohm.AddHandler(context.Background(), &Outbound{
tag: c.tag,
listener: listener,
})
}模式 1——直接 TCP(设置 listen 字段): 打开一个真实的 TCP 套接字。配置更简单但暴露在网络中。
模式 2——Outbound Handler(默认): 创建虚拟的 OutboundListener 并注册 Outbound handler。gRPC 客户端通过 Xray 的 inbound/路由系统连接。
OutboundListener
文件: app/commander/outbound.go
基于连接 channel 的 net.Listener 实现:
type OutboundListener struct {
buffer chan net.Conn // 容量:4
done *done.Instance
}
func (l *OutboundListener) Accept() (net.Conn, error) {
select {
case <-l.done.Wait():
return nil, errors.New("listen closed")
case c := <-l.buffer:
return c, nil
}
}Outbound Handler
将 Xray 传输链路转换为 net.Conn 供 gRPC 服务器使用:
type Outbound struct {
tag string
listener *OutboundListener
access sync.RWMutex
closed bool
}
func (co *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
closeSignal := done.New()
c := cnc.NewConnection(
cnc.ConnectionInputMulti(link.Writer),
cnc.ConnectionOutputMulti(link.Reader),
cnc.ConnectionOnClose(closeSignal),
)
co.listener.add(c)
<-closeSignal.Wait() // 阻塞直到连接关闭
}可用服务
StatsService
文件: app/stats/command/command.go
type statsServer struct {
stats feature_stats.Manager
startTime time.Time
}gRPC 方法:
| 方法 | 说明 |
|---|---|
GetStats(name, reset) | 获取单个计数器的值,可选择性重置 |
QueryStats(pattern, reset) | 查询所有匹配子串模式的计数器 |
GetSysStats() | 系统统计:运行时间、goroutine 数、内存分配、GC |
GetStatsOnline(name) | 获取用户在线 map 的在线用户数 |
GetStatsOnlineIpList(name) | 获取在线 map 的 IP 列表及时间戳 |
GetAllOnlineUsers() | 列出所有有活跃 IP 的用户 |
GetSysStats 方法提供运行时诊断信息:
func (s *statsServer) GetSysStats(ctx context.Context, request *SysStatsRequest) (*SysStatsResponse, error) {
var rtm runtime.MemStats
runtime.ReadMemStats(&rtm)
return &SysStatsResponse{
Uptime: uint32(time.Since(s.startTime).Seconds()),
NumGoroutine: uint32(runtime.NumGoroutine()),
Alloc: rtm.Alloc,
TotalAlloc: rtm.TotalAlloc,
Sys: rtm.Sys,
Mallocs: rtm.Mallocs,
Frees: rtm.Frees,
LiveObjects: rtm.Mallocs - rtm.Frees,
NumGC: rtm.NumGC,
PauseTotalNs: rtm.PauseTotalNs,
}, nil
}为兼容 v2ray,StatsService 注册了两次——分别使用 xray.app.stats.command.StatsService 和 v2ray.core.app.stats.command.StatsService:
func (s *service) Register(server *grpc.Server) {
ss := NewStatsServer(s.statsManager)
RegisterStatsServiceServer(server, ss)
vCoreDesc := StatsService_ServiceDesc
vCoreDesc.ServiceName = "v2ray.core.app.stats.command.StatsService"
server.RegisterService(&vCoreDesc, ss)
}HandlerService
包: app/proxyman/command
运行时管理 inbound 和 outbound handler:
- 添加/移除 inbound handler
- 添加/移除 outbound handler
- 修改 inbound handler 设置
RoutingService
包: app/router/command
运行时路由管理:
- 根据指定上下文测试路由规则
- 查询路由表
LoggerService
包: app/log/command
- 重启日志记录器
- 通过流式传输跟踪日志输出
ObservatoryService
包: app/observatory/command
GetOutboundStatus():返回所有被监测出站的健康状态
ReflectionService
文件: app/commander/service.go
启用 gRPC 服务器反射以支持工具调试:
type reflectionService struct{}
func (r reflectionService) Register(s *grpc.Server) {
reflection.Register(s)
}配置
文件: infra/conf/api.go
{
"api": {
"tag": "api",
"listen": "127.0.0.1:10085",
"services": [
"HandlerService",
"StatsService",
"RoutingService",
"LoggerService",
"ObservatoryService",
"ReflectionService"
]
}
}Build() 方法将服务名称映射到对应的 Protobuf 配置:
func (c *APIConfig) Build() (*commander.Config, error) {
for _, s := range c.Services {
switch strings.ToLower(s) {
case "reflectionservice":
services = append(services, serial.ToTypedMessage(&commander.ReflectionConfig{}))
case "handlerservice":
services = append(services, serial.ToTypedMessage(&handlerservice.Config{}))
case "statsservice":
services = append(services, serial.ToTypedMessage(&statsservice.Config{}))
// ... 其他服务
}
}
}使用 outbound 模式(不设置 listen)时,需要:
- 一个标记为 API tag 的
dokodemo-doorinbound - 一条将该 inbound 导向 API outbound 的路由规则
{
"inbounds": [{
"tag": "api-in",
"protocol": "dokodemo-door",
"port": 10085,
"settings": { "address": "127.0.0.1" }
}],
"routing": {
"rules": [{
"inboundTag": ["api-in"],
"outboundTag": "api"
}]
}
}实现要点
Commander 通过
common.RegisterConfig((*Config)(nil), ...)注册自身,并从功能注册表中获取outbound.Manager。OutboundListener缓冲区容量为 4。如果 gRPC 服务器接受连接较慢且已有 4 个连接排队,额外的连接将被立即关闭(丢弃)。Outbound.Dispatch()方法通过<-closeSignal.Wait()阻塞,以在 gRPC 连接存续期间保持传输链路存活。Commander 关闭时调用
c.server.Stop(),这将强制终止所有活跃的 gRPC 流,没有优雅关闭机制。服务创建使用通用的
common.CreateObject()路径,该路径在全局注册表中查找 Protobuf 配置类型。这意味着第三方包可以通过注册新配置来扩展服务。listen字段启用直接 TCP 模式,配置更简单但绕过了 Xray 的路由系统。在此模式下不会注册 outbound handler。