Change Streams

Real-time notifications for document changes


Overview

Change Streams allow you to watch collections for real-time changes. When documents are inserted, updated, or deleted, your application receives immediate notifications. This enables reactive applications, real-time synchronization, and event-driven architectures.

Features

Getting Started

Watch a collection for real-time changes:

use jasonisnthappy::{Database, ChangeOperation};

fn main() -> jasonisnthappy::Result<()> {
    let db = Database::open("my.db")?;
    let collection = db.collection("users");

    // Watch all changes in the collection
    let (handle, receiver) = collection.watch().subscribe()?;

    // Spawn a thread to listen for changes
    std::thread::spawn(move || {
        while let Ok(event) = receiver.recv() {
            match event.operation {
                ChangeOperation::Insert => {
                    println!("New document inserted: {:?}", event.document);
                }
                ChangeOperation::Update => {
                    println!("Document updated: {:?}", event.document);
                }
                ChangeOperation::Delete => {
                    println!("Document deleted: {}", event.doc_id);
                }
            }
        }
    });

    // Make some changes
    let mut tx = db.begin()?;
    let mut coll = tx.collection("users")?;
    coll.insert(serde_json::json!({"name": "Alice", "age": 30}))?;
    tx.commit()?;

    // Watcher automatically unsubscribes when handle is dropped
    drop(handle);

    Ok(())
}
from jasonisnthappy import Database, ChangeOperation
import threading

def watch_changes(receiver):
    """Background thread to watch for changes"""
    try:
        while True:
            event = receiver.recv()
            if event.operation == ChangeOperation.INSERT:
                print(f"New document inserted: {event.document}")
            elif event.operation == ChangeOperation.UPDATE:
                print(f"Document updated: {event.document}")
            elif event.operation == ChangeOperation.DELETE:
                print(f"Document deleted: {event.doc_id}")
    except Exception:
        pass  # Handle receiver closed

with Database.open("my.db") as db:
    collection = db.collection("users")

    # Watch all changes
    handle, receiver = collection.watch().subscribe()

    # Start background thread
    watcher_thread = threading.Thread(target=watch_changes, args=(receiver,))
    watcher_thread.daemon = True
    watcher_thread.start()

    # Make some changes
    with db.begin() as tx:
        coll = tx.collection("users")
        coll.insert({"name": "Alice", "age": 30})

    # Cleanup
    handle.unsubscribe()
const { Database, ChangeOperation } = require('jasonisnthappy');

const db = Database.open('my.db');
const collection = db.collection('users');

// Watch all changes
const { handle, receiver } = collection.watch().subscribe();

// Listen for changes
(async () => {
    try {
        while (true) {
            const event = await receiver.recv();

            switch (event.operation) {
                case ChangeOperation.INSERT:
                    console.log('New document inserted:', event.document);
                    break;
                case ChangeOperation.UPDATE:
                    console.log('Document updated:', event.document);
                    break;
                case ChangeOperation.DELETE:
                    console.log('Document deleted:', event.docId);
                    break;
            }
        }
    } catch (err) {
        // Receiver closed
    }
})();

// Make some changes
const tx = db.begin();
const coll = tx.collection('users');
coll.insert({ name: 'Alice', age: 30 });
tx.commit();

// Cleanup
handle.unsubscribe();
db.close();
package main

import (
    "fmt"
    "github.com/sohzm/jasonisnthappy-go"
)

func main() {
    db, _ := jasonisnthappy.Open("my.db")
    defer db.Close()

    collection, _ := db.Collection("users")

    // Watch all changes
    handle, receiver, _ := collection.Watch().Subscribe()
    defer handle.Unsubscribe()

    // Listen for changes in goroutine
    go func() {
        for event := range receiver {
            switch event.Operation {
            case jasonisnthappy.ChangeInsert:
                fmt.Printf("New document inserted: %v\n", event.Document)
            case jasonisnthappy.ChangeUpdate:
                fmt.Printf("Document updated: %v\n", event.Document)
            case jasonisnthappy.ChangeDelete:
                fmt.Printf("Document deleted: %s\n", event.DocID)
            }
        }
    }()

    // Make some changes
    tx, _ := db.Begin()
    coll, _ := tx.Collection("users")
    coll.Insert(map[string]interface{}{"name": "Alice", "age": 30})
    tx.Commit()
}

Filtering Changes

You can filter changes using the query language, so you only receive notifications for documents matching specific criteria.

Filter by Field Value

Only watch changes for users older than 18:

let (handle, receiver) = collection
    .watch()
    .filter("age > 18")
    .subscribe()?;
handle, receiver = collection.watch().filter("age > 18").subscribe()
const { handle, receiver } = collection.watch().filter('age > 18').subscribe();
handle, receiver, _ := collection.Watch().Filter("age > 18").Subscribe()

Filter by Multiple Conditions

Watch for premium users in specific cities:

let (handle, receiver) = collection
    .watch()
    .filter("subscription is \"premium\" and (city is \"NYC\" or city is \"SF\")")
    .subscribe()?;
handle, receiver = collection.watch().filter(
    'subscription is "premium" and (city is "NYC" or city is "SF")'
).subscribe()
const { handle, receiver } = collection.watch().filter(
    'subscription is "premium" and (city is "NYC" or city is "SF")'
).subscribe();
handle, receiver, _ := collection.Watch().Filter(
    `subscription is "premium" and (city is "NYC" or city is "SF")`,
).Subscribe()

