Aggregation Pipeline

Process and transform data through multiple stages


Overview

Aggregation pipelines allow you to process and transform documents through a series of stages. Each stage performs an operation on the data and passes the results to the next stage.

Common use cases include:

Basic Example

Get user count and average age per city:

let results = users.aggregate()
    .match_("status is \"active\"")
    .group_by("city")
    .count("total")
    .avg("age", "avg_age")
    .execute()?;

for result in results {
    println!("{}: {} users, avg age {:.1}",
        result["city"],
        result["total"],
        result["avg_age"]
    );
}
results = coll.aggregate([
    {"match": "status = 'active'"},
    {"group_by": "city"},
    {"count": "total"},
    {
        "avg": {
            "field": "age",
            "output": "avg_age"
        }
    }
])

for result in results:
    print(f"{result['city']}: {result['total']} users, avg age {result['avg_age']:.1f}")
const results = await users.aggregate()
    .match("status is 'active'")
    .groupBy('city')
    .count('total')
    .avg('age', 'avg_age')
    .execute();

results.forEach(result => {
    console.log(`${result.city}: ${result.total} users, avg age ${result.avg_age.toFixed(1)}`);
});
results, err := users.Aggregate().
    Match("status is \"active\"").
    GroupBy("city").
    Count("total").
    Avg("age", "avg_age").
    Execute()

for _, result := range results {
    fmt.Printf("%s: %v users, avg age %.1f\n",
        result["city"],
        result["total"],
        result["avg_age"]
    )
}

Pipeline Stages

Aggregation pipelines process documents through stages in order:

let results = users.aggregate()
    .match_("age > 18")                    // Filter documents
    .group_by("city")                      // Group by city
    .count("user_count")                   // Count per group
    .avg("age", "avg_age")                 // Average age per group
    .sum("salary", "total_salary")         // Sum salaries per group
    .min("age", "youngest")                // Minimum age per group
    .max("age", "oldest")                  // Maximum age per group
    .sort("user_count", false)             // Sort by count descending
    .limit(10)                             // Top 10 cities
    .execute()?;
results = coll.aggregate([
    {"match": "age > 18"},
    {"group_by": "city"},
    {"count": "user_count"},
    {"avg": {"field": "age", "output": "avg_age"}},
    {"sum": {"field": "salary", "output": "total_salary"}},
    {"min": {"field": "age", "output": "youngest"}},
    {"max": {"field": "age", "output": "oldest"}},
    {"sort": {"field": "user_count", "asc": False}},
    {"limit": 10}
])
const results = await users.aggregate()
    .match('age > 18')
    .groupBy('city')
    .count('user_count')
    .avg('age', 'avg_age')
    .sum('salary', 'total_salary')
    .min('age', 'youngest')
    .max('age', 'oldest')
    .sort('user_count', false)
    .limit(10)
    .execute();
results, err := users.Aggregate().
    Match("age > 18").
    GroupBy("city").
    Count("user_count").
    Avg("age", "avg_age").
    Sum("salary", "total_salary").
    Min("age", "youngest").
    Max("age", "oldest").
    Sort("user_count", false).
    Limit(10).
    Execute()

Match (Filter)

Filter documents using query language syntax before aggregation:

// Filter adult active users
let results = users.aggregate()
    .match_("age >= 18 and status is \"active\"")
    .group_by("city")
    .count("total")
    .execute()?;
# Filter adult active users
results = coll.aggregate([
    {"match": "age >= 18 and status = 'active'"},
    {"group_by": "city"},
    {"count": "total"}
])
// Filter adult active users
const results = await users.aggregate()
    .match("age >= 18 and status is 'active'")
    .groupBy('city')
    .count('total')
    .execute();
// Filter adult active users
results, err := users.Aggregate().
    Match("age >= 18 and status is \"active\"").
    GroupBy("city").
    Count("total").
    Execute()

Group By

Group documents by one or more field values:

