Skip to content

管道与缓冲区系统

管道和缓冲区系统构成了 Xray-core 数据传输的基础。通过代理的每个字节都经过这些抽象层。

缓冲区(common/buf/

缓冲区结构

go
// common/buf/buffer.go
type Buffer struct {
    v         []byte          // 底层字节切片
    start     int32           // 读游标
    end       int32           // 写游标
    ownership ownership       // managed、unmanaged 或 bytespool
    UDP       *net.Destination // 逐包 UDP 目标地址
}

关键特性:

  • 标准大小:8192 字节(8KB)
  • 池化:托管缓冲区通过 sync.Pool 回收
  • 基于游标start..end 定义有效数据窗口
  • UDP 元数据UDP 字段携带逐包的目标地址,用于 UDP 代理

缓冲区池

go
const Size = 8192
var pool = bytespool.GetPool(Size)

func New() *Buffer {
    buf := pool.Get().([]byte)
    return &Buffer{v: buf[:Size]}
}

func (b *Buffer) Release() {
    if cap(b.v) == Size {
        pool.Put(b.v)
    }
}

对于更大的分配,bytespool 提供基于大小等级的池:

go
// common/bytespool/pool.go
// 池大小:1K、2K、4K、8K、16K、32K、64K、128K、...
func Alloc(size int32) []byte  // 从池中分配
func Free(b []byte)            // 归还到池中

MultiBuffer

MultiBuffer*Buffer 的切片,用于批量操作:

go
type MultiBuffer []*Buffer

func (mb MultiBuffer) Len() int32      // 所有缓冲区的总字节数
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // 复制到扁平字节切片

Reader 和 Writer 接口

go
type Reader interface {
    ReadMultiBuffer() (MultiBuffer, error)
}

type Writer interface {
    WriteMultiBuffer(MultiBuffer) error
}

type TimeoutReader interface {
    Reader
    ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}

buf.Copy — 核心传输循环

几乎所有数据传输都使用 buf.Copy()

go
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
    for {
        buffer, err := reader.ReadMultiBuffer()
        if !buffer.IsEmpty() {
            for _, opt := range options {
                opt(&optHolder)  // 例如 UpdateActivity(timer)
            }
            if werr := writer.WriteMultiBuffer(buffer); werr != nil {
                return werr
            }
        }
        if err != nil {
            return err  // io.EOF = 正常完成
        }
    }
}

复制选项:

  • UpdateActivity(timer) — 每次传输时重置空闲超时
  • CountSize(counter) — 统计传输的字节数

BufferedWriter

BufferedWriter 批量处理小写入:

go
type BufferedWriter struct {
    writer  Writer
    buffer  *Buffer
    buffered bool
}

func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
    if w.buffered {
        // 在缓冲区中累积
        // 缓冲区满时刷新
    } else {
        // 直接传递给底层写入器
    }
}

func (w *BufferedWriter) SetBuffered(b bool) error {
    if !b && w.buffer != nil {
        // 刷新缓冲的数据
    }
}

这在协议头编码时使用:先缓冲头部,然后连同第一个数据载荷一起刷新,确保它们在同一个 TCP 段中发送。

管道(transport/pipe/

管道通过具有背压感知的缓冲连接入站和出站。

创建管道

go
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
    p := &pipe{
        readSignal:  signal.NewNotifier(),
        writeSignal: signal.NewNotifier(),
        done:        done.New(),
        errChan:     make(chan error, 1),
        option: pipeOption{limit: -1},
    }
    return &Reader{pipe: p}, &Writer{pipe: p}
}

选项:

  • WithSizeLimit(limit) — 背压阈值(字节),来自策略配置
  • WithoutSizeLimit() — 无限制缓冲(-1)
  • DiscardOverflow() — 缓冲区满时丢弃写入而非阻塞

管道内部状态

go
// transport/pipe/impl.go
type pipe struct {
    readSignal  *signal.Notifier  // 通知读取器有数据可用
    writeSignal *signal.Notifier  // 通知写入器有空间可用
    done        *done.Instance    // 管道已关闭
    errChan     chan error         // 关闭错误
    option      pipeOption
    data        buf.MultiBuffer   // 缓冲的数据
    state       int32             // 活跃、已关闭、出错
}

写入路径

go
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
    for {
        // 检查管道是否已关闭
        if p.done.Done() { return io.ErrClosedPipe }

        // 检查大小限制
        if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
            // 阻塞:等待读取器消费数据
            // 或者如果设置了 DiscardOverflow 则丢弃
            p.writeSignal.Wait()
            continue
        }

        // 追加到缓冲区
        p.data = append(p.data, mb...)
        p.readSignal.Signal()  // 唤醒读取器
        return nil
    }
}

