Skip to content

数据包的旅程

本页追踪一个连接在 Xray-core 中的完整生命周期,从客户端连接的那一刻到数据到达远程服务器。

概述

mermaid
flowchart TB
    Client([客户端应用]) -->|"连接到<br/>监听端口"| Listener

    subgraph Inbound["入站 (app/proxyman/inbound)"]
        Listener["internet.Listener<br/>(TCP Hub)"]
        Worker["tcpWorker / udpWorker"]
        Proxy["proxy.Inbound.Process()<br/>(VLESS/VMess/Trojan/...)"]
    end

    subgraph Core["核心流水线"]
        Dispatcher["DefaultDispatcher.Dispatch()"]
        Sniff["嗅探器<br/>(HTTP/TLS/QUIC/FakeDNS)"]
        Router["Router.PickRoute()"]
    end

    subgraph Outbound["出站 (app/proxyman/outbound)"]
        OHandler["outbound.Handler.Dispatch()"]
        Mux["Mux ClientManager<br/>(如果启用了多路复用)"]
        OProxy["proxy.Outbound.Process()<br/>(VLESS/Freedom/...)"]
        Transport["internet.Dialer.Dial()<br/>(TCP/WS/gRPC/...)"]
    end

    Listener -->|stat.Connection| Worker
    Worker -->|"构建上下文并调用"| Proxy
    Proxy -->|"dispatcher.Dispatch(ctx, dest)"| Dispatcher
    Dispatcher --> Sniff
    Sniff --> Router
    Router -->|出站标签| OHandler
    OHandler --> Mux
    Mux --> OProxy
    OProxy --> Transport
    Transport -->|"加密连接"| Server([远程/目标服务器])

阶段 1:连接接受

