Skip to content

المرصد: فحص الحالة الصحية ومراقبة المعالجات الصادرة

يقوم نظام المرصد الفرعي بفحص المعالجات الصادرة لتحديد حالتها الصحية وزمن الاستجابة. تستخدم الموازنات هذه البيانات لاختيار أفضل معالج صادر لكل اتصال. يوفر Xray تنفيذين للمرصد: المرصد القياسي Observer والمرصد بالوضع الاندفاعي BurstObserver.

نظرة عامة على البنية

mermaid
flowchart TD
    subgraph Observatory
        O[Observer] --> P[probe via HTTP GET]
        P --> S[OutboundStatus list]
    end

    subgraph BurstObservatory
        BO[BurstObserver] --> HP[HealthPing]
        HP --> HPR[HealthPingRTTS per tag]
        HPR --> BS[OutboundStatus with stats]
    end

    subgraph Integration
        S --> Bal[Balancer]
        BS --> Bal
        Bal --> OH[Outbound Handler selection]
    end

    subgraph gRPC API
        CMD[ObservatoryService] --> O
        CMD --> BO
    end

المرصد القياسي

الملف: app/observatory/observer.go

يفحص المرصد القياسي كل معالج صادر مُحدد عن طريق إرسال طلبات HTTP GET من خلاله.

go
type Observer struct {
    config *Config
    ctx    context.Context

    statusLock sync.Mutex
    status     []*OutboundStatus

    finished *done.Instance

    ohm        outbound.Manager
    dispatcher routing.Dispatcher
}

آلية الفحص

تقوم دالة probe() بإنشاء عميل HTTP يوجّه الاتصال عبر معالج صادر محدد:

go
func (o *Observer) probe(outbound string) ProbeResult {
    httpTransport := http.Transport{
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := v2net.ParseDestination(network + ":" + addr)
            trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
            conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
            return conn, err
        },
        TLSHandshakeTimeout: 5 * time.Second,
    }
    httpClient := &http.Client{
        Transport: &httpTransport,
        Timeout:   5 * time.Second,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }

    probeURL := "https://www.google.com/generate_204"
    if o.config.ProbeUrl != "" {
        probeURL = o.config.ProbeUrl
    }

    startTime := time.Now()
    response, err := httpClient.Do(req)
    GETTime := time.Since(startTime)

    if err != nil {
        return ProbeResult{Alive: false, LastErrorReason: errorMessage}
    }
    return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}

التفاصيل الرئيسية:

  • يستخدم tagged.Dialer() لتوجيه اتصال الفحص عبر المعالج الصادر المحدد
  • تتبع الأخطاء عبر session.TrackedConnectionError() يلتقط أخطاء الاتصال الأساسية
  • لا يتبع عمليات إعادة التوجيه (ErrUseLastResponse)
  • عنوان URL الافتراضي للفحص: https://www.google.com/generate_204
  • مهلة الانتظار: 5 ثوانٍ لكل من مصافحة TLS والطلب الإجمالي

الحلقة الخلفية

go
func (o *Observer) background() {
    for !o.finished.Done() {
        hs, _ := o.ohm.(outbound.HandlerSelector)
        outbounds := hs.Select(o.config.SubjectSelector)

        sleepTime := time.Second * 10
        if o.config.ProbeInterval != 0 {
            sleepTime = time.Duration(o.config.ProbeInterval)
        }

        if !o.config.EnableConcurrency {
            // Sequential: probe one at a time with sleep between
            for _, v := range outbounds {
                result := o.probe(v)
                o.updateStatusForResult(v, &result)
                time.Sleep(sleepTime)
            }
        } else {
            // Concurrent: probe all at once, wait for all
            ch := make(chan struct{}, len(outbounds))
            for _, v := range outbounds {
                go func(v string) {
                    result := o.probe(v)
                    o.updateStatusForResult(v, &result)
                    ch <- struct{}{}
                }(v)
            }
            for range outbounds { <-ch }
            time.Sleep(sleepTime)
        }
    }
}

SubjectSelector هي قائمة من أنماط الوسوم تُستخدم مع outbound.HandlerSelector.Select() لمطابقة وسوم المعالجات الصادرة (يدعم المطابقة بالبادئة).

