流量统计
Xray 的统计子系统提供基于计数器的流量测量、用户在线追踪以及用于实时事件监控的发布/订阅通道。它与分发器(Dispatcher)集成,透明地按用户、按 inbound 和按 outbound 测量流量。
架构
flowchart TD
subgraph 统计管理器
M[Manager] --> C[Counters 映射表]
M --> OM[OnlineMaps 映射表]
M --> CH[Channels 映射表]
end
subgraph 分发器集成
D[DefaultDispatcher] --> SSW[SizeStatWriter]
SSW --> C
D --> OI[OnlineMap.AddIP]
OI --> OM
end
subgraph gRPC API
SS[StatsService] --> M
end统计管理器
文件: app/stats/stats.go
Manager 是所有统计对象的中央注册表:
type Manager struct {
access sync.RWMutex
counters map[string]*Counter
onlineMap map[string]*OnlineMap
channels map[string]*Channel
running bool
}它实现了 stats.Manager 接口,提供以下方法:
| 方法 | 说明 |
|---|---|
RegisterCounter(name) | 创建新的命名计数器 |
UnregisterCounter(name) | 移除计数器 |
GetCounter(name) | 按名称查找计数器 |
VisitCounters(visitor) | 使用回调遍历所有计数器 |
RegisterOnlineMap(name) | 创建新的命名在线映射表 |
GetOnlineMap(name) | 查找在线映射表 |
GetAllOnlineUsers() | 返回所有有活跃 IP 的用户 |
RegisterChannel(name) | 创建新的发布/订阅通道 |
GetChannel(name) | 查找通道 |
Counter
文件: app/stats/counter.go
无锁原子计数器:
type Counter struct {
value int64
}
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}Set() 返回旧值(可通过 Set(0) 实现"读取并重置"操作)。 Add() 返回加法后的新值。
SizeStatWriter
文件: app/dispatcher/stats.go
数据面与统计系统之间的桥梁:
type SizeStatWriter struct {
Counter stats.Counter
Writer buf.Writer
}
func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}它包装了一个 buf.Writer,在转发数据前将每个 MultiBuffer 的字节数累加到计数器中。这是零拷贝拦截——数据本身不会被修改。
分发器集成
文件: app/dispatcher/default.go
DefaultDispatcher.getLink() 方法连接统计系统:
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}
if user != nil && len(user.Email) > 0 {
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
}
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
}
}
if p.Stats.UserOnline {
name := "user>>>" + user.Email + ">>>online"
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
om.AddIP(sessionInbounds.Source.Address.String())
}
}
}
}计数器命名规范
计数器遵循以 >>> 为分隔符的层级命名规范:
| 模式 | 示例 | 方向 |
|---|---|---|
user>>>{email}>>>traffic>>>uplink | user>>>user1@example.com>>>traffic>>>uplink | 客户端 -> Xray |
user>>>{email}>>>traffic>>>downlink | user>>>user1@example.com>>>traffic>>>downlink | Xray -> 客户端 |
user>>>{email}>>>online | user>>>user1@example.com>>>online | 在线状态(OnlineMap) |
inbound>>>{tag}>>>traffic>>>uplink | inbound>>>vmess-in>>>traffic>>>uplink | 按 inbound 上行 |
inbound>>>{tag}>>>traffic>>>downlink | inbound>>>vmess-in>>>traffic>>>downlink | 按 inbound 下行 |
outbound>>>{tag}>>>traffic>>>uplink | outbound>>>freedom>>>traffic>>>uplink | 按 outbound 上行 |
outbound>>>{tag}>>>traffic>>>downlink | outbound>>>freedom>>>traffic>>>downlink | 按 outbound 下行 |
WrapLink
WrapLink() 函数为 DispatchLink() 提供类似功能(供反向代理和其他内部调用者使用):
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
if user != nil && len(user.Email) > 0 {
if p.Stats.UserUplink {
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
}
if p.Stats.UserDownlink {
link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
}
// 在线追踪...
}
return link
}注意不对称性:在 WrapLink 中,上行流量通过 reader(TimeoutWrapperReader.Counter)追踪;而在 getLink 中,上行流量通过 writer(SizeStatWriter)追踪。这是因为两种路径中的链路方向是反转的。
OnlineMap
文件: app/stats/online_map.go
追踪指定用户当前连接的 IP 地址:
type OnlineMap struct {
ipList map[string]time.Time
access sync.RWMutex
lastCleanup time.Time
cleanupPeriod time.Duration // 10 秒
}关键行为:
AddIP(ip)添加或刷新 IP 的时间戳,跳过127.0.0.1RemoveExpiredIPs()移除超过 20 秒的条目- 清理通过惰性触发:仅当调用
AddIP()且距上次清理已超过 10 秒时才执行 Count()返回当前活跃 IP 数量List()返回所有活跃 IP 字符串
20 秒过期意味着只有在过去 20 秒内有活跃流量发送的 IP 才被视为"在线"。
Channel(发布/订阅)
文件: app/stats/channel.go
用于实时事件广播的发布-订阅通道:
type Channel struct {
channel chan channelMessage
subscribers []chan interface{}
access sync.RWMutex
closed chan struct{}
blocking bool
bufferSize int
subsLimit int
}发布模式:
- 阻塞模式: 如果通道缓冲区已满,
Publish()将阻塞等待消费者 - 非阻塞模式: 如果缓冲区已满,启动 goroutine 异步投递
广播模式:
- 阻塞模式: 向每个订阅者广播时,如果订阅者通道已满则阻塞
- 非阻塞模式: 如果订阅者通道已满,为每个订阅者启动 goroutine
默认通道为非阻塞模式,缓冲区大小为 64。
生命周期:
func (c *Channel) Start() error {
c.closed = make(chan struct{})
go func() {
for {
select {
case pub := <-c.channel:
for _, sub := range c.Subscribers() {
pub.broadcastNonBlocking(sub)
}
case <-c.closed:
// 取消所有订阅并关闭
return
}
}
}()
}基于策略的激活
统计收集由策略系统控制。每个用户等级包含以下设置:
type Stats struct {
UserUplink bool
UserDownlink bool
UserOnline bool
}配置中必须存在 stats 配置段(即使为空)才能启用统计管理器:
{
"stats": {},
"policy": {
"levels": {
"0": {
"statsUserUplink": true,
"statsUserDownlink": true,
"statsUserOnline": true
}
}
}
}查询统计信息
通过 gRPC StatsService:
# 获取单个计数器
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# 获取并重置计数器
grpcurl -d '{"name": "...", "reset": true}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# 按模式查询
grpcurl -d '{"pattern": "user>>>user1"}' \
localhost:10085 xray.app.stats.command.StatsService/QueryStats
# 获取在线用户数
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline
# 获取所有在线用户
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsers实现要点
Counter 完全使用
sync/atomic操作——读写无需互斥锁。这意味着计数器操作是无锁的,在大多数架构上只需一条 CAS 指令,速度极快。GetOrRegisterCounter()辅助函数(位于features/stats)在计数器不存在时创建它,避免分发器在检查和注册计数器之间出现竞态条件。Manager.GetAllOnlineUsers()遍历所有在线映射表并返回至少有一个未过期 IP 的用户。它使用写锁(而非读锁),因为IpTimeMap()可能在内部触发清理操作。流量计数器会无限累加,直到通过
Set(0)显式重置。gRPC 的GetStats配合reset: true可以原子性地读取并重置。SizeStatWriter统计MultiBuffer中的总字节数(所有缓冲区长度之和),这代表 Xray 内部缓冲后、线路编码前的载荷大小。在线映射表是按用户而非按连接的。来自同一 IP 的多个连接会更新同一时间戳。20 秒过期是一种启发式策略:如果用户的连接空闲超过 20 秒,将从在线列表中移除。