MongoDB 聚合管道
聚合管道是一系列数据处理阶段组成的流水线,文档依次通过每个阶段被加工处理。
核心阶段
| 阶段 | 作用 |
|---|---|
$match | 过滤文档(类似 SQL WHERE) |
$group | 分组聚合(类似 SQL GROUP BY) |
$project | 字段重塑 / 计算新字段 |
$lookup | 左外连接(类似 SQL LEFT JOIN) |
$unwind | 展开数组为多个文档 |
$sort | 排序 |
$limit / $skip | 限制 / 跳过 |
$addFields / $set | 添加字段 |
$facet | 并行多管道 |
阶段详解
$match — 过滤
js
{ $match: { status: "completed", total: { $gte: 100 } } }尽可能将
$match放在管道最前面,可用索引过滤,减少后续阶段处理量。
$group — 分组聚合
js
db.orders.aggregate([
{
$group: {
_id: "$customerId",
totalAmount: { $sum: "$total" },
orderCount: { $sum: 1 },
avgAmount: { $avg: "$total" },
firstOrder: { $min: "$date" },
lastOrder: { $max: "$date" }
}
}
])常用累加器:
| 操作符 | 含义 |
|---|---|
$sum | 求和 / 计数 |
$avg | 平均值 |
$min / $max | 最小 / 最大值 |
$push | 将值加入数组(保留重复) |
$addToSet | 将值加入数组(去重) |
$first / $last | 取首 / 尾值(需先排序) |
$project — 重塑文档
js
{
$project: {
_id: 0,
name: 1,
accountAgeDays: {
$divide: [{ $subtract: ["$$NOW", "$createdAt"] }, 86400000]
}
}
}仅增删字段时,用
$set/$unset更轻量。
$lookup — 关联查询
js
db.orders.aggregate([
{
$lookup: {
from: "products",
localField: "productId",
foreignField: "_id",
as: "product"
}
},
// 连接后 product 是数组,用 $unwind 拆开
{ $unwind: "$product" }
])带子管道的 $lookup(用于预过滤):
js
{
$lookup: {
from: "products",
let: { pid: "$productId" },
pipeline: [
{ $match: {
$expr: { $eq: ["$_id", "$$pid"] },
status: "active"
}},
{ $project: { name: 1, category: 1 } }
],
as: "product"
}
}$unwind — 展开数组
js
// 输入:{ name: "Alice", tags: ["mongodb", "nosql", "database"] }
// 输出:3 个文档,tags 成为单个值
{ $unwind: "$tags" }⚠️ 展开大数组会爆炸文档数量,先用
$match和$project缩小范围。
$facet — 并行管道
js
db.orders.aggregate([
{
$facet: {
byStatus: [
{ $group: { _id: "$status", count: { $sum: 1 } } }
],
byCategory: [
{ $unwind: "$items" },
{ $group: { _id: "$items.category", total: { $sum: "$items.price" } } }
]
}
}
])
// 输出:{ byStatus: [...], byCategory: [...] }完整示例
js
db.orders.aggregate([
// 1. 过滤最近一个月的已完成订单
{ $match: {
status: "completed",
date: { $gte: oneMonthAgo }
}},
// 2. 展开订单项
{ $unwind: "$items" },
// 3. 关联产品信息
{ $lookup: {
from: "products",
localField: "items.productId",
foreignField: "_id",
as: "productDetail"
}},
{ $unwind: "$productDetail" },
// 4. 计算每条订单项的销售额
{ $addFields: {
itemRevenue: { $multiply: ["$items.quantity", "$items.price"] }
}},
// 5. 按产品分类汇总
{ $group: {
_id: "$productDetail.category",
totalRevenue: { $sum: "$itemRevenue" },
unitsSold: { $sum: "$items.quantity" },
orderCount: { $addToSet: "$_id" }
}},
// 6. 整理输出
{ $addFields: { orderCount: { $size: "$orderCount" } }},
// 7. 按销售额降序
{ $sort: { totalRevenue: -1 } },
// 8. 只取前 5
{ $limit: 5 }
]).allowDiskUse(true) // 允许写入磁盘,突破 100MB 内存限制性能建议
| 建议 | 原因 |
|---|---|
$match 放最前 | 减少文档量,利用索引 |
$match 后接 $project | 裁剪字段,减少内存占用 |
$match、$sort 字段建索引 | 大幅提升速度 |
allowDiskUse(true) | 突破 100MB 内存限制 |
避免大数组 $unwind | 文档爆炸 |
用 $facet 合并并行查询 | 减少往返次数 |