Pipe & Buffer System
The pipe and buffer systems form the backbone of data transfer in Xray-core. Every byte passing through the proxy flows through these abstractions.
Buffer (common/buf/)
Buffer Structure
// common/buf/buffer.go
type Buffer struct {
v []byte // underlying byte slice
start int32 // read cursor
end int32 // write cursor
ownership ownership // managed, unmanaged, or bytespool
UDP *net.Destination // per-packet UDP destination
}Key properties:
- Standard size: 8192 bytes (8KB)
- Pooled: Managed buffers are recycled via
sync.Pool - Cursor-based:
start..enddefines the active data window - UDP metadata: The
UDPfield carries per-packet destination for UDP proxying
Buffer Pool
const Size = 8192
var pool = bytespool.GetPool(Size)
func New() *Buffer {
buf := pool.Get().([]byte)
return &Buffer{v: buf[:Size]}
}
func (b *Buffer) Release() {
if cap(b.v) == Size {
pool.Put(b.v)
}
}For larger allocations, bytespool provides size-class-based pools:
// common/bytespool/pool.go
// Pool sizes: 1K, 2K, 4K, 8K, 16K, 32K, 64K, 128K, ...
func Alloc(size int32) []byte // allocate from pool
func Free(b []byte) // return to poolMultiBuffer
A MultiBuffer is a slice of *Buffer, used for batch operations:
type MultiBuffer []*Buffer
func (mb MultiBuffer) Len() int32 // total bytes across all buffers
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // copy to flat byte sliceReader & Writer Interfaces
type Reader interface {
ReadMultiBuffer() (MultiBuffer, error)
}
type Writer interface {
WriteMultiBuffer(MultiBuffer) error
}
type TimeoutReader interface {
Reader
ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}buf.Copy — The Core Transfer Loop
Almost all data transfer uses buf.Copy():
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for {
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
for _, opt := range options {
opt(&optHolder) // e.g., UpdateActivity(timer)
}
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
}
}
if err != nil {
return err // io.EOF = normal completion
}
}
}Copy options:
UpdateActivity(timer)— reset idle timeout on each transferCountSize(counter)— count bytes transferred
BufferedWriter
BufferedWriter batches small writes:
type BufferedWriter struct {
writer Writer
buffer *Buffer
buffered bool
}
func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
if w.buffered {
// Accumulate in buffer
// Flush when buffer is full
} else {
// Direct passthrough to underlying writer
}
}
func (w *BufferedWriter) SetBuffered(b bool) error {
if !b && w.buffer != nil {
// Flush buffered data
}
}This is used during protocol header encoding: buffer the header, then flush along with the first data payload, ensuring they go out in one TCP segment.
Pipe (transport/pipe/)
The pipe connects inbound and outbound with backpressure-aware buffering.
Creating a Pipe
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
p := &pipe{
readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(),
done: done.New(),
errChan: make(chan error, 1),
option: pipeOption{limit: -1},
}
return &Reader{pipe: p}, &Writer{pipe: p}
}Options:
WithSizeLimit(limit)— backpressure threshold in bytes (from policy)WithoutSizeLimit()— unlimited buffer (-1)DiscardOverflow()— drop writes when full instead of blocking
Internal Pipe State
// transport/pipe/impl.go
type pipe struct {
readSignal *signal.Notifier // signal reader that data is available
writeSignal *signal.Notifier // signal writer that buffer has space
done *done.Instance // pipe closed
errChan chan error // close error
option pipeOption
data buf.MultiBuffer // buffered data
state int32 // active, closed, errored
}Write Path
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
for {
// Check if pipe is done
if p.done.Done() { return io.ErrClosedPipe }
// Check size limit
if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
// Block: wait for reader to consume data
// OR discard if DiscardOverflow is set
p.writeSignal.Wait()
continue
}
// Append to buffer
p.data = append(p.data, mb...)
p.readSignal.Signal() // wake up reader
return nil
}
}Read Path
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
if !p.data.IsEmpty() {
mb := p.data
p.data = nil
p.writeSignal.Signal() // wake up blocked writer
return mb, nil
}
if p.done.Done() {
return nil, io.EOF
}
p.readSignal.Wait() // block until data available
}
}
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
// Same but with timeout via select + timer
}Backpressure Flow
sequenceDiagram
participant Inbound as Inbound Proxy
participant PW as Pipe Writer
participant Pipe as Pipe Buffer
participant PR as Pipe Reader
participant Outbound as Outbound Proxy
Inbound->>PW: WriteMultiBuffer(data)
PW->>Pipe: Append to buffer
alt Buffer below limit
PW->>PR: Signal (data available)
PR->>Outbound: ReadMultiBuffer()
Outbound->>Outbound: Forward to remote
PR->>PW: Signal (space available)
else Buffer at limit
PW->>PW: Block (wait for space)
Note over PW: Backpressure!<br/>Inbound slows down
PR->>Outbound: ReadMultiBuffer()
PR->>PW: Signal (space available)
PW->>PW: Resume writing
endPipe Size Limits (Policy)
Buffer sizes come from the policy system:
// features/policy/policy.go
type Buffer struct {
PerConnection int32 // pipe size limit per connection
}
// Configured per user level:
// Level 0: 10240 bytes (10KB)
// Level 1+: configurableFrom context:
func OptionsFromContext(ctx context.Context) []Option {
bp := policy.BufferPolicyFromContext(ctx)
if bp.PerConnection >= 0 {
return []Option{WithSizeLimit(bp.PerConnection)}
}
return []Option{WithoutSizeLimit()}
}Transport Link
The transport.Link pairs a reader and writer:
// transport/link.go
type Link struct {
Reader buf.Reader
Writer buf.Writer
}The dispatcher creates two linked Links:
InboundLink: OutboundLink:
Reader = downlinkPipeReader Reader = uplinkPipeReader
Writer = uplinkPipeWriter Writer = downlinkPipeWriterData written to InboundLink.Writer is read from OutboundLink.Reader (uplink). Data written to OutboundLink.Writer is read from InboundLink.Reader (downlink).
Signal System (common/signal/)
Notifier
Non-blocking signal for waking goroutines:
type Notifier struct {
c chan struct{}
}
func (n *Notifier) Signal() {
select {
case n.c <- struct{}{}: // signal sent
default: // already signaled, skip
}
}
func (n *Notifier) Wait() <-chan struct{} {
return n.c
}ActivityTimer
Tracks connection activity for idle timeout:
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer
func (t *ActivityTimer) SetTimeout(timeout time.Duration) // change timeout
// Called by buf.Copy with UpdateActivity optionOn each data transfer, UpdateActivity resets the timer. If no activity for the timeout duration, cancel() is called, terminating both goroutines (upload + download).
Implementation Notes
Buffer pooling is essential: Without it, GC pressure from millions of 8KB allocations kills performance. Use a
sync.Poolequivalent.MultiBuffer enables batch I/O: Reading/writing multiple chunks at once reduces syscall overhead. A single
ReadMultiBuffer()call might return several buffers.Backpressure prevents OOM: Without pipe size limits, a fast sender + slow receiver causes unbounded memory growth. The default 10KB limit is intentionally small.
Signal-based vs channel-based: The
Notifieris non-blocking signal (trySignal semantics). This avoids goroutine leaks when both sides race to signal.Buffer.UDP is critical for UDP: Each buffer can carry a different destination address. This is how XUDP and TUN handle per-packet routing.
BufferedWriter for header batching: Always buffer the protocol header + first payload together. Sending them separately reveals the header size to traffic analysis.