Python ve Pika ile RabbitMQ Tüketicisi Yazmak

Selamlar, bu yazımda Python tarafında RabbitMQ tüketicisi (consumer) yazarken kafamı en çok kurcalayan noktalara değineceğim. Pika resmi istemcisi gerçekten esnek ama bu esneklik insana 'acaba doğru mu yapıyorum?' dedirten cinsten. Hadi adım adım bakalım.

Mesaj kuyruğu konusu kâğıt üzerinde basit görünür: bir taraf yazar, diğer taraf okur. Gerçekte ise broker bağlantısı kopar, tüketici ortasında çöker, tek bir yavaş istek bütün worker'ları bloklar. Bu yazıda bu tuzakların büyük çoğunluğunu kapatan bir iskelet kuracağız.

RabbitMQ'yu yerelde kaldırmak

Test için lokalde Docker üzerinden ayağa kaldırmak en hızlısı. Management arayüzü de cabası, kuyrukları gözle görmek hayat kurtarır:

docker run -d --name rabbitmq-test \
  -p 5672:5672 \
  -p 15672:15672 \
  -e RABBITMQ_DEFAULT_USER=admin \
  -e RABBITMQ_DEFAULT_PASS=password \
  rabbitmq:3-management

http://localhost:15672 adresinden arayüze girip kuyrukların durumunu, message rate'ini ve unacked sayısını canlı izleyebilirsiniz. Pika tarafı için ise tek bir pip install pika yetiyor.

Bağlantı yönetimi ve dayanıklılık

Pika'nın BlockingConnection nesnesi tek başına yeterli değil. Network bir an dalgalanır, broker yeniden başlar - sizin tüketicinin bunu görmezden gelmesi gerekir. Bence en sağlıklısı, bağlantıyı küçük bir sınıfa sarmak ve heartbeat'i mantıklı bir değerde tutmak:

import pika
import time
import logging

logger = logging.getLogger(__name__)


class RabbitBaglantisi:
    def __init__(self, host='localhost', user='admin', password='password'):
        self.parametreler = pika.ConnectionParameters(
            host=host,
            credentials=pika.PlainCredentials(user, password),
            heartbeat=600,
            blocked_connection_timeout=300,
            connection_attempts=3,
            retry_delay=5,
        )
        self._baglanti = None
        self._kanal = None

    def baglan(self):
        # Exponential backoff ile yeniden deneme
        for deneme in range(1, 6):
            try:
                self._baglanti = pika.BlockingConnection(self.parametreler)
                logger.info('RabbitMQ baglantisi acildi')
                return self._baglanti
            except pika.exceptions.AMQPConnectionError as hata:
                bekleme = min(2 ** deneme, 60)
                logger.warning(f'Baglanti basarisiz: {hata}. {bekleme}s sonra tekrar.')
                time.sleep(bekleme)
        raise ConnectionError('RabbitMQ baglantisi kurulamadi')

    def kanal(self):
        if self._baglanti is None or self._baglanti.is_closed:
            self.baglan()
        if self._kanal is None or self._kanal.is_closed:
            self._kanal = self._baglanti.channel()
        return self._kanal

Burada dikkatinizi çekeceğim iki nokta var. Birincisi heartbeat=600: çok kısa tutarsanız meşgul tüketicilerde yanlış kopma uyarıları alırsınız. İkincisi blocked_connection_timeout: broker bellek baskısı altındayken yayını duraklatır, bu süre dolarsa siz de pes edersiniz.

Tüketici iskeleti ve ack akışı

Şimdi gerçek iş bu. Pika'da auto_ack=True çok cazip görünür ama prod'da kullanmayın - mesajı aldığınız anda silinir, siz işlerken process çökerse mesaj kaybolur. Manuel ack'in bedeli birkaç satır kod, kazancı uykusuz gece sayısı.

import json

