FastAPI ile WebSocket bağlantıları nasıl kurulur?

Selamlar, bu yazımda FastAPI ile WebSocket bağlantılarını nasıl kuracağımıza bakacağız. Konu uzun gibi gözükür ama aslında sırayla gidersek hiç de göründüğü kadar zor değil. Hadi başlayalım.

WebSocket, istemci ile sunucu arasında çift yönlü ve sürekli açık bir kanal kurar. HTTP'nin aksine her istek için yeni bağlantı açıp kapatmıyoruz; bir kere el sıkışılıyor, sonra her iki taraf da istediği zaman veri itiyor. Sohbet uygulamaları, canlı bildirimler, dashboard güncellemeleri, ortak doküman editörleri - hepsinde WebSocket görüyoruz.

WebSocket nedir?

WebSocket (Türkçe karşılığıyla web soketi), tek bir TCP bağlantısı üzerinden tam çift yönlü (full-duplex) iletişim sağlayan bir protokol. Polling ya da long-polling gibi numaralara gerek kalmıyor; sunucu istemciye anında push edebiliyor. FastAPI, altta Starlette'i kullandığı için WebSocket desteği zaten kutudan çıkıyor.

Basit bir echo endpoint

Hadi minimum bir örnekle başlayalım. Aşağıdaki endpoint istemciden gelen mesajı alıp aynısını geri yolluyor:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect

app = FastAPI()

@app.websocket('/ws')
async def websocket_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_text()
            await websocket.send_text(f'Echo: {data}')
    except WebSocketDisconnect:
        print('Istemci ayrildi')

Burada accept() el sıkışmayı tamamlıyor. while True döngüsü bağlantı koptuğu ana kadar dönüyor; istemci kapatınca WebSocketDisconnect fırlıyor ve biz de düzgünce çıkıyoruz. Bunu unutursanız sunucu loglarınız sürekli izole exception'larla dolar, bence en başta yakalamayı alışkanlık edinin.

Çoklu istemci için bağlantı yöneticisi

Tek istemci tamam, ama gerçek hayatta onlarca, yüzlerce bağlantıyı bir arada tutmamız gerekiyor. Bu yüzden bir ConnectionManager sınıfı yazıyoruz:

from fastapi import WebSocket
from typing import Dict, Set

class ConnectionManager:
    def __init__(self):
        self.active: Dict[str, WebSocket] = {}
        self.rooms: Dict[str, Set[str]] = {}

    async def connect(self, ws: WebSocket, client_id: str):
        await ws.accept()
        self.active[client_id] = ws

    def disconnect(self, client_id: str):
        self.active.pop(client_id, None)
        for room in self.rooms.values():
            room.discard(client_id)

    async def broadcast(self, message: str, exclude: str = None):
        for cid, conn in self.active.items():
            if cid != exclude:
                await conn.send_text(message)

    def join_room(self, client_id: str, room: str):
        self.rooms.setdefault(room, set()).add(client_id)

manager = ConnectionManager()

Buradaki kritik nokta şu: disconnect çağrıldığında istemciyi sadece sözlükten silmek yetmiyor, üye olduğu odalardan da çıkarmamız lazım. Aksi halde 'hayalet' kullanıcılar oda listesinde kalır ve broadcast sırasında zaten kapanmış bir socket'e mesaj göndermeye çalışırız.

Kimlik doğrulama

WebSocket bağlantısını accept() çağırmadan önce doğrulamak şart. Aksi halde herhangi biri endpoint'inize bağlanıp el sıkışmayı tamamlatabilir. JWT ile basit bir örnek:

from fastapi import WebSocket, Query
from jose import jwt, JWTError

@app.websocket('/ws')
async def ws_auth(websocket: WebSocket, token: str = Query(...)):
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=['HS256'])
        user_id = payload.get('sub')
    except JWTError:
        await websocket.close(code=4001)
        return

    await websocket.accept()
    await manager.connect(websocket, user_id)

Token'ı query string'den almak en pratik yol; tarayıcıdan gelen WebSocket isteğinde özel header eklemek hâlâ sancılı. 4001 bizim seçtiğimiz custom kapanış kodu - 4000-4999 aralığı uygulamaya özel kullanım için ayrılmış.

Heartbeat ile bağlantıyı canlı tutmak

Aradaki proxy ya da load balancer, uzun süre veri akmayan bağlantıları sessizce kapatabilir. Çözüm: belli aralıklarla ping göndermek.

import asyncio

async def heartbeat(ws: WebSocket, interval: int = 30):
    while True:
        try:
            await asyncio.sleep(interval)
            await ws.send_json({'type': 'ping'})
        except Exception:
            break

@app.websocket('/ws/{client_id}')
async def ws_with_heartbeat(websocket: WebSocket, client_id: str):
    await websocket.accept()
    task = asyncio.create_task(heartbeat(websocket))
    try:
        while True:
            data = await websocket.receive_json()
            if data.get('type') == 'pong':
                continue
    except WebSocketDisconnect:
        task.cancel()

task.cancel() satırını atlamak çok yapılan bir hata; bağlantı koptuğunda heartbeat task'ı arka planda dönmeye devam ederse her sefer bir kaynak sızıntısı yaşarsınız.

Sık karşılaşılan hatalar

  • accept() öncesi doğrulama yapmamak: Bağlantı zaten kuruldu, kötü niyetli istemci sunucu kaynağınızı tüketmeye başladı bile. Token'ı accept() çağrısından önce kontrol edin.
  • WebSocketDisconnect yakalamamak: Her istemci ayrılışında stack trace görürsünüz. Bağlantı koptuğunda bu istisna normaldir, hata değil.
  • Yatay ölçeklemede tek instance varsayımı: Birden fazla pod çalıştığında her biri kendi bağlantı listesini tutar. A pod'undaki kullanıcı, B pod'undakine mesaj atamaz. Çözüm: Redis pub/sub ile mesajları tüm instance'lara dağıtmak.
  • Heartbeat task'ını iptal etmemek: Yukarıda gördüğümüz gibi her kopuk bağlantı arkada bir asyncio task bırakır; bellek yavaş yavaş şişer.

Kapanış

Bu yazıda FastAPI ile WebSocket'in temellerini, bağlantı yöneticisini, kimlik doğrulamayı ve heartbeat'i konuştuk. Bana sorarsanız ilk projenizde Redis pub/sub'a hemen atlamayın; tek instance ile başlayın, ihtiyaç doğunca ölçeklersiniz. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.