Python ile Redis Pub/Sub Kanalları

Selamlar, bu yazımda Redis'in Pub/Sub mekanizmasını Python tarafından nasıl kullanacağımıza bakacağız. Konu basit gibi görünür, gerçekten de ilk publisher/subscriber satırını yazana kadar basittir; ama arka plan thread'i, pattern subscription ve mesaj kayıp senaryoları işin içine girince biraz dikkat ister. Hadi sırayla gidelim.

Pub/Sub aslinda ne is yapar?

Pub/Sub (yayıncı/abone) modeli, mesajı gönderen tarafın kime gittiğini bilmediği, alıcı tarafın da gönderici hakkında bir fikri olmadığı bir mesajlaşma kalıbıdır. Yayıncı bir kanala (channel) publish der; o anda o kanalı dinleyen kim varsa mesajı alır. O an dinleyen kimse yoksa mesaj uçar gider, hiçbir yerde tutulmaz.

Bu davranış kulağa zayıf gibi gelebilir ama bilerek böyle. Anlık bildirim, canlı yayın sayacı, dashboard fan-out gibi senaryolarda 'kayıp olursa olsun, geç gelmesin' tercihi mantıklıdır. Garanti istiyorsanız zaten yanlış aracı seçmişsiniz, en aşağıda Streams'a geleceğim.

Temel publisher ve subscriber

Önce yayıncı tarafı. redis-py ile bağlanıp publish çağrısı yapıyoruz, dönüş değeri o anda kanalı dinleyen abone sayısı:

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

count = r.publish('haber:spor', 'Final maci sona erdi')
print(f'{count} aboneye iletildi')

Abone tarafında pubsub() nesnesi üzerinden bir veya birden fazla kanala abone oluyoruz, sonra listen() ile bloklayıcı bir döngüye giriyoruz:

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
pubsub.subscribe('haber:spor', 'haber:teknoloji')

for message in pubsub.listen():
    if message['type'] == 'message':
        print(f"[{message['channel']}] {message['data']}")

listen() size sadece veri mesajları değil, abonelik onayı (subscribe, psubscribe) gibi kontrol mesajları da gönderir. Bu yüzden type == 'message' kontrolü zorunlu, atlamayın.

Pattern subscription

Onlarca kanalı tek tek yazmak istemiyorsanız psubscribe ile glob pattern kullanabilirsiniz. Mesela tüm haber kanallarını yakalamak için:

import redis

r = redis.Redis(host='localhost', port=6379, decode_responses=True)
pubsub = r.pubsub()
pubsub.psubscribe('haber:*')

for message in pubsub.listen():
    if message['type'] == 'pmessage':
        print(f"{message['pattern']} -> {message['channel']}: {message['data']}")

Dikkat: pattern aboneliklerinde mesaj tipi message değil pmessage olarak gelir. Aynı abone hem subscribe hem psubscribe yaptıysa, eşleşen bir mesaj iki ayrı tipte iki kez teslim edilebilir; uygulama tarafında buna hazır olun.

Arka plan thread'inde dinleme

Gerçek bir uygulamada for message in pubsub.listen() döngüsünü ana thread'i kilitlemeden çalıştırmak istersiniz. redis-py'nin run_in_thread yardımcısı tam bunu yapıyor; siz callback fonksiyonunuzu kanala bağlıyorsunuz, kütüphane arka planda thread açıp mesajları size gönderiyor:

import redis
import time

r = redis.Redis(host='localhost', port=6379, decode_responses=True)

def handle(message):
    if message['type'] == 'message':
        print(f"alindi: {message['data']}")

pubsub = r.pubsub()
pubsub.subscribe(**{'bildirim:1001': handle})

thread = pubsub.run_in_thread(sleep_time=0.01)
print('arka planda dinleniyor')

time.sleep(5)
thread.stop()
pubsub.close()

sleep_time thread'in iki poll arası ne kadar bekleyeceğini söyler; çok düşük tutarsanız CPU yer, çok yüksek tutarsanız mesaj gecikir. 0.01 çoğu yük için makul. Programı kapatırken thread.stop() ve ardından pubsub.close() çağırmayı unutmayın, aksi halde Redis tarafında asılı bağlantılar bırakırsınız.

Sik karsilasilan tuzaklar

  • At-most-once oldugunu unutmak: Pub/Sub mesajı diske yazmaz, kuyruğa koymaz. Subscriber bağlı değilken atılan mesaj kaybolur. 'Bir kez bile kaçırırsam olmaz' diyorsanız Streams'e geçin.
  • Tek baglantida hem publish hem subscribe denemek: subscribe çağrısından sonra o bağlantı normal komut kabul etmez. Yayıncı ve abone için iki ayrı Redis istemcisi kullanın.
  • type == 'message' kontrolunu atlamak: Abonelik onay mesajlarını veri mesajı sanıp data alanını parse etmeye çalışırsanız tip hatası alırsınız.
  • thread.stop() cagrisini unutmak: Process kapanırken bile arka plan thread'i bağlantıyı düzgün bırakmaz; Redis tarafında CLIENT LIST çıktısında ölü subscriber'lar birikir.

Pub/Sub mi, Streams mi?

Bence ayrım net: anlık ve kayıp tolere edilebiliyorsa Pub/Sub, kalıcılık ve consumer group istiyorsanız Streams. Streams diske yazar, offset tutar, XREADGROUP ile birden fazla consumer arasında iş bölüştürür; ama API biraz daha ağırdır. Şahsi kanaatim, dashboard ve canlı bildirim için Pub/Sub yeter, sipariş veya ödeme olayı gibi kaybedilmemesi gereken şeyler için doğrudan Streams ile başlayın, sonra geri dönmek zorunda kalmayın.

Kapanis

Bu yazıda redis-py ile temel publisher/subscriber yazmayı, pattern aboneliklerini ve arka plan thread'inde mesaj dinlemeyi ele aldık. Pub/Sub yerinde kullanıldığında çok hafif ve hızlıdır, yanlış yerde kullanıldığında ise sessiz mesaj kayıplarıyla başınızı ağrıtır. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.