// Group by single field
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .execute()?;
# Group by single field
results = coll.aggregate([
    {"group_by": "city"},
    {"count": "total"}
])
// Group by single field
const results = await users.aggregate()
    .groupBy('city')
    .count('total')
    .execute();
// Group by single field
results, err := users.Aggregate().
    GroupBy("city").
    Count("total").
    Execute()

Aggregators

Count

Count documents in each group:

let results = users.aggregate()
    .group_by("city")
    .count("user_count")
    .execute()?;
results = coll.aggregate([
    {"group_by": "city"},
    {"count": "user_count"}
])
const results = await users.aggregate()
    .groupBy('city')
    .count('user_count')
    .execute();
results, err := users.Aggregate().
    GroupBy("city").
    Count("user_count").
    Execute()

Sum

Sum numeric field values:

let results = users.aggregate()
    .group_by("city")
    .sum("salary", "total_salary")
    .execute()?;
results = coll.aggregate([
    {"group_by": "city"},
    {"sum": {"field": "salary", "output": "total_salary"}}
])
const results = await users.aggregate()
    .groupBy('city')
    .sum('salary', 'total_salary')
    .execute();
results, err := users.Aggregate().
    GroupBy("city").
    Sum("salary", "total_salary").
    Execute()

Average (Avg)

Calculate average of numeric values:

let results = users.aggregate()
    .group_by("city")
    .avg("age", "avg_age")
    .execute()?;
results = coll.aggregate([
    {"group_by": "city"},
    {"avg": {"field": "age", "output": "avg_age"}}
])
const results = await users.aggregate()
    .groupBy('city')
    .avg('age', 'avg_age')
    .execute();
results, err := users.Aggregate().
    GroupBy("city").
    Avg("age", "avg_age").
    Execute()

Min & Max

Find minimum and maximum values:

let results = users.aggregate()
    .group_by("city")
    .min("age", "youngest")
    .max("age", "oldest")
    .execute()?;
results = coll.aggregate([
    {"group_by": "city"},
    {"min": {"field": "age", "output": "youngest"}},
    {"max": {"field": "age", "output": "oldest"}}
])
const results = await users.aggregate()
    .groupBy('city')
    .min('age', 'youngest')
    .max('age', 'oldest')
    .execute();
results, err := users.Aggregate().
    GroupBy("city").
    Min("age", "youngest").
    Max("age", "oldest").
    Execute()

Sort & Limit

Sort results and limit the number returned:

let results = users.aggregate()
    .group_by("city")
    .count("total")
    .sort("total", false)  // false = descending
    .limit(10)             // Top 10
    .skip(5)               // Skip first 5
    .execute()?;
results = coll.aggregate([
    {"group_by": "city"},
    {"count": "total"},
    {"sort": {"field": "total", "asc": False}},
    {"limit": 10},
    {"skip": 5}
])
const results = await users.aggregate()
    .groupBy('city')
    .count('total')
    .sort('total', false)  // false = descending
    .limit(10)             // Top 10
    .skip(5)               // Skip first 5
    .execute();
results, err := users.Aggregate().
    GroupBy("city").
    Count("total").
    Sort("total", false).  // false = descending
    Limit(10).             // Top 10
    Skip(5).               // Skip first 5
    Execute()

Project Fields

Select or exclude specific fields from results:

let results = users.aggregate()
    .group_by("city")
    .count("total")
    .project(&["city", "total"], false)  // Include only these fields
    .execute()?;

// Or exclude fields
let results = users.aggregate()
    .group_by("city")
    .count("total")
    .project(&["_internal"], true)  // Exclude these fields
    .execute()?;
# Include only these fields
results = coll.aggregate([
    {"group_by": "city"},
    {"count": "total"},
    {"project": ["city", "total"]}
])

# Or exclude fields
results = coll.aggregate([
    {"group_by": "city"},
    {"count": "total"},
    {"exclude": ["_internal"]}
])
// Include only these fields
const results = await users.aggregate()
    .groupBy('city')
    .count('total')
    .project(['city', 'total'], false)
    .execute();

