OmniStream Docs
  • Panduan Pengguna
  • Developer
  • API Reference
Developer Hub
Pendahuluan
    Overview DeveloperArsitektur LayananQuickstart
Autentikasi
Model Data
Webhook
WebSocket
Self-Hosting
Error & Rate Limit
Pendahuluan

Arsitektur Layanan

Arsitektur Layanan

OmniStream adalah sistem event-driven yang memproses pesan omnichannel melalui pipeline: webhook masuk → Kafka → processor → database + Redis pub/sub → WebSocket → browser; dan arah balik untuk pesan keluar.

Halaman ini menggambarkan topologi layanan, port, dependensi infrastruktur, dan bagaimana event mengalir dari satu ujung ke ujung lainnya.

Diagram topologi

Code
INBOUND WhatsApp webhook ─┐ Messenger webhook ─┤ Instagram webhook ─┤→ webhook-ingestor (:WEBHOOK_INGESTOR_PORT) ──┐ Email webhook ─┘ │ ↓ Kafka: channel.inbound.raw (InboundRawEvent.channel = ...) ↓ chat-engine (single binary, routes by channel) ↓ PG + Mongo + Redis pub/sub ↓ ws-server (:3002) ← Redis subscriber ↓ Browser (WebSocket) OUTBOUND Browser → frontend (:4000) → api-gateway (:3000) → Kafka channel.outbound.job ↓ message-sender (single binary, routes by channel) ↓ Meta Graph API / SMTP

Catatan topologi (runtime aktual, bukan per-saluran):

  • Webhook-ingestor adalah satu binary yang menyajikan empat route /webhook/{whatsapp, messenger, instagram, email} + /health dari satu port yang ditentukan oleh env WEBHOOK_INGESTOR_PORT. Tidak ada binary terpisah webhook-ingestor-whatsapp, webhook-ingestor-instagram, dll. — itu nama historis.
  • Chat-engine adalah satu binary yang berlangganan satu topik Kafka channel.inbound.raw dan men-dispatch ke parser per saluran (crates/chat-engine/src/parser/{whatsapp,messenger,instagram,email}.rs) berdasarkan field channel pada InboundRawEvent.
  • Message-sender adalah satu binary yang berlangganan channel.outbound.job dan men-dispatch ke modul pengirim per saluran (crates/message-sender/src/{whatsapp,messenger,instagram,email}.rs).

Layanan dan port (dev)

LayananPortProtokolKeterangan
api-gateway3000HTTPREST API + JWT + RBAC + scheduler + webhook dispatcher
webhook-ingestorWEBHOOK_INGESTOR_PORT (3001 di compose dev)HTTPSatu binary, empat route /webhook/{whatsapp,messenger,instagram,email} + /health, verifikasi HMAC, produksi ke channel.inbound.raw
ws-server3002WebSocketReal-time event delivery ke browser
frontend (SvelteKit)4000HTTPUI browser
Mailpit SMTP1025SMTPRelay development
Mailpit Web UI8025HTTPInspeksi email dev

chat-engine dan message-sender adalah konsumer Kafka tanpa port HTTP — mereka tidak mengekspos endpoint dan tidak perlu port bound. Mereka bertahan selama proses Kafka subscribe mereka aktif.

Infrastruktur

Infrastruktur dependensi berjalan sebagai container via docker-compose.yml:

KomponenKegunaan
PostgreSQLData relasional — ~25 tabel inti: agents, contacts, conversations, messages (metadata), integrations, campaigns, sla_policies, dll.
MongoDBmessages collection (payload lengkap), webhook_audit (raw payload untuk replay). UUID disimpan sebagai BinData(0).
Redis4 channel pub/sub untuk routing real-time antar layanan
RedpandaKafka-compatible; tidak pakai Apache Kafka karena ukuran image & operabilitas
MinIOS3-compatible object storage untuk file upload
MailpitRelay SMTP + web UI untuk uji coba email di dev

Topik Kafka

OmniStream memakai 3 topik aktif — satu inbound, satu outbound, satu event pemrosesan:

TopikKegunaan
channel.inbound.rawSatu topik penerima untuk semua saluran. Payload mentah dibungkus InboundRawEvent { event_id, timestamp, channel: String, integration_account_id, payload }. Field channel (whatsapp | messenger | instagram | email) dipakai chat-engine untuk dispatch ke parser yang sesuai.
channel.outbound.jobSatu topik job untuk semua saluran. message-sender berlangganan topik ini dan merutekan job ke modul {whatsapp,messenger,instagram,email}.rs berdasarkan isi OutboundMessageJob.channel.
chat.message.processedEvent pesan yang sudah diproses oleh chat-engine (dipublikasi pasca-parse untuk konsumen hilir seperti analitik).

Catatan partisi: jumlah partisi diatur di konfigurasi Redpanda dan tidak dipatok per saluran — karena ada satu topik bersama, semua saluran berbagi pool partisi yang sama. Atur KAFKA_INBOUND_PARTITIONS / KAFKA_OUTBOUND_PARTITIONS sesuai beban produksi.

:::warning Drift CLAUDE.md → realitas kode CLAUDE.md masih mendaftarkan tiga topik inbound per-saluran (channel.inbound.{whatsapp,instagram,email}) dan tiga topik outbound per-saluran. Itu sudah tidak lagi benar — runtime memakai satu topik inbound channel.inbound.raw dan satu topik outbound channel.outbound.job seperti didokumentasikan di atas. Bukti: crates/webhook-ingestor/src/routes.rs:182,294,406,512 (4 handler memproduksi ke topik yang sama) dan crates/message-sender/src/main.rs:164 (subscribe topik tunggal). Drift ini dicatat di SCOPE.md sebagai D6. :::

