Aggregation Pipeline
Process and transform data through multiple stages
Overview
Aggregation pipelines allow you to process and transform documents through a series of stages. Each stage performs an operation on the data and passes the results to the next stage.
Common use cases include:
- Computing statistics (count, sum, average, min, max)
- Grouping documents by field values
- Filtering and sorting data
- Generating reports and analytics
Basic Example
Get user count and average age per city:
let results = users.aggregate()
.match_("status is \"active\"")
.group_by("city")
.count("total")
.avg("age", "avg_age")
.execute()?;
for result in results {
println!("{}: {} users, avg age {:.1}",
result["city"],
result["total"],
result["avg_age"]
);
}
results = coll.aggregate([
{"match": "status = 'active'"},
{"group_by": "city"},
{"count": "total"},
{
"avg": {
"field": "age",
"output": "avg_age"
}
}
])
for result in results:
print(f"{result['city']}: {result['total']} users, avg age {result['avg_age']:.1f}")
const results = await users.aggregate()
.match("status is 'active'")
.groupBy('city')
.count('total')
.avg('age', 'avg_age')
.execute();
results.forEach(result => {
console.log(`${result.city}: ${result.total} users, avg age ${result.avg_age.toFixed(1)}`);
});
results, err := users.Aggregate().
Match("status is \"active\"").
GroupBy("city").
Count("total").
Avg("age", "avg_age").
Execute()
for _, result := range results {
fmt.Printf("%s: %v users, avg age %.1f\n",
result["city"],
result["total"],
result["avg_age"]
)
}
Pipeline Stages
Aggregation pipelines process documents through stages in order:
let results = users.aggregate()
.match_("age > 18") // Filter documents
.group_by("city") // Group by city
.count("user_count") // Count per group
.avg("age", "avg_age") // Average age per group
.sum("salary", "total_salary") // Sum salaries per group
.min("age", "youngest") // Minimum age per group
.max("age", "oldest") // Maximum age per group
.sort("user_count", false) // Sort by count descending
.limit(10) // Top 10 cities
.execute()?;
results = coll.aggregate([
{"match": "age > 18"},
{"group_by": "city"},
{"count": "user_count"},
{"avg": {"field": "age", "output": "avg_age"}},
{"sum": {"field": "salary", "output": "total_salary"}},
{"min": {"field": "age", "output": "youngest"}},
{"max": {"field": "age", "output": "oldest"}},
{"sort": {"field": "user_count", "asc": False}},
{"limit": 10}
])
const results = await users.aggregate()
.match('age > 18')
.groupBy('city')
.count('user_count')
.avg('age', 'avg_age')
.sum('salary', 'total_salary')
.min('age', 'youngest')
.max('age', 'oldest')
.sort('user_count', false)
.limit(10)
.execute();
results, err := users.Aggregate().
Match("age > 18").
GroupBy("city").
Count("user_count").
Avg("age", "avg_age").
Sum("salary", "total_salary").
Min("age", "youngest").
Max("age", "oldest").
Sort("user_count", false).
Limit(10).
Execute()
Match (Filter)
Filter documents using query language syntax before aggregation:
// Filter adult active users
let results = users.aggregate()
.match_("age >= 18 and status is \"active\"")
.group_by("city")
.count("total")
.execute()?;
# Filter adult active users
results = coll.aggregate([
{"match": "age >= 18 and status = 'active'"},
{"group_by": "city"},
{"count": "total"}
])
// Filter adult active users
const results = await users.aggregate()
.match("age >= 18 and status is 'active'")
.groupBy('city')
.count('total')
.execute();
// Filter adult active users
results, err := users.Aggregate().
Match("age >= 18 and status is \"active\"").
GroupBy("city").
Count("total").
Execute()
Group By
Group documents by one or more field values:
// Group by single field
let results = users.aggregate()
.group_by("city")
.count("total")
.execute()?;
# Group by single field
results = coll.aggregate([
{"group_by": "city"},
{"count": "total"}
])
// Group by single field
const results = await users.aggregate()
.groupBy('city')
.count('total')
.execute();
// Group by single field
results, err := users.Aggregate().
GroupBy("city").
Count("total").
Execute()
Aggregators
Count
Count documents in each group:
let results = users.aggregate()
.group_by("city")
.count("user_count")
.execute()?;
results = coll.aggregate([
{"group_by": "city"},
{"count": "user_count"}
])
const results = await users.aggregate()
.groupBy('city')
.count('user_count')
.execute();
results, err := users.Aggregate().
GroupBy("city").
Count("user_count").
Execute()
Sum
Sum numeric field values:
let results = users.aggregate()
.group_by("city")
.sum("salary", "total_salary")
.execute()?;
results = coll.aggregate([
{"group_by": "city"},
{"sum": {"field": "salary", "output": "total_salary"}}
])
const results = await users.aggregate()
.groupBy('city')
.sum('salary', 'total_salary')
.execute();
results, err := users.Aggregate().
GroupBy("city").
Sum("salary", "total_salary").
Execute()
Average (Avg)
Calculate average of numeric values:
let results = users.aggregate()
.group_by("city")
.avg("age", "avg_age")
.execute()?;
results = coll.aggregate([
{"group_by": "city"},
{"avg": {"field": "age", "output": "avg_age"}}
])
const results = await users.aggregate()
.groupBy('city')
.avg('age', 'avg_age')
.execute();
results, err := users.Aggregate().
GroupBy("city").
Avg("age", "avg_age").
Execute()
Min & Max
Find minimum and maximum values:
let results = users.aggregate()
.group_by("city")
.min("age", "youngest")
.max("age", "oldest")
.execute()?;
results = coll.aggregate([
{"group_by": "city"},
{"min": {"field": "age", "output": "youngest"}},
{"max": {"field": "age", "output": "oldest"}}
])
const results = await users.aggregate()
.groupBy('city')
.min('age', 'youngest')
.max('age', 'oldest')
.execute();
results, err := users.Aggregate().
GroupBy("city").
Min("age", "youngest").
Max("age", "oldest").
Execute()
Sort & Limit
Sort results and limit the number returned:
let results = users.aggregate()
.group_by("city")
.count("total")
.sort("total", false) // false = descending
.limit(10) // Top 10
.skip(5) // Skip first 5
.execute()?;
results = coll.aggregate([
{"group_by": "city"},
{"count": "total"},
{"sort": {"field": "total", "asc": False}},
{"limit": 10},
{"skip": 5}
])
const results = await users.aggregate()
.groupBy('city')
.count('total')
.sort('total', false) // false = descending
.limit(10) // Top 10
.skip(5) // Skip first 5
.execute();
results, err := users.Aggregate().
GroupBy("city").
Count("total").
Sort("total", false). // false = descending
Limit(10). // Top 10
Skip(5). // Skip first 5
Execute()
Project Fields
Select or exclude specific fields from results:
let results = users.aggregate()
.group_by("city")
.count("total")
.project(&["city", "total"], false) // Include only these fields
.execute()?;
// Or exclude fields
let results = users.aggregate()
.group_by("city")
.count("total")
.project(&["_internal"], true) // Exclude these fields
.execute()?;
# Include only these fields
results = coll.aggregate([
{"group_by": "city"},
{"count": "total"},
{"project": ["city", "total"]}
])
# Or exclude fields
results = coll.aggregate([
{"group_by": "city"},
{"count": "total"},
{"exclude": ["_internal"]}
])
// Include only these fields
const results = await users.aggregate()
.groupBy('city')
.count('total')
.project(['city', 'total'], false)
.execute();
// Or exclude fields
const results = await users.aggregate()
.groupBy('city')
.count('total')
.project(['_internal'], true)
.execute();
// Include only these fields
results, err := users.Aggregate().
GroupBy("city").
Count("total").
Project([]string{"city", "total"}, false).
Execute()
// Or exclude fields
results, err = users.Aggregate().
GroupBy("city").
Count("total").
Project([]string{"_internal"}, true).
Execute()
Complete Examples
User Statistics by City
let city_stats = users.aggregate()
.group_by("city")
.count("users")
.avg("age", "avg_age")
.sort("users", false)
.execute()?;
for stat in city_stats {
println!("{}: {} users, avg age {:.1}",
stat["city"], stat["users"], stat["avg_age"]);
}
city_stats = coll.aggregate([
{"group_by": "city"},
{"count": "users"},
{"avg": {"field": "age", "output": "avg_age"}},
{"sort": {"field": "users", "asc": False}}
])
for stat in city_stats:
print(f"{stat['city']}: {stat['users']} users, avg age {stat['avg_age']:.1f}")
const cityStats = await users.aggregate()
.groupBy('city')
.count('users')
.avg('age', 'avg_age')
.sort('users', false)
.execute();
cityStats.forEach(stat => {
console.log(`${stat.city}: ${stat.users} users, avg age ${stat.avg_age.toFixed(1)}`);
});
cityStats, err := users.Aggregate().
GroupBy("city").
Count("users").
Avg("age", "avg_age").
Sort("users", false).
Execute()
for _, stat := range cityStats {
fmt.Printf("%s: %v users, avg age %.1f\n",
stat["city"], stat["users"], stat["avg_age"])
}
Salary Report by Department
let results = employees.aggregate()
.match_("active is true")
.group_by("department")
.sum("salary", "total_salary")
.avg("salary", "avg_salary")
.count("employee_count")
.sort("total_salary", false)
.execute()?;
for dept in results {
println!("Department: {}", dept["department"]);
println!(" Employees: {}", dept["employee_count"]);
println!(" Total: ${:.2}", dept["total_salary"]);
println!(" Average: ${:.2}", dept["avg_salary"]);
}
results = coll.aggregate([
{"match": "active = true"},
{"group_by": "department"},
{"sum": {"field": "salary", "output": "total_salary"}},
{"avg": {"field": "salary", "output": "avg_salary"}},
{"count": "employee_count"},
{"sort": {"field": "total_salary", "asc": False}}
])
for dept in results:
print(f"Department: {dept['department']}")
print(f" Employees: {dept['employee_count']}")
print(f" Total: ${dept['total_salary']:.2f}")
print(f" Average: ${dept['avg_salary']:.2f}")
const results = await employees.aggregate()
.match('active is true')
.groupBy('department')
.sum('salary', 'total_salary')
.avg('salary', 'avg_salary')
.count('employee_count')
.sort('total_salary', false)
.execute();
results.forEach(dept => {
console.log(`Department: ${dept.department}`);
console.log(` Employees: ${dept.employee_count}`);
console.log(` Total: $${dept.total_salary.toFixed(2)}`);
console.log(` Average: $${dept.avg_salary.toFixed(2)}`);
});
results, err := employees.Aggregate().
Match("active is true").
GroupBy("department").
Sum("salary", "total_salary").
Avg("salary", "avg_salary").
Count("employee_count").
Sort("total_salary", false).
Execute()
for _, dept := range results {
fmt.Printf("Department: %s\n", dept["department"])
fmt.Printf(" Employees: %v\n", dept["employee_count"])
fmt.Printf(" Total: $%.2f\n", dept["total_salary"])
fmt.Printf(" Average: $%.2f\n", dept["avg_salary"])
}