تتبع الحالة

go
type OutboundStatus struct {
    Alive           bool
    Delay           int64   // milliseconds
    LastErrorReason string
    OutboundTag     string
    LastSeenTime    int64   // unix timestamp
    LastTryTime     int64   // unix timestamp
}

عند فشل الفحص، يُعيَّن التأخير إلى 99999999 (ما يعادل اللانهاية عمليًا لأغراض الترتيب).

المرصد الاندفاعي

الملف: app/observatory/burst/burstobserver.go

يوفر المرصد الاندفاعي فحصًا صحيًا أكثر تطورًا مع تحليل إحصائي (المتوسط، الانحراف المعياري، الحد الأدنى، الحد الأقصى، معدل الفشل).

go
type Observer struct {
    config     *Config
    ctx        context.Context
    statusLock sync.Mutex
    hp         *HealthPing
    finished   *done.Instance
    ohm        outbound.Manager
}

HealthPing

الملف: app/observatory/burst/healthping.go

محرك فحص الحالة الصحية الأساسي:

go
type HealthPing struct {
    ctx         context.Context
    dispatcher  routing.Dispatcher
    access      sync.Mutex
    ticker      *time.Ticker
    tickerClose chan struct{}
    Settings    *HealthPingSettings
    Results     map[string]*HealthPingRTTS
}

الإعدادات:

الإعدادالقيمة الافتراضيةالوصف
Destinationhttps://connectivitycheck.gstatic.com/generate_204عنوان URL للفحص
Intervalدقيقة واحدةالفترة بين جولات أخذ العينات
SamplingCount10عدد العينات لكل جولة
Timeout5 ثوانٍمهلة الانتظار لكل فحص
HttpMethodHEADطريقة HTTP
Connectivity(فارغ)عنوان URL للتحقق من توفر الشبكة

استراتيجية أخذ العينات

يعمل المُجدوِل على مؤقت بفترة = Interval * SamplingCount:

go
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
    interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
    ticker := time.NewTicker(interval)
    // Initial check
    go func() { h.Check(tags) }()
    // Periodic checks
    go func() {
        for {
            go func() {
                h.doCheck(tags, interval, h.Settings.SamplingCount)
                h.Cleanup(tags)
            }()
            <-ticker.C
        }
    }()
}

تقوم دالة doCheck() بتوزيع الفحوصات عشوائيًا ضمن فترة أخذ العينات:

go
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
    for _, tag := range tags {
        client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
        for i := 0; i < rounds; i++ {
            delay := time.Duration(dice.RollInt63n(int64(duration)))
            time.AfterFunc(delay, func() {
                delay, err := client.MeasureDelay(h.Settings.HttpMethod)
                // report result
            })
        }
    }
}

هذا التوزيع العشوائي يمنع جميع الفحوصات من الوصول إلى الشبكة في الوقت نفسه.

عميل الفحص

الملف: app/observatory/burst/ping.go

go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
    tr := &http.Transport{
        DisableKeepAlives: true,
        DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
            dest, _ := net.ParseDestination(network + ":" + addr)
            return tagged.Dialer(ctxv, dispatcher, dest, handler)
        },
    }
    return &http.Client{
        Transport: tr,
        Timeout:   timeout,
        CheckRedirect: func(req *http.Request, via []*http.Request) error {
            return http.ErrUseLastResponse
        },
    }
}

ملاحظة: DisableKeepAlives: true يضمن أن كل فحص ينشئ اتصالاً جديدًا.

فحص الاتصال بالشبكة

قبل تصنيف فحص على أنه فاشل، يمكن للنظام اختياريًا التحقق مما إذا كانت الشبكة نفسها معطّلة:

go
func (h *HealthPing) checkConnectivity() bool {
    if h.Settings.Connectivity == "" { return true }
    tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
    _, err := tester.MeasureDelay(h.Settings.HttpMethod)
    return err == nil
}

إذا كانت الشبكة معطّلة، يتم تجاهل نتيجة الفحص (RTT = 0، ولا يتم تخزينها).

التحليل الإحصائي

الملف: app/observatory/burst/healthping_result.go

