Skip to content

Система pipe и буферов

Системы pipe и буферов составляют основу передачи данных в Xray-core. Каждый байт, проходящий через прокси, проходит через эти абстракции.

Буфер (common/buf/)

Структура буфера

go
// common/buf/buffer.go
type Buffer struct {
    v         []byte          // нижележащий байтовый срез
    start     int32           // курсор чтения
    end       int32           // курсор записи
    ownership ownership       // managed, unmanaged или bytespool
    UDP       *net.Destination // назначение для каждого UDP-пакета
}

Ключевые свойства:

  • Стандартный размер: 8192 байта (8 КБ)
  • Пулированные: Управляемые буферы переиспользуются через sync.Pool
  • На основе курсоров: start..end определяет активное окно данных
  • Метаданные UDP: Поле UDP содержит назначение для каждого пакета при UDP-проксировании

Пул буферов

go
const Size = 8192
var pool = bytespool.GetPool(Size)

func New() *Buffer {
    buf := pool.Get().([]byte)
    return &Buffer{v: buf[:Size]}
}

func (b *Buffer) Release() {
    if cap(b.v) == Size {
        pool.Put(b.v)
    }
}

Для более крупных выделений bytespool предоставляет пулы на основе классов размеров:

go
// common/bytespool/pool.go
// Размеры пулов: 1K, 2K, 4K, 8K, 16K, 32K, 64K, 128K, ...
func Alloc(size int32) []byte  // выделение из пула
func Free(b []byte)            // возврат в пул

MultiBuffer

MultiBuffer — это срез *Buffer, используемый для пакетных операций:

go
type MultiBuffer []*Buffer

func (mb MultiBuffer) Len() int32      // общее количество байтов во всех буферах
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // копирование в плоский байтовый срез

Интерфейсы Reader и Writer

go
type Reader interface {
    ReadMultiBuffer() (MultiBuffer, error)
}

type Writer interface {
    WriteMultiBuffer(MultiBuffer) error
}

type TimeoutReader interface {
    Reader
    ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}

buf.Copy — основной цикл передачи

Почти вся передача данных использует buf.Copy():

go
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
    for {
        buffer, err := reader.ReadMultiBuffer()
        if !buffer.IsEmpty() {
            for _, opt := range options {
                opt(&optHolder)  // например, UpdateActivity(timer)
            }
            if werr := writer.WriteMultiBuffer(buffer); werr != nil {
                return werr
            }
        }
        if err != nil {
            return err  // io.EOF = нормальное завершение
        }
    }
}

Опции копирования:

  • UpdateActivity(timer) — сброс таймаута бездействия при каждой передаче
  • CountSize(counter) — подсчёт переданных байтов

BufferedWriter

BufferedWriter группирует мелкие записи:

go
type BufferedWriter struct {
    writer  Writer
    buffer  *Buffer
    buffered bool
}

func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
    if w.buffered {
        // Накопление в буфере
        // Сброс при заполнении буфера
    } else {
        // Прямая передача нижележащему writer
    }
}

func (w *BufferedWriter) SetBuffered(b bool) error {
    if !b && w.buffer != nil {
        // Сброс буферизованных данных
    }
}

Используется при кодировании заголовков протоколов: заголовок буферизуется, а затем сбрасывается вместе с первыми данными полезной нагрузки, чтобы они были отправлены в одном TCP-сегменте.

Pipe (transport/pipe/)

Pipe соединяет входящий и исходящий обработчики с буферизацией, учитывающей обратное давление.

Создание pipe

go
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
    p := &pipe{
        readSignal:  signal.NewNotifier(),
        writeSignal: signal.NewNotifier(),
        done:        done.New(),
        errChan:     make(chan error, 1),
        option: pipeOption{limit: -1},
    }
    return &Reader{pipe: p}, &Writer{pipe: p}
}

Опции:

  • WithSizeLimit(limit) — порог обратного давления в байтах (из политики)
  • WithoutSizeLimit() — неограниченный буфер (-1)
  • DiscardOverflow() — отбрасывать записи при заполнении вместо блокировки

Внутреннее состояние pipe

go
// transport/pipe/impl.go
type pipe struct {
    readSignal  *signal.Notifier  // сигнал reader о доступности данных
    writeSignal *signal.Notifier  // сигнал writer о наличии свободного места
    done        *done.Instance    // pipe закрыт
    errChan     chan error         // ошибка закрытия
    option      pipeOption
    data        buf.MultiBuffer   // буферизованные данные
    state       int32             // active, closed, errored
}

Путь записи

go
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
    for {
        // Проверка, закрыт ли pipe
        if p.done.Done() { return io.ErrClosedPipe }

        // Проверка ограничения по размеру
        if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
            // Блокировка: ожидание, пока reader потребит данные
            // ИЛИ отбрасывание при установленном DiscardOverflow
            p.writeSignal.Wait()
            continue
        }

        // Добавление в буфер
        p.data = append(p.data, mb...)
        p.readSignal.Signal()  // пробуждение reader
        return nil
    }
}

