Skip to content

Traffic Statistics

Xray's stats subsystem provides counter-based traffic measurement, user online tracking, and pub/sub channels for real-time event monitoring. It integrates with the dispatcher to transparently measure traffic per user, per inbound, and per outbound.

Architecture

mermaid
flowchart TD
    subgraph Stats Manager
        M[Manager] --> C[Counters map]
        M --> OM[OnlineMaps map]
        M --> CH[Channels map]
    end

    subgraph Dispatcher Integration
        D[DefaultDispatcher] --> SSW[SizeStatWriter]
        SSW --> C
        D --> OI[OnlineMap.AddIP]
        OI --> OM
    end

    subgraph gRPC API
        SS[StatsService] --> M
    end

Stats Manager

File: app/stats/stats.go

The Manager is the central registry for all statistics objects:

go
type Manager struct {
    access    sync.RWMutex
    counters  map[string]*Counter
    onlineMap map[string]*OnlineMap
    channels  map[string]*Channel
    running   bool
}

It implements stats.Manager with methods:

MethodDescription
RegisterCounter(name)Create a new named counter
UnregisterCounter(name)Remove a counter
GetCounter(name)Look up a counter by name
VisitCounters(visitor)Iterate all counters with a callback
RegisterOnlineMap(name)Create a new named online map
GetOnlineMap(name)Look up an online map
GetAllOnlineUsers()Return all users with active IPs
RegisterChannel(name)Create a new pub/sub channel
GetChannel(name)Look up a channel

Counter

File: app/stats/counter.go

A lock-free atomic counter:

go
type Counter struct {
    value int64
}

func (c *Counter) Value() int64 {
    return atomic.LoadInt64(&c.value)
}

func (c *Counter) Set(newValue int64) int64 {
    return atomic.SwapInt64(&c.value, newValue)
}

func (c *Counter) Add(delta int64) int64 {
    return atomic.AddInt64(&c.value, delta)
}

Set() returns the old value (useful for "get and reset" operations via Set(0)). Add() returns the new value after adding.

SizeStatWriter

File: app/dispatcher/stats.go

The bridge between the data plane and the stats system:

go
type SizeStatWriter struct {
    Counter stats.Counter
    Writer  buf.Writer
}

func (w *SizeStatWriter) WriteMultiBuffer(mb buf.MultiBuffer) error {
    w.Counter.Add(int64(mb.Len()))
    return w.Writer.WriteMultiBuffer(mb)
}

It wraps a buf.Writer and adds the byte count of every MultiBuffer to its counter before forwarding the data. This is a zero-copy interception -- the data is not modified.

Dispatcher Integration

File: app/dispatcher/default.go

The DefaultDispatcher.getLink() method wires up statistics:

go
func (d *DefaultDispatcher) getLink(ctx context.Context) (*transport.Link, *transport.Link) {
    uplinkReader, uplinkWriter := pipe.New(opt...)
    downlinkReader, downlinkWriter := pipe.New(opt...)

    inboundLink := &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}
    outboundLink := &transport.Link{Reader: uplinkReader, Writer: downlinkWriter}

    if user != nil && len(user.Email) > 0 {
        p := d.policy.ForLevel(user.Level)

        if p.Stats.UserUplink {
            name := "user>>>" + user.Email + ">>>traffic>>>uplink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                inboundLink.Writer = &SizeStatWriter{Counter: c, Writer: inboundLink.Writer}
            }
        }

        if p.Stats.UserDownlink {
            name := "user>>>" + user.Email + ">>>traffic>>>downlink"
            if c, _ := stats.GetOrRegisterCounter(d.stats, name); c != nil {
                outboundLink.Writer = &SizeStatWriter{Counter: c, Writer: outboundLink.Writer}
            }
        }

        if p.Stats.UserOnline {
            name := "user>>>" + user.Email + ">>>online"
            if om, _ := stats.GetOrRegisterOnlineMap(d.stats, name); om != nil {
                om.AddIP(sessionInbounds.Source.Address.String())
            }
        }
    }
}

Counter Naming Convention

Counters follow a hierarchical >>> delimiter convention:

PatternExampleDirection
user>>>{email}>>>traffic>>>uplinkuser>>>user1@example.com>>>traffic>>>uplinkClient -> Xray
user>>>{email}>>>traffic>>>downlinkuser>>>user1@example.com>>>traffic>>>downlinkXray -> Client
user>>>{email}>>>onlineuser>>>user1@example.com>>>onlineOnline status (OnlineMap)
inbound>>>{tag}>>>traffic>>>uplinkinbound>>>vmess-in>>>traffic>>>uplinkPer-inbound uplink
inbound>>>{tag}>>>traffic>>>downlinkinbound>>>vmess-in>>>traffic>>>downlinkPer-inbound downlink
outbound>>>{tag}>>>traffic>>>uplinkoutbound>>>freedom>>>traffic>>>uplinkPer-outbound uplink
outbound>>>{tag}>>>traffic>>>downlinkoutbound>>>freedom>>>traffic>>>downlinkPer-outbound downlink