def tuketiciyi_calistir(kanal, kuyruk_adi, isleyici):
    # Dayanikli kuyruk: broker yeniden basladiginda kaybolmaz
    kanal.queue_declare(queue=kuyruk_adi, durable=True)
    # Bir seferde sadece bir mesaj cek
    kanal.basic_qos(prefetch_count=1)

    def callback(ch, method, properties, body):
        try:
            mesaj = json.loads(body.decode('utf-8'))
            isleyici(mesaj)
            ch.basic_ack(delivery_tag=method.delivery_tag)
        except json.JSONDecodeError:
            # Bozuk mesaji tekrar kuyruga atma, anlamsiz dongu olur
            logger.error('JSON parse hatasi, mesaj DLQ adayi')
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=False)
        except Exception as hata:
            logger.exception(f'Isleme hatasi: {hata}')
            ch.basic_nack(delivery_tag=method.delivery_tag, requeue=True)

    kanal.basic_consume(
        queue=kuyruk_adi,
        on_message_callback=callback,
        auto_ack=False,
    )
    kanal.start_consuming()

Buradaki ayrım çok önemli: requeue=False mesajı diskten siler (ya da DLX varsa oraya yollar), requeue=True ise mesajı kuyruğun başına geri koyar. Bozuk JSON gibi kalıcı hatalarda requeue=True vermek demek, sonsuz bir döngü demek. Ben şahsen JSON hatası gibi 'tekrarı manasız' durumlar için requeue=False, geçici hatalar için requeue=True ayrımını net çiziyorum.

Prefetch neden bu kadar önemli?

basic_qos(prefetch_count=N) belki de tüketici performansının tek satırlık sırrıdır. Varsayılan değer sınırsızdır - yani broker, tüketicinize ne kadar mesaj sığabilirse o kadarını fırlatır. Sonuç? Bir tüketici tüm yükü çeker, diğerleri boş durur.

Genel kurallar:

  • CPU yoğun işlem: prefetch_count=1. Eşit dağılım için.
  • IO ağırlıklı işlem: prefetch_count=10-50 arası. Network beklerken başka mesajla ilgilenebilirsiniz.
  • Çok hızlı, idempotent işler: 100+ da olabilir. Ama ölçmeden artırmayın.

Sık karşılaşılan hatalar

  • Manuel ack'i unutmak: auto_ack=True ile çalışıyorsanız crash'te mesaj uçar. Production'da kapalı tutun, her zaman.
  • Heartbeat'i 30 saniyeye çekmek: Uzun süren mesaj işlemlerinde sahte bağlantı kopmaları yersiniz. 600 mantıklı bir varsayılandır.
  • Tek thread'de uzun iş yapmak: start_consuming() bloklar; uzun işlerde ayrı bir worker pool'a delege edin, yoksa heartbeat'i kaçırırsınız.
  • DLX kurmadan requeue=False yapmak: Mesaj sessizce silinir. Önce dead letter exchange tanımlayın, sonra reddedin.
  • Connection paylaşmak ama channel paylaşmamak: Pika'da channel thread-safe değil. Her thread kendi channel'ını açsın.

Doğrulama

Yerelde test için bir publisher script'i yazıp birkaç mesaj atın, management UI'da kuyruktaki sayının azalışını izleyin:

curl -u admin:password http://localhost:15672/api/queues/%2F/orders | \
  python -c 'import sys, json; d=json.load(sys.stdin); print(d[\"messages\"], d[\"messages_unacknowledged\"])'

messages_unacknowledged sayısı uzun süre artıyorsa tüketicinizde ack akışı bozuk demektir; ya callback patliyor ya da basic_ack çağrısına hiç ulaşılmıyor.

Kapanış

RabbitMQ tüketicisi yazmak ilk seferde basit, ikinci seferde zor görünür - çünkü ilk crash'ten sonra kayıp mesajları kim ödeyecek sorusu çıkar ortaya. Şahsi kanaatim, manuel ack ve düşük prefetch ile başlayıp ihtiyaç doğdukça açmak en sağlıklısı; baştan 'optimize edeyim' deyip prefetch'i 1000 yapanlar genelde geri döner. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.