TCP Worker(app/proxyman/inbound/worker.go

当 TCP 连接到达时,tcpWorker.callback() 方法被触发:

go
func (w *tcpWorker) callback(conn stat.Connection) {
    ctx, cancel := context.WithCancel(w.ctx)
    sid := session.NewID()
    ctx = c.ContextWithID(ctx, sid)

    // 构建出站元数据
    outbounds := []*session.Outbound{{}}

    // 对于透明代理:获取原始目标地址
    if w.recvOrigDest {
        switch getTProxyType(w.stream) {
        case internet.SocketConfig_Redirect:
            dest, _ = tcp.GetOriginalDestination(conn)
        case internet.SocketConfig_TProxy:
            dest = net.DestinationFromAddr(conn.LocalAddr())
        }
        outbounds[0].Target = dest
    }
    ctx = session.ContextWithOutbounds(ctx, outbounds)

    // 附加入站元数据
    ctx = session.ContextWithInbound(ctx, &session.Inbound{
        Source:  net.DestinationFromAddr(conn.RemoteAddr()),
        Gateway: net.TCPDestination(w.address, w.port),
        Tag:     w.tag,
        Conn:    conn,
    })

    // 附加嗅探配置
    content := new(session.Content)
    content.SniffingRequest = ... // 来自配置
    ctx = session.ContextWithContent(ctx, content)

    // 移交给协议处理器
    w.proxy.Process(ctx, net.Network_TCP, conn, w.dispatcher)
}

此处设置的关键上下文值:

  • session.Inbound — 来源地址、入站标签、原始连接
  • session.Outbound — 目标地址(为 TProxy/redirect 填充)
  • session.Content — 嗅探配置

UDP Worker

对于 UDP,udpWorker 以不同方式处理数据包:

  • 使用 udp.Dispatcher 管理 UDP "连接"(以来源地址为键)
  • 每个唯一来源获得一个虚拟连接,通过代理进行分发
  • 基于超时的空闲 UDP 会话清理

阶段 2:协议处理(入站)

每个代理协议都实现了 proxy.Inbound 接口:

go
type Inbound interface {
    Network() []net.Network
    Process(ctx context.Context, network net.Network,
        conn stat.Connection, dispatcher routing.Dispatcher) error
}

协议处理器:

  1. conn 中读取并解码协议头
  2. 提取目标地址(地址 + 端口)
  3. 验证用户身份(如适用)
  4. 调用 dispatcher.Dispatch(ctx, destination) 获取管道对
  5. conn 和管道之间双向复制数据

示例:VLESS 入站(简化版)

go
func (h *Handler) Process(ctx, network, connection, dispatch) error {
    // 读取首字节
    first := buf.FromBytes(make([]byte, buf.Size))
    first.ReadFrom(connection)

    // 解码 VLESS 头部
    userSentID, request, requestAddons, err :=
        encoding.DecodeRequestHeader(first, reader, h.validator)

    // 在上下文中设置用户
    ctx = session.ContextWithInbound(ctx, &session.Inbound{
        User: user,
        ...
    })

    // 分发到路由
    link, _ := dispatch.Dispatch(ctx, request.Destination())

    // 双向复制
    // 上行:connection → link.Writer(到出站)
    // 下行:link.Reader → connection(到客户端)
    task.Run(ctx, requestDone, responseDone)
}

阶段 3:分发

DefaultDispatcher.Dispatch() 是中央枢纽(app/dispatcher/default.go):

go
func (d *DefaultDispatcher) Dispatch(ctx, destination) (*transport.Link, error) {
    // 在出站元数据中设置目标
    ob.OriginalTarget = destination
    ob.Target = destination

    // 创建管道对
    inbound, outbound := d.getLink(ctx)

    if sniffingRequest.Enabled {
        go func() {
            // 用缓存包装读取器
            cReader := &cachedReader{reader: outbound.Reader}
            outbound.Reader = cReader

            // 嗅探首字节
            result, err := sniffer(ctx, cReader, ...)

            // 如果嗅探匹配则覆盖目标
            if d.shouldOverride(ctx, result, ...) {
                destination.Address = net.ParseAddress(result.Domain())
                ob.Target = destination // 或 RouteOnly 时设置 ob.RouteTarget
            }

            d.routedDispatch(ctx, outbound, destination)
        }()
    } else {
        go d.routedDispatch(ctx, outbound, destination)
    }

    return inbound, nil  // 返回给入站代理
}

管道对

getLink() 创建两个相互关联的管道对:

客户端 ←→ [InboundLink] ←→ 管道 ←→ [OutboundLink] ←→ 服务器

InboundLink:                    OutboundLink:
  Reader = downlinkReader         Reader = uplinkReader
  Writer = uplinkWriter           Writer = downlinkWriter

客户端写入 → uplinkWriter → uplinkReader → 服务器读取
服务器写入 → downlinkWriter → downlinkReader → 客户端读取

如果启用了统计功能,会插入 SizeStatWriter 包装器来统计字节数。

阶段 4:路由

routedDispatch() 选择出站处理器:

go
func (d *DefaultDispatcher) routedDispatch(ctx, link, destination) {
    // 1. 检查强制出站标签(来自平台/API)
    if forcedTag := session.GetForcedOutboundTagFromContext(ctx); forcedTag != "" {
        handler = d.ohm.GetHandler(forcedTag)
    }
    // 2. 请求路由器选择路由
    else if route, err := d.router.PickRoute(routingCtx); err == nil {
        handler = d.ohm.GetHandler(route.GetOutboundTag())
    }
    // 3. 回退到默认出站
    else {
        handler = d.ohm.GetDefaultHandler()
    }

    // 分发到选定的出站
    handler.Dispatch(ctx, link)
}

路由器按顺序评估规则(参见路由引擎)。

阶段 5:出站处理

出站处理器(app/proxyman/outbound/handler.go

出站处理器包装器:

go
func (h *Handler) Dispatch(ctx, link) {
    // 检查多路复用
    if h.mux != nil && shouldUseMux(ctx) {
        h.mux.Dispatch(ctx, link)
        return
    }

    // 直接代理处理
    h.proxy.Process(ctx, link, h)  // h 实现了 internet.Dialer
}

传输层拨号

proxy.Process() 调用 dialer.Dial(ctx, dest) 时:

  1. 查找出站的流设置
  2. 选择传输层拨号器(TCP/WS/gRPC 等)
  3. 建立原始连接
  4. 应用安全层(TLS/REALITY/无)
  5. 返回 stat.Connection

出站代理处理

出站代理编码其协议并复制数据:

go
func (h *Handler) Process(ctx, link, dialer) error {
    // 拨号传输连接
    conn, _ := dialer.Dial(ctx, serverAddress)

    // 编码协议头
    encoding.EncodeRequestHeader(conn, request, addons)

    // 双向复制
    // 上行:link.Reader → conn(到服务器)
    // 下行:conn → link.Writer(通过管道到客户端)
    task.Run(ctx, postRequest, getResponse)
}

完整时序图

mermaid
sequenceDiagram
    participant C as 客户端
    participant TW as tcpWorker
    participant PI as 入站代理
    participant D as 分发器
    participant R as 路由器
    participant PO as 出站代理
    participant T as 传输层
    participant S as 远程服务器

    C->>TW: TCP 连接
    TW->>TW: 构建会话上下文
    TW->>PI: Process(ctx, conn, dispatcher)
    PI->>PI: 解码协议头
    PI->>D: Dispatch(ctx, destination)
    D->>D: 创建管道对
    D-->>PI: 返回 inboundLink
    Note over D: 异步 goroutine:
    D->>D: 嗅探首字节
    D->>R: PickRoute(ctx)
    R-->>D: 出站标签
    D->>PO: handler.Dispatch(ctx, outboundLink)
    PO->>T: dialer.Dial(ctx, server)
    T->>S: 传输连接 + TLS
    T-->>PO: conn
    PO->>S: 编码头部 + 载荷

    par 上行(客户端 → 服务器)
        PI->>D: pipe.Write(客户端数据)
        D->>PO: pipe.Read → conn.Write
    and 下行(服务器 → 客户端)
        S->>PO: conn.Read
        PO->>D: pipe.Write(服务器数据)
        D->>PI: pipe.Read → conn.Write
        PI->>C: 响应数据
    end

实现说明

重新实现时,关键部分包括:

  1. 会话上下文 — 携带所有元数据;必须贯穿每个调用
  2. 管道对 — 入站和出站之间的异步桥梁;需要支持背压
  3. 嗅探 — 必须在路由之前对首字节进行检测;缓存已消费的字节
  4. 双向复制 — 两个 goroutine(上行 + 下行),共享取消机制
  5. 活动计时器 — 每次数据传输时重置;空闲超时后触发关闭

task.Run(ctx, postRequest, task.OnSuccess(getResponse, task.Close(writer))) 模式被广泛使用:先运行上行,成功后启动下行,下行结束时关闭写入器。

用于重新实现目的的技术分析。