Change Streams
Real-time notifications for document changes
Overview
Change Streams allow you to watch collections for real-time changes. When documents are inserted, updated, or deleted, your application receives immediate notifications. This enables reactive applications, real-time synchronization, and event-driven architectures.
Features
- Real-time: Receive notifications immediately when changes occur
- Filtered: Use query language to watch only specific changes
- Multiple Watchers: Multiple watchers can observe the same collection
- Automatic Cleanup: Watchers are automatically removed when dropped (RAII)
- Operation Types: Distinguish between inserts, updates, and deletes
Getting Started
Watch a collection for real-time changes:
use jasonisnthappy::{Database, ChangeOperation};
fn main() -> jasonisnthappy::Result<()> {
let db = Database::open("my.db")?;
let collection = db.collection("users");
// Watch all changes in the collection
let (handle, receiver) = collection.watch().subscribe()?;
// Spawn a thread to listen for changes
std::thread::spawn(move || {
while let Ok(event) = receiver.recv() {
match event.operation {
ChangeOperation::Insert => {
println!("New document inserted: {:?}", event.document);
}
ChangeOperation::Update => {
println!("Document updated: {:?}", event.document);
}
ChangeOperation::Delete => {
println!("Document deleted: {}", event.doc_id);
}
}
}
});
// Make some changes
let mut tx = db.begin()?;
let mut coll = tx.collection("users")?;
coll.insert(serde_json::json!({"name": "Alice", "age": 30}))?;
tx.commit()?;
// Watcher automatically unsubscribes when handle is dropped
drop(handle);
Ok(())
}
from jasonisnthappy import Database, ChangeOperation
import threading
def watch_changes(receiver):
"""Background thread to watch for changes"""
try:
while True:
event = receiver.recv()
if event.operation == ChangeOperation.INSERT:
print(f"New document inserted: {event.document}")
elif event.operation == ChangeOperation.UPDATE:
print(f"Document updated: {event.document}")
elif event.operation == ChangeOperation.DELETE:
print(f"Document deleted: {event.doc_id}")
except Exception:
pass # Handle receiver closed
with Database.open("my.db") as db:
collection = db.collection("users")
# Watch all changes
handle, receiver = collection.watch().subscribe()
# Start background thread
watcher_thread = threading.Thread(target=watch_changes, args=(receiver,))
watcher_thread.daemon = True
watcher_thread.start()
# Make some changes
with db.begin() as tx:
coll = tx.collection("users")
coll.insert({"name": "Alice", "age": 30})
# Cleanup
handle.unsubscribe()
const { Database, ChangeOperation } = require('jasonisnthappy');
const db = Database.open('my.db');
const collection = db.collection('users');
// Watch all changes
const { handle, receiver } = collection.watch().subscribe();
// Listen for changes
(async () => {
try {
while (true) {
const event = await receiver.recv();
switch (event.operation) {
case ChangeOperation.INSERT:
console.log('New document inserted:', event.document);
break;
case ChangeOperation.UPDATE:
console.log('Document updated:', event.document);
break;
case ChangeOperation.DELETE:
console.log('Document deleted:', event.docId);
break;
}
}
} catch (err) {
// Receiver closed
}
})();
// Make some changes
const tx = db.begin();
const coll = tx.collection('users');
coll.insert({ name: 'Alice', age: 30 });
tx.commit();
// Cleanup
handle.unsubscribe();
db.close();
package main
import (
"fmt"
"github.com/sohzm/jasonisnthappy-go"
)
func main() {
db, _ := jasonisnthappy.Open("my.db")
defer db.Close()
collection, _ := db.Collection("users")
// Watch all changes
handle, receiver, _ := collection.Watch().Subscribe()
defer handle.Unsubscribe()
// Listen for changes in goroutine
go func() {
for event := range receiver {
switch event.Operation {
case jasonisnthappy.ChangeInsert:
fmt.Printf("New document inserted: %v\n", event.Document)
case jasonisnthappy.ChangeUpdate:
fmt.Printf("Document updated: %v\n", event.Document)
case jasonisnthappy.ChangeDelete:
fmt.Printf("Document deleted: %s\n", event.DocID)
}
}
}()
// Make some changes
tx, _ := db.Begin()
coll, _ := tx.Collection("users")
coll.Insert(map[string]interface{}{"name": "Alice", "age": 30})
tx.Commit()
}
Filtering Changes
You can filter changes using the query language, so you only receive notifications for documents matching specific criteria.
Filter by Field Value
Only watch changes for users older than 18:
let (handle, receiver) = collection
.watch()
.filter("age > 18")
.subscribe()?;
handle, receiver = collection.watch().filter("age > 18").subscribe()
const { handle, receiver } = collection.watch().filter('age > 18').subscribe();
handle, receiver, _ := collection.Watch().Filter("age > 18").Subscribe()
Filter by Multiple Conditions
Watch for premium users in specific cities:
let (handle, receiver) = collection
.watch()
.filter("subscription is \"premium\" and (city is \"NYC\" or city is \"SF\")")
.subscribe()?;
handle, receiver = collection.watch().filter(
'subscription is "premium" and (city is "NYC" or city is "SF")'
).subscribe()
const { handle, receiver } = collection.watch().filter(
'subscription is "premium" and (city is "NYC" or city is "SF")'
).subscribe();
handle, receiver, _ := collection.Watch().Filter(
`subscription is "premium" and (city is "NYC" or city is "SF")`,
).Subscribe()
More Filter Examples
// Watch only active users
collection.watch().filter("status is \"active\"").subscribe()?;
// Watch high-value transactions
collection.watch().filter("amount > 1000").subscribe()?;
// Watch verified accounts
collection.watch().filter("verified is true").subscribe()?;
// Watch specific tags
collection.watch().filter("tags has \"urgent\"").subscribe()?;
# Watch only active users
collection.watch().filter('status is "active"').subscribe()
# Watch high-value transactions
collection.watch().filter('amount > 1000').subscribe()
# Watch verified accounts
collection.watch().filter('verified is true').subscribe()
# Watch specific tags
collection.watch().filter('tags has "urgent"').subscribe()
// Watch only active users
collection.watch().filter('status is "active"').subscribe();
// Watch high-value transactions
collection.watch().filter('amount > 1000').subscribe();
// Watch verified accounts
collection.watch().filter('verified is true').subscribe();
// Watch specific tags
collection.watch().filter('tags has "urgent"').subscribe();
// Watch only active users
collection.Watch().Filter(`status is "active"`).Subscribe()
// Watch high-value transactions
collection.Watch().Filter("amount > 1000").Subscribe()
// Watch verified accounts
collection.Watch().Filter("verified is true").Subscribe()
// Watch specific tags
collection.Watch().Filter(`tags has "urgent"`).Subscribe()
Important Notes on Filtering
- Insert/Update Only: Filters only apply to insert and update operations
- Delete Operations: Delete events with filters are not sent (no document to filter)
- Query Syntax: Use the same query language as the find operations
Change Events
Event Structure
Each change event contains:
collection: Name of the collectionoperation: Type of change (Insert, Update, Delete)doc_id: Document IDdocument: Full document data (None for Delete)
Operation Types
Insert
Triggered when a new document is created.
{
"collection": "users",
"operation": "Insert",
"doc_id": "doc123",
"document": {
"_id": "doc123",
"name": "Alice",
"age": 30
}
}
Update
Triggered when a document is modified.
{
"collection": "users",
"operation": "Update",
"doc_id": "doc123",
"document": {
"_id": "doc123",
"name": "Alice",
"age": 31
}
}
Delete
Triggered when a document is removed.
{
"collection": "users",
"operation": "Delete",
"doc_id": "doc123",
"document": null
}
Advanced Examples
Multiple Watchers
// Watch for all changes
let (handle1, receiver1) = collection.watch().subscribe()?;
// Watch only for high-priority changes
let (handle2, receiver2) = collection
.watch()
.filter("priority is \"high\"")
.subscribe()?;
// Both watchers receive events independently
Real-time Dashboard
use jasonisnthappy::{Database, ChangeOperation};
use std::sync::{Arc, Mutex};
// Shared state for dashboard
let active_users = Arc::new(Mutex::new(Vec::new()));
let active_users_clone = active_users.clone();
let db = Database::open("my.db")?;
let collection = db.collection("users");
// Watch for user status changes
let (handle, receiver) = collection
.watch()
.filter("status is \"online\"")
.subscribe()?;
// Update dashboard in background
std::thread::spawn(move || {
while let Ok(event) = receiver.recv() {
let mut users = active_users_clone.lock().unwrap();
match event.operation {
ChangeOperation::Insert | ChangeOperation::Update => {
if let Some(doc) = event.document {
println!("User online: {:?}", doc);
users.push(doc);
}
}
ChangeOperation::Delete => {
println!("User offline: {}", event.doc_id);
users.retain(|u| u.get("_id") != Some(&event.doc_id.into()));
}
}
}
});
Data Synchronization
// Sync changes to external system
let (handle, receiver) = collection.watch().subscribe()?;
std::thread::spawn(move || {
while let Ok(event) = receiver.recv() {
match event.operation {
ChangeOperation::Insert => {
// Sync new document to remote
sync_to_remote("insert", &event.document);
}
ChangeOperation::Update => {
// Sync update to remote
sync_to_remote("update", &event.document);
}
ChangeOperation::Delete => {
// Sync deletion to remote
sync_to_remote("delete", &event.doc_id);
}
}
}
});
Event Logging
let (handle, receiver) = collection.watch().subscribe()?;
let log_file = Arc::new(Mutex::new(File::create("changes.log")?));
std::thread::spawn(move || {
while let Ok(event) = receiver.recv() {
let mut file = log_file.lock().unwrap();
writeln!(
file,
"[{}] {} {} in {}",
chrono::Utc::now(),
event.operation,
event.doc_id,
event.collection
).ok();
}
});
Best Practices
Resource Management
- Drop Handles: Always drop or unsubscribe handles when done to free resources
- Use RAII: Leverage automatic cleanup by letting handles go out of scope
- Limit Watchers: Don't create too many watchers for the same collection
Performance
- Use Filters: Filter at the source to reduce unnecessary events
- Background Threads: Process events in separate threads/workers
- Buffering: Consider buffering events if processing is slow
- Error Handling: Handle receiver errors gracefully when channel closes
Common Patterns
Temporary Watching
{
let (handle, receiver) = collection.watch().subscribe()?;
// Process some events
for _ in 0..10 {
if let Ok(event) = receiver.recv() {
process_event(event);
}
}
// Handle dropped automatically here
}
Manual Unsubscribe
let (handle, receiver) = collection.watch().subscribe()?;
// ... use receiver ...
// Explicitly unsubscribe
handle.unsubscribe();
Graceful Shutdown
let running = Arc::new(AtomicBool::new(true));
let running_clone = running.clone();
let (handle, receiver) = collection.watch().subscribe()?;
std::thread::spawn(move || {
while running_clone.load(Ordering::Relaxed) {
if let Ok(event) = receiver.recv_timeout(Duration::from_millis(100)) {
process_event(event);
}
}
});
// Later: signal shutdown
running.store(false, Ordering::Relaxed);
handle.unsubscribe();