Skip to content

流量统计

Xray 的统计子系统提供基于计数器的流量测量、用户在线追踪以及用于实时事件监控的发布/订阅通道。它与分发器(Dispatcher)集成,透明地按用户、按 inbound 和按 outbound 测量流量。

架构

mermaid
flowchart TD
    subgraph 统计管理器
        M[Manager] --> C[Counters 映射表]
        M --> OM[OnlineMaps 映射表]
        M --> CH[Channels 映射表]
    end

    subgraph 分发器集成
        D[DefaultDispatcher] --> SSW[SizeStatWriter]
        SSW --> C
        D --> OI[OnlineMap.AddIP]
        OI --> OM
    end

    subgraph gRPC API
        SS[StatsService] --> M
    end

统计管理器

文件: app/stats/stats.go

Manager 是所有统计对象的中央注册表:

go
type Manager struct {
    access    sync.RWMutex
    counters  map[string]*Counter
    onlineMap map[string]*OnlineMap
    channels  map[string]*Channel
    running   bool
}

它实现了 stats.Manager 接口,提供以下方法:

方法说明
RegisterCounter(name)创建新的命名计数器
UnregisterCounter(name)移除计数器
GetCounter(name)按名称查找计数器
VisitCounters(visitor)使用回调遍历所有计数器
RegisterOnlineMap(name)创建新的命名在线映射表
GetOnlineMap(name)查找在线映射表
GetAllOnlineUsers()返回所有有活跃 IP 的用户
RegisterChannel(name)创建新的发布/订阅通道
GetChannel(name)查找通道

Counter

文件: app/stats/counter.go

无锁原子计数器:

go
type Counter struct {
    value int64
}

func (c *Counter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *Counter) Set(newValue int64) int64 {
    return atomic.SwapInt64(&c.value, newValue)
}

func (c *Counter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

Set() 返回旧值(可通过 Set(0) 实现"读取并重置"操作)。 Add() 返回加法后的新值。

SizeStatWriter

文件: app/dispatcher/stats.go

数据面与统计系统之间的桥梁:

go
type SizeStatWriter struct {
    Counter stats.Counter
    Writer  buf.Writer
}

func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
    w.Counter.Add(int64(mb.Len()))
    return w.Writer.WriteMultiBuffer(mb)
}

它包装了一个 buf.Writer,在转发数据前将每个 MultiBuffer 的字节数累加到计数器中。这是零拷贝拦截——数据本身不会被修改。

分发器集成

文件: app/dispatcher/default.go

DefaultDispatcher.getLink() 方法连接统计系统:

go
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
    uplinkReader, uplinkWriter := pipe.New(opt...)
    downlinkReader, downlinkWriter := pipe.New(opt...)

    inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
    outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}

    if user != nil && len(user.Email) > 0 {
        p := d.policy.ForLevel(user.Level)

        if p.Stats.UserUplink {
            name := "user>>>" + user.Email + ">>>traffic>>>uplink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
            }
        }

        if p.Stats.UserDownlink {
            name := "user>>>" + user.Email + ">>>traffic>>>downlink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
            }
        }

        if p.Stats.UserOnline {
            name := "user>>>" + user.Email + ">>>online"
            if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
                om.AddIP(sessionInbounds.Source.Address.String())
            }
        }
    }
}

计数器命名规范

计数器遵循以 >>> 为分隔符的层级命名规范:

模式示例方向
user>>>{email}>>>traffic>>>uplinkuser>>>user1@example.com>>>traffic>>>uplink客户端 -> Xray
user>>>{email}>>>traffic>>>downlinkuser>>>user1@example.com>>>traffic>>>downlinkXray -> 客户端
user>>>{email}>>>onlineuser>>>user1@example.com>>>online在线状态(OnlineMap)
inbound>>>{tag}>>>traffic>>>uplinkinbound>>>vmess-in>>>traffic>>>uplink按 inbound 上行
inbound>>>{tag}>>>traffic>>>downlinkinbound>>>vmess-in>>>traffic>>>downlink按 inbound 下行
outbound>>>{tag}>>>traffic>>>uplinkoutbound>>>freedom>>>traffic>>>uplink按 outbound 上行
outbound>>>{tag}>>>traffic>>>downlinkoutbound>>>freedom>>>traffic>>>downlink按 outbound 下行

WrapLink() 函数为 DispatchLink() 提供类似功能(供反向代理和其他内部调用者使用):