Путь чтения

go
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
    for {
        if !p.data.IsEmpty() {
            mb := p.data
            p.data = nil
            p.writeSignal.Signal()  // пробуждение заблокированного writer
            return mb, nil
        }
        if p.done.Done() {
            return nil, io.EOF
        }
        p.readSignal.Wait()  // блокировка до появления данных
    }
}

func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
    // То же самое, но с таймаутом через select + timer
}

Поток обратного давления

mermaid
sequenceDiagram
    participant Inbound as Входящий прокси
    participant PW as Pipe Writer
    participant Pipe as Буфер pipe
    participant PR as Pipe Reader
    participant Outbound as Исходящий прокси

    Inbound->>PW: WriteMultiBuffer(data)
    PW->>Pipe: Добавление в буфер

    alt Буфер ниже лимита
        PW->>PR: Сигнал (данные доступны)
        PR->>Outbound: ReadMultiBuffer()
        Outbound->>Outbound: Пересылка на удалённый сервер
        PR->>PW: Сигнал (место доступно)
    else Буфер на лимите
        PW->>PW: Блокировка (ожидание места)
        Note over PW: Обратное давление!<br/>Входящий замедляется
        PR->>Outbound: ReadMultiBuffer()
        PR->>PW: Сигнал (место доступно)
        PW->>PW: Возобновление записи
    end

Ограничения размера pipe (политика)

Размеры буферов задаются системой политик:

go
// features/policy/policy.go
type Buffer struct {
    PerConnection int32  // ограничение размера pipe на соединение
}

// Настраивается для каждого уровня пользователя:
// Уровень 0: 10240 байт (10 КБ)
// Уровень 1+: настраиваемый

Из контекста:

go
func OptionsFromContext(ctx context.Context) []Option {
    bp := policy.BufferPolicyFromContext(ctx)
    if bp.PerConnection >= 0 {
        return []Option{WithSizeLimit(bp.PerConnection)}
    }
    return []Option{WithoutSizeLimit()}
}

transport.Link объединяет reader и writer в пару:

go
// transport/link.go
type Link struct {
    Reader buf.Reader
    Writer buf.Writer
}

Диспетчер создаёт два связанных Link:

InboundLink:                      OutboundLink:
  Reader = downlinkPipeReader       Reader = uplinkPipeReader
  Writer = uplinkPipeWriter         Writer = downlinkPipeWriter

Данные, записанные в InboundLink.Writer, читаются из OutboundLink.Reader (восходящий канал). Данные, записанные в OutboundLink.Writer, читаются из InboundLink.Reader (нисходящий канал).

Система сигналов (common/signal/)

Notifier

Неблокирующий сигнал для пробуждения горутин:

go
type Notifier struct {
    c chan struct{}
}

func (n *Notifier) Signal() {
    select {
    case n.c <- struct{}{}: // сигнал отправлен
    default: // уже просигнализировано, пропускаем
    }
}

func (n *Notifier) Wait() <-chan struct{} {
    return n.c
}

ActivityTimer

Отслеживает активность соединения для таймаута бездействия:

go
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer

func (t *ActivityTimer) SetTimeout(timeout time.Duration)  // изменение таймаута
// Вызывается из buf.Copy с опцией UpdateActivity

При каждой передаче данных UpdateActivity сбрасывает таймер. Если активность отсутствует в течение заданного времени, вызывается cancel(), завершающий обе горутины (загрузка + скачивание).

Заметки по реализации

  1. Пулирование буферов необходимо: Без него давление на сборщик мусора от миллионов выделений по 8 КБ снижает производительность. Используйте аналог sync.Pool.

  2. MultiBuffer обеспечивает пакетный ввод/вывод: Чтение/запись нескольких блоков за раз сокращает накладные расходы на системные вызовы. Один вызов ReadMultiBuffer() может вернуть несколько буферов.

  3. Обратное давление предотвращает OOM: Без ограничений размера pipe быстрый отправитель + медленный получатель приводят к неограниченному росту памяти. Лимит по умолчанию в 10 КБ намеренно мал.

  4. На основе сигналов, а не каналов: Notifier — это неблокирующий сигнал (семантика trySignal). Это предотвращает утечки горутин при гонке обеих сторон за отправку сигнала.

  5. Buffer.UDP критичен для UDP: Каждый буфер может нести различный адрес назначения. Так XUDP и TUN обрабатывают маршрутизацию для каждого пакета.

  6. BufferedWriter для группировки заголовков: Всегда буферизуйте заголовок протокола + первую полезную нагрузку вместе. Отправка их по отдельности раскрывает размер заголовка для анализа трафика.

Технический анализ для целей повторной реализации.