聚合操作处理数据记录并返回计算结果。聚合操作组将来自多个文档的值组合在一起,并且可以对分组数据执行各种操作以返回单个结果。MongoDB提供了执行聚合的三种方法:Aggregation Pipeline
、Map-Reduce
、Single Purpose Aggregation Operations
。
1.Aggregation Pipeline
MongoDB的聚合管道将MongoDB文档在一个管道处理完毕后将结果传给下一个管道处理。聚合管道是一个强大的工具,类似shell中的管道,并且它能够支持分片集群。
聚合管道的语法如下,aggregate()的参数为一个管道操作符的数组。
db.collection.aggregate( [ { <stage> }, ... ] )
1.1 Aggregation Pipeline Stages
文档按照顺序通过一组聚合管道,聚合管道支持以下操作符。
1.1.1 $project
修改输入文档的结构(不影响原数据):引入、去除指定字段
> db.user.find()
{ "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
{ "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "abc", "age" : 11 }
{ "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
> db.user.aggregate([{$project:{_id:0,name:1,age:1}}])
{ "name" : "jack", "age" : 14 }
{ "name" : "abc", "age" : 11 }
{ "name" : "tom", "age" : 12 }
1.1.2 $match
过滤数据,输出符合条件的文档。
> db.user.find({"name":"tom"})
{ "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
> db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$match:{"name":"tom"}}])
{ "name" : "tom", "age" : 12 }
1.1.3 $limit
限制返回的文档数。
> db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$limit:1}])
{ "name" : "jack", "age" : 14 }
1.1.4 $skip
跳过指定数量的文档。
> db.user.aggregate([{$project:{_id:0,name:1,age:1}},{$skip:2},{$limit:1}])
{ "name" : "tom", "age" : 12 }
1.1.5 $unwind
将数组类型字段值拆分成多条,每条包含数组的一个元素。
> db.user.find({"name":"rose"}).limit(1)
{ "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : [ 1, 2, 3, 4 ] }
> db.user.aggregate([{$match:{"name":"rose"}},{$limit:1},{$unwind:"$role"}])
{ "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 1 }
{ "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 2 }
{ "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 3 }
{ "_id" : ObjectId("5ae981f79ac174c20fb4c1f0"), "name" : "rose", "role" : 4 }
1.1.6 $group
分组操作,类似sql中的group by,一般与管道表达式min等组合使用。
语法如下
{ $group: { _id: <expression>, <field1>: { <accumulator1> : <expression1> }, ... } }
示例
> db.user.find().limit(3)
{ "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
{ "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "rose", "age" : 12 }
{ "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
> db.user.aggregate([{$limit:3},{$group:{_id:"$age"}}])
{ "_id" : 12 }
{ "_id" : 14 }
> db.user.aggregate([{$limit:3},{$group:{_id:"$age","n":{$max:"$name"}}}])
{ "_id" : 12, "n" : "tom" }
{ "_id" : 14, "n" : "jack" }
1.1.7 $sort
对文档进行排序,1为升序,-1为降序。
> db.user.aggregate([{$limit:3},{$sort:{age:1}},{$project:{_id:0,name:1,age:1}}])
{ "name" : "rose", "age" : 12 }
{ "name" : "tom", "age" : 12 }
{ "name" : "jack", "age" : 14 }
> db.user.aggregate([{$limit:3},{$sort:{age:1}},{$sort:{name:-1}},{$project:{_id:0,name:1,age:1}}])
{ "name" : "tom", "age" : 12 }
{ "name" : "rose", "age" : 12 }
{ "name" : "jack", "age" : 14 }
1.1.8 $lookup
用于多文档连接查询返回关联数据。
> db.product.find()
{ "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" }
{ "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" }
{ "_id" : 3, "name" : "产品3", "price" : 13, "type" : "品类2" }
{ "_id" : 4, "name" : "产品4", "price" : 15, "type" : "品类2" }
{ "_id" : 5, "name" : "产品5", "price" : 17, "type" : "品类3" }
{ "_id" : 6, "name" : "产品6", "price" : 19, "type" : "品类3" }
> db.order.find()
{ "_id" : 1, "product_id" : 1, "name" : "订单1" }
{ "_id" : 2, "product_id" : 2, "name" : "订单2" }
{ "_id" : 3, "product_id" : 2, "name" : "订单3" }
{ "_id" : 4, "product_id" : 1, "name" : "订单4" }
{ "_id" : 5, "name" : "订单5" }
> db.order.aggregate({$lookup:{from:"product",localField:"product_id",foreignField:"_id",as:"order_product"}})
{ "_id" : 1, "product_id" : 1, "name" : "订单1", "order_product" : [ { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" } ] }
{ "_id" : 2, "product_id" : 2, "name" : "订单2", "order_product" : [ { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" } ] }
{ "_id" : 3, "product_id" : 2, "name" : "订单3", "order_product" : [ { "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" } ] }
{ "_id" : 4, "product_id" : 1, "name" : "订单4", "order_product" : [ { "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" } ] }
{ "_id" : 5, "name" : "订单5", "order_product" : [ ] }
MongoDB 3.6加入了一些新的特性,可以到官网手册查阅
https://docs.mongodb.com/manual/reference/operator/aggregation/lookup/。
1.1.9 $geoNear
输出接近指定地点的文档,LBS相关的应用中常用。
https://docs.mongodb.com/manual/reference/operator/aggregation/geoNear/
1.2 Aggregation Pipeline Operators
聚合管道操作符可以构造用于聚合管道的表达式,聚合表达式的形式如下
{ <operator>: [ <argument1>, <argument2> ... ] }
如果操作符对应单个参数,其表达式形式如下
{ <operator>: <argument> }
MongoDB中支持的管道操作符太多,这里只给出几种常用的。
1.2.1 $sum 求和
> db.product.find()
{ "_id" : 1, "name" : "产品1", "price" : 10, "type" : "品类1" }
{ "_id" : 2, "name" : "产品2", "price" : 11, "type" : "品类1" }
{ "_id" : 3, "name" : "产品3", "price" : 13, "type" : "品类2" }
{ "_id" : 4, "name" : "产品4", "price" : 15, "type" : "品类2" }
{ "_id" : 5, "name" : "产品5", "price" : 17, "type" : "品类3" }
{ "_id" : 6, "name" : "产品6", "price" : 19, "type" : "品类3" }
> db.product.aggregate([{$group:{_id:"$type",price:{$sum:"$price"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "price" : 21 }
{ "_id" : "品类2", "price" : 28 }
{ "_id" : "品类3", "price" : 36 }
1.2.2 $avg 求平均值
> db.product.aggregate([{$group:{"_id":"$type","price":{$avg:"$price"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "price" : 10.5 }
{ "_id" : "品类2", "price" : 14 }
{ "_id" : "品类3", "price" : 18 }
1.2.3 $min 取最小值
> db.product.aggregate([{$group:{"_id":"$type","price":{$min:"$price"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "price" : 10 }
{ "_id" : "品类2", "price" : 13 }
{ "_id" : "品类3", "price" : 17 }
1.2.4 $max 取最大值
> db.product.aggregate([{$group:{"_id":"$type","price":{$max:"$price"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "price" : 11 }
{ "_id" : "品类2", "price" : 15 }
{ "_id" : "品类3", "price" : 19 }
1.2.5 $push 添加数组元素
> db.product.aggregate([{$group:{"_id":"$type","products":{$push:"$name"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "products" : [ "产品1", "产品2" ] }
{ "_id" : "品类2", "products" : [ "产品3", "产品4" ] }
{ "_id" : "品类3", "products" : [ "产品5", "产品6" ] }
1.2.6 $addToSet 添加数组元素(无重复)
$addToSet与$push几乎一致,区别就是不会添加已有值。
1.2.7 $first 取第一个元素
> db.product.aggregate([{$group:{"_id":"$type","product":{$first:"$name"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "product" : "产品1" }
{ "_id" : "品类2", "product" : "产品3" }
{ "_id" : "品类3", "product" : "产品5" }
1.2.8 $last 取最后一个元素
> db.product.aggregate([{$group:{"_id":"$type","product":{$last:"$name"}}},{$sort:{_id:1}}])
{ "_id" : "品类1", "product" : "产品2" }
{ "_id" : "品类2", "product" : "产品4" }
{ "_id" : "品类3", "product" : "产品6" }
更多管道操作相关内容查看官网文档 https://docs.mongodb.com/manual/reference/operator/aggregation/。
2.Map-Reduce Function
Map-Reduce是一个数据处理模型,它将大量数据分解(Map),再合并成最终结果(Reduce)。
以下图片是官方手册中Map-Reduce的处理过程的示例:
Map-Reduce的语法如下([ 参数 ]为可选项):
db.runCommand(
{
mapReduce: <collection>, # 要操作的集合
map: <function>, # 映射函数(生成键值对序列,作为reduce函数参数)
reduce: <function>, # 统计函数
[ finalize: <function>, ] # 对reduce返回结果进行最终整理后存入结果集合
[ out: <output>, ]
[ query: <document>, ] # 查询条件
[ sort: <document>, ] # 排序
[ limit: <number>, ] # 限制返回记录数量
[ scope: <document>, ] # 向map、reduce、finalize函数传入外部变量
[ jsMode: <boolean>, ]
[ verbose: <boolean>, ] # 显示详细的时间统计信息
[ bypassDocumentValidation: <boolean>, ]
[ collation: <document> ]
}
)
参数说明
- map函数:
function(){ emit(key,value) }
,提交两个参数key和value,数据会根据key的值进行分组,同组的value值存入values中。key和value作为reduce函数的参数。 - reduce函数:
function(key,values){ }
,参数key是分组字段,values是同组的值,reduce函数中对分组数据进行处理。 - out:指定结果集保存在哪里,可以是一个集合名称,也可以使用如下文档配置
# 结果集存放在内存(结果集需要<16M)
{ inline:1 }
# 结果集存放在集合中,如果该集合中已经有数据,可以选择以下三种模式处理
{ replace:"collection" } # 集合中的旧数据被替换,不保留
{ merge:"collection"} # 合并含有相同键的结果文档
{ reduce:"collection"} # 调用reduce函数,根据新值来处理旧集合的值
示例
准备数据
> db.order.find()
{ "_id" : ObjectId("5aec0f5733e1b532f26e4c74"), "cust_id" : "A123", "amount" : 500, "status" : "A" }
{ "_id" : ObjectId("5aec0f6333e1b532f26e4c75"), "cust_id" : "A123", "amount" : 250, "status" : "A" }
{ "_id" : ObjectId("5aec0f7333e1b532f26e4c76"), "cust_id" : "B212", "amount" : 200, "status" : "A" }
{ "_id" : ObjectId("5aec0f8333e1b532f26e4c77"), "cust_id" : "A123", "amount" : 300, "status" : "D" }
使用db.collection.mapReduce()的形式,结果集保存到内存
> db.order.mapReduce(function() {
... emit(this.cust_id, this.amount);
... },
... function(key, values) {
... return Array.sum(values)
... },
... {
... query: {
... status: "A"
... },
... out: {
... inline: 1
... },
... finalize: function(key, reduced) {
... return "总数为" + reduced;
... }
... })
{
"results" : [
{
"_id" : "A123",
"value" : "总数为750"
},
{
"_id" : "B212",
"value" : "总数为200"
}
],
"timeMillis" : 14,
"counts" : {
"input" : 3,
"emit" : 3,
"reduce" : 1,
"output" : 2
},
"ok" : 1
}
使用db.runCommand()的形式,结果集保存到指定的集合
> db.runCommand({
... mapReduce: "order",
... map: function() {
... emit(this.cust_id, this.amount);
... },
... reduce: function(key, values) {
... return Array.sum(values)
... },
... query: {
... status: "A"
... },
... out: {
... replace: "order_total"
... },
... finalize: function(key, reduced) {
... return "总数为" + reduced
... }
... })
{
"result" : "order_total",
"timeMillis" : 55,
"counts" : {
"input" : 3,
"emit" : 3,
"reduce" : 1,
"output" : 2
},
"ok" : 1
}
> db.order_total.find()
{ "_id" : "A123", "value" : "总数为750" }
{ "_id" : "B212", "value" : "总数为200" }
示例中仅仅演示了求和,除此以外,reduce函数中可以根据需要进行一系列的复杂处理。
3.Single Purpose Aggregation Methods
MongoDB同样提供了一些单一用途的聚合函数
db.collection.count():返回匹配的文档数
db.collection.distinct():以数组形式返回去重的字段值
> db.user.find()
{ "_id" : ObjectId("5ae2d26a219cdd2458b3cac3"), "name" : "jack", "age" : 14 }
{ "_id" : ObjectId("5ae2d26c219cdd2458b3cac4"), "name" : "rose", "age" : 12 }
{ "_id" : ObjectId("5ae2d26d219cdd2458b3cac5"), "name" : "tom", "age" : 12, "tag" : "11岁以上", "r" : [ 5 ] }
> db.user.count({"age":12})
2
> db.user.distinct("age")
[ 14, 12 ]