Python ve EventStoreDB ile Event Sourcing
Merhabalar, bu yazıda Python ile EventStoreDB üstünde event-sourced bir aggregate yazmanın temellerine bakacağız. Klasik CRUD'a alışkın gözle bakınca event sourcing biraz tuhaf gelebilir; çünkü 'şimdiki durum'u değil, 'durum nasıl oluştu'yu kaydediyoruz. Lafı uzatmadan başlayalım.
Event sourcing nedir, neden EventStoreDB?
Event sourcing, sistemin durumunu değil; o duruma yol açan olayların (event) sırasını kayıt altına alma fikri. Bir orders tablosunda satırı UPDATE etmek yerine, OrderCreated, OrderItemAdded gibi değişmez (immutable) olayları bir log'a (stream) append ediyorsun. Şimdiki durumu istediğinde bu olayları sırayla uygulayıp (replay) hesaplıyorsun.
Avantajı: tam audit trail, geçmişe dönük sorgular, yeni projection'lar için mevcut veriyi taşımadan yeni read model üretebilme. Dezavantaj da var: schema evolution daha çetrefilli, eventual consistency ile barışık olmak gerekiyor. EventStoreDB ise optimistic concurrency, persistent subscription ve built-in projection'ı kutudan çıkar çıkmaz veren bir veritabanı. Bence Kafka'yı 'event store' diye zorlayan kurulumlar yerine, gerçek bir event store'a ihtiyacın varsa direkt buradan başlamak daha sağlıklı.
Kurulum ve bağlantı
Lokalde hızlıca kaldırmak için Docker yeter. Insecure modda çalıştırıyoruz, dev için bu kafi:
docker run -d --name eventstore \
-p 2113:2113 \
-e EVENTSTORE_INSECURE=true \
-e EVENTSTORE_RUN_PROJECTIONS=All \
eventstore/eventstore:latest
Sonra Python tarafında resmi client'ı kuruyoruz:
pip install esdbclient
Bağlantı için minimum şu yeter:
from esdbclient import EventStoreDBClient
client = EventStoreDBClient(uri='esdb://localhost:2113?tls=false')
Domain event'leri ve aggregate
Domain event'lerini geçmiş zamanda isimlendiriyoruz: OrderCreated, OrderItemAdded. frozen=True dataclass'lar event'leri immutable yapıyor, biz de aggregate üstünde _apply ile state'i güncelliyoruz:
from dataclasses import dataclass, field
from datetime import datetime
from uuid import UUID, uuid4
@dataclass(frozen=True)
class OrderCreated:
order_id: UUID = field(default_factory=uuid4)
customer_id: str = ''
occurred_at: datetime = field(default_factory=datetime.utcnow)
@dataclass(frozen=True)
class OrderItemAdded:
order_id: UUID = field(default_factory=uuid4)
product_id: str = ''
quantity: int = 1
unit_price: float = 0.0
occurred_at: datetime = field(default_factory=datetime.utcnow)
Aggregate ise iş kurallarını uygulayıp event üretiyor. Burada kritik nokta şu: command çağrıldığında önce kuralı kontrol et, kuraldan geçerse event'i oluştur, sonra _apply ile in-memory state'i güncelle ve _uncommitted_events listesine ekle. Diske yazma kararı buraya değil, repository'ye ait.
class Order:
def __init__(self, order_id=None):
self.id = order_id or uuid4()
self.items: dict = {}
self.status = 'draft'
self._uncommitted = []
self._version = 0
def add_item(self, product_id, quantity, unit_price):
if self.status != 'draft':
raise ValueError('Sadece draft sipariste degisiklik yapilabilir')
if quantity <= 0:
raise ValueError('Adet pozitif olmali')
event = OrderItemAdded(
order_id=self.id,
product_id=product_id,
quantity=quantity,
unit_price=unit_price,
)
self._apply(event)
self._uncommitted.append(event)
def _apply(self, event):
handler = getattr(self, f'_on_{type(event).__name__}', None)
if handler:
handler(event)
self._version += 1
append_to_stream ve optimistic concurrency
Kaydetme tarafına gelince, repository'nin işi event'leri stream'e append etmek. Stream adlandırması için order-<id> gibi tip-prefix'li bir konvansiyon işine yarar; çünkü ileride subscribe_to_all ile filter çekerken kategoriye göre süzmek için bu prefix işe yarıyor.
Buradaki kritik nokta optimistic concurrency. Aggregate'i yüklerken kaç event okuduğunu hatırlıyorsun (_version). Yazarken current_version parametresine bekledigin pozisyonu veriyorsun. Başka biri sen okuduktan sonra yazdıysa EventStoreDB WrongCurrentVersion hatasıyla reddediyor. Bu sayede 'lost update' problemi yaşamıyorsun.
from esdbclient import NewEvent, StreamState
import json
def save(self, order):
events = order._uncommitted
if not events:
return
stream = f'order-{order.id}'
new_events = [
NewEvent(
type=type(e).__name__,
data=json.dumps(e.to_dict()).encode('utf-8'),
)
for e in events
]
expected = (
StreamState.NO_STREAM
if order._version - len(events) == 0
else order._version - len(events) - 1
)
self.client.append_to_stream(
stream_name=stream,
events=new_events,
current_version=expected,
)
order._uncommitted.clear()
Yüklemek için read_stream (ya da bazı sürümlerde get_stream) kullanıyoruz; gelen RecordedEvent'leri tek tek _apply ediyoruz. Stream yoksa None dönmek genelde temiz bir API.
Projection mantığı
'Listede aktif siparişler' sorgusunu her seferinde tüm stream'leri replay ederek yapamazsın. Bu yüzden read model (projection) tutuyoruz. Python tarafında çoğu zaman subscribe_to_all ile bir worker yazıp PostgreSQL'e ya da Redis'e materialize etmek pratik. Şahsi tercihim Postgres; çökerse from_position ile son işlenen pozisyondan devam edebiliyorsun.
Genel akış:
EventStoreDB stream --> subscribe_to_all (filter: order-)
|
v
Worker: handle_event(type, data)
|
v
Postgres: order_summary tablosu
Worker'ın içinde event tipine göre yönlendirme yap, idempotent yaz (aynı event_id iki kez gelirse upsert ya da ON CONFLICT DO NOTHING).
Sık karşılaşılan tuzaklar
- Event isminde fiil çekimi yanlış:
CreateOrderdeğilOrderCreated. Event geçmişte olmuş bir şeyi temsil eder; emir kipi command için. Karıştırırsan ekipte herkes farklı yazmaya başlıyor. - Event içine 'şu anki state'i koymak:
OrderItemAddediçinde toplam tutarı yazma. Toplam türetilmiş bir değer; ileride hesaplama mantığı değişirse eski event'leri yeniden yorumlayamazsın. Sadece o anda gerçekten gerçekleşen ham veriyi koy. - Schema versiyonlamayı baştan düşünmemek: İlk gün tek alan eklemek 30 saniye, iki yıl sonra 14 alan değiştirmek aylar sürer. Event'lerin metadata'sına bir
schema_versionkoy, deserializer tarafında upcaster'lar yaz. - Optimistic concurrency'i atlamak: 'Pek çakışma olmaz canım' diyerek
current_versionvermezsen, paralel iki request aynı siparişi farklı şekilde değiştirebilir ve ikisi de yazılır. Bir keresinde bunu görmüştüm; debug etmek epey acı. - Aggregate'i devasa yapmak: Tüm
Customer'ı tek aggregate yapıp içine adresleri, siparişleri, ödemeleri tıkıştırmak. Stream çok uzar, replay yavaşlar, write contention artar. Aggregate sınırı = transactional consistency sınırı, küçük tut. - Projection'ı senkron yazmak: Aggregate save'in içinde projection'ı da güncellemeye kalkma. Eventual consistency'yi kabul et, projection'ı subscription üzerinden ayrı worker yapsın.
Doğrulama
Her şey çalışıyor mu, hızlıca bakalım:
order = Order()
order.add_item('prod-1', 2, 29.99)
repo.save(order)
loaded = repo.load(order.id)
assert len(loaded.items) == 1
assert loaded._version == 1
EventStoreDB UI'a http://localhost:2113 üstünden bakabilirsin; order-<id> stream'ini açınca event'leri tek tek görmen lazım. Görmüyorsan büyük ihtimalle current_version yanlış geçildi ve append sessizce reddedildi - exception fırlatıyor, log'a düşüyor mu bir kontrol et.
Kapanış
Bu yazıda Python ile EventStoreDB üstünde minimum bir event-sourced aggregate kurduk; append_to_stream, replay, projection ve optimistic concurrency gibi parçaların nasıl bir araya geldiğine baktık. Bana sorarsanız event sourcing her CRUD uygulamasının ihtiyacı değil; ama audit, temporal sorgu ve event-driven entegrasyon gerçekten gerekiyorsa elindeki en sağlam araçlardan biri. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.
