Honker - ส่วนขยายที่เพิ่มเซแมนติก Postgres NOTIFY/LISTEN ให้กับ SQLite
(github.com/russellromney)- ผ่าน ส่วนขยาย SQLite และ language binding หลายภาษา ทำให้สามารถจัดการ durable pub/sub, คิวงาน, และสตรีมอีเวนต์ภายในไฟล์
.dbเดียวกันได้ โดยไม่ต้องมี client polling หรือ daemon·broker แยกต่างหาก notify(),stream(),queue()ทั้งหมดจะถูก บันทึกภายในทรานแซกชันของผู้เรียก และจะ commit หรือ rollback ไปพร้อมกับการเขียนข้อมูลทางธุรกิจ ช่วยลดปัญหา dual-write- การปลุกข้ามโปรเซสทำงานด้วยการตรวจสอบ
PRAGMA **data_version**ทุก 1ms โดยตั้งเป้าให้มี latency ระดับมิลลิวินาทีเลขหลักเดียว และมีต้นทุนการ query ต่ำมาก - คิวงานรองรับการ retry, priority, delayed execution, dead-letter, scheduler, named lock, rate limiting และสตรีมรองรับ การส่งมอบแบบ at-least-once พร้อมเก็บ offset แยกตามผู้บริโภค
- เป็นแนวทางที่รวมแอปพลิเคชันและงาน async ไว้ใน ไฟล์ฐานข้อมูลเดียว สำหรับสภาพแวดล้อมที่ใช้ SQLite เป็นที่เก็บข้อมูลหลัก เพื่อลดความซับซ้อนในการปฏิบัติการ โดย API ยังอยู่ในสถานะ Experimental
ภาพรวม
- เพิ่มพฤติกรรม
NOTIFY/LISTENแบบ Postgres ให้กับ SQLite ด้วย ส่วนขยาย SQLite และ language binding หลายภาษา ทำให้สามารถจัดการ durable pub/sub, คิวงาน, และสตรีมอีเวนต์ภายในไฟล์.dbเดียวกันได้ โดยไม่ต้องมี client polling หรือ daemon·broker แยกต่างหาก - อิงจาก on-disk layout ที่กำหนดครั้งเดียวด้วย Rust โดย Python, Node, Bun, Ruby, Go, Elixir และ C++ binding ทั้งหมดถูกจัดให้เป็นโครงสร้างที่ครอบ loadable extension เดียวกันแบบบาง ๆ
- แทนที่ application-level polling ด้วยวิธีอ่านฐานข้อมูลทุก 1ms โดยต้นทุนของการอ่าน
PRAGMA data_versionอยู่ในระดับไมโครวินาทีเลขหลักเดียว และการส่งการแจ้งเตือนข้ามโปรเซสอยู่ในระดับมิลลิวินาทีเลขหลักเดียว - หากใช้ SQLite เป็นที่เก็บข้อมูลหลัก ก็สามารถ commit หรือ rollback การเขียนข้อมูลทางธุรกิจ และการนำงานเข้าคิวในทรานแซกชันเดียวกันได้ ช่วยลดการต้องดูแล datastore แยกและปัญหา dual-write
- API ยังอยู่ในสถานะ Experimental และอาจมีการเปลี่ยนแปลงได้
- ระบุไว้อย่างชัดเจนว่าหากใช้งาน Postgres อยู่แล้ว การใช้
pg_notify, pg-boss, Oban อาจเหมาะสมกว่า
ฟีเจอร์หลัก
- มีทั้ง notify/listen ข้ามโปรเซส, คิวงานที่มี retry, priority, delayed execution, dead-letter table และ durable stream ที่มี offset แยกตามผู้บริโภค ภายในไฟล์
.dbเดียวกัน - การส่งทั้งหมดสามารถ ผูกแบบอะตอมมิก กับการเขียนข้อมูลทางธุรกิจได้ ทำให้ commit หรือ rollback ไปพร้อมกัน
- เวลาในการตอบสนองข้ามโปรเซสอยู่ในระดับมิลลิวินาทีเลขหลักเดียว และยังมี handler timeout, การ retry แบบ exponential backoff, delayed jobs, task expiration, named lock, rate limiting รวมอยู่ด้วย
- รองรับ scheduler แบบ leader election และ periodic task สไตล์ crontab รวมถึงการเก็บผลลัพธ์ของ task แบบ opt-in
enqueueจะคืนค่า id, worker จะบันทึกค่าที่คืนกลับมา และผู้เรียกสามารถรอผลลัพธ์ได้ด้วยqueue.wait_result(id)
- มาในรูปแบบ SQLite loadable extension ทำให้ SQLite client ใด ๆ ก็สามารถอ่านตารางเดียวกันได้
- ทำงานได้แม้ภายในการเชื่อมต่อ SQLite ที่ ORM เป็นเจ้าของ และ คู่มือ ORM ครอบคลุมการเชื่อมต่อกับ SQLAlchemy, SQLModel, Django, Drizzle, Kysely, sqlx, GORM, ActiveRecord, Ecto
- ในทางกลับกันก็ระบุ ขอบเขตที่ตั้งใจไม่รองรับ ไว้อย่างชัดเจน
- ไม่รองรับ task pipeline, chain, group, chord
- ไม่รองรับ multi-writer replication
- ไม่รองรับ workflow orchestration แบบ DAG
เริ่มต้นอย่างรวดเร็ว
-
คิว Python
- เปิดฐานข้อมูลด้วย
honker.open("app.db")และรับคิวด้วยdb.queue("emails")เพื่อใส่งานและประมวลผลงานได้ - ภายในบล็อก
with db.transaction() as tx:หากทำทั้งการ INSERT คำสั่งซื้อและemails.enqueue(..., tx=tx)ร่วมกัน การเขียนคำสั่งซื้อและการใส่งานอีเมลจะถูกรวมอยู่ในทรานแซกชันเดียวกัน - worker จะดึงงานทีละรายการในรูปแบบ
async for job in emails.claim("worker-1"):และเมื่อสำเร็จให้ใช้job.ack()หากล้มเหลวให้จัดการด้วยjob.retry(delay_s=60, error=str(e)) claim()เป็น asynchronous iterator และภายในจะเรียกclaim_batch(worker_id, 1)ในแต่ละรอบการวนซ้ำ- จะถูกปลุกขึ้นเมื่อมี commit ใดๆ ในฐานข้อมูล และจะถอยกลับไปใช้ paranoia poll ทุก 5 วินาทีเฉพาะเมื่อ commit watcher ไม่สามารถทำงานได้เท่านั้น
- สำหรับงานแบบแบตช์ได้แยกให้ใช้
claim_batch(worker_id, n)และqueue.ack_batch(ids, worker_id)โดยตรง และ visibility เริ่มต้นคือ 300 วินาที
- เปิดฐานข้อมูลด้วย
-
Task Python
- หากใช้ดีคอเรเตอร์
@emails.task(retries=3, timeout_s=30)การเรียกฟังก์ชันจะถูกเปลี่ยนเป็นการใส่งานเข้าคิวโดยตรงและคืนค่าTaskResult - ฝั่งที่เรียกใช้สามารถใช้งานแบบ
send_email("alice@example.com", "Hi")และรอผลจากการทำงานของ worker ได้ด้วยr.get(timeout=10) - worker สามารถรันเป็นโปรเซสแยกหรือ in-process ได้ เช่น
python -m honker worker myapp.tasks:db --queue=emails --concurrency=4 - ชื่ออัตโนมัติคือ
{module}.{qualname}และในสภาพแวดล้อม production แนะนำให้ใช้ชื่อแบบกำหนดเองอย่างชัดเจน เช่น@emails.task(name="...")เพื่อป้องกันไม่ให้ pending job กลายเป็น orphan เพราะมีการเปลี่ยนชื่อ - periodic task ใช้รูปแบบ
@emails.periodic_task(crontab("0 3 * * *")) - ตัวอย่างโดยละเอียดอยู่ใน
packages/honker/examples/tasks.py
- หากใช้ดีคอเรเตอร์
-
สตรีม Python
db.stream("user-events")ให้ durable pub/sub และสามารถทำ business UPDATE กับstream.publish(..., tx=tx)ภายในทรานแซกชันเดียวกันได้- หาก subscribe ด้วย
async for event in stream.subscribe(consumer="dashboard"):ระบบจะ replay แถวหลังจาก offset ที่บันทึกไว้ แล้วจากนั้นจะสลับไปเป็นการส่งแบบเรียลไทม์ที่อิงกับ commit - offset ของ named consumer แต่ละตัวจะถูกเก็บไว้ในตาราง
_honker_stream_consumers - การบันทึก offset อัตโนมัติจะเกิดขึ้นโดยปริยายเพียงทุก 1000 อีเวนต์หรือทุก 1 วินาที เพื่อไม่ให้ไปกระแทก single-writer slot มากเกินไปแม้ในปริมาณงานสูง
- สามารถปรับได้ด้วย
save_every_n=และsave_every_s=และหากตั้งทั้งคู่เป็น 0 จะปิดการบันทึกอัตโนมัติและเรียกstream.save_offset(consumer, offset, tx=tx)ได้โดยตรง - หากเกิด crash ระบบจะใช้โมเดล at-least-once โดยอีเวนต์ in-flight หลัง offset ที่ flush ล่าสุดจะถูกส่งซ้ำอีกครั้ง
-
Python notify
- สามารถ subscribe ephemeral pub/sub ได้ด้วย
async for n in db.listen("orders"):และส่งการแจ้งเตือนภายในทรานแซกชันได้ด้วยtx.notify("orders", {"id": 42}) - listener จะเริ่มเกาะจากตำแหน่ง
MAX(id)ปัจจุบัน จึงไม่ replay ประวัติในอดีต - หากต้องการ durable replay ต้องใช้
db.stream() - ตาราง notifications จะไม่ถูกล้างอัตโนมัติ ดังนั้นควรเรียก
db.prune_notifications(older_than_s=…, max_keep=…)จากงานตามกำหนดเวลา - payload ของ task ต้องเป็น JSON ที่ถูกต้อง และ Python writer กับ Node reader สามารถใช้ช่องเดียวกันร่วมกันได้
- สามารถ subscribe ephemeral pub/sub ได้ด้วย
-
Node.js
- ใน Node binding ก็ใช้ความสามารถเดียวกันผ่านแพตเทิร์น
open('app.db'),db.transaction(),tx.notify(...),db.listen('orders') - การเขียน business และ notify จะถูกรวมอยู่ใน commit เดียวกัน และ listen จะถูกปลุกจาก commit ใดๆ ของฐานข้อมูลก่อนกรองตาม channel
- ใน Node binding ก็ใช้ความสามารถเดียวกันผ่านแพตเทิร์น
-
SQLite extension
- หลัง
.load ./libhonker_extให้เริ่มต้นด้วยSELECT honker_bootstrap();และสามารถใช้ฟังก์ชัน SQL เพียงอย่างเดียวเพื่อใช้งานคิว ล็อก rate limit scheduler สตรีม และการบันทึกผลลัพธ์ได้ - มีฟังก์ชันอย่าง
honker_claim_batch,honker_ack_batch,honker_sweep_expired,honker_lock_acquire,honker_rate_limit_try,honker_scheduler_tick,honker_stream_publish,honker_stream_read_since,honker_result_save - Python binding และ extension ใช้
_honker_live,_honker_dead,_honker_notificationsร่วมกัน ดังนั้นงานที่ภาษาอื่นใส่ผ่าน extension ก็สามารถถูกดึงโดย Python worker ได้ - ความเข้ากันได้ของสคีมาถูกตรึงไว้ใน
tests/test_extension_interop.py
- หลัง
การออกแบบ
- รีโพซิทอรีนี้รวม
honkerSQLite loadable extension และ binding สำหรับ Python, Node, Rust, Go, Ruby, Bun, Elixir ไว้ด้วยกัน - มุ่งเป้าไปที่แอปพลิเคชันที่ใช้ SQLite เป็นที่เก็บข้อมูลหลัก และเน้นการย้าย package logic ไปไว้ใน SQLite extension เพื่อให้หลายภาษาและหลายเฟรมเวิร์กใช้งานในรูปแบบที่คล้ายกันได้
- primitive หลักมีสามอย่าง
notify()ซึ่งเป็น ephemeral pub/substream()ซึ่งเป็น durable pub/sub ที่มี offset แยกตามผู้บริโภคqueue()ซึ่งเป็นคิวงานแบบ at-least-once
- primitive ทั้งสามนี้จะถูกบันทึกเป็น INSERT ภายในทรานแซกชันของผู้เรียกทั้งหมด ทำให้การส่งงานและการเขียน business commit พร้อมกันหรือ rollback พร้อมกัน
- เป้าหมายคือทำให้เกิดพฤติกรรมคล้าย
NOTIFY/LISTENโดยไม่ต้อง polling ในระดับแอปพลิเคชัน เพื่อให้ได้เวลาในการตอบสนองที่รวดเร็ว - หากใช้ไฟล์ SQLite เดิมต่อไป ทุก commit ในฐานข้อมูลจะปลุก worker ขึ้นมา และ trigger ส่วนใหญ่อาจจบลงด้วยการอ่านข้อความหรือคิวแล้วไม่ต้องประมวลผลจริงจนได้ผลลัพธ์ว่าง
- overtriggering แบบนี้เป็น tradeoff ที่ตั้งใจเลือก เพื่อให้ได้พฤติกรรมที่ใกล้เคียง push และเวลาในการตอบสนองที่รวดเร็ว
ค่าเริ่มต้นที่แนะนำ: WAL
- language binding จะใช้
journal_mode = WALเป็นค่าเริ่มต้น ซึ่งให้โครงสร้าง reader พร้อมกันหลายตัวและ writer เดียว, การทำfsyncbatching ที่มีประสิทธิภาพ, และการตั้งค่าwal_autocheckpoint = 10000 - โหมดอื่นอย่าง DELETE, TRUNCATE, MEMORY ก็ใช้งานได้เช่นกัน โดยการตรวจจับ commit อาศัย
PRAGMA data_versionที่เพิ่มขึ้นในทุก journal mode - สิ่งที่เสียไปในโหมด non-WAL มีเพียงคุณสมบัติ เขียนระหว่างที่มีการอ่านพร้อมกัน เท่านั้น โดย correctness และการ wake ข้ามโปรเซสเองไม่ได้พึ่งพา WAL
- ทั้งระบบประกอบด้วยไฟล์
.dbเดียว และเมื่อเปิด WAL อาจมี sidecar เพิ่มเป็น.db-wal,.db-shm - claim จัดการด้วย
UPDATE … RETURNINGเพียงครั้งเดียวผ่าน partial index ส่วน ack จัดการด้วยDELETEเพียงครั้งเดียว - ไม่ว่า journal mode ใด ในช่วงเวลาเดียวกันจะมี writer ได้เพียงหนึ่งตัว และข้อดีด้านการอ่านพร้อมกันเป็นสิ่งที่ WAL มอบให้
PRAGMA data_versionจะเพิ่มขึ้นทุกครั้งที่มี commit และ checkpoint จึงรองรับกรณีอย่าง WAL truncation, การสร้างและลบไฟล์ journal, หรือการนำขนาดเดิมกลับมาใช้ซ้ำได้อย่างถูกต้อง- SQLite ไม่มี wire protocol จึงไม่สามารถทำ server push ได้ และผู้บริโภคต้องเริ่มอ่านเอง
- สัญญาณ wake คือการเพิ่มขึ้นของ counter
- จากนั้นการดึงข้อมูลจริงจะทำด้วย
SELECT
- เนื่องจาก transaction มีต้นทุนต่ำ จึงบันทึก jobs, events, notifications ไว้ภายในบล็อก
with db.transaction()ที่ผู้เรียกเปิดอยู่ในลักษณะเดียวกับ outbox pattern - ใช้
PRAGMA data_versionแทนวิธีดูขนาดไฟล์ WAL·mtime ด้วยstat(2)หรือ kernel watcher อย่างFSEvents·inotify·kqueuedata_versionคือ monotonic counter ที่ SQLite เพิ่มให้เมื่อมี commit จากการเชื่อมต่อใดก็ตาม- จัดการ WAL truncation, clock skew, และ transaction ที่ rollback ได้อย่างถูกต้อง
- kernel watcher บน macOS จะพลาดการเขียนจากโปรเซสเดียวกัน และ
stat(2)ที่อิง(size, mtime)อาจพลาด commit เมื่อ WAL ถูก truncate แล้วโตกลับมาที่ขนาดเดิม - ทำงานเหมือนกันบน Linux, macOS, Windows และมีต้นทุน CPU ต่ำมากที่ความละเอียดระดับ 1ms
- ระบุว่าต้นทุนต่อ query อยู่ที่ราว 3.5µs หรือรวมประมาณ 3.5ms/sec ที่ 1kHz
- โมเดลล็อกของ SQLite ตั้งอยู่บนสมมติฐาน single machine, single writer และหากสองเซิร์ฟเวอร์เขียนลง
.dbเดียวกันบน NFS จะเกิดความเสียหาย- กรณีนี้ต้องใช้ file-level sharding หรือย้ายไป Postgres
สถาปัตยกรรม
-
เส้นทาง wake
- แต่ละ
Databaseจะมี PRAGMA poll thread หนึ่งตัวเพื่อ querydata_versionทุก 1ms - เมื่อ counter เปลี่ยน จะ fan-out tick ไปยัง bounded channel ของ subscriber แต่ละราย
- แต่ละ subscriber จะรัน
SELECT … WHERE id > last_seenโดยใช้ partial index แล้วคืนค่าแถวใหม่ก่อนกลับไปรออีกครั้ง - ถึงจะมี subscriber 100 คนก็ใช้ poll thread เพียง 1 ตัวได้
- listener ที่ idle จะไม่รัน SQL query ใด ๆ เลย
- ต้นทุนยาม idle มีเพียง query
PRAGMA data_versionหนึ่งครั้งต่อ 1ms ต่อฐานข้อมูล และจำนวน listener เพิ่มขึ้นได้แทบฟรีเพราะโครงสร้างนี้ใช้การอ่าน SQLite counter SharedWalWatcherของhonker-coreเป็นเจ้าของ poll thread และทำ fan-out ผ่านช่องทาง boundedSyncSender<()>ตาม subscriber id- ทุกการเรียก
db.wal_events()จะลงทะเบียน subscriber และเมื่อ handle ที่ส่งกลับมาถูกDropก็จะยกเลิกการสมัครโดยอัตโนมัติ - เมื่อ listener ถูก drop จะเกิด
rx.recv() -> Errใน bridge thread จากนั้นจะ cleanup และจบการทำงาน
- แต่ละ
-
สคีมาคิว
_honker_liveเก็บแถวที่มีสถานะ pending และ processing- partial index อยู่ในรูป
(queue, priority DESC, run_at, id) WHERE state IN ('pending','processing') - claim เกิดจาก
UPDATE … RETURNINGครั้งเดียวผ่าน index นี้ - ack คือ
DELETEครั้งเดียว - แถวที่เกินขีดจำกัดการ retry จะถูกย้ายไป
_honker_deadและจะไม่ถูกสแกนอีกในเส้นทาง claim - ด้วย partial index บน state ทำให้ claim hot path ถูกจำกัดด้วย ขนาดของ working set ไม่ใช่ขนาดของ history ทั้งหมด
- แม้จะมี dead row 100k แถว ความเร็ว claim ก็ยังเท่ากับคิวที่ไม่มี dead row
-
Claim iterator
async for job in q.claim(id)จะเรียกclaim_batch(id, 1)ซ้ำ ๆ และส่งงานออกมาทีละรายการJob.ack()คือDELETEเดี่ยวภายใน transaction ของตัวเอง โดยค่าที่คืนมาจะเป็นTrueถ้า claim ยังมีผลอยู่ และเป็นFalseถ้า visibility window ผ่านไปแล้วและ worker อื่นเข้ามาได้งานนั้นอีกครั้ง- จะถูกปลุกเมื่อมี database commit จากโปรเซสใดก็ตาม และ paranoia poll ทุก 5 วินาทีคือ fallback เพียงอย่างเดียว
- งานแบบ batch ควรใช้
claim_batch(worker_id, n)และqueue.ack_batch(ids, worker_id)โดยตรง - ไลบรารีไม่ได้ซ่อน batch ไว้หลัง iterator เพื่อให้จัดการต้นทุน transaction และพฤติกรรม at-most-once visibility ได้ชัดเจนขึ้น
-
การผูกกับ transaction
notify()คือ SQL scalar function ที่ลงทะเบียนบน writer connection- มันจะ INSERT ลง
_honker_notificationsภายใต้ transaction ที่ผู้เรียกเปิดอยู่ queue.enqueue(…, tx=tx)และstream.publish(…, tx=tx)ก็ทำงานแบบเดียวกัน- หากเกิด rollback งาน, event, notification ก็จะหายไปพร้อมกัน
- นี่คือ transactional outbox pattern ที่มีมาให้ในตัว ซึ่งจัดการ business write และ side effect enqueue ไปพร้อมกันได้โดยไม่ต้องติดตั้งไลบรารีแยก
- ไม่มี dispatch table หรือ dispatcher process แยกต่างหาก และ row ของ side effect เองก็คือแถวที่ถูก commit ซึ่งโปรเซสใดก็ตามที่เฝ้าฐานข้อมูลอยู่สามารถหยิบไปได้ภายในราว 1ms
-
over-triggering ที่เร็วกว่าการ polling
- การเปลี่ยนแปลงของ
data_versionจะปลุก subscriber ทั้งหมดของDatabaseนั้น โดยไม่ได้ปลุกเฉพาะช่องที่ถูก commit แบบเลือกเจาะจง - ต้นทุนของการถูกปลุกผิดคือ
SELECTแบบมี index เพียงครั้งเดียวในระดับไมโครวินาที - ในทางกลับกัน หากพลาดเป้าหมายที่ควรปลุก จะกลายเป็นบั๊กด้าน correctness ที่เงียบและตรวจจับยาก
- การกรองตามช่องจะจัดการในเส้นทาง
SELECTไม่ใช่ในขั้น trigger notification - SQLite สามารถจัดการ แพตเทิร์นที่มี query เล็กจำนวนมาก ได้อย่างมีประสิทธิภาพ
- การเปลี่ยนแปลงของ
-
นโยบายการเก็บรักษา
- งานในคิวจะคงอยู่จนกว่าจะ ack และถ้าเกินขีดจำกัดการ retry จะถูกย้ายไป
_honker_dead - event ใน stream จะถูกเก็บไว้ และ named consumer แต่ละรายจะติดตาม offset ของตนเอง
- notify เป็นแบบ fire-and-forget และไม่มีการล้างอัตโนมัติ
- นโยบายการเก็บรักษาเป็นสิ่งที่ผู้เรียกเลือกเองในแต่ละ primitive และต้องเรียก
db.prune_notifications(older_than_s=…, max_keep=…)โดยตรง - แนวทางนี้ไม่ซ่อนอยู่หลังค่าเริ่มต้นของไลบรารี แต่ทำให้นโยบาย retention ปรากฏชัดในโค้ดของผู้เรียก
- งานในคิวจะคงอยู่จนกว่าจะ ack และถ้าเกินขีดจำกัดการ retry จะถูกย้ายไป
การกู้คืนจากข้อขัดข้อง
- การ rollback จะลบ jobs, events และ notifications ทั้งหมดไปพร้อมกับการเขียนเชิงธุรกิจตามคุณสมบัติ ACID ของ SQLite
- ปลอดภัยแม้จะโดน
SIGKILLระหว่างทรานแซ็กชัน และเมื่อ open ครั้งถัดไป atomic commit rollback ของ SQLite จะไม่ทิ้ง stale state เอาไว้- การใช้ WAL หรือ rollback journal ขึ้นอยู่กับ journal mode
- การตรวจสอบทำใน
tests/test_crash_recovery.pyโดยปิด subprocess ก่อน COMMIT แล้วตรวจPRAGMA integrity_check == 'ok'และยืนยัน flow ของ notify ใหม่
- หาก worker ตายระหว่างทำงาน หลัง
visibility_timeout_sผ่านไป worker อื่นจะมา claim งานนั้นใหม่- ค่าเริ่มต้นคือ 300 วินาที
attemptsจะเพิ่มขึ้น- หากเกินค่าเริ่มต้นของ
max_attemptsที่ 3 ครั้ง แถวนั้นจะถูกย้ายไป_honker_dead
- listener ที่ออฟไลน์อยู่ระหว่าง prune จะพลาด event ที่ถูกลบไป และหากต้องการ durable replay ควรใช้
db.stream()ที่เก็บ offset แยกตาม consumer
การเชื่อมกับเว็บเฟรมเวิร์ก
- ไม่มีปลั๊กอินสำหรับเฟรมเวิร์กโดยเฉพาะ และเลือกแนวทางเชื่อมต่อด้วย glue code เพียงไม่กี่บรรทัดเพราะ API มีขนาดเล็ก
- ใน FastAPI มีตัวอย่างให้รัน worker loop ตอน startup และระหว่างประมวลผล request ให้ทำ business write กับ queue enqueue ภายในทรานแซ็กชันเดียวกัน
- SSE endpoint สามารถสร้างบน
db.listen(channel)หรือdb.stream(name).subscribe(...)ได้ในรูปแบบasync def stream(...): yield f"data: ...\n\n"โดยใช้โค้ดราว 30 บรรทัด - ใน Django และ Flask แนะนำให้รัน worker เป็นโปรเซส CLI แยกต่างหากในรูปแบบเดียวกับ Celery หรือ RQ
การใช้ ORM
- ในการเชื่อมต่อผ่าน ORM ให้ load
libhonker_extและเรียก SQL function ภายในทรานแซ็กชันของ ORM เอง เพื่อให้ enqueue ถูก commit แบบอะตอมมิกพร้อมกับ business write - ตัวอย่างของ SQLAlchemy จะโหลด extension ใน connect event แล้วรัน
SELECT honker_bootstrap()จากนั้นภายในทรานแซ็กชันs.begin()จะเรียกทั้งการ INSERT model และSELECT honker_enqueue(...)ร่วมกัน - worker จะรันเป็นโปรเซสแยกโดยใช้
honker.open("app.db")และ commit watcher จะตื่นขึ้นเมื่อมี commit จากการเชื่อมต่อใดก็ตามที่ใช้ไฟล์เดียวกัน - คู่มือ Using with an ORM ครอบคลุมการเชื่อมกับ Django, SQLModel, Drizzle, Kysely, sqlx, GORM, ActiveRecord, Ecto รวมถึงแพตเทิร์น wrapper
TypedQueue[T]สำหรับ SQLModel/Pydantic และข้อควรระวังเกี่ยวกับ Prisma
ประสิทธิภาพ
- ระบุว่าสามารถประมวลผลข้อความได้ระดับหลายพันรายการต่อวินาทีบนโน้ตบุ๊กสมัยใหม่
- ความหน่วงของการปลุกข้ามโปรเซสถูกจำกัดด้วย 1ms poll cadence โดยค่ามัธยฐานบน M-series อยู่ที่ราว 1~2ms
- การวัดบนฮาร์ดแวร์จริงสามารถทำได้ด้วย
bench/wake_latency_bench.pyและbench/real_bench.py
การตั้งค่าสำหรับพัฒนา
-
โครงสร้างที่เก็บโค้ด
honker-core/: Rustrlibที่ทุก binding ใช้ร่วมกัน รวมอยู่ในรีโพนี้และเผยแพร่บน crates.io ด้วยhonker-extension/:cdylibสำหรับ SQLite loadable extension รวมอยู่ในรีโพนี้และเผยแพร่บน crates.io ด้วยpackages/honker/: แพ็กเกจ Python ที่มี PyO3cdylibพร้อม Queue, Stream, Outbox และ Schedulerpackages/honker-node/: binding สำหรับ Node.js และเป็น git submodulepackages/honker-rs/: ergonomic wrapper สำหรับ Rust และเป็น git submodulepackages/honker-go/: binding สำหรับ Go และเป็น git submodulepackages/honker-ruby/: binding สำหรับ Ruby และเป็น git submodulepackages/honker-bun/: binding สำหรับ Bun และเป็น git submodulepackages/honker-ex/: binding สำหรับ Elixir และเป็น git submodulepackages/honker-cpp/: binding สำหรับ C++ และเป็น git submoduletests/: ไดเรกทอรี integration test ข้ามแพ็กเกจbench/: ไดเรกทอรีเบนช์มาร์กsite/: เว็บไซต์honker.devพัฒนาด้วย Astro และเป็น git submodule- รีโพของแต่ละ binding ถูกเผยแพร่แยกบน PyPI, npm, crates.io, Hex, RubyGems เป็นต้น ส่วนฐานร่วมอย่าง
honker-coreและhonker-extensionถูกรวมไว้โดยตรงในรีโพนี้ - ตอน clone ต้องใช้
git clone --recursiveหรือgit submodule update --init --recursive
การทดสอบและ coverage
make testจะรันการทดสอบของ Rust, Python และ Node ตามค่าเริ่มต้น และใช้เวลาราว 10 วินาทีในเส้นทางแบบเร็วmake test-python-slowรวมการทดสอบ soak และ real-time cron โดยใช้เวลาราว 2 นาทีmake test-allจะรันการทดสอบทั้งหมดรวมถึง slow markmake buildจะทำทั้ง PyO3maturin developและการ build loadable extension- สามารถรันเบนช์มาร์กได้ด้วย
python bench/wake_latency_bench.py --samples 500,python bench/real_bench.py --workers 4 --enqueuers 2 --seconds 15,python bench/ext_bench.py - ใช้
make install-coverage-depsเพื่อติดตั้งเครื่องมือ coverage โดยจะติดตั้งcoverage.pyและcargo-llvm-cov make coverageจะสร้าง HTML report สองชุดไว้ในcoverage/ส่วนmake coverage-pythonจะสร้างรายงานสำหรับฝั่ง Python และmake coverage-rustจะสร้างรายงานตาม Rust unit test ของhonker-core- ระบุว่า coverage ของ Python สำหรับ
packages/honker/อยู่ที่ประมาณ 92% - coverage ของ Rust สะท้อนเฉพาะ
cargo testและหลายเส้นทางในhonker_ops.rsถูกรันผ่าน Python test suite เท่านั้น จึงไม่ถูกรวมในรายงาน Rust - การรวม cross-language coverage ผ่านการ merge ข้อมูล LLVM profile ที่ข้ามขอบเขต PyO3 ทำได้ยาก และยังถูกเลื่อนไว้ก่อน
ใบอนุญาต
- ใช้ใบอนุญาต Apache 2.0
- ดูรายละเอียดได้ที่ LICENSE
1 ความคิดเห็น
ความคิดเห็นจาก Hacker News
ผมเป็นคนทำสิ่งนี้เอง Honker เพิ่ม cross-process NOTIFY/LISTEN ให้ SQLite ทำให้ส่งอีเวนต์แบบ push ด้วย latency ระดับเลขหลักเดียวของ ms ได้ โดยใช้แค่ไฟล์ SQLite เดิม ไม่ต้องมี daemon หรือ broker
SQLite ไม่มีเซิร์ฟเวอร์แบบ Postgres ดังนั้นแทนที่จะคิวรีเป็นช่วง ๆ หัวใจสำคัญคือย้ายแหล่ง polling ไปเป็น
stat(2)แบบเบากับไฟล์ WAL SQLite มีประสิทธิภาพอยู่แล้วแม้จะยิงคิวรีเล็ก ๆ จำนวนมาก (https://www.sqlite.org/np1queryprob.html) จึงอาจไม่ถึงกับเป็นการอัปเกรดครั้งใหญ่ แต่ก็น่าสนใจตรงที่แค่เฝ้าดู WAL และเรียกฟังก์ชันของ SQLite ก็พอ ทำให้ไม่ผูกกับภาษาใดภาษาหนึ่งนอกจากนี้ยังมี ephemeral pub/sub, durable work queue ที่มี retry และ dead-letter, และ event stream ที่มี offset แยกต่อ consumer ทั้งสามอย่างนี้เป็น row ในไฟล์
.dbของแอปเดิม จึง commit แบบอะตอมมิก ไปพร้อมกับการเขียน business logic ได้ และถ้า rollback ทั้งคู่ก็หายไปพร้อมกันเดิมมันชื่อ litenotify/joblite แต่พอซื้อ
honker.devแบบขำ ๆ ไว้แล้วก็มานึกได้ว่าชื่ออย่าง Oban, pg-boss, Huey, RabbitMQ, Celery, Sidekiq ต่างก็ฟังตลก ๆ กันทั้งนั้น เลยใช้ชื่อนี้ไปเลย หวังว่าจะมีประโยชน์หรืออย่างน้อยก็ทำให้ขำได้ และคำเตือนว่าเป็นซอฟต์แวร์ระยะอัลฟายังคงใช้เหมือนเดิมในฝั่งอย่าง Java/Go/Clojure/C# นั้น SQLite ก็เป็น single writer อยู่แล้ว ดังนั้นให้แอปพลิเคชันเป็นคนจัดการ writer นั้นเอง แล้วใช้ concurrent queue ระดับภาษาเพื่อรู้ว่ามีการเขียนอะไรเกิดขึ้นและปลุกเฉพาะ thread ที่เกี่ยวข้อง น่าจะง่ายและสะอาดกว่ามาก
ถึงอย่างนั้น การใช้ WAL แบบสร้างสรรค์เช่นนี้ก็น่าสนุก และสำหรับภาษาอย่าง Python/JS/TS/Ruby ที่ concurrency แบบอิง process พบได้บ่อย ก็ดูเข้ากับกลไก notify แบบนี้ไม่น้อย
stat()ทุก 1ms ก็ยังถูกกว่าที่คิดมากบนฮาร์ดแวร์ของผม ใช้เวลาไม่ถึง 1μs ต่อครั้ง ดังนั้น polling ระดับนี้ใช้ CPU ไม่ถึง 0.1%
PRAGMA data_versionน่าจะดีกว่าstat(2)ไหมhttps://sqlite.org/pragma.html#pragma_data_version
ถ้าเป็น C API ก็ยังมี
SQLITE_FCNTL_DATA_VERSIONที่ตรงกว่าอีกhttps://sqlite.org/c3ref/c_fcntl_begin_atomic_write.html#sqlitefcntldataversion
สงสัยว่าสามารถใช้สิ่งนี้เป็น persistent message stream แบบ Kafka ขนาดเบา ได้ไหม เช่นสำหรับ topic หนึ่ง ๆ จะ replay ข้อความทั้งหมดทั้งย้อนหลังและแบบเรียลไทม์ตั้งแต่ timestamp ที่กำหนดได้หรือไม่
คงพอเลียนแบบด้วย polling แบบ pub/sub ได้ แต่ก็คงไม่ใช่วิธีที่เหมาะที่สุดอย่างที่บอก
ถ้าเก็บตำแหน่งการอ่าน ชื่อ queue และ filter เอาไว้ แทนที่จะปลุกทุก subscription thread ทุกครั้งที่
stat(2)เปลี่ยนแล้วให้แต่ละตัวทำ SELECT แบบ N=1 เอง polling thread อาจทำEvents INNER JOIN Subscribersแล้วปลุกเฉพาะ subscriber ที่ match จริง ๆ ได้ขอบคุณสำหรับ feedback ผมเปิด PR ที่รวมข้อเสนอเหล่านั้นแล้ว
https://github.com/russellromney/honker/pulls/1
ตอนนี้เปลี่ยนเป็น โครงสร้าง polling 3 ชั้น:
PRAGMA data_versionทุก 1ms,statทุก 100ms, และการ reconnect เมื่อเกิดข้อผิดพลาดPRAGMA data_versionทุก 1ms แทนการตรวจจับการเปลี่ยนขนาด/mtime แบบstatเดิม มันเป็น commit counter ของ SQLite เองจึงเป็น monotonic ไม่ได้รับผลจาก clock skew และจัดการ WAL truncation หรือ rollback ได้ถูกต้องด้วย เป็น nonblocking query ที่ใช้เวลาประมาณ 3µs และเปลี่ยนเพราะเรื่อง ความถูกต้อง ไม่ใช่เพราะประสิทธิภาพ จริง ๆ แล้วช้ากว่านิดหน่อยด้วยซ้ำ ความเสี่ยงเรื่อง truncation ก็สมจริงกว่าที่คิดจากที่ทดสอบ C API อย่าง
SQLITE_FCNTL_DATA_VERSIONใช้งานข้าม connection ไม่ได้ ตอนนี้จึงยังต้องยอมรับต้นทุนจากการผ่าน VFS layer อยู่ และถือว่าเป็น tradeoff ที่เลือกอย่างชัดเจนdata_versionล้มเหลว ก็จะลอง reconnect โดยสมมติกรณีอย่างดิสก์ขัดข้องชั่วคราว, NFS สะดุด, หรือ connection เสีย และเพื่อความปลอดภัยก็จะปลุก subscriber ด้วยstatเปรียบเทียบ(dev, ino)กับค่าตอน startup เพื่อจับ การสลับไฟล์ เช่น atomic rename, litestream restore, หรือ volume remount เพราะdata_versionจะตาม fd ที่เปิดอยู่ ทำให้ต่อให้ไฟล์เปลี่ยนไปแล้วก็ยังมอง inode เดิมอยู่และจับกรณีนี้ไม่ได้ทำให้ Honker ดีขึ้นมาก และผมเองก็ได้เรียนรู้เยอะ
ขอโปรโมตนิดหนึ่ง ใน PostgreSQL 19 ที่กำลังจะมานั้น LISTEN/NOTIFY ได้รับการปรับแต่งให้สเกลดีขึ้นมากสำหรับ selective signaling
เป็นแพตช์ที่มุ่งไปที่กรณีที่ backend จำนวนมากกำลัง listen คนละ channel กัน
https://git.postgresql.org/gitweb/?p=postgresql.git;a=commitdiff;h=282b1cde9
สงสัยว่าถ้าจะเฝ้าดูการเปลี่ยน WAL ด้วย inotify หรือ wrapper ข้ามแพลตฟอร์มโดยไม่ใช้ polling จะได้ไหม
ส่วน
statนั้นใช้งานได้ทุกที่เฉย ๆสิ่งที่น่าสนใจกว่า IPC แยกต่างหากคือมัน commit แบบอะตอมมิกกับข้อมูลธุรกิจ ได้
การส่งข้อความภายนอกมักมีปัญหาแบบ "แจ้งเตือนไปแล้วแต่ทรานแซกชัน rollback" อยู่เสมอ และเรื่องแบบนี้จะรกเร็วมาก
สิ่งที่ผมสงสัยอย่างหนึ่งคือ WAL checkpoint ตอนที่ SQLite truncate WAL กลับไปเป็น 0 การ polling ด้วย
stat()จัดการกรณีนั้นได้ถูกต้องหรือไม่ รู้สึกเหมือนอาจมีช่วงที่พลาดอีเวนต์ได้เมื่อก่อนผมเคยเจอปัญหากับ Postgres+SQS ที่ trigger ส่ง enqueue ออกไปก่อนที่อีก connection จะมองเห็น commit เลยต้องมานั่งเพิ่ม retry logic, เพิ่ม polling ฝั่ง worker แล้วสุดท้ายก็ย้าย enqueue เข้าไปอยู่ในทรานแซกชัน ซึ่งพอทำแบบนั้นก็เท่ากับกลับมาสร้างสิ่งที่ Honker ทำอยู่ใหม่ด้วยชิ้นส่วนที่มากกว่าเดิม
บั๊กประเภท "ส่ง notification ไปแล้วแต่ row ยังไม่ commit" มักจะเงียบและขึ้นกับจังหวะเวลา ทำให้ตามหายากมากจริง ๆ
แต่ตอนนี้ยังไม่มีเทสต์สำหรับส่วนนี้ จึงต้องตรวจสอบเพิ่มอีกหน่อย เป็นประเด็นที่ดี เดี๋ยวจะตามดู
ขอบคุณ
ตอนนี้มีแอปเล็ก ๆ ที่ใช้ SQLite มากขึ้นเรื่อย ๆ และส่วนใหญ่ก็ต้องการ queue กับ scheduler
ผมเองเคยหมุนของหลายอย่างดู แต่ก็มักรู้สึกเสียดายความสง่างามของ โซลูชันสาย Postgres อยู่เสมอ
อันนี้ตั้งใจว่าจะลองใช้ทันที
ถ้าเจอปัญหาก็ฝากเปิด PR หรือ issue ใน repo ได้เลย
ตรงนี้ทำให้อยากใช้ kqueue/FSEvents แต่เท่าที่เข้าใจ Darwin จะทิ้งการแจ้งเตือนจาก process เดียวกัน
ถ้า publisher กับ listener อยู่ใน process เดียวกัน บางครั้ง listener จะไม่ตื่นเลย ทำให้ตามหาปัญหาค่อนข้างเละ
statpolling แม้จะดูไม่สวย แต่สุดท้ายดูเหมือนเป็นวิธีที่ใช้งานได้จริงทุกที่ผมก็สงสัยเหมือนกันว่าตอน WAL checkpoint แล้วไฟล์เล็กลงอีกครั้ง จะมี wakeup เกิดขึ้นหรือเปล่า หรือ poller กรองการลดลงของขนาดไฟล์ทิ้ง
VNODE event ของ kqueue จะถูกส่งถ้า process นั้นมีสิทธิ์เข้าถึงไฟล์ และไม่มี filter อะไรที่ตัดทิ้งเพราะเป็น process เดียวกัน
ผมจะลองเช็กดูแล้วกลับมาบอกอีกที
เจ๋งมาก อยากรู้ว่าตอนรับโหลดหนัก ๆ คอขวดหลักคือ throughput การเขียนของ SQLite หรือว่าเป็น ชั้น WAL notification กันแน่
และก็ขึ้นอยู่กับ journal mode กับ synchronous mode มากด้วย
ส่วน notification ไม่ว่าจะเป็นวิธี
stat(2)เดิมหรือแบบใหม่ที่อิงPRAGMAก็มีต้นทุนต่ำมาก ในคอมเมนต์อื่นก็มีการบอกว่าstat(2)อยู่ราว ๆ 1µsเป็นโปรเจ็กต์ที่ดีมาก ผมเองก็กำลังทำของที่ดัน SQLite ไปไกลกว่าการใช้งานทั่วไปมาก
เห็นคนอื่นสำรวจว่า SQLite ไปได้ไกลแค่ไหนจริง ๆ ก็รู้สึกมีกำลังใจ
สงสัยว่าสามารถรวมเข้ากับกรณีที่ใช้ SQLAlchemy ได้หรือไม่
จากหน้าตาในตอนนี้เหมือนมันพยายามสร้าง DB connection เอง