Skip to content

Pipe & Buffer System

The pipe and buffer systems form the backbone of data transfer in Xray-core. Every byte passing through the proxy flows through these abstractions.

Buffer (common/buf/)

Buffer Structure

go
// common/buf/buffer.go
type Buffer struct {
    v         []byte          // underlying byte slice
    start     int32           // read cursor
    end       int32           // write cursor
    ownership ownership       // managed, unmanaged, or bytespool
    UDP       *net.Destination // per-packet UDP destination
}

Key properties:

  • Standard size: 8192 bytes (8KB)
  • Pooled: Managed buffers are recycled via sync.Pool
  • Cursor-based: start..end defines the active data window
  • UDP metadata: The UDP field carries per-packet destination for UDP proxying

Buffer Pool

go
const Size = 8192
var pool = bytespool.GetPool(Size)

func New() *Buffer {
    buf := pool.Get().([]byte)
    return &Buffer{v: buf[:Size]}
}

func (b *Buffer) Release() {
    if cap(b.v) == Size {
        pool.Put(b.v)
    }
}

For larger allocations, bytespool provides size-class-based pools:

go
// common/bytespool/pool.go
// Pool sizes: 1K, 2K, 4K, 8K, 16K, 32K, 64K, 128K, ...
func Alloc(size int32) []byte  // allocate from pool
func Free(b []byte)            // return to pool

MultiBuffer

A MultiBuffer is a slice of *Buffer, used for batch operations:

go
type MultiBuffer []*Buffer

func (mb MultiBuffer) Len() int32      // total bytes across all buffers
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // copy to flat byte slice

Reader & Writer Interfaces

go
type Reader interface {
    ReadMultiBuffer() (MultiBuffer, error)
}

type Writer interface {
    WriteMultiBuffer(MultiBuffer) error
}

type TimeoutReader interface {
    Reader
    ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}

buf.Copy — The Core Transfer Loop

Almost all data transfer uses buf.Copy():

go
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
    for {
        buffer, err := reader.ReadMultiBuffer()
        if !buffer.IsEmpty() {
            for _, opt := range options {
                opt(&optHolder)  // e.g., UpdateActivity(timer)
            }
            if werr := writer.WriteMultiBuffer(buffer); werr != nil {
                return werr
            }
        }
        if err != nil {
            return err  // io.EOF = normal completion
        }
    }
}

Copy options:

  • UpdateActivity(timer) — reset idle timeout on each transfer
  • CountSize(counter) — count bytes transferred

BufferedWriter

BufferedWriter batches small writes:

go
type BufferedWriter struct {
    writer  Writer
    buffer  *Buffer
    buffered bool
}

func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
    if w.buffered {
        // Accumulate in buffer
        // Flush when buffer is full
    } else {
        // Direct passthrough to underlying writer
    }
}

func (w *BufferedWriter) SetBuffered(b bool) error {
    if !b && w.buffer != nil {
        // Flush buffered data
    }
}

This is used during protocol header encoding: buffer the header, then flush along with the first data payload, ensuring they go out in one TCP segment.

Pipe (transport/pipe/)

The pipe connects inbound and outbound with backpressure-aware buffering.

Creating a Pipe

go
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
    p := &pipe{
        readSignal:  signal.NewNotifier(),
        writeSignal: signal.NewNotifier(),
        done:        done.New(),
        errChan:     make(chan error, 1),
        option: pipeOption{limit: -1},
    }
    return &Reader{pipe: p}, &Writer{pipe: p}
}

Options:

  • WithSizeLimit(limit) — backpressure threshold in bytes (from policy)
  • WithoutSizeLimit() — unlimited buffer (-1)
  • DiscardOverflow() — drop writes when full instead of blocking

Internal Pipe State

go
// transport/pipe/impl.go
type pipe struct {
    readSignal  *signal.Notifier  // signal reader that data is available
    writeSignal *signal.Notifier  // signal writer that buffer has space
    done        *done.Instance    // pipe closed
    errChan     chan error         // close error
    option      pipeOption
    data        buf.MultiBuffer   // buffered data
    state       int32             // active, closed, errored
}

Write Path

