Arsitektur Layanan
Arsitektur Layanan
OmniStream adalah sistem event-driven yang memproses pesan omnichannel melalui pipeline: webhook masuk → Kafka → processor → database + Redis pub/sub → WebSocket → browser; dan arah balik untuk pesan keluar.
Halaman ini menggambarkan topologi layanan, port, dependensi infrastruktur, dan bagaimana event mengalir dari satu ujung ke ujung lainnya.
Diagram topologi
Code
Catatan topologi (runtime aktual, bukan per-saluran):
- Webhook-ingestor adalah satu binary yang menyajikan empat route
/webhook/{whatsapp, messenger, instagram, email}+/healthdari satu port yang ditentukan oleh envWEBHOOK_INGESTOR_PORT. Tidak ada binary terpisahwebhook-ingestor-whatsapp,webhook-ingestor-instagram, dll. — itu nama historis. - Chat-engine adalah satu binary yang berlangganan satu topik Kafka
channel.inbound.rawdan men-dispatch ke parser per saluran (crates/chat-engine/src/parser/{whatsapp,messenger,instagram,email}.rs) berdasarkan fieldchannelpadaInboundRawEvent. - Message-sender adalah satu binary yang berlangganan
channel.outbound.jobdan men-dispatch ke modul pengirim per saluran (crates/message-sender/src/{whatsapp,messenger,instagram,email}.rs).
Layanan dan port (dev)
| Layanan | Port | Protokol | Keterangan |
|---|---|---|---|
api-gateway | 3000 | HTTP | REST API + JWT + RBAC + scheduler + webhook dispatcher |
webhook-ingestor | WEBHOOK_INGESTOR_PORT (3001 di compose dev) | HTTP | Satu binary, empat route /webhook/{whatsapp,messenger,instagram,email} + /health, verifikasi HMAC, produksi ke channel.inbound.raw |
ws-server | 3002 | WebSocket | Real-time event delivery ke browser |
frontend (SvelteKit) | 4000 | HTTP | UI browser |
| Mailpit SMTP | 1025 | SMTP | Relay development |
| Mailpit Web UI | 8025 | HTTP | Inspeksi email dev |
chat-engine dan message-sender adalah konsumer Kafka tanpa port HTTP — mereka tidak mengekspos endpoint dan tidak perlu port bound. Mereka bertahan selama proses Kafka subscribe mereka aktif.
Infrastruktur
Infrastruktur dependensi berjalan sebagai container via docker-compose.yml:
| Komponen | Kegunaan |
|---|---|
| PostgreSQL | Data relasional — ~25 tabel inti: agents, contacts, conversations, messages (metadata), integrations, campaigns, sla_policies, dll. |
| MongoDB | messages collection (payload lengkap), webhook_audit (raw payload untuk replay). UUID disimpan sebagai BinData(0). |
| Redis | 4 channel pub/sub untuk routing real-time antar layanan |
| Redpanda | Kafka-compatible; tidak pakai Apache Kafka karena ukuran image & operabilitas |
| MinIO | S3-compatible object storage untuk file upload |
| Mailpit | Relay SMTP + web UI untuk uji coba email di dev |
Topik Kafka
OmniStream memakai 3 topik aktif — satu inbound, satu outbound, satu event pemrosesan:
| Topik | Kegunaan |
|---|---|
channel.inbound.raw | Satu topik penerima untuk semua saluran. Payload mentah dibungkus InboundRawEvent { event_id, timestamp, channel: String, integration_account_id, payload }. Field channel (whatsapp | messenger | instagram | email) dipakai chat-engine untuk dispatch ke parser yang sesuai. |
channel.outbound.job | Satu topik job untuk semua saluran. message-sender berlangganan topik ini dan merutekan job ke modul {whatsapp,messenger,instagram,email}.rs berdasarkan isi OutboundMessageJob.channel. |
chat.message.processed | Event pesan yang sudah diproses oleh chat-engine (dipublikasi pasca-parse untuk konsumen hilir seperti analitik). |
Catatan partisi: jumlah partisi diatur di konfigurasi Redpanda dan tidak dipatok per saluran — karena ada satu topik bersama, semua saluran berbagi pool partisi yang sama. Atur KAFKA_INBOUND_PARTITIONS / KAFKA_OUTBOUND_PARTITIONS sesuai beban produksi.
:::warning Drift CLAUDE.md → realitas kode
CLAUDE.md masih mendaftarkan tiga topik inbound per-saluran (channel.inbound.{whatsapp,instagram,email}) dan tiga topik outbound per-saluran. Itu sudah tidak lagi benar — runtime memakai satu topik inbound channel.inbound.raw dan satu topik outbound channel.outbound.job seperti didokumentasikan di atas. Bukti: crates/webhook-ingestor/src/routes.rs:182,294,406,512 (4 handler memproduksi ke topik yang sama) dan crates/message-sender/src/main.rs:164 (subscribe topik tunggal). Drift ini dicatat di SCOPE.md sebagai D6.
:::
Channel Redis pub/sub
ws-server berlangganan 4 channel Redis. chat-engine, api-gateway, dan scheduler mempublikasikan ke mereka:
| Channel | Publisher | Subscriber | Muatan |
|---|---|---|---|
conversation_updates | chat-engine, api-gateway | ws-server, webhook_dispatcher | Event percakapan (pesan baru, status, transfer) |
typing_indicators | ws-server (dari client) | ws-server (broadcast) | Indikator mengetik, dikirim via event client WebSocket |
agent_presence | ws-server | ws-server | Agent online/offline, dipublikasikan saat first/last connection |
sla_breaches | scheduler | ws-server | Event pelanggaran SLA, dipantau supervisor |
Database strategy
PostgreSQL menyimpan semua data relasional terstruktur: agents, contacts, conversations, conversation_notes, conversation_transfers, quick_replies, agent_favorite_replies, wa_templates, campaigns, campaign_recipients, integrations, activity_logs, divisions, agent_divisions, scheduled_messages, csat_surveys, sla_policies, sla_breach_logs, outgoing_webhooks, webhook_deliveries. 41 file migrasi di migrations/ auto-apply saat startup melalui omni-common (via sqlx::migrate!). Verifikasi: find migrations -name "*.sql" | wc -l. (CLAUDE.md masih menuliskan angka 22 — drift D5 di SCOPE.md.)
MongoDB menyimpan data dengan throughput tinggi atau skema longgar: koleksi messages (semua pesan chat, dengan payload lengkap) dan webhook_audit (payload webhook mentah untuk replay).
Redis murni untuk pub/sub event real-time — OmniStream tidak memakai Redis sebagai cache kunci-nilai persisten.
Tugas background di api-gateway
api-gateway adalah satu-satunya layanan yang menjalankan scheduler background:
- Scheduler (
crates/api-gateway/src/scheduler.rs) — polling setiap 30 detik untuk:- Kampanye yang jadwal kirimnya sudah tiba
- Pesan terjadwal (
scheduled_messages) - Pelanggaran SLA (kirim ke channel
sla_breaches) - Pengantaran webhook pending (
webhook_deliveries)
- Webhook dispatcher (
crates/api-gateway/src/webhook_dispatcher.rs) — berlanggananconversation_updates, memetakan event ke tipe webhook (message.received,message.sent,message.status,conversation.created,conversation.resolved,conversation.assigned), mengantrikanwebhook_deliveriesdengan payload ditandatangani HMAC + exponential backoff.
Alur request end-to-end
Alur inbound
- Meta/SendGrid mengirim
POST /webhook/<channel>ke binarywebhook-ingestor(satu proses, empat route). - Ingestor memverifikasi tanda tangan (
X-Hub-Signature-256untuk Meta atauX-Email-Webhook-Signatureuntuk email) menggunakan modulsignaturelokal (crates/webhook-ingestor/src/signature.rs). - Payload dibungkus
InboundRawEvent(denganchannel: "whatsapp" | "messenger" | "instagram" | "email") lalu diproduksi ke satu topik Kafkachannel.inbound.raw; audit mentah ditulis ke MongoDBwebhook_audit(fire-and-forget). chat-enginemengkonsumsichannel.inbound.raw, membaca fieldchannel, dan men-dispatch ke parser yang cocok dicrates/chat-engine/src/parser/{whatsapp,messenger,instagram,email}.rs. Parser mem-upsert ke PostgreSQL (contacts, conversations) dan MongoDB (messages), lalu publikasiconversation_updateske Redis.ws-servermenerima event Redis dan merutekan ke koneksi WebSocket pelanggan (supervisor/admin → broadcast; agent → hanya percakapan yang assigned).- Secara paralel,
webhook_dispatcherdiapi-gatewaymemetakan event menjadimessage.receiveddan mengantrikan kewebhook_deliveriesuntuk dikirim ke webhook pihak ketiga.
Alur outbound
- Browser memanggil
POST /api/messagesviaapi-gateway. api-gatewaymemvalidasi JWT, RBAC, dan memproduksi job ke satu topik Kafkachannel.outbound.job(payloadOutboundMessageJob { channel, ... }).message-sender(satu binary) berlanggananchannel.outbound.job, membaca fieldchannelpada job, dan men-dispatch ke modul{whatsapp,messenger,instagram,email}.rs. Setiap modul membaca secret terkini dariintegrations(hot-reload 30 detik), dan memanggil Meta Graph API atau mengirim via SMTP.- Respons dicatat ke MongoDB dan
conversation_updatesditerbitkan kembali supaya WebSocket dan webhook keluar ikut mendapatkan statusmessage.sent.
Auth & RBAC singkat
3 peran: admin, supervisor, agent. Middleware JWT di crates/api-gateway/src/middleware/auth.rs mengekstrak token dari header Authorization: Bearer atau cookie access_token dan memasukkan AgentClaims ke request extensions. Detail peran dan matriks izin ada di RBAC.