Channel Redis pub/sub

ws-server berlangganan 4 channel Redis. chat-engine, api-gateway, dan scheduler mempublikasikan ke mereka:

ChannelPublisherSubscriberMuatan
conversation_updateschat-engine, api-gatewayws-server, webhook_dispatcherEvent percakapan (pesan baru, status, transfer)
typing_indicatorsws-server (dari client)ws-server (broadcast)Indikator mengetik, dikirim via event client WebSocket
agent_presencews-serverws-serverAgent online/offline, dipublikasikan saat first/last connection
sla_breachesschedulerws-serverEvent pelanggaran SLA, dipantau supervisor

Database strategy

PostgreSQL menyimpan semua data relasional terstruktur: agents, contacts, conversations, conversation_notes, conversation_transfers, quick_replies, agent_favorite_replies, wa_templates, campaigns, campaign_recipients, integrations, activity_logs, divisions, agent_divisions, scheduled_messages, csat_surveys, sla_policies, sla_breach_logs, outgoing_webhooks, webhook_deliveries. 41 file migrasi di migrations/ auto-apply saat startup melalui omni-common (via sqlx::migrate!). Verifikasi: find migrations -name "*.sql" | wc -l. (CLAUDE.md masih menuliskan angka 22 — drift D5 di SCOPE.md.)

MongoDB menyimpan data dengan throughput tinggi atau skema longgar: koleksi messages (semua pesan chat, dengan payload lengkap) dan webhook_audit (payload webhook mentah untuk replay).

Redis murni untuk pub/sub event real-time — OmniStream tidak memakai Redis sebagai cache kunci-nilai persisten.

Tugas background di api-gateway

api-gateway adalah satu-satunya layanan yang menjalankan scheduler background:

  • Scheduler (crates/api-gateway/src/scheduler.rs) — polling setiap 30 detik untuk:
    • Kampanye yang jadwal kirimnya sudah tiba
    • Pesan terjadwal (scheduled_messages)
    • Pelanggaran SLA (kirim ke channel sla_breaches)
    • Pengantaran webhook pending (webhook_deliveries)
  • Webhook dispatcher (crates/api-gateway/src/webhook_dispatcher.rs) — berlangganan conversation_updates, memetakan event ke tipe webhook (message.received, message.sent, message.status, conversation.created, conversation.resolved, conversation.assigned), mengantrikan webhook_deliveries dengan payload ditandatangani HMAC + exponential backoff.

Alur request end-to-end

Alur inbound

  1. Meta/SendGrid mengirim POST /webhook/<channel> ke binary webhook-ingestor (satu proses, empat route).
  2. Ingestor memverifikasi tanda tangan (X-Hub-Signature-256 untuk Meta atau X-Email-Webhook-Signature untuk email) menggunakan modul signature lokal (crates/webhook-ingestor/src/signature.rs).
  3. Payload dibungkus InboundRawEvent (dengan channel: "whatsapp" | "messenger" | "instagram" | "email") lalu diproduksi ke satu topik Kafka channel.inbound.raw; audit mentah ditulis ke MongoDB webhook_audit (fire-and-forget).
  4. chat-engine mengkonsumsi channel.inbound.raw, membaca field channel, dan men-dispatch ke parser yang cocok di crates/chat-engine/src/parser/{whatsapp,messenger,instagram,email}.rs. Parser mem-upsert ke PostgreSQL (contacts, conversations) dan MongoDB (messages), lalu publikasi conversation_updates ke Redis.
  5. ws-server menerima event Redis dan merutekan ke koneksi WebSocket pelanggan (supervisor/admin → broadcast; agent → hanya percakapan yang assigned).
  6. Secara paralel, webhook_dispatcher di api-gateway memetakan event menjadi message.received dan mengantrikan ke webhook_deliveries untuk dikirim ke webhook pihak ketiga.

Alur outbound

  1. Browser memanggil POST /api/messages via api-gateway.
  2. api-gateway memvalidasi JWT, RBAC, dan memproduksi job ke satu topik Kafka channel.outbound.job (payload OutboundMessageJob { channel, ... }).
  3. message-sender (satu binary) berlangganan channel.outbound.job, membaca field channel pada job, dan men-dispatch ke modul {whatsapp,messenger,instagram,email}.rs. Setiap modul membaca secret terkini dari integrations (hot-reload 30 detik), dan memanggil Meta Graph API atau mengirim via SMTP.
  4. Respons dicatat ke MongoDB dan conversation_updates diterbitkan kembali supaya WebSocket dan webhook keluar ikut mendapatkan status message.sent.

Auth & RBAC singkat

3 peran: admin, supervisor, agent. Middleware JWT di crates/api-gateway/src/middleware/auth.rs mengekstrak token dari header Authorization: Bearer atau cookie access_token dan memasukkan AgentClaims ke request extensions. Detail peran dan matriks izin ada di RBAC.

Last modified on June 8, 2026
Overview DeveloperQuickstart
On this page
  • Diagram topologi
  • Layanan dan port (dev)
  • Infrastruktur
  • Topik Kafka
  • Channel Redis pub/sub
  • Database strategy
  • Tugas background di api-gateway
  • Alur request end-to-end
    • Alur inbound
    • Alur outbound
  • Auth & RBAC singkat