读取路径

go
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
    for {
        if !p.data.IsEmpty() {
            mb := p.data
            p.data = nil
            p.writeSignal.Signal()  // 唤醒阻塞的写入器
            return mb, nil
        }
        if p.done.Done() {
            return nil, io.EOF
        }
        p.readSignal.Wait()  // 阻塞直到有数据可用
    }
}

func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
    // 相同逻辑但通过 select + timer 实现超时
}

背压流程

mermaid
sequenceDiagram
    participant Inbound as 入站代理
    participant PW as 管道写入器
    participant Pipe as 管道缓冲区
    participant PR as 管道读取器
    participant Outbound as 出站代理

    Inbound->>PW: WriteMultiBuffer(data)
    PW->>Pipe: 追加到缓冲区

    alt 缓冲区低于限制
        PW->>PR: 信号(有数据可用)
        PR->>Outbound: ReadMultiBuffer()
        Outbound->>Outbound: 转发到远程
        PR->>PW: 信号(有空间可用)
    else 缓冲区达到限制
        PW->>PW: 阻塞(等待空间)
        Note over PW: 背压!<br/>入站减速
        PR->>Outbound: ReadMultiBuffer()
        PR->>PW: 信号(有空间可用)
        PW->>PW: 恢复写入
    end

管道大小限制(策略)

缓冲区大小来自策略系统:

go
// features/policy/policy.go
type Buffer struct {
    PerConnection int32  // 每连接的管道大小限制
}

// 按用户等级配置:
// Level 0:10240 字节(10KB)
// Level 1+:可配置

从上下文获取:

go
func OptionsFromContext(ctx context.Context) []Option {
    bp := policy.BufferPolicyFromContext(ctx)
    if bp.PerConnection >= 0 {
        return []Option{WithSizeLimit(bp.PerConnection)}
    }
    return []Option{WithoutSizeLimit()}
}

传输链路

transport.Link 将读取器和写入器配对:

go
// transport/link.go
type Link struct {
    Reader buf.Reader
    Writer buf.Writer
}

分发器创建两个相互关联的 Link:

InboundLink:                      OutboundLink:
  Reader = downlinkPipeReader       Reader = uplinkPipeReader
  Writer = uplinkPipeWriter         Writer = downlinkPipeWriter

写入 InboundLink.Writer 的数据从 OutboundLink.Reader 读取(上行)。 写入 OutboundLink.Writer 的数据从 InboundLink.Reader 读取(下行)。

信号系统(common/signal/

Notifier

用于唤醒 goroutine 的非阻塞信号:

go
type Notifier struct {
    c chan struct{}
}

func (n *Notifier) Signal() {
    select {
    case n.c <- struct{}{}: // 信号已发送
    default: // 已有信号,跳过
    }
}

func (n *Notifier) Wait() <-chan struct{} {
    return n.c
}

ActivityTimer

跟踪连接活动以实现空闲超时:

go
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer

func (t *ActivityTimer) SetTimeout(timeout time.Duration)  // 修改超时时间
// 通过 buf.Copy 的 UpdateActivity 选项调用

每次数据传输时,UpdateActivity 会重置计时器。如果在超时时间内没有活动,则调用 cancel(),终止两个 goroutine(上行 + 下行)。

实现说明

  1. 缓冲区池化至关重要:没有它,数百万次 8KB 分配带来的 GC 压力会严重影响性能。需要使用 sync.Pool 或等效机制。

  2. MultiBuffer 实现批量 I/O:一次读写多个数据块可以减少系统调用开销。单次 ReadMultiBuffer() 调用可能返回多个缓冲区。

  3. 背压防止 OOM:如果没有管道大小限制,快速发送方 + 慢速接收方会导致内存无限增长。默认的 10KB 限制是有意设置得较小的。

  4. 基于信号 vs 基于通道Notifier 是非阻塞信号(trySignal 语义)。这避免了双方竞争发送信号时的 goroutine 泄漏。

  5. Buffer.UDP 对 UDP 至关重要:每个缓冲区可以携带不同的目标地址。XUDP 和 TUN 就是通过这种方式实现逐包路由的。

  6. BufferedWriter 用于头部批处理:始终将协议头和首个载荷数据缓冲在一起。分开发送会向流量分析暴露头部大小。

用于重新实现目的的技术分析。