يحتفظ HealthPingRTTS بمخزن دائري لنتائج الفحص مع صلاحية زمنية:

go
type HealthPingRTTS struct {
    idx      int              // current write position
    cap      int              // buffer capacity
    validity time.Duration    // max age of a result
    rtts     []*pingRTT       // circular buffer
    stats    *HealthPingStats // cached statistics
}

الإحصائيات المحسوبة:

go
type HealthPingStats struct {
    All       int            // total valid samples
    Fail      int            // failed probes
    Deviation time.Duration  // standard deviation
    Average   time.Duration  // mean RTT
    Max       time.Duration  // maximum RTT
    Min       time.Duration  // minimum RTT
}

بالنسبة للعقد التي تحتوي على عينة واحدة فقط، يُقدَّر الانحراف المعياري بـ Average / 2 لمنع تفضيلها بشكل غير عادل على العقد التي تحتوي على بيانات أكثر.

واجهة gRPC API

الملف: app/observatory/command/command.go

go
type service struct {
    observatory extension.Observatory
}

func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
    resp, _ := s.observatory.GetObservation(ctx)
    retdata := resp.(*observatory.ObservationResult)
    return &GetOutboundStatusResponse{Status: retdata}, nil
}

كلا تنفيذي المرصد يلبيان واجهة extension.Observatory:

go
type Observatory interface {
    GetObservation(ctx context.Context) (proto.Message, error)
}

تجميع الأخطاء

الملف: app/observatory/explainErrors.go

go
type errorCollector struct {
    errors *errors.Error
}

func (e *errorCollector) SubmitError(err error) {
    if e.errors == nil {
        e.errors = errors.New("underlying connection error").Base(err)
    } else {
        e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
    }
}

يقوم مُجمّع الأخطاء بربط أخطاء الاتصال لأغراض التسجيل التشخيصي. يتم تمريره عبر session.TrackedConnectionError() بحيث يتم التقاط أخطاء طبقة النقل الأساسية.

الإعدادات

المرصد القياسي:

json
{
    "observatory": {
        "subjectSelector": ["proxy_"],
        "probeUrl": "https://www.google.com/generate_204",
        "probeInterval": "10s",
        "enableConcurrency": true
    }
}

المرصد الاندفاعي:

json
{
    "burstObservatory": {
        "subjectSelector": ["proxy_"],
        "pingConfig": {
            "destination": "https://connectivitycheck.gstatic.com/generate_204",
            "interval": "1m",
            "samplingCount": 10,
            "timeout": "5s",
            "httpMethod": "HEAD",
            "connectivity": "https://connectivitycheck.gstatic.com/generate_204"
        }
    }
}

ملاحظات التنفيذ

  • كلا المرصدين يُسجَّلان عبر common.RegisterConfig() ويتم إنشاؤهما عندما يظهر إعداد protobuf الخاص بهما في قائمة App.

  • القيمة المُرجعة من extension.ObservatoryType() تضمن أنه يمكن تنشيط مرصد واحد فقط في كل مرة (قياسي أو اندفاعي، وليس كلاهما).

  • دالة updateStatus() في المرصد القياسي حاليًا لا تقوم بأي عمل فيما يخص إزالة الإدخالات القديمة (مُعلّمة بتعليق TODO).

  • دالة Cleanup() في المرصد الاندفاعي تزيل النتائج لوسوم المعالجات الصادرة التي لم تعد موجودة في مخرجات المُحدّد، مما يعالج الإزالة الديناميكية للمعالجات.

  • صلاحية فحص الحالة الصحية في الوضع الاندفاعي هي Interval * SamplingCount * 2 -- ضعف فترة أخذ العينات لمراعاة التوزيع العشوائي للفحوصات.

  • ثابت rttFailed (المُعرَّف في حزمة burst) يمثل فشل الفحص ويُخزَّن كقيمة حارسة في المخزن الدائري، ويُحسب ضمن إحصائية Fail.

  • كلا المرصدين يبدآن بشكل كسول: يبدأ الفحص فقط عند استدعاء Start() وعندما يكون SubjectSelector غير فارغ.

تحليل تقني لأغراض إعادة التنفيذ.