Система pipe и буферов
Системы pipe и буферов составляют основу передачи данных в Xray-core. Каждый байт, проходящий через прокси, проходит через эти абстракции.
Буфер (common/buf/)
Структура буфера
// common/buf/buffer.go
type Buffer struct {
v []byte // нижележащий байтовый срез
start int32 // курсор чтения
end int32 // курсор записи
ownership ownership // managed, unmanaged или bytespool
UDP *net.Destination // назначение для каждого UDP-пакета
}Ключевые свойства:
- Стандартный размер: 8192 байта (8 КБ)
- Пулированные: Управляемые буферы переиспользуются через
sync.Pool - На основе курсоров:
start..endопределяет активное окно данных - Метаданные UDP: Поле
UDPсодержит назначение для каждого пакета при UDP-проксировании
Пул буферов
const Size = 8192
var pool = bytespool.GetPool(Size)
func New() *Buffer {
buf := pool.Get().([]byte)
return &Buffer{v: buf[:Size]}
}
func (b *Buffer) Release() {
if cap(b.v) == Size {
pool.Put(b.v)
}
}Для более крупных выделений bytespool предоставляет пулы на основе классов размеров:
// common/bytespool/pool.go
// Размеры пулов: 1K, 2K, 4K, 8K, 16K, 32K, 64K, 128K, ...
func Alloc(size int32) []byte // выделение из пула
func Free(b []byte) // возврат в пулMultiBuffer
MultiBuffer — это срез *Buffer, используемый для пакетных операций:
type MultiBuffer []*Buffer
func (mb MultiBuffer) Len() int32 // общее количество байтов во всех буферах
func (mb MultiBuffer) IsEmpty() bool
func (mb MultiBuffer) Copy(b []byte) int // копирование в плоский байтовый срезИнтерфейсы Reader и Writer
type Reader interface {
ReadMultiBuffer() (MultiBuffer, error)
}
type Writer interface {
WriteMultiBuffer(MultiBuffer) error
}
type TimeoutReader interface {
Reader
ReadMultiBufferTimeout(time.Duration) (MultiBuffer, error)
}buf.Copy — основной цикл передачи
Почти вся передача данных использует buf.Copy():
// common/buf/copy.go
func Copy(reader Reader, writer Writer, options ...CopyOption) error {
for {
buffer, err := reader.ReadMultiBuffer()
if !buffer.IsEmpty() {
for _, opt := range options {
opt(&optHolder) // например, UpdateActivity(timer)
}
if werr := writer.WriteMultiBuffer(buffer); werr != nil {
return werr
}
}
if err != nil {
return err // io.EOF = нормальное завершение
}
}
}Опции копирования:
UpdateActivity(timer)— сброс таймаута бездействия при каждой передачеCountSize(counter)— подсчёт переданных байтов
BufferedWriter
BufferedWriter группирует мелкие записи:
type BufferedWriter struct {
writer Writer
buffer *Buffer
buffered bool
}
func (w *BufferedWriter) WriteMultiBuffer(mb MultiBuffer) error {
if w.buffered {
// Накопление в буфере
// Сброс при заполнении буфера
} else {
// Прямая передача нижележащему writer
}
}
func (w *BufferedWriter) SetBuffered(b bool) error {
if !b && w.buffer != nil {
// Сброс буферизованных данных
}
}Используется при кодировании заголовков протоколов: заголовок буферизуется, а затем сбрасывается вместе с первыми данными полезной нагрузки, чтобы они были отправлены в одном TCP-сегменте.
Pipe (transport/pipe/)
Pipe соединяет входящий и исходящий обработчики с буферизацией, учитывающей обратное давление.
Создание pipe
// transport/pipe/pipe.go
func New(opts ...Option) (*Reader, *Writer) {
p := &pipe{
readSignal: signal.NewNotifier(),
writeSignal: signal.NewNotifier(),
done: done.New(),
errChan: make(chan error, 1),
option: pipeOption{limit: -1},
}
return &Reader{pipe: p}, &Writer{pipe: p}
}Опции:
WithSizeLimit(limit)— порог обратного давления в байтах (из политики)WithoutSizeLimit()— неограниченный буфер (-1)DiscardOverflow()— отбрасывать записи при заполнении вместо блокировки
Внутреннее состояние pipe
// transport/pipe/impl.go
type pipe struct {
readSignal *signal.Notifier // сигнал reader о доступности данных
writeSignal *signal.Notifier // сигнал writer о наличии свободного места
done *done.Instance // pipe закрыт
errChan chan error // ошибка закрытия
option pipeOption
data buf.MultiBuffer // буферизованные данные
state int32 // active, closed, errored
}Путь записи
func (p *pipe) WriteMultiBuffer(mb buf.MultiBuffer) error {
for {
// Проверка, закрыт ли pipe
if p.done.Done() { return io.ErrClosedPipe }
// Проверка ограничения по размеру
if p.option.limit >= 0 && p.data.Len() >= p.option.limit {
// Блокировка: ожидание, пока reader потребит данные
// ИЛИ отбрасывание при установленном DiscardOverflow
p.writeSignal.Wait()
continue
}
// Добавление в буфер
p.data = append(p.data, mb...)
p.readSignal.Signal() // пробуждение reader
return nil
}
}Путь чтения
func (p *pipe) ReadMultiBuffer() (buf.MultiBuffer, error) {
for {
if !p.data.IsEmpty() {
mb := p.data
p.data = nil
p.writeSignal.Signal() // пробуждение заблокированного writer
return mb, nil
}
if p.done.Done() {
return nil, io.EOF
}
p.readSignal.Wait() // блокировка до появления данных
}
}
func (p *pipe) ReadMultiBufferTimeout(d time.Duration) (buf.MultiBuffer, error) {
// То же самое, но с таймаутом через select + timer
}Поток обратного давления
sequenceDiagram
participant Inbound as Входящий прокси
participant PW as Pipe Writer
participant Pipe as Буфер pipe
participant PR as Pipe Reader
participant Outbound as Исходящий прокси
Inbound->>PW: WriteMultiBuffer(data)
PW->>Pipe: Добавление в буфер
alt Буфер ниже лимита
PW->>PR: Сигнал (данные доступны)
PR->>Outbound: ReadMultiBuffer()
Outbound->>Outbound: Пересылка на удалённый сервер
PR->>PW: Сигнал (место доступно)
else Буфер на лимите
PW->>PW: Блокировка (ожидание места)
Note over PW: Обратное давление!<br/>Входящий замедляется
PR->>Outbound: ReadMultiBuffer()
PR->>PW: Сигнал (место доступно)
PW->>PW: Возобновление записи
endОграничения размера pipe (политика)
Размеры буферов задаются системой политик:
// features/policy/policy.go
type Buffer struct {
PerConnection int32 // ограничение размера pipe на соединение
}
// Настраивается для каждого уровня пользователя:
// Уровень 0: 10240 байт (10 КБ)
// Уровень 1+: настраиваемыйИз контекста:
func OptionsFromContext(ctx context.Context) []Option {
bp := policy.BufferPolicyFromContext(ctx)
if bp.PerConnection >= 0 {
return []Option{WithSizeLimit(bp.PerConnection)}
}
return []Option{WithoutSizeLimit()}
}Transport Link
transport.Link объединяет reader и writer в пару:
// transport/link.go
type Link struct {
Reader buf.Reader
Writer buf.Writer
}Диспетчер создаёт два связанных Link:
InboundLink: OutboundLink:
Reader = downlinkPipeReader Reader = uplinkPipeReader
Writer = uplinkPipeWriter Writer = downlinkPipeWriterДанные, записанные в InboundLink.Writer, читаются из OutboundLink.Reader (восходящий канал). Данные, записанные в OutboundLink.Writer, читаются из InboundLink.Reader (нисходящий канал).
Система сигналов (common/signal/)
Notifier
Неблокирующий сигнал для пробуждения горутин:
type Notifier struct {
c chan struct{}
}
func (n *Notifier) Signal() {
select {
case n.c <- struct{}{}: // сигнал отправлен
default: // уже просигнализировано, пропускаем
}
}
func (n *Notifier) Wait() <-chan struct{} {
return n.c
}ActivityTimer
Отслеживает активность соединения для таймаута бездействия:
func CancelAfterInactivity(ctx context.Context, cancel func(), timeout time.Duration) *ActivityTimer
func (t *ActivityTimer) SetTimeout(timeout time.Duration) // изменение таймаута
// Вызывается из buf.Copy с опцией UpdateActivityПри каждой передаче данных UpdateActivity сбрасывает таймер. Если активность отсутствует в течение заданного времени, вызывается cancel(), завершающий обе горутины (загрузка + скачивание).
Заметки по реализации
Пулирование буферов необходимо: Без него давление на сборщик мусора от миллионов выделений по 8 КБ снижает производительность. Используйте аналог
sync.Pool.MultiBuffer обеспечивает пакетный ввод/вывод: Чтение/запись нескольких блоков за раз сокращает накладные расходы на системные вызовы. Один вызов
ReadMultiBuffer()может вернуть несколько буферов.Обратное давление предотвращает OOM: Без ограничений размера pipe быстрый отправитель + медленный получатель приводят к неограниченному росту памяти. Лимит по умолчанию в 10 КБ намеренно мал.
На основе сигналов, а не каналов:
Notifier— это неблокирующий сигнал (семантика trySignal). Это предотвращает утечки горутин при гонке обеих сторон за отправку сигнала.Buffer.UDP критичен для UDP: Каждый буфер может нести различный адрес назначения. Так XUDP и TUN обрабатывают маршрутизацию для каждого пакета.
BufferedWriter для группировки заголовков: Всегда буферизуйте заголовок протокола + первую полезную нагрузку вместе. Отправка их по отдельности раскрывает размер заголовка для анализа трафика.