PostgreSQL background worker yazmak
Selamlar, bu yazımda PostgreSQL'in pek de günlük konuşmaya girmeyen ama bilindiğinde işi gerçekten kolaylaştıran bir köşesine bakacağız: background worker'lar. Yani veritabanı sunucusunun yanında postmaster tarafından fork edilip kendi başına iş yapan yardımcı süreçler. Lafı çok uzatmadan başlayayım.
İhtiyaç şuradan çıkıyor: zamanlanmış bir bakım işi, asenkron bir job kuyruğu, bir agregasyon, belki kendi minik replikasyonunuz - bunları cron'a ya da uygulama tarafına atmak yerine doğrudan PostgreSQL içinde çalıştırmak istediğiniz bir gün geliyor. İşte o gün karşımıza bgworker API'si çıkıyor. Bence bu API'yi öğrenmenin en iyi yolu, küçük bir iskeletle başlayıp parça parça eklemek; ben de öyle anlatacağım.
Background worker nedir?
Background worker (Türkçe karşılığıyla arka plan işçisi), postmaster'ın fork ettiği, istemci bağlantısı tutmayan, kendi bellek bağlamına sahip ayrı bir process. İsterse shared memory'ye dokunabiliyor, isterse bir veritabanına bağlanıp SPI üzerinden SQL çalıştırabiliyor. Yani normal bir backend'in yapabildiği şeylerin çoğunu yapabiliyor, ama tetiklenmesi bir client connection'a bağlı değil.
Üç tip başlangıç zamanı var: BgWorkerStart_PostmasterStart (en erken, DB yok henüz), BgWorkerStart_ConsistentState (recovery sırasında, read-only erişim) ve BgWorkerStart_RecoveryFinished (full DB erişimi). Çoğu pratik senaryoda sonuncusu doğru tercih.
Worker'ı kaydetmek
Worker'lar paylaşımlı kütüphane (shared library) olarak yüklenir ve _PG_init içinde kendilerini kaydeder. Aşağıdaki gibi bir iskelet tipik:
#include "postgres.h"
#include "postmaster/bgworker.h"
#include "fmgr.h"
PG_MODULE_MAGIC;
void _PG_init(void);
PGDLLEXPORT void my_worker_main(Datum main_arg);
void
_PG_init(void)
{
BackgroundWorker worker;
memset(&worker, 0, sizeof(BackgroundWorker));
snprintf(worker.bgw_name, BGW_MAXLEN, "queue_runner");
snprintf(worker.bgw_type, BGW_MAXLEN, "queue_runner");
snprintf(worker.bgw_library_name, BGW_MAXLEN, "queue_ext");
snprintf(worker.bgw_function_name, BGW_MAXLEN, "my_worker_main");
worker.bgw_flags = BGWORKER_SHMEM_ACCESS |
BGWORKER_BACKEND_DATABASE_CONNECTION;
worker.bgw_start_time = BgWorkerStart_RecoveryFinished;
worker.bgw_restart_time = 10;
worker.bgw_main_arg = (Datum) 0;
worker.bgw_notify_pid = 0;
RegisterBackgroundWorker(&worker);
}
bgw_restart_time'ı 10 saniyeye çektik; crash olursa postmaster bizi 10 saniye sonra geri kaldırıyor. Tek seferlik bir worker yazıyorsanız BGW_NEVER_RESTART daha doğru.
Ana döngü ve sinyaller
Worker'ın main fonksiyonu klasik 'sinyal kur, latch'te bekle, iş yap' döngüsünü takip eder. sleep(N) çağırmak yasak gibi düşünün - postmaster ölünce uyanmazsınız. Doğrusu WaitLatch:
static volatile sig_atomic_t got_sigterm = false;
static void
worker_sigterm(SIGNAL_ARGS)
{
int save_errno = errno;
got_sigterm = true;
SetLatch(MyLatch);
errno = save_errno;
}
void
my_worker_main(Datum main_arg)
{
pqsignal(SIGTERM, worker_sigterm);
BackgroundWorkerUnblockSignals();
BackgroundWorkerInitializeConnection("appdb", NULL, 0);
while (!got_sigterm)
{
int rc = WaitLatch(MyLatch,
WL_LATCH_SET | WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
10 * 1000L,
PG_WAIT_EXTENSION);
ResetLatch(MyLatch);
if (rc & WL_TIMEOUT)
perform_scheduled_task();
}
proc_exit(0);
}
WL_EXIT_ON_PM_DEATH küçük ama hayati bir bayrak: postmaster ölürse worker da düzgünce çıkıyor, ortada zombie kalmıyor.
SPI ile iş yapmak
Worker içinden SQL çalıştırmak için Server Programming Interface (SPI) kullanılır. Tipik bir transaction sarmalı:
SetCurrentStatementStartTimestamp();
StartTransactionCommand();
SPI_connect();
PushActiveSnapshot(GetTransactionSnapshot());
int ret = SPI_execute(
"UPDATE jobs SET status='running' "
"WHERE id IN (SELECT id FROM jobs "
" WHERE status='pending' ORDER BY created_at "
" LIMIT 100 FOR UPDATE SKIP LOCKED) "
"RETURNING id",
false, 0);
if (ret != SPI_OK_UPDATE_RETURNING)
ereport(ERROR, (errmsg("queue fetch failed: %d", ret)));
SPI_finish();
PopActiveSnapshot();
CommitTransactionCommand();
FOR UPDATE SKIP LOCKED burada altın değerinde - birden fazla worker aynı kuyruğu çekerken kimse kimseyi beklemiyor.
Sık karşılaşılan tuzaklar
sleepveyapg_sleepçağırmak: Worker bu sırada SIGTERM'i göremez, postmaster ölümünden haberi olmaz. Tek doğru yolWaitLatch.- Shared memory'ye lock'suz dokunmak:
LWLockolmadan paylaşımlı struct'a yazarsanız çoklu worker senaryosunda şeytan üçgenine girersiniz. Yazma içinLW_EXCLUSIVE, okuma içinLW_SHARED. - PG_TRY/PG_CATCH unutmak: Bir SPI hatası worker'ı düşürürse
bgw_restart_timedevreye girer ama crash log'unuz şişer. Hata yakalayıpEmitErrorReport+FlushErrorStateile devam etmek çok daha sağlıklı. - GUC tanımlamamak:
naptime,max_items,enabledgibi parametreleriDefineCustomIntVariableile tanımlamazsanız her ayar değişikliği için kod derlemek zorunda kalırsınız.
Doğrulama
Worker derlenip yüklendikten sonra çalıştığını şu sorguyla kontrol edebilirsiniz:
SELECT pid, backend_type, state, query
FROM pg_stat_activity
WHERE backend_type = 'queue_runner';
Bir satır dönüyor ve state mantıklıysa hayattadır. pgstat_report_activity çağrısını koymayı unutmazsanız query sütununda o anki adımı bile görürsünüz.
Kapanış
Background worker'lar PostgreSQL'i 'sadece bir veritabanı' olmaktan çıkarıp küçük bir uygulama platformuna dönüştüren parçalardan biri. Şahsi kanaatim, basit bir cron iş için bunu yazmak abartı; ama job kuyruğu, gerçek zamanlı agregasyon ya da custom replikasyon gibi senaryolarda yazılan kod miktarına göre kazandırdığı operasyonel sadelik tartışılmaz. Umarım faydalı olur, bir sonraki yazıda görüşmek üzere.