go
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
    link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}

    if user != nil && len(user.Email) > 0 {
        if p.Stats.UserUplink {
            link.Reader.(*buf.TimeoutWrapperReader).Counter = c
        }
        if p.Stats.UserDownlink {
            link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
        }
        // 在线追踪...
    }
    return link
}

注意不对称性:在 WrapLink 中,上行流量通过 reader(TimeoutWrapperReader.Counter)追踪;而在 getLink 中,上行流量通过 writer(SizeStatWriter)追踪。这是因为两种路径中的链路方向是反转的。

OnlineMap

文件: app/stats/online_map.go

追踪指定用户当前连接的 IP 地址:

go
type OnlineMap struct {
    ipList        map[string]time.Time
    access        sync.RWMutex
    lastCleanup   time.Time
    cleanupPeriod time.Duration  // 10 秒
}

关键行为:

  • AddIP(ip) 添加或刷新 IP 的时间戳,跳过 127.0.0.1
  • RemoveExpiredIPs() 移除超过 20 秒的条目
  • 清理通过惰性触发:仅当调用 AddIP() 且距上次清理已超过 10 秒时才执行
  • Count() 返回当前活跃 IP 数量
  • List() 返回所有活跃 IP 字符串

20 秒过期意味着只有在过去 20 秒内有活跃流量发送的 IP 才被视为"在线"。

Channel(发布/订阅)

文件: app/stats/channel.go

用于实时事件广播的发布-订阅通道:

go
type Channel struct {
    channel     chan channelMessage
    subscribers []chan interface{}
    access      sync.RWMutex
    closed      chan struct{}
    blocking    bool
    bufferSize  int
    subsLimit   int
}

发布模式:

  • 阻塞模式: 如果通道缓冲区已满,Publish() 将阻塞等待消费者
  • 非阻塞模式: 如果缓冲区已满,启动 goroutine 异步投递

广播模式:

  • 阻塞模式: 向每个订阅者广播时,如果订阅者通道已满则阻塞
  • 非阻塞模式: 如果订阅者通道已满,为每个订阅者启动 goroutine

默认通道为非阻塞模式,缓冲区大小为 64。

生命周期:

go
func (c *Channel) Start() error {
    c.closed = make(chan struct{})
    go func() {
        for {
            select {
            case pub := <-c.channel:
                for _, sub := range c.Subscribers() {
                    pub.broadcastNonBlocking(sub)
                }
            case <-c.closed:
                // 取消所有订阅并关闭
                return
            }
        }
    }()
}

基于策略的激活

统计收集由策略系统控制。每个用户等级包含以下设置:

go
type Stats struct {
    UserUplink   bool
    UserDownlink bool
    UserOnline   bool
}

配置中必须存在 stats 配置段(即使为空)才能启用统计管理器:

json
{
    "stats": {},
    "policy": {
        "levels": {
            "0": {
                "statsUserUplink": true,
                "statsUserDownlink": true,
                "statsUserOnline": true
            }
        }
    }
}

查询统计信息

通过 gRPC StatsService:

bash
# 获取单个计数器
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# 获取并重置计数器
grpcurl -d '{"name": "...", "reset": true}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# 按模式查询
grpcurl -d '{"pattern": "user>>>user1"}' \
    localhost:10085 xray.app.stats.command.StatsService/QueryStats

# 获取在线用户数
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline

# 获取所有在线用户
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsers

实现要点

  • Counter 完全使用 sync/atomic 操作——读写无需互斥锁。这意味着计数器操作是无锁的,在大多数架构上只需一条 CAS 指令,速度极快。

  • GetOrRegisterCounter() 辅助函数(位于 features/stats)在计数器不存在时创建它,避免分发器在检查和注册计数器之间出现竞态条件。

  • Manager.GetAllOnlineUsers() 遍历所有在线映射表并返回至少有一个未过期 IP 的用户。它使用写锁(而非读锁),因为 IpTimeMap() 可能在内部触发清理操作。

  • 流量计数器会无限累加,直到通过 Set(0) 显式重置。gRPC 的 GetStats 配合 reset: true 可以原子性地读取并重置。

  • SizeStatWriter 统计 MultiBuffer 中的总字节数(所有缓冲区长度之和),这代表 Xray 内部缓冲后、线路编码前的载荷大小。

  • 在线映射表是按用户而非按连接的。来自同一 IP 的多个连接会更新同一时间戳。20 秒过期是一种启发式策略:如果用户的连接空闲超过 20 秒,将从在线列表中移除。

用于重新实现目的的技术分析。