Traffic Statistics
Xray's stats subsystem provides counter-based traffic measurement, user online tracking, and pub/sub channels for real-time event monitoring. It integrates with the dispatcher to transparently measure traffic per user, per inbound, and per outbound.
Architecture
flowchart TD
subgraph Stats Manager
M[Manager] --> C[Counters map]
M --> OM[OnlineMaps map]
M --> CH[Channels map]
end
subgraph Dispatcher Integration
D[DefaultDispatcher] --> SSW[SizeStatWriter]
SSW --> C
D --> OI[OnlineMap.AddIP]
OI --> OM
end
subgraph gRPC API
SS[StatsService] --> M
endStats Manager
File: app/stats/stats.go
The Manager is the central registry for all statistics objects:
type Manager struct {
access sync.RWMutex
counters map[string]*Counter
onlineMap map[string]*OnlineMap
channels map[string]*Channel
running bool
}It implements stats.Manager with methods:
| Method | Description |
|---|---|
RegisterCounter(name) | Create a new named counter |
UnregisterCounter(name) | Remove a counter |
GetCounter(name) | Look up a counter by name |
VisitCounters(visitor) | Iterate all counters with a callback |
RegisterOnlineMap(name) | Create a new named online map |
GetOnlineMap(name) | Look up an online map |
GetAllOnlineUsers() | Return all users with active IPs |
RegisterChannel(name) | Create a new pub/sub channel |
GetChannel(name) | Look up a channel |
Counter
File: app/stats/counter.go
A lock-free atomic counter:
type Counter struct {
value int64
}
func (c *Counter) Value() int64 {
return atomic.LoadInt64(&c.value)
}
func (c *Counter) Set(newValue int64) int64 {
return atomic.SwapInt64(&c.value, newValue)
}
func (c *Counter) Add(delta int64) int64 {
return atomic.AddInt64(&c.value, delta)
}Set() returns the old value (useful for "get and reset" operations via Set(0)). Add() returns the new value after adding.
SizeStatWriter
File: app/dispatcher/stats.go
The bridge between the data plane and the stats system:
type SizeStatWriter struct {
Counter stats.Counter
Writer buf.Writer
}
func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
w.Counter.Add(int64(mb.Len()))
return w.Writer.WriteMultiBuffer(mb)
}It wraps a buf.Writer and adds the byte count of every MultiBuffer to its counter before forwarding the data. This is a zero-copy interception -- the data is not modified.
Dispatcher Integration
File: app/dispatcher/default.go
The DefaultDispatcher.getLink() method wires up statistics:
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
uplinkReader, uplinkWriter := pipe.New(opt...)
downlinkReader, downlinkWriter := pipe.New(opt...)
inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}
if user != nil && len(user.Email) > 0 {
p := d.policy.ForLevel(user.Level)
if p.Stats.UserUplink {
name := "user>>>" + user.Email + ">>>traffic>>>uplink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
}
}
if p.Stats.UserDownlink {
name := "user>>>" + user.Email + ">>>traffic>>>downlink"
if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
}
}
if p.Stats.UserOnline {
name := "user>>>" + user.Email + ">>>online"
if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
om.AddIP(sessionInbounds.Source.Address.String())
}
}
}
}Counter Naming Convention
Counters follow a hierarchical >>> delimiter convention:
| Pattern | Example | Direction |
|---|---|---|
user>>>{email}>>>traffic>>>uplink | user>>>user1@example.com>>>traffic>>>uplink | Client -> Xray |
user>>>{email}>>>traffic>>>downlink | user>>>user1@example.com>>>traffic>>>downlink | Xray -> Client |
user>>>{email}>>>online | user>>>user1@example.com>>>online | Online status (OnlineMap) |
inbound>>>{tag}>>>traffic>>>uplink | inbound>>>vmess-in>>>traffic>>>uplink | Per-inbound uplink |
inbound>>>{tag}>>>traffic>>>downlink | inbound>>>vmess-in>>>traffic>>>downlink | Per-inbound downlink |
outbound>>>{tag}>>>traffic>>>uplink | outbound>>>freedom>>>traffic>>>uplink | Per-outbound uplink |
outbound>>>{tag}>>>traffic>>>downlink | outbound>>>freedom>>>traffic>>>downlink | Per-outbound downlink |
WrapLink
The WrapLink() function serves a similar purpose for DispatchLink() (used by reverse proxy and other internal callers):
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}
if user != nil && len(user.Email) > 0 {
if p.Stats.UserUplink {
link.Reader.(*buf.TimeoutWrapperReader).Counter = c
}
if p.Stats.UserDownlink {
link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
}
// Online tracking...
}
return link
}Note the asymmetry: for WrapLink, uplink is tracked on the reader (via TimeoutWrapperReader.Counter), while for getLink, uplink is tracked on the writer (via SizeStatWriter). This is because the link directions are inverted between the two paths.
OnlineMap
File: app/stats/online_map.go
Tracks which IPs are currently connected for a given user:
type OnlineMap struct {
ipList map[string]time.Time
access sync.RWMutex
lastCleanup time.Time
cleanupPeriod time.Duration // 10 seconds
}Key behavior:
AddIP(ip)adds or refreshes an IP's timestamp. Skips127.0.0.1.RemoveExpiredIPs()removes entries older than 20 seconds- Cleanup is triggered lazily: only when
AddIP()is called and 10+ seconds have elapsed since last cleanup Count()returns the number of currently active IPsList()returns all active IP strings
The 20-second expiry means an IP is considered "online" only if it has been actively sending traffic within the last 20 seconds.
Channel (Pub/Sub)
File: app/stats/channel.go
A publish-subscribe channel for real-time event broadcasting:
type Channel struct {
channel chan channelMessage
subscribers []chan interface{}
access sync.RWMutex
closed chan struct{}
blocking bool
bufferSize int
subsLimit int
}Publish modes:
- Blocking:
Publish()blocks if the channel buffer is full (waits for consumer) - Non-blocking: If buffer is full, spawns a goroutine to deliver asynchronously
Broadcast modes:
- Blocking: Broadcast to each subscriber blocks if subscriber's channel is full
- Non-blocking: If subscriber's channel is full, spawns a goroutine per subscriber
The default channel is non-blocking with buffer size 64.
Lifecycle:
func (c *Channel) Start() error {
c.closed = make(chan struct{})
go func() {
for {
select {
case pub := <-c.channel:
for _, sub := range c.Subscribers() {
pub.broadcastNonBlocking(sub)
}
case <-c.closed:
// Unsubscribe all and close
return
}
}
}()
}Policy-Based Activation
Stats collection is controlled by the policy system. Each user level has settings:
type Stats struct {
UserUplink bool
UserDownlink bool
UserOnline bool
}The stats config section must be present in the configuration (even empty) to enable the stats manager:
{
"stats": {},
"policy": {
"levels": {
"0": {
"statsUserUplink": true,
"statsUserDownlink": true,
"statsUserOnline": true
}
}
}
}Querying Statistics
Via the gRPC StatsService:
# Get a single counter
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Get and reset a counter
grpcurl -d '{"name": "...", "reset": true}' \
localhost:10085 xray.app.stats.command.StatsService/GetStats
# Query by pattern
grpcurl -d '{"pattern": "user>>>user1"}' \
localhost:10085 xray.app.stats.command.StatsService/QueryStats
# Get online user count
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline
# Get all online users
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsersImplementation Notes
Counters use
sync/atomicoperations exclusively -- no mutex locking for read/write. This means counter operations are lock-free and extremely fast (single CAS instruction on most architectures).The
GetOrRegisterCounter()helper (infeatures/stats) creates a counter if it doesn't exist, avoiding race conditions between the dispatcher checking for a counter and registering one.Manager.GetAllOnlineUsers()iterates all online maps and returns users who have at least one non-expired IP. It uses a write lock (not read lock) becauseIpTimeMap()may trigger cleanup internally.Traffic counters accumulate bytes indefinitely until explicitly reset via
Set(0). The gRPCGetStatswithreset: trueatomically reads and resets.The
SizeStatWritercounts the total bytes in aMultiBuffer(sum of all buffer lengths), which represents the payload size after Xray's internal buffering but before wire encoding.Online maps are per-user, not per-connection. Multiple connections from the same IP update the same timestamp. The 20-second expiry is a heuristic: if a user's connection goes idle for 20 seconds, they are removed from the online list.