// Or exclude fields
const results = await users.aggregate()
    .groupBy('city')
    .count('total')
    .project(['_internal'], true)
    .execute();
// Include only these fields
results, err := users.Aggregate().
    GroupBy("city").
    Count("total").
    Project([]string{"city", "total"}, false).
    Execute()

// Or exclude fields
results, err = users.Aggregate().
    GroupBy("city").
    Count("total").
    Project([]string{"_internal"}, true).
    Execute()

Complete Examples

User Statistics by City

let city_stats = users.aggregate()
    .group_by("city")
    .count("users")
    .avg("age", "avg_age")
    .sort("users", false)
    .execute()?;

for stat in city_stats {
    println!("{}: {} users, avg age {:.1}",
        stat["city"], stat["users"], stat["avg_age"]);
}
city_stats = coll.aggregate([
    {"group_by": "city"},
    {"count": "users"},
    {"avg": {"field": "age", "output": "avg_age"}},
    {"sort": {"field": "users", "asc": False}}
])

for stat in city_stats:
    print(f"{stat['city']}: {stat['users']} users, avg age {stat['avg_age']:.1f}")
const cityStats = await users.aggregate()
    .groupBy('city')
    .count('users')
    .avg('age', 'avg_age')
    .sort('users', false)
    .execute();

cityStats.forEach(stat => {
    console.log(`${stat.city}: ${stat.users} users, avg age ${stat.avg_age.toFixed(1)}`);
});
cityStats, err := users.Aggregate().
    GroupBy("city").
    Count("users").
    Avg("age", "avg_age").
    Sort("users", false).
    Execute()

for _, stat := range cityStats {
    fmt.Printf("%s: %v users, avg age %.1f\n",
        stat["city"], stat["users"], stat["avg_age"])
}

Salary Report by Department

let results = employees.aggregate()
    .match_("active is true")
    .group_by("department")
    .sum("salary", "total_salary")
    .avg("salary", "avg_salary")
    .count("employee_count")
    .sort("total_salary", false)
    .execute()?;

for dept in results {
    println!("Department: {}", dept["department"]);
    println!("  Employees: {}", dept["employee_count"]);
    println!("  Total: ${:.2}", dept["total_salary"]);
    println!("  Average: ${:.2}", dept["avg_salary"]);
}
results = coll.aggregate([
    {"match": "active = true"},
    {"group_by": "department"},
    {"sum": {"field": "salary", "output": "total_salary"}},
    {"avg": {"field": "salary", "output": "avg_salary"}},
    {"count": "employee_count"},
    {"sort": {"field": "total_salary", "asc": False}}
])

for dept in results:
    print(f"Department: {dept['department']}")
    print(f"  Employees: {dept['employee_count']}")
    print(f"  Total: ${dept['total_salary']:.2f}")
    print(f"  Average: ${dept['avg_salary']:.2f}")
const results = await employees.aggregate()
    .match('active is true')
    .groupBy('department')
    .sum('salary', 'total_salary')
    .avg('salary', 'avg_salary')
    .count('employee_count')
    .sort('total_salary', false)
    .execute();

results.forEach(dept => {
    console.log(`Department: ${dept.department}`);
    console.log(`  Employees: ${dept.employee_count}`);
    console.log(`  Total: $${dept.total_salary.toFixed(2)}`);
    console.log(`  Average: $${dept.avg_salary.toFixed(2)}`);
});
results, err := employees.Aggregate().
    Match("active is true").
    GroupBy("department").
    Sum("salary", "total_salary").
    Avg("salary", "avg_salary").
    Count("employee_count").
    Sort("total_salary", false).
    Execute()

for _, dept := range results {
    fmt.Printf("Department: %s\n", dept["department"])
    fmt.Printf("  Employees: %v\n", dept["employee_count"])
    fmt.Printf("  Total: $%.2f\n", dept["total_salary"])
    fmt.Printf("  Average: $%.2f\n", dept["avg_salary"])
}