Python ile Apache Beam Veri Hatları Kurmak

Merhabalar, bu yazıda Python SDK üzerinden Apache Beam ile küçük bir veri hattı kurmaya bakacağız. Konu uzun bir alan, hepsini tek seferde anlatmak mümkün değil; ama temel kavramları net oturtursak ileride streaming'e geçtiğinizde de aynı zihinsel modelle ilerleyebiliyorsunuz. Hadi başlayalım.

İşler büyüdükçe ham CSV dosyalarını pandas ile işlemek bir yere kadar gidiyor. Sonra giriyor devreye 'aynı kod hem batch hem streaming çalışsın, lokalde test edeyim ama prod'da Dataflow'a atayım' isteği. Apache Beam tam burada konumlanıyor.

Apache Beam nedir?

Apache Beam, batch (toplu) ve streaming (akan) veri işlemeyi tek bir programlama modelinde birleştiren açık kaynaklı bir framework. Yani siz pipeline'ınızı bir kez yazıyorsunuz; aynı kod DirectRunner ile lokalde, Dataflow'da Google Cloud'da, ya da Flink/Spark üzerinde çalışabiliyor. Beam'in cazip tarafı bu taşınabilirlik. Runner'ı değiştiriyorsunuz, kod aynı kalıyor.

Dört temel kavramı baştan oturtmakta fayda var:

  • Pipeline: Bütün hattı temsil eden konteyner.
  • PCollection: Hatta akan dağıtık veri kümesi. Sınırlı (bounded) ya da sınırsız (unbounded) olabilir.
  • PTransform: PCollection'ları dönüştüren işlem. Map, Filter, GroupByKey gibi.
  • Runner: Pipeline'ı fiilen çalıştıran motor.

Bence bu dört kavramı aklınızda tutarsanız geri kalanı bunların türevi gibi geliyor.

Minimum bir DirectRunner örneği

Önce paketi kuralım:

pip install 'apache-beam[gcp]'

Şimdi küçük bir kelime sayım örneği yazalım. Klasik ama Beam'in ruhunu en hızlı gösteren örnek:

import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions

options = PipelineOptions(runner='DirectRunner')

with beam.Pipeline(options=options) as pipeline:
    (
        pipeline
        | 'Oku' >> beam.io.ReadFromText('input.txt')
        | 'Boluk' >> beam.FlatMap(lambda satir: satir.split())
        | 'Eslestir' >> beam.Map(lambda kelime: (kelime.lower(), 1))
        | 'Topla' >> beam.CombinePerKey(sum)
        | 'Yaz' >> beam.io.WriteToText('output')
    )

Burada | operatörü transform zincirleme için. Tırnak içindeki etiketler ('Oku', 'Boluk') zorunlu değil ama runner UI'larında bu isimleri görüyorsunuz, debug ederken hayat kurtarıyor. with bloğu bittiğinde pipeline çalıştırılıyor; DirectRunner tek makinede sonuçları üretiyor. Çıktı output-00000-of-00001 gibi parçalı dosyalara düşer çünkü Beam dağıtık dünyaya göre dizayn edilmiş.

DoFn ile özelleştirme

Map ve FlatMap işin %70'ini görür ama bazen state tutmanız, dış kaynak açmanız gerekir. O zaman DoFn sınıfı devreye girer:

class TemizleVeZenginleştir(beam.DoFn):
    def setup(self):
        # Her worker'da bir kez çalışır; DB bağlantısı, model yükleme vb.
        self.client = self._baglanti_ac()

    def process(self, element):
        if not element.get('email'):
            return
        element['email'] = element['email'].strip().lower()
        yield element

    def teardown(self):
        self.client.close()

    def _baglanti_ac(self):
        return None  # gerçek projede dış servis client'i

setup ve teardown worker yaşam döngüsüne bağlı; start_bundle / finish_bundle ise daha küçük gruplar için. Ben şahsen ilk başlarda dış servis bağlantısını process içinde açıyordum, her eleman için yeni bağlantı kurulduğu için pipeline çakılıyordu. Tecrübeyle sabit: pahalı kaynaklar her zaman setup'ta açılır.

Sık karşılaşılan tuzaklar

  • Lambda'da harici nesne kullanmak: Lambda'nın içinde dışarıdaki bir client objesini kullanırsanız Beam onu serialize edemez, runtime'da kapalı hata alırsınız. Dış kaynaklar DoFn içine taşınmalı.
  • DirectRunner'da hızlı, Dataflow'da yavaş: Lokalde tek thread çalışır; Dataflow'a atınca paralelizm devreye girer ama shuffle maliyetleri görünür olur. GroupByKey öncesi veriyi mümkün mertebe küçültmek mantıklı.
  • Sınırsız PCollection'a windowing koymamak: Streaming kaynaktan okuyorsanız ve GroupByKey kullanıyorsanız windowing tanımlamak zorunlu, yoksa Beam Group has unbounded PCollection diye atar.

Sonuç

Bu yazıda Apache Beam'in temel kavramlarını ve küçük bir Python pipeline örneğini gördük. Bana sorarsanız Beam'in en güzel yanı runner'dan bağımsız olması; lokalde DirectRunner ile test edip prod'a Dataflow ile çıkmak gerçekten konforlu bir his. Bir sonraki yazıda görüşmek üzere.