Çok kiracılı arka plan işlerini OpenTelemetry ile gözlemlemek
Selamlar, bu yazımda multi-tenant bir SaaS'ta arka plan işlerinin (background jobs) OpenTelemetry ile nasıl gözlemleneceğine bakacağız. Konu kâğıt üstünde basit duruyor ama bir kiracı bir gecede 200 bin satırlık export attığında, diğer kiracıların 'iş bitti mi acaba?' diye sorması işin pratik tarafı. Hadi başlayalım.
Sorun ne tam olarak?
Kuyruğa job atan birden fazla kiracı var. Worker'lar tek havuzdan çekiyorsa, büyük bir kiracı kuyruğu doldurduğunda küçük kiracıların işleri açlık (starvation) yaşar. Buradaki kritik nokta şu: sadece toplam throughput'a bakarsak hiçbir şey görmeyiz, ortalama kiracıyı saklar. Bence çözüm de zaten metric ve span'lere tenant.id etiketi koymaktan geçiyor; gerisi adil zamanlama (fair scheduling) ayarı.
Kiracı bazlı kuyruk ve enqueue tarafı
İşin başlangıcında her kiracı için ayrı bir Redis listesi tutuyoruz. jobs:<tenant>:<priority> formatı; priority opsiyonel ama ileride lazım olur.
from opentelemetry import trace, metrics
import json, time
tracer = trace.get_tracer('jobs.processor')
meter = metrics.get_meter('jobs.processor')
job_enqueued = meter.create_counter('jobs.enqueued', unit='1')
job_queue_depth = meter.create_up_down_counter('jobs.queue_depth', unit='1')
job_wait_time = meter.create_histogram('jobs.wait_time', unit='ms')
job_duration = meter.create_histogram('jobs.duration', unit='ms')
async def enqueue(redis, tenant_id, job_type, payload, priority='normal'):
with tracer.start_as_current_span('job.enqueue', attributes={
'tenant.id': tenant_id,
'job.type': job_type,
'job.priority': priority,
}) as span:
ctx = span.get_span_context()
job = {
'id': new_id(),
'tenant_id': tenant_id,
'type': job_type,
'payload': payload,
'priority': priority,
'enqueued_at': time.time(),
'trace_id': format(ctx.trace_id, '032x'),
'span_id': format(ctx.span_id, '016x'),
}
await redis.lpush(f'jobs:{tenant_id}:{priority}', json.dumps(job))
job_enqueued.add(1, {'tenant.id': tenant_id, 'job.type': job_type})
job_queue_depth.add(1, {'tenant.id': tenant_id})
return job['id']
Buradaki en kritik detay trace_id ve span_id'yi job payload'ına gömmek. Worker tarafı bunları okuyup Span Link kuracak; böylece enqueue ve process arasında saatler geçse bile Jaeger/Tempo'da iki ucu birbirine bağlayabiliyoruz. Aynı trace içinde tutmak yerine link kullanmamızın sebebi şu: bir HTTP isteği iş atıyor olabilir ve o trace'in 200 milisaniyede bitmesi normal; iş 5 dakika sonra çalıştığında ana trace'i şişirmek istemeyiz.
İşçi tarafı ve adil paylaşım
İşçi her kiracı için bir semafor tutuyor. Tek kiracı bütün worker slotlarını kapatamasın diye max_concurrent_per_tenant koyduk; bizde başlangıç için 5 yetti, sizdeki yüke göre tekrar bakın.
async def process_job(job):
enqueue_ctx = trace.SpanContext(
trace_id=int(job['trace_id'], 16),
span_id=int(job['span_id'], 16),
is_remote=True,
)
wait_ms = (time.time() - job['enqueued_at']) * 1000
with tracer.start_as_current_span(
'job.process',
links=[trace.Link(enqueue_ctx)],
attributes={
'tenant.id': job['tenant_id'],
'job.type': job['type'],
'job.wait_time_ms': wait_ms,
},
) as span:
job_wait_time.record(wait_ms, {'tenant.id': job['tenant_id']})
job_queue_depth.add(-1, {'tenant.id': job['tenant_id']})
start = time.time()
try:
await handlers[job['type']](job['payload'])
span.set_attribute('job.status', 'success')
except Exception as ex:
span.record_exception(ex)
span.set_status(trace.StatusCode.ERROR, str(ex))
finally:
job_duration.record((time.time() - start) * 1000,
{'tenant.id': job['tenant_id']})
wait_time metriği bence bu işin en değerli sinyali. Süreyi histogram olarak topladığımız için p95(jobs.wait_time) by (tenant.id) ile hangi kiracının kuyrukta beklediğini doğrudan görebilirsiniz. Throughput sayısı yanıltır, p95 yalan söylemez.
Açlığı zamanında yakalamak
Her dakika çalışan ufak bir kontrol; en eski job 5 dakikadan uzun süredir bekliyorsa kiracıyı işaretliyoruz.
async def starvation_check():
with tracer.start_as_current_span('jobs.starvation_check') as span:
starving = [t for t, s in await queue_stats()
if s['oldest_job_age_seconds'] > 300]
span.set_attribute('jobs.starving_tenant_count', len(starving))
if starving:
span.add_event('starvation', {'tenants': ','.join(starving)})
Sık karşılaşılan tuzaklar
- Tek bir trace altında zincirlemek: Enqueue ve process'i parent-child yaparsanız 5 dakikalık bir trace ortaya çıkar; çoğu backend bunu ya keser ya garip gösterir. Span Link doğrusu.
tenant.idyerine isim koymak: Kiracı adı değişebilir. ID koyun, görünen ada label olarak ayrı geçin.- Histogram yerine counter ile süre ölçmek: Ortalamadan iyi bir hikâye çıkmaz. p95/p99 lazım.
- Semafor sayısını uygulamada paylaşmak: Worker birden fazla instance ise her biri kendi semaforunu tutar. Gerçek kapasite kontrolünü Redis tarafında atomik tutmak gerekiyor.
Kapanış
Bu yazıda kiracı bazlı kuyruk, enqueue/process arasında Span Link, ve adil zamanlama için worker semaforuna baktık. Bana sorarsanız bu kurulumda en çok işe yarayan tek metric jobs.wait_time p95'i; geri kalanı bunun etrafında şekilleniyor. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.
