管道与缓冲区系统
管道和缓冲区系统构成了 Xray-core 数据传输的基础。通过代理的每个字节都经过这些抽象层。
缓冲区(common/buf/)
缓冲区结构
// common/buf/buffer.go
type Buffer struct {
v []byte // 底层字节切片
start int32 // 读游标
end int32 // 写游标
ownership ownership // managed、unmanaged 或 bytespool
UDP *net.Destination // 逐包 UDP 目标地址
}关键特性:
- 标准大小:8192 字节(8KB)
- 池化:托管缓冲区通过
sync.Pool回收 - 基于游标:
start..end定义有效数据窗口 - UDP 元数据:
UDP字段携带逐包的目标地址,用于 UDP 代理
缓冲区池
const Size = 8192
var pool = bytespool.GetPool(Size)
func New() *Buffer {
buf := pool.Get().([]byte)
return &Buffer{v: buf[:Size]}
}
func (b *Buffer) Release() {
if cap(b.v) == Size {
pool.Put(b.v)
}
}对于更大的分配,bytespool 提供基于大小等级的池:
// common/bytespool/pool.go
// 池大小:1K、2K、4K、8K、16K、32K、64K、128K、...
func Alloc(size int32) []byte // 从池中分配
func Free(b []byte) // 归还到池中MultiBuffer
MultiBuffer 是 *Buffer 的切片,用于批量操作:
type MultiBuffer []*Buffer
func (mb MultiBuffer) Len() int32 // 所有缓冲区的总字节数
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // 复制到扁平字节切片Reader 和 Writer 接口
type Reader interface {
ReadMultiBuffer() (MultiBuffer, error)
}
type Writer interface {
WriteMultiBuffer(MultiBuffer) error
}
type TimeoutReader interface {
Reader
ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}buf.Copy — 核心传输循环
几乎所有数据传输都使用 buf.Copy():
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for {
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
for _, opt := range options {
opt(&optHolder) // 例如 UpdateActivity(timer)
}
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
}
}
if err != nil {
return err // io.EOF = 正常完成
}
}
}复制选项:
UpdateActivity(timer)— 每次传输时重置空闲超时CountSize(counter)— 统计传输的字节数
BufferedWriter
BufferedWriter 批量处理小写入:
type BufferedWriter struct {
writer Writer
buffer *Buffer
buffered bool
}
func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
if w.buffered {
// 在缓冲区中累积
// 缓冲区满时刷新
} else {
// 直接传递给底层写入器
}
}
func (w *BufferedWriter) SetBuffered(b bool) error {
if !b && w.buffer != nil {
// 刷新缓冲的数据
}
}这在协议头编码时使用:先缓冲头部,然后连同第一个数据载荷一起刷新,确保它们在同一个 TCP 段中发送。
管道(transport/pipe/)
管道通过具有背压感知的缓冲连接入站和出站。
创建管道
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
p := &pipe{
readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(),
done: done.New(),
errChan: make(chan error, 1),
option: pipeOption{limit: -1},
}
return &Reader{pipe: p}, &Writer{pipe: p}
}选项:
WithSizeLimit(limit)— 背压阈值(字节),来自策略配置WithoutSizeLimit()— 无限制缓冲(-1)DiscardOverflow()— 缓冲区满时丢弃写入而非阻塞
管道内部状态
// transport/pipe/impl.go
type pipe struct {
readSignal *signal.Notifier // 通知读取器有数据可用
writeSignal *signal.Notifier // 通知写入器有空间可用
done *done.Instance // 管道已关闭
errChan chan error // 关闭错误
option pipeOption
data buf.MultiBuffer // 缓冲的数据
state int32 // 活跃、已关闭、出错
}写入路径
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
for {
// 检查管道是否已关闭
if p.done.Done() { return io.ErrClosedPipe }
// 检查大小限制
if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
// 阻塞:等待读取器消费数据
// 或者如果设置了 DiscardOverflow 则丢弃
p.writeSignal.Wait()
continue
}
// 追加到缓冲区
p.data = append(p.data, mb...)
p.readSignal.Signal() // 唤醒读取器
return nil
}
}读取路径
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
if !p.data.IsEmpty() {
mb := p.data
p.data = nil
p.writeSignal.Signal() // 唤醒阻塞的写入器
return mb, nil
}
if p.done.Done() {
return nil, io.EOF
}
p.readSignal.Wait() // 阻塞直到有数据可用
}
}
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
// 相同逻辑但通过 select + timer 实现超时
}背压流程
sequenceDiagram
participant Inbound as 入站代理
participant PW as 管道写入器
participant Pipe as 管道缓冲区
participant PR as 管道读取器
participant Outbound as 出站代理
Inbound->>PW: WriteMultiBuffer(data)
PW->>Pipe: 追加到缓冲区
alt 缓冲区低于限制
PW->>PR: 信号(有数据可用)
PR->>Outbound: ReadMultiBuffer()
Outbound->>Outbound: 转发到远程
PR->>PW: 信号(有空间可用)
else 缓冲区达到限制
PW->>PW: 阻塞(等待空间)
Note over PW: 背压!<br/>入站减速
PR->>Outbound: ReadMultiBuffer()
PR->>PW: 信号(有空间可用)
PW->>PW: 恢复写入
end管道大小限制(策略)
缓冲区大小来自策略系统:
// features/policy/policy.go
type Buffer struct {
PerConnection int32 // 每连接的管道大小限制
}
// 按用户等级配置:
// Level 0:10240 字节(10KB)
// Level 1+:可配置从上下文获取:
func OptionsFromContext(ctx context.Context) []Option {
bp := policy.BufferPolicyFromContext(ctx)
if bp.PerConnection >= 0 {
return []Option{WithSizeLimit(bp.PerConnection)}
}
return []Option{WithoutSizeLimit()}
}传输链路
transport.Link 将读取器和写入器配对:
// transport/link.go
type Link struct {
Reader buf.Reader
Writer buf.Writer
}分发器创建两个相互关联的 Link:
InboundLink: OutboundLink:
Reader = downlinkPipeReader Reader = uplinkPipeReader
Writer = uplinkPipeWriter Writer = downlinkPipeWriter写入 InboundLink.Writer 的数据从 OutboundLink.Reader 读取(上行)。 写入 OutboundLink.Writer 的数据从 InboundLink.Reader 读取(下行)。
信号系统(common/signal/)
Notifier
用于唤醒 goroutine 的非阻塞信号:
type Notifier struct {
c chan struct{}
}
func (n *Notifier) Signal() {
select {
case n.c <- struct{}{}: // 信号已发送
default: // 已有信号,跳过
}
}
func (n *Notifier) Wait() <-chan struct{} {
return n.c
}ActivityTimer
跟踪连接活动以实现空闲超时:
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer
func (t *ActivityTimer) SetTimeout(timeout time.Duration) // 修改超时时间
// 通过 buf.Copy 的 UpdateActivity 选项调用每次数据传输时,UpdateActivity 会重置计时器。如果在超时时间内没有活动,则调用 cancel(),终止两个 goroutine(上行 + 下行)。
实现说明
缓冲区池化至关重要:没有它,数百万次 8KB 分配带来的 GC 压力会严重影响性能。需要使用
sync.Pool或等效机制。MultiBuffer 实现批量 I/O:一次读写多个数据块可以减少系统调用开销。单次
ReadMultiBuffer()调用可能返回多个缓冲区。背压防止 OOM:如果没有管道大小限制,快速发送方 + 慢速接收方会导致内存无限增长。默认的 10KB 限制是有意设置得较小的。
基于信号 vs 基于通道:
Notifier是非阻塞信号(trySignal 语义)。这避免了双方竞争发送信号时的 goroutine 泄漏。Buffer.UDP 对 UDP 至关重要:每个缓冲区可以携带不同的目标地址。XUDP 和 TUN 就是通过这种方式实现逐包路由的。
BufferedWriter 用于头部批处理:始终将协议头和首个载荷数据缓冲在一起。分开发送会向流量分析暴露头部大小。