Python ve Pika ile RabbitMQ Tüketicisi Yazmak
Selamlar, bu yazımda Python tarafında RabbitMQ tüketicisi (consumer) yazarken kafamı en çok kurcalayan noktalara değineceğim. Pika resmi istemcisi gerçekten esnek ama bu esneklik insana 'acaba doğru mu yapıyorum?' dedirten cinsten. Hadi adım adım bakalım.
Mesaj kuyruğu konusu kâğıt üzerinde basit görünür: bir taraf yazar, diğer taraf okur. Gerçekte ise broker bağlantısı kopar, tüketici ortasında çöker, tek bir yavaş istek bütün worker'ları bloklar. Bu yazıda bu tuzakların büyük çoğunluğunu kapatan bir iskelet kuracağız.
RabbitMQ'yu yerelde kaldırmak
Test için lokalde Docker üzerinden ayağa kaldırmak en hızlısı. Management arayüzü de cabası, kuyrukları gözle görmek hayat kurtarır:
docker run -d --name rabbitmq-test \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=password \
rabbitmq:3-management
http://localhost:15672 adresinden arayüze girip kuyrukların durumunu, message rate'ini ve unacked sayısını canlı izleyebilirsiniz. Pika tarafı için ise tek bir pip install pika yetiyor.
Bağlantı yönetimi ve dayanıklılık
Pika'nın BlockingConnection nesnesi tek başına yeterli değil. Network bir an dalgalanır, broker yeniden başlar - sizin tüketicinin bunu görmezden gelmesi gerekir. Bence en sağlıklısı, bağlantıyı küçük bir sınıfa sarmak ve heartbeat'i mantıklı bir değerde tutmak:
import pika
import time
import logging
logger = logging.getLogger(__name__)
class RabbitBaglantisi:
def __init__(self, host='localhost', user='admin', password='password'):
self.parametreler = pika.ConnectionParameters(
host=host,
credentials=pika.PlainCredentials(user, password),
heartbeat=600,
blocked_connection_timeout=300,
connection_attempts=3,
retry_delay=5,
)
self._baglanti = None
self._kanal = None
def baglan(self):
# Exponential backoff ile yeniden deneme
for deneme in range(1, 6):
try:
self._baglanti = pika.BlockingConnection(self.parametreler)
logger.info('RabbitMQ baglantisi acildi')
return self._baglanti
except pika.exceptions.AMQPConnectionError as hata:
bekleme = min(2 ** deneme, 60)
logger.warning(f'Baglanti basarisiz: {hata}. {bekleme}s sonra tekrar.')
time.sleep(bekleme)
raise ConnectionError('RabbitMQ baglantisi kurulamadi')
def kanal(self):
if self._baglanti is None or self._baglanti.is_closed:
self.baglan()
if self._kanal is None or self._kanal.is_closed:
self._kanal = self._baglanti.channel()
return self._kanal
Burada dikkatinizi çekeceğim iki nokta var. Birincisi heartbeat=600: çok kısa tutarsanız meşgul tüketicilerde yanlış kopma uyarıları alırsınız. İkincisi blocked_connection_timeout: broker bellek baskısı altındayken yayını duraklatır, bu süre dolarsa siz de pes edersiniz.
Tüketici iskeleti ve ack akışı
Şimdi gerçek iş bu. Pika'da auto_ack=True çok cazip görünür ama prod'da kullanmayın - mesajı aldığınız anda silinir, siz işlerken process çökerse mesaj kaybolur. Manuel ack'in bedeli birkaç satır kod, kazancı uykusuz gece sayısı.
import json
def tuketiciyi_calistir(kanal, kuyruk_adi, isleyici):
# Dayanikli kuyruk: broker yeniden basladiginda kaybolmaz
kanal.queue_declare(queue=kuyruk_adi, durable=True)
# Bir seferde sadece bir mesaj cek
kanal.basic_qos(prefetch_count=1)
def callback(ch, method, properties, body):
try:
mesaj = json.loads(body.decode('utf-8'))
isleyici(mesaj)
ch.basic_ack(delivery_tag=method.delivery_tag)
except json.JSONDecodeError:
# Bozuk mesaji tekrar kuyruga atma, anlamsiz dongu olur
logger.error('JSON parse hatasi, mesaj DLQ adayi')
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
except Exception as hata:
logger.exception(f'Isleme hatasi: {hata}')
ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)
kanal.basic_consume(
queue=kuyruk_adi,
on_message_callback=callback,
auto_ack=False,
)
kanal.start_consuming()
Buradaki ayrım çok önemli: requeue=False mesajı diskten siler (ya da DLX varsa oraya yollar), requeue=True ise mesajı kuyruğun başına geri koyar. Bozuk JSON gibi kalıcı hatalarda requeue=True vermek demek, sonsuz bir döngü demek. Ben şahsen JSON hatası gibi 'tekrarı manasız' durumlar için requeue=False, geçici hatalar için requeue=True ayrımını net çiziyorum.
Prefetch neden bu kadar önemli?
basic_qos(prefetch_count=N) belki de tüketici performansının tek satırlık sırrıdır. Varsayılan değer sınırsızdır - yani broker, tüketicinize ne kadar mesaj sığabilirse o kadarını fırlatır. Sonuç? Bir tüketici tüm yükü çeker, diğerleri boş durur.
Genel kurallar:
- CPU yoğun işlem:
prefetch_count=1. Eşit dağılım için. - IO ağırlıklı işlem:
prefetch_count=10-50arası. Network beklerken başka mesajla ilgilenebilirsiniz. - Çok hızlı, idempotent işler:
100+da olabilir. Ama ölçmeden artırmayın.
Sık karşılaşılan hatalar
- Manuel ack'i unutmak:
auto_ack=Trueile çalışıyorsanız crash'te mesaj uçar. Production'da kapalı tutun, her zaman. - Heartbeat'i 30 saniyeye çekmek: Uzun süren mesaj işlemlerinde sahte bağlantı kopmaları yersiniz. 600 mantıklı bir varsayılandır.
- Tek thread'de uzun iş yapmak:
start_consuming()bloklar; uzun işlerde ayrı bir worker pool'a delege edin, yoksa heartbeat'i kaçırırsınız. - DLX kurmadan
requeue=Falseyapmak: Mesaj sessizce silinir. Önce dead letter exchange tanımlayın, sonra reddedin. - Connection paylaşmak ama channel paylaşmamak: Pika'da channel thread-safe değil. Her thread kendi channel'ını açsın.
Doğrulama
Yerelde test için bir publisher script'i yazıp birkaç mesaj atın, management UI'da kuyruktaki sayının azalışını izleyin:
curl -u admin:password http://localhost:15672/api/queues/%2F/orders | \
python -c 'import sys, json; d=json.load(sys.stdin); print(d[\"messages\"], d[\"messages_unacknowledged\"])'
messages_unacknowledged sayısı uzun süre artıyorsa tüketicinizde ack akışı bozuk demektir; ya callback patliyor ya da basic_ack çağrısına hiç ulaşılmıyor.
Kapanış
RabbitMQ tüketicisi yazmak ilk seferde basit, ikinci seferde zor görünür - çünkü ilk crash'ten sonra kayıp mesajları kim ödeyecek sorusu çıkar ortaya. Şahsi kanaatim, manuel ack ve düşük prefetch ile başlayıp ihtiyaç doğdukça açmak en sağlıklısı; baştan 'optimize edeyim' deyip prefetch'i 1000 yapanlar genelde geri döner. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.