go
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
    for {
        // Check if pipe is done
        if p.done.Done() { return io.ErrClosedPipe }

        // Check size limit
        if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
            // Block: wait for reader to consume data
            // OR discard if DiscardOverflow is set
            p.writeSignal.Wait()
            continue
        }

        // Append to buffer
        p.data = append(p.data, mb...)
        p.readSignal.Signal()  // wake up reader
        return nil
    }
}

Read Path

go
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
    for {
        if !p.data.IsEmpty() {
            mb := p.data
            p.data = nil
            p.writeSignal.Signal()  // wake up blocked writer
            return mb, nil
        }
        if p.done.Done() {
            return nil, io.EOF
        }
        p.readSignal.Wait()  // block until data available
    }
}

func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
    // Same but with timeout via select + timer
}

Backpressure Flow

mermaid
sequenceDiagram
    participant Inbound as Inbound Proxy
    participant PW as Pipe Writer
    participant Pipe as Pipe Buffer
    participant PR as Pipe Reader
    participant Outbound as Outbound Proxy

    Inbound->>PW: WriteMultiBuffer(data)
    PW->>Pipe: Append to buffer

    alt Buffer below limit
        PW->>PR: Signal (data available)
        PR->>Outbound: ReadMultiBuffer()
        Outbound->>Outbound: Forward to remote
        PR->>PW: Signal (space available)
    else Buffer at limit
        PW->>PW: Block (wait for space)
        Note over PW: Backpressure!<br/>Inbound slows down
        PR->>Outbound: ReadMultiBuffer()
        PR->>PW: Signal (space available)
        PW->>PW: Resume writing
    end

Pipe Size Limits (Policy)

Buffer sizes come from the policy system:

go
// features/policy/policy.go
type Buffer struct {
    PerConnection int32  // pipe size limit per connection
}

// Configured per user level:
// Level 0: 10240 bytes (10KB)
// Level 1+: configurable

From context:

go
func OptionsFromContext(ctx context.Context) []Option {
    bp := policy.BufferPolicyFromContext(ctx)
    if bp.PerConnection >= 0 {
        return []Option{WithSizeLimit(bp.PerConnection)}
    }
    return []Option{WithoutSizeLimit()}
}

The transport.Link pairs a reader and writer:

go
// transport/link.go
type Link struct {
    Reader buf.Reader
    Writer buf.Writer
}

The dispatcher creates two linked Links:

InboundLink:                      OutboundLink:
  Reader = downlinkPipeReader       Reader = uplinkPipeReader
  Writer = uplinkPipeWriter         Writer = downlinkPipeWriter

Data written to InboundLink.Writer is read from OutboundLink.Reader (uplink). Data written to OutboundLink.Writer is read from InboundLink.Reader (downlink).

Signal System (common/signal/)

Notifier

Non-blocking signal for waking goroutines:

go
type Notifier struct {
    c chan struct{}
}

func (n *Notifier) Signal() {
    select {
    case n.c <- struct{}{}: // signal sent
    default: // already signaled, skip
    }
}

func (n *Notifier) Wait() <-chan struct{} {
    return n.c
}

ActivityTimer

Tracks connection activity for idle timeout:

go
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer

func (t *ActivityTimer) SetTimeout(timeout time.Duration)  // change timeout
// Called by buf.Copy with UpdateActivity option

On each data transfer, UpdateActivity resets the timer. If no activity for the timeout duration, cancel() is called, terminating both goroutines (upload + download).

Implementation Notes

  1. Buffer pooling is essential: Without it, GC pressure from millions of 8KB allocations kills performance. Use a sync.Pool equivalent.

  2. MultiBuffer enables batch I/O: Reading/writing multiple chunks at once reduces syscall overhead. A single ReadMultiBuffer() call might return several buffers.

  3. Backpressure prevents OOM: Without pipe size limits, a fast sender + slow receiver causes unbounded memory growth. The default 10KB limit is intentionally small.

  4. Signal-based vs channel-based: The Notifier is non-blocking signal (trySignal semantics). This avoids goroutine leaks when both sides race to signal.

  5. Buffer.UDP is critical for UDP: Each buffer can carry a different destination address. This is how XUDP and TUN handle per-packet routing.

  6. BufferedWriter for header batching: Always buffer the protocol header + first payload together. Sending them separately reveals the header size to traffic analysis.

Technical analysis for re-implementation purposes.