المرصد: فحص الحالة الصحية ومراقبة المعالجات الصادرة
يقوم نظام المرصد الفرعي بفحص المعالجات الصادرة لتحديد حالتها الصحية وزمن الاستجابة. تستخدم الموازنات هذه البيانات لاختيار أفضل معالج صادر لكل اتصال. يوفر Xray تنفيذين للمرصد: المرصد القياسي Observer والمرصد بالوضع الاندفاعي BurstObserver.
نظرة عامة على البنية
flowchart TD
subgraph Observatory
O[Observer] --> P[probe via HTTP GET]
P --> S[OutboundStatus list]
end
subgraph BurstObservatory
BO[BurstObserver] --> HP[HealthPing]
HP --> HPR[HealthPingRTTS per tag]
HPR --> BS[OutboundStatus with stats]
end
subgraph Integration
S --> Bal[Balancer]
BS --> Bal
Bal --> OH[Outbound Handler selection]
end
subgraph gRPC API
CMD[ObservatoryService] --> O
CMD --> BO
endالمرصد القياسي
الملف: app/observatory/observer.go
يفحص المرصد القياسي كل معالج صادر مُحدد عن طريق إرسال طلبات HTTP GET من خلاله.
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
status []*OutboundStatus
finished *done.Instance
ohm outbound.Manager
dispatcher routing.Dispatcher
}آلية الفحص
تقوم دالة probe() بإنشاء عميل HTTP يوجّه الاتصال عبر معالج صادر محدد:
func (o *Observer) probe(outbound string) ProbeResult {
httpTransport := http.Transport{
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := v2net.ParseDestination(network + ":" + addr)
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
conn, err := tagged.Dialer(trackedCtx, o.dispatcher, dest, outbound)
return conn, err
},
TLSHandshakeTimeout: 5 * time.Second,
}
httpClient := &http.Client{
Transport: &httpTransport,
Timeout: 5 * time.Second,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
probeURL := "https://www.google.com/generate_204"
if o.config.ProbeUrl != "" {
probeURL = o.config.ProbeUrl
}
startTime := time.Now()
response, err := httpClient.Do(req)
GETTime := time.Since(startTime)
if err != nil {
return ProbeResult{Alive: false, LastErrorReason: errorMessage}
}
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
}التفاصيل الرئيسية:
- يستخدم
tagged.Dialer()لتوجيه اتصال الفحص عبر المعالج الصادر المحدد - تتبع الأخطاء عبر
session.TrackedConnectionError()يلتقط أخطاء الاتصال الأساسية - لا يتبع عمليات إعادة التوجيه (
ErrUseLastResponse) - عنوان URL الافتراضي للفحص:
https://www.google.com/generate_204 - مهلة الانتظار: 5 ثوانٍ لكل من مصافحة TLS والطلب الإجمالي
الحلقة الخلفية
func (o *Observer) background() {
for !o.finished.Done() {
hs, _ := o.ohm.(outbound.HandlerSelector)
outbounds := hs.Select(o.config.SubjectSelector)
sleepTime := time.Second * 10
if o.config.ProbeInterval != 0 {
sleepTime = time.Duration(o.config.ProbeInterval)
}
if !o.config.EnableConcurrency {
// Sequential: probe one at a time with sleep between
for _, v := range outbounds {
result := o.probe(v)
o.updateStatusForResult(v, &result)
time.Sleep(sleepTime)
}
} else {
// Concurrent: probe all at once, wait for all
ch := make(chan struct{}, len(outbounds))
for _, v := range outbounds {
go func(v string) {
result := o.probe(v)
o.updateStatusForResult(v, &result)
ch <- struct{}{}
}(v)
}
for range outbounds { <-ch }
time.Sleep(sleepTime)
}
}
}SubjectSelector هي قائمة من أنماط الوسوم تُستخدم مع outbound.HandlerSelector.Select() لمطابقة وسوم المعالجات الصادرة (يدعم المطابقة بالبادئة).
تتبع الحالة
type OutboundStatus struct {
Alive bool
Delay int64 // milliseconds
LastErrorReason string
OutboundTag string
LastSeenTime int64 // unix timestamp
LastTryTime int64 // unix timestamp
}عند فشل الفحص، يُعيَّن التأخير إلى 99999999 (ما يعادل اللانهاية عمليًا لأغراض الترتيب).
المرصد الاندفاعي
الملف: app/observatory/burst/burstobserver.go
يوفر المرصد الاندفاعي فحصًا صحيًا أكثر تطورًا مع تحليل إحصائي (المتوسط، الانحراف المعياري، الحد الأدنى، الحد الأقصى، معدل الفشل).
type Observer struct {
config *Config
ctx context.Context
statusLock sync.Mutex
hp *HealthPing
finished *done.Instance
ohm outbound.Manager
}HealthPing
الملف: app/observatory/burst/healthping.go
محرك فحص الحالة الصحية الأساسي:
type HealthPing struct {
ctx context.Context
dispatcher routing.Dispatcher
access sync.Mutex
ticker *time.Ticker
tickerClose chan struct{}
Settings *HealthPingSettings
Results map[string]*HealthPingRTTS
}الإعدادات:
| الإعداد | القيمة الافتراضية | الوصف |
|---|---|---|
Destination | https://connectivitycheck.gstatic.com/generate_204 | عنوان URL للفحص |
Interval | دقيقة واحدة | الفترة بين جولات أخذ العينات |
SamplingCount | 10 | عدد العينات لكل جولة |
Timeout | 5 ثوانٍ | مهلة الانتظار لكل فحص |
HttpMethod | HEAD | طريقة HTTP |
Connectivity | (فارغ) | عنوان URL للتحقق من توفر الشبكة |
استراتيجية أخذ العينات
يعمل المُجدوِل على مؤقت بفترة = Interval * SamplingCount:
func (h *HealthPing) StartScheduler(selector func() ([]string, error)) {
interval := h.Settings.Interval * time.Duration(h.Settings.SamplingCount)
ticker := time.NewTicker(interval)
// Initial check
go func() { h.Check(tags) }()
// Periodic checks
go func() {
for {
go func() {
h.doCheck(tags, interval, h.Settings.SamplingCount)
h.Cleanup(tags)
}()
<-ticker.C
}
}()
}تقوم دالة doCheck() بتوزيع الفحوصات عشوائيًا ضمن فترة أخذ العينات:
func (h *HealthPing) doCheck(tags []string, duration time.Duration, rounds int) {
for _, tag := range tags {
client := newPingClient(h.ctx, h.dispatcher, h.Settings.Destination, h.Settings.Timeout, tag)
for i := 0; i < rounds; i++ {
delay := time.Duration(dice.RollInt63n(int64(duration)))
time.AfterFunc(delay, func() {
delay, err := client.MeasureDelay(h.Settings.HttpMethod)
// report result
})
}
}
}هذا التوزيع العشوائي يمنع جميع الفحوصات من الوصول إلى الشبكة في الوقت نفسه.
عميل الفحص
الملف: app/observatory/burst/ping.go
func newHTTPClient(ctxv context.Context, dispatcher routing.Dispatcher, handler string, timeout time.Duration) *http.Client {
tr := &http.Transport{
DisableKeepAlives: true,
DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) {
dest, _ := net.ParseDestination(network + ":" + addr)
return tagged.Dialer(ctxv, dispatcher, dest, handler)
},
}
return &http.Client{
Transport: tr,
Timeout: timeout,
CheckRedirect: func(req *http.Request, via []*http.Request) error {
return http.ErrUseLastResponse
},
}
}ملاحظة: DisableKeepAlives: true يضمن أن كل فحص ينشئ اتصالاً جديدًا.
فحص الاتصال بالشبكة
قبل تصنيف فحص على أنه فاشل، يمكن للنظام اختياريًا التحقق مما إذا كانت الشبكة نفسها معطّلة:
func (h *HealthPing) checkConnectivity() bool {
if h.Settings.Connectivity == "" { return true }
tester := newDirectPingClient(h.Settings.Connectivity, h.Settings.Timeout)
_, err := tester.MeasureDelay(h.Settings.HttpMethod)
return err == nil
}إذا كانت الشبكة معطّلة، يتم تجاهل نتيجة الفحص (RTT = 0، ولا يتم تخزينها).
التحليل الإحصائي
الملف: app/observatory/burst/healthping_result.go
يحتفظ HealthPingRTTS بمخزن دائري لنتائج الفحص مع صلاحية زمنية:
type HealthPingRTTS struct {
idx int // current write position
cap int // buffer capacity
validity time.Duration // max age of a result
rtts []*pingRTT // circular buffer
stats *HealthPingStats // cached statistics
}الإحصائيات المحسوبة:
type HealthPingStats struct {
All int // total valid samples
Fail int // failed probes
Deviation time.Duration // standard deviation
Average time.Duration // mean RTT
Max time.Duration // maximum RTT
Min time.Duration // minimum RTT
}بالنسبة للعقد التي تحتوي على عينة واحدة فقط، يُقدَّر الانحراف المعياري بـ Average / 2 لمنع تفضيلها بشكل غير عادل على العقد التي تحتوي على بيانات أكثر.
واجهة gRPC API
الملف: app/observatory/command/command.go
type service struct {
observatory extension.Observatory
}
func (s *service) GetOutboundStatus(ctx context.Context, request *GetOutboundStatusRequest) (*GetOutboundStatusResponse, error) {
resp, _ := s.observatory.GetObservation(ctx)
retdata := resp.(*observatory.ObservationResult)
return &GetOutboundStatusResponse{Status: retdata}, nil
}كلا تنفيذي المرصد يلبيان واجهة extension.Observatory:
type Observatory interface {
GetObservation(ctx context.Context) (proto.Message, error)
}تجميع الأخطاء
الملف: app/observatory/explainErrors.go
type errorCollector struct {
errors *errors.Error
}
func (e *errorCollector) SubmitError(err error) {
if e.errors == nil {
e.errors = errors.New("underlying connection error").Base(err)
} else {
e.errors = e.errors.Base(errors.New("underlying connection error").Base(err))
}
}يقوم مُجمّع الأخطاء بربط أخطاء الاتصال لأغراض التسجيل التشخيصي. يتم تمريره عبر session.TrackedConnectionError() بحيث يتم التقاط أخطاء طبقة النقل الأساسية.
الإعدادات
المرصد القياسي:
{
"observatory": {
"subjectSelector": ["proxy_"],
"probeUrl": "https://www.google.com/generate_204",
"probeInterval": "10s",
"enableConcurrency": true
}
}المرصد الاندفاعي:
{
"burstObservatory": {
"subjectSelector": ["proxy_"],
"pingConfig": {
"destination": "https://connectivitycheck.gstatic.com/generate_204",
"interval": "1m",
"samplingCount": 10,
"timeout": "5s",
"httpMethod": "HEAD",
"connectivity": "https://connectivitycheck.gstatic.com/generate_204"
}
}
}ملاحظات التنفيذ
كلا المرصدين يُسجَّلان عبر
common.RegisterConfig()ويتم إنشاؤهما عندما يظهر إعداد protobuf الخاص بهما في قائمةApp.القيمة المُرجعة من
extension.ObservatoryType()تضمن أنه يمكن تنشيط مرصد واحد فقط في كل مرة (قياسي أو اندفاعي، وليس كلاهما).دالة
updateStatus()في المرصد القياسي حاليًا لا تقوم بأي عمل فيما يخص إزالة الإدخالات القديمة (مُعلّمة بتعليق TODO).دالة
Cleanup()في المرصد الاندفاعي تزيل النتائج لوسوم المعالجات الصادرة التي لم تعد موجودة في مخرجات المُحدّد، مما يعالج الإزالة الديناميكية للمعالجات.صلاحية فحص الحالة الصحية في الوضع الاندفاعي هي
Interval * SamplingCount * 2-- ضعف فترة أخذ العينات لمراعاة التوزيع العشوائي للفحوصات.ثابت
rttFailed(المُعرَّف في حزمة burst) يمثل فشل الفحص ويُخزَّن كقيمة حارسة في المخزن الدائري، ويُحسب ضمن إحصائيةFail.كلا المرصدين يبدآن بشكل كسول: يبدأ الفحص فقط عند استدعاء
Start()وعندما يكونSubjectSelectorغير فارغ.