The WrapLink() function serves a similar purpose for DispatchLink() (used by reverse proxy and other internal callers):

go
func WrapLink(ctx context.Context, policyManager policy.Manager, statsManager stats.Manager, link *transport.Link) *transport.Link {
    link.Reader = &buf.TimeoutWrapperReader{Reader: link.Reader}

    if user != nil && len(user.Email) > 0 {
        if p.Stats.UserUplink {
            link.Reader.(*buf.TimeoutWrapperReader).Counter = c
        }
        if p.Stats.UserDownlink {
            link.Writer = &SizeStatWriter{Counter: c, Writer: link.Writer}
        }
        // Online tracking...
    }
    return link
}

Note the asymmetry: for WrapLink, uplink is tracked on the reader (via TimeoutWrapperReader.Counter), while for getLink, uplink is tracked on the writer (via SizeStatWriter). This is because the link directions are inverted between the two paths.

OnlineMap

File: app/stats/online_map.go

Tracks which IPs are currently connected for a given user:

go
type OnlineMap struct {
    ipList        map[string]time.Time
    access        sync.RWMutex
    lastCleanup   time.Time
    cleanupPeriod time.Duration  // 10 seconds
}

Key behavior:

  • AddIP(ip) adds or refreshes an IP's timestamp. Skips 127.0.0.1.
  • RemoveExpiredIPs() removes entries older than 20 seconds
  • Cleanup is triggered lazily: only when AddIP() is called and 10+ seconds have elapsed since last cleanup
  • Count() returns the number of currently active IPs
  • List() returns all active IP strings

The 20-second expiry means an IP is considered "online" only if it has been actively sending traffic within the last 20 seconds.

Channel (Pub/Sub)

File: app/stats/channel.go

A publish-subscribe channel for real-time event broadcasting:

go
type Channel struct {
    channel     chan channelMessage
    subscribers []chan interface{}
    access      sync.RWMutex
    closed      chan struct{}
    blocking    bool
    bufferSize  int
    subsLimit   int
}

Publish modes:

  • Blocking: Publish() blocks if the channel buffer is full (waits for consumer)
  • Non-blocking: If buffer is full, spawns a goroutine to deliver asynchronously

Broadcast modes:

  • Blocking: Broadcast to each subscriber blocks if subscriber's channel is full
  • Non-blocking: If subscriber's channel is full, spawns a goroutine per subscriber

The default channel is non-blocking with buffer size 64.

Lifecycle:

go
func (c *Channel) Start() error {
    c.closed = make(chan struct{})
    go func() {
        for {
            select {
            case pub := <-c.channel:
                for _, sub := range c.Subscribers() {
                    pub.broadcastNonBlocking(sub)
                }
            case <-c.closed:
                // Unsubscribe all and close
                return
            }
        }
    }()
}

Policy-Based Activation

Stats collection is controlled by the policy system. Each user level has settings:

go
type Stats struct {
    UserUplink   bool
    UserDownlink bool
    UserOnline   bool
}

The stats config section must be present in the configuration (even empty) to enable the stats manager:

json
{
    "stats": {},
    "policy": {
        "levels": {
            "0": {
                "statsUserUplink": true,
                "statsUserDownlink": true,
                "statsUserOnline": true
            }
        }
    }
}

Querying Statistics

Via the gRPC StatsService:

bash
# Get a single counter
grpcurl -d '{"name": "user>>>user1@example.com>>>traffic>>>uplink"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Get and reset a counter
grpcurl -d '{"name": "...", "reset": true}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStats

# Query by pattern
grpcurl -d '{"pattern": "user>>>user1"}' \
    localhost:10085 xray.app.stats.command.StatsService/QueryStats

# Get online user count
grpcurl -d '{"name": "user>>>user1@example.com>>>online"}' \
    localhost:10085 xray.app.stats.command.StatsService/GetStatsOnline

# Get all online users
grpcurl localhost:10085 xray.app.stats.command.StatsService/GetAllOnlineUsers

Implementation Notes

  • Counters use sync/atomic operations exclusively -- no mutex locking for read/write. This means counter operations are lock-free and extremely fast (single CAS instruction on most architectures).

  • The GetOrRegisterCounter() helper (in features/stats) creates a counter if it doesn't exist, avoiding race conditions between the dispatcher checking for a counter and registering one.

  • Manager.GetAllOnlineUsers() iterates all online maps and returns users who have at least one non-expired IP. It uses a write lock (not read lock) because IpTimeMap() may trigger cleanup internally.

  • Traffic counters accumulate bytes indefinitely until explicitly reset via Set(0). The gRPC GetStats with reset: true atomically reads and resets.

  • The SizeStatWriter counts the total bytes in a MultiBuffer (sum of all buffer lengths), which represents the payload size after Xray's internal buffering but before wire encoding.

  • Online maps are per-user, not per-connection. Multiple connections from the same IP update the same timestamp. The 20-second expiry is a heuristic: if a user's connection goes idle for 20 seconds, they are removed from the online list.

Technical analysis for re-implementation purposes.