OmniStream Docs
  • Panduan Pengguna
  • Developer
  • API Reference
Developer Hub
Pendahuluan
Autentikasi
Model Data
    Ikhtisar Model DataSchema PostgreSQLKoleksi MongoDBRedis Pub/SubModel AgentModel ContactModel ConversationModel MessageRoles & Permissions
Webhook
WebSocket
Self-Hosting
Error & Rate Limit
Model Data

Redis Pub/Sub

Redis Pub/Sub

OmniStream memakai Redis tidak sebagai cache, melainkan sebagai message bus realtime. Event yang harus sampai ke browser user dipublikasikan di channel pub/sub Redis; ws-server subscribe ke keempat channel dan me-routing-kan ke WebSocket klien yang relevan.

Ada empat channel aktif:

  1. conversation_updates
  2. typing_indicators
  3. agent_presence
  4. sla_breaches

Semua channel bersumber pada konvensi redis::cmd("PUBLISH") dari service publisher, dan ws-server men-subscribe-nya di crates/ws-server/src/subscriber.rs:16-44:

Code
Subscribed to Redis channels 'conversation_updates', 'typing_indicators', 'agent_presence', and 'sla_breaches'

Channel conversation_updates

Publisher: chat-engine (setiap channel), api-gateway (untuk event user-action seperti assign/transfer/resolve).

Subscriber: ws-server.

Payload schema: ConversationUpdateEvent di crates/omni-common/src/models/events.rs:64:

Code
pub struct ConversationUpdateEvent { pub event_type: ConversationEventType, pub conversation_id: Uuid, pub contact_id: Uuid, pub assigned_agent_id: Option<Uuid>, pub message: Option<MessagePreview>, pub timestamp: DateTime<Utc>, } pub enum ConversationEventType { NewMessage, StatusChanged, AgentAssigned, ConversationCreated, Transferred, }

MessagePreview sebagai opsional sub-struct membawa cukup data untuk preview di Inbox tanpa me-load pesan penuh:

Code
pub struct MessagePreview { pub id: String, pub direction: String, pub msg_type: String, pub content_preview: String, pub status: Option<String>, // untuk status_changed pub external_id: Option<String>, // mis. wamid pub created_at: DateTime<Utc>, }

Routing di ws-server: Event NewMessage dan StatusChanged dikirim ke agent yang ditugaskan (assigned_agent_id) plus semua supervisor/admin yang sedang online. Event AgentAssigned dan Transferred dikirim ke agent lama dan baru sekaligus supaya kedua sisi UI refresh.

Kenapa ada juga di webhook dispatcher: api-gateway lewat webhook_dispatcher juga subscribe ke channel yang sama untuk memicu outgoing webhook ke endpoint customer — lihat developer/webhook/outgoing.

Channel typing_indicators

Publisher: ws-server (diterima dari client message WebSocket, lihat developer/websocket/client-messages).

Subscriber: ws-server (broadcast ke semua koneksi).

Payload schema: JSON sederhana, tidak ada struct formal — event diteruskan apa adanya oleh broadcast handler (crates/ws-server/src/subscriber.rs:98-106):

Code
{ "event_type": "typing_start", "conversation_id": "550e8400-e29b-41d4-a716-446655440000", "agent_id": "...", "timestamp": "2026-04-11T10:30:00Z" }

Frontend mem-filter berdasar conversation_id dan menyembunyikan event dari sender itu sendiri (supaya user tidak melihat indikator ketik milik dirinya sendiri).

Typing indicator adalah satu-satunya channel yang di-broadcast ke semua koneksi WebSocket. Volume-nya rendah (setiap agent hanya me-ngetik saat sedang aktif) jadi overhead broadcast-nya marginal. Kalau di masa depan jumlah agent simultan naik signifikan, ini jadi kandidat pertama untuk re-design ke channel per-conversation.

Channel agent_presence

Publisher: ws-server (saat WebSocket connection open/close).

Subscriber: ws-server (broadcast ke semua agent).

Payload schema: AgentPresenceEvent di crates/omni-common/src/models/events.rs:105:

Code
pub struct AgentPresenceEvent { pub event_type: String, // "agent_online" | "agent_offline" pub agent_id: Uuid, pub agent_name: String, pub is_online: bool, pub timestamp: DateTime<Utc>, }

Frontend memakai event ini untuk menampilkan dot hijau/abu-abu di daftar agent di halaman User Management dan di sidebar Inbox (saat memilih siapa yang akan menerima transfer).

Kenapa publish-nya di ws-server dan bukan api-gateway? Karena kondisi "online/offline" otoritatifnya adalah apakah ada koneksi WebSocket terbuka, bukan row database. Kalau dua tab browser terbuka dan satu ditutup, ws-server masih menganggap agent online sampai tab terakhir tertutup.

Channel sla_breaches

Publisher: api-gateway/src/scheduler.rs::publish_breach_event (scheduler.rs:422-444).

Subscriber: ws-server — kirim ke assigned agent + semua supervisor/admin (subscriber.rs:107 dst.).

Payload schema: SlaBreachEvent di crates/omni-common/src/models/sla.rs (struct internal, tidak di-expose di events.rs):

Code
{ "event_type": "sla_breach", "conversation_id": "...", "policy_id": "...", "policy_name": "WhatsApp Priority Support", "breach_type": "first_response", "breached_at": "2026-04-11T10:35:00Z", "deadline_at": "2026-04-11T10:30:00Z", "assigned_agent_id": "...", "contact_name": "..." }

Kapan dipublish: Scheduler polling di api-gateway berjalan setiap 30 detik. Untuk setiap sla_policies aktif, ia me-load semua open conversation yang match (prioritas match: channel+division(4) > division(2) > channel(2) > default(1), logic di scheduler.rs::find_policy), lalu mengecek apakah deadline first_response atau resolution sudah lewat tanpa entri di sla_breach_logs. Breach baru → insert ke sla_breach_logs + publish.

Detail halaman admin: panduan/admin/kebijakan-sla. Detail implementasi scheduler: crates/api-gateway/src/scheduler.rs.

Cara menghubungkan ke browser

Browser tidak bicara dengan Redis secara langsung. Alur lengkap:

Code
chat-engine / api-gateway scheduler ↓ PUBLISH Redis channel ↓ SUBSCRIBE ws-server (:3002) ↓ filter + route WebSocket ke browser agent tertentu ↓ onmessage Svelte store → reactive UI

Detail koneksi WebSocket dari sisi client ada di developer/websocket/koneksi dan bentuk event yang diterima browser ada di developer/websocket/events.

Konfigurasi dan debugging

Variabel lingkungan:

  • REDIS_URL — wajib (mis. redis://redis:6379). Dipakai oleh publisher maupun subscriber. Tidak ada default.

Debug cepat dari VPS:

TerminalCode
# Lihat publish rate per channel (butuh container redis running) docker compose exec redis redis-cli MONITOR | grep -E "PUBLISH (conversation_updates|typing_indicators|agent_presence|sla_breaches)" # Subscribe manual untuk menangkap payload asli docker compose exec redis redis-cli SUBSCRIBE conversation_updates # Cek jumlah subscriber aktif per channel docker compose exec redis redis-cli PUBSUB NUMSUB conversation_updates typing_indicators agent_presence sla_breaches

Kalau NUMSUB menunjukkan 0 untuk salah satu channel, artinya ws-server tidak tersambung — cek log ws-server untuk pesan "Starting Redis subscriber for channels ..." yang seharusnya muncul saat boot.

Referensi

  • Skema event: crates/omni-common/src/models/events.rs
  • Publisher: crates/chat-engine/, crates/api-gateway/src/scheduler.rs, crates/api-gateway/src/webhook_dispatcher.rs
  • Subscriber: crates/ws-server/src/subscriber.rs:16-124
  • WebSocket routing: crates/ws-server/src/handler.rs
Last modified on June 8, 2026
Koleksi MongoDBModel Agent
On this page
  • Channel conversation_updates
  • Channel typing_indicators
  • Channel agent_presence
  • Channel sla_breaches
  • Cara menghubungkan ke browser
  • Konfigurasi dan debugging
  • Referensi
Rust
Rust
JSON
Rust
JSON