More Filter Examples

// Watch only active users
collection.watch().filter("status is \"active\"").subscribe()?;

// Watch high-value transactions
collection.watch().filter("amount > 1000").subscribe()?;

// Watch verified accounts
collection.watch().filter("verified is true").subscribe()?;

// Watch specific tags
collection.watch().filter("tags has \"urgent\"").subscribe()?;
# Watch only active users
collection.watch().filter('status is "active"').subscribe()

# Watch high-value transactions
collection.watch().filter('amount > 1000').subscribe()

# Watch verified accounts
collection.watch().filter('verified is true').subscribe()

# Watch specific tags
collection.watch().filter('tags has "urgent"').subscribe()
// Watch only active users
collection.watch().filter('status is "active"').subscribe();

// Watch high-value transactions
collection.watch().filter('amount > 1000').subscribe();

// Watch verified accounts
collection.watch().filter('verified is true').subscribe();

// Watch specific tags
collection.watch().filter('tags has "urgent"').subscribe();
// Watch only active users
collection.Watch().Filter(`status is "active"`).Subscribe()

// Watch high-value transactions
collection.Watch().Filter("amount > 1000").Subscribe()

// Watch verified accounts
collection.Watch().Filter("verified is true").Subscribe()

// Watch specific tags
collection.Watch().Filter(`tags has "urgent"`).Subscribe()

Important Notes on Filtering

Change Events

Event Structure

Each change event contains:

Operation Types

Insert

Triggered when a new document is created.

{
  "collection": "users",
  "operation": "Insert",
  "doc_id": "doc123",
  "document": {
    "_id": "doc123",
    "name": "Alice",
    "age": 30
  }
}

Update

Triggered when a document is modified.

{
  "collection": "users",
  "operation": "Update",
  "doc_id": "doc123",
  "document": {
    "_id": "doc123",
    "name": "Alice",
    "age": 31
  }
}

Delete

Triggered when a document is removed.

{
  "collection": "users",
  "operation": "Delete",
  "doc_id": "doc123",
  "document": null
}

Advanced Examples

Multiple Watchers

// Watch for all changes
let (handle1, receiver1) = collection.watch().subscribe()?;

// Watch only for high-priority changes
let (handle2, receiver2) = collection
    .watch()
    .filter("priority is \"high\"")
    .subscribe()?;

// Both watchers receive events independently

Real-time Dashboard

use jasonisnthappy::{Database, ChangeOperation};
use std::sync::{Arc, Mutex};

// Shared state for dashboard
let active_users = Arc::new(Mutex::new(Vec::new()));
let active_users_clone = active_users.clone();

let db = Database::open("my.db")?;
let collection = db.collection("users");

// Watch for user status changes
let (handle, receiver) = collection
    .watch()
    .filter("status is \"online\"")
    .subscribe()?;

// Update dashboard in background
std::thread::spawn(move || {
    while let Ok(event) = receiver.recv() {
        let mut users = active_users_clone.lock().unwrap();

        match event.operation {
            ChangeOperation::Insert | ChangeOperation::Update => {
                if let Some(doc) = event.document {
                    println!("User online: {:?}", doc);
                    users.push(doc);
                }
            }
            ChangeOperation::Delete => {
                println!("User offline: {}", event.doc_id);
                users.retain(|u| u.get("_id") != Some(&event.doc_id.into()));
            }
        }
    }
});

Data Synchronization

// Sync changes to external system
let (handle, receiver) = collection.watch().subscribe()?;

std::thread::spawn(move || {
    while let Ok(event) = receiver.recv() {
        match event.operation {
            ChangeOperation::Insert => {
                // Sync new document to remote
                sync_to_remote("insert", &event.document);
            }
            ChangeOperation::Update => {
                // Sync update to remote
                sync_to_remote("update", &event.document);
            }
            ChangeOperation::Delete => {
                // Sync deletion to remote
                sync_to_remote("delete", &event.doc_id);
            }
        }
    }
});

Event Logging

let (handle, receiver) = collection.watch().subscribe()?;
let log_file = Arc::new(Mutex::new(File::create("changes.log")?));

std::thread::spawn(move || {
    while let Ok(event) = receiver.recv() {
        let mut file = log_file.lock().unwrap();
        writeln!(
            file,
            "[{}] {} {} in {}",
            chrono::Utc::now(),
            event.operation,
            event.doc_id,
            event.collection
        ).ok();
    }
});

Best Practices

Resource Management

Performance

Common Patterns

Temporary Watching

{
    let (handle, receiver) = collection.watch().subscribe()?;

    // Process some events
    for _ in 0..10 {
        if let Ok(event) = receiver.recv() {
            process_event(event);
        }
    }

    // Handle dropped automatically here
}

Manual Unsubscribe

let (handle, receiver) = collection.watch().subscribe()?;

// ... use receiver ...

// Explicitly unsubscribe
handle.unsubscribe();

Graceful Shutdown

let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();

let (handle, receiver) = collection.watch().subscribe()?;

std::thread::spawn(move || {
    while running_clone.load(Ordering::Relaxed) {
        if let Ok(event) = receiver.recv_timeout(Duration::from_millis(100)) {
            process_event(event);
        }
    }
});

// Later: signal shutdown
running.store(false, Ordering::Relaxed);
handle.unsubscribe();