Last Updated on 3월 26, 2023 by Jade(정현호)
안녕하세요
이번 포스팅에서는 MongoDB의 Aggregation 에 대해서 확인 해보려고 합니다.
Contents
Aggregation
MongoDB에서는 데이터를 분석하기 위한 몇 가지 기능을 제공 하고 있습니다.
그중에서 대표적인 기능은 맵리듀스 와 Aggregation 기능일 것 같으며, 그외 확장 검색 기능으로 전문 검색 과 공간 검색 기능도 제공 하고 있습니다.
맵리듀스는 다른 대용량 분산 NO SQL 데이터베이스와 같은 기능을 의미하며, 대용량 분산 병렬 처리 형태로 사용 됩니다.
Aggregation 은 맵리듀스 보다 MongoDB에서 늦게 추가 된 기능으로 맵리듀스에 비해서 빠른 성능과 손 쉬운 사용 편의성을 제공해주고 있습니다.
맵리듀스도 적절한 사용 용도가 있으며 Aggregation에서 만족할 수 없는 요건에 대해서 맵리듀스를 통해서 업무를 수행할 수도 있습니다. 그래서 원하는 용도와 목적에 맞게 사용하면 되며, 보통의 경우 Aggregation 을 통해서 많은 영역의 조회 및 데이터 분석 등의 조회 영역을 상대적으로 손쉽고 빠르게 수행할 수 있습니다.
그럼 MongoDB 에서 Aggregation이 주로 많이 사용 되는 이유에 대해서 살펴 보자면
• RDBMS에서의 SQL 로 Grouping 하여 분석을 위한 가공하는 것과 달리 MongoDB의 find 메서드로는 데이터 가공이 어려움이 있습니다.
• find 메서드를 통한 데이터의 그룹핑 및 특정 조건에 일치하는 도큐먼트의 개수를 확인 한다하는 등의 복잡한 쿼리 수행이 불가 합니다.
• 맵리듀스 기능을 통해서 분석 기능을 사용할 수는 있으나 간단한 분석에서도 맵리듀스의 경우 자바스크립트를 통해서 작성을 해야 합니다. 또한 자바 스크립트 기반의 맵리듀스는 많은 제약을 가지고 있기도 합니다.
• 맵리듀스를 사용하기 위해서 자바스크립트를 사용해야 한다는 것을 자바스크립트를 학습, 즉 런닝커브가 필요함을 의미 합니다.
• 맵리듀스가 Aggregation 에 비해서 상대적으로 처리 속도가 느립니다. 이유로는 Aggregation은 C++ 로 작성(개발)된 MongoDB 엔진의 자체 내장 기능 입니다. 별도의 언어 엔진이나 변수 매핑의 과정도 불필요 합니다. 통상적으로 C++ 은 어떠한 언어보다 빠른 처리 속도를 보장하거나 보여주고 있는 언어이기도 합니다.
MongoDB 2.2 버전부터 Aggregation 기능을 추가 되었으며, 버전 갱신 되면서 해당 기능도 개선 및 기능 추가가 되었습니다.
위에서 설명한 것처럼 모든 업무 요건을 Aggregation 으로 처리할 수는 없지만, 보통의 대부분의 경우 Aggregation 으로 처리 할 수 있습니다.
Single purpose aggregation methods
MongoDB 서버의 Aggregation 종류는 크게 2가지로 나눠볼 수 있습니다.
- Single purpose aggregation methods - 단일 목적 집계 방법
- Aggregation pipelines - 범용적 사용의 집계 방법
먼저 이번 장에서 살펴볼 내용은 단일 목적 Aggregation 방법 - Single purpose aggregation methods 입니다.
사용성이 단순하고, 사용 빈도는 높으며 쉽게 Aggregation 기능을 사용 할 수 있도록 별도의 명령을 지원하는 것 입니다.
이렇게 별도의 전용 명령어가 있는 Aggregation 을 Single purpose aggregation methods(단일 목적 Aggreegation) 이라고 합니다.
MongoDB 5.0, 6.0 기준으로 3개의 전용 명령어를 사용할 수 있습니다.
- db.collection.estimatedDocumentCount
- db.collection.count()
- db.collection.distinct()
MongoDB 버전 별로 Deprecated 되었거나 새로 추가 된 명령어가 있을 수 있으므로 사용하는 버전에서 사용 가능한 명령어는 확인이 필요 합니다.
db.collection.count()
db.collection.count() 는 collection 또는 view 에 대한 find() 쿼리와 일치하는 문서 수를 반환합니다.
기능을 확인해보기 위해서 아래와 같이 샘플 데이터를 입력 하겠습니다.
db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7698 , ename : "jade", job : "dba"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"}) db.test1.insert({empno:7654 , ename : "martin", job : "salesman"})
입력한 Collection에 대해서 count() 메서드는 아래와 같이 사용할 수 있습니다.
> db.test1.count() 18 > db.test1.count({ ename:"jade" }) 12 > db.test1.count({ ename:"martin" }) 6
count() 메서드는 인자로는 query 와 options 을 사용할 수 있으며, 두번째 인자인 options 에는 아래와 같은 옵션을 사용할 수 있습니다.
- limit
- skip
- hint
- maxTimeMS
- readConcern
- collation
옵션을 사용할 경우 아래와 같이 사용 합니다.
> db.test1.count({ ename:"jade" }, {limit: 5} ) 5
MongoDB의 매뉴얼에서는 db.collection.count(query) 와 db.collection.find(query).count() 구조가 같다고 언급되어 있습니다.
count() is equivalent to the db.collection.find(query).count() construct.
> db.test1.count({ ename:"jade" }) 12 > db.test1.find( { ename: "jade"}).count() 12
다만 skip 과 limit 옵션을 사용 할 경우 차이가 발생하는 부분은 있습니다.
> db.test1.count({ ename:"jade" }, {limit: 5} ) 5 > db.test1.find( { ename: "jade"}).limit(5).count() 12
find() 메서드에서는 limit 과 skip 옵션을 무시하기 때문에 위와 같은 결과가 나오게 됩니다.
find() 메서드를 사용하면서 의도한 대로 limit 과 skip이 적용하려면 applySkipLimit 옵션을 사용 합니다.
> db.test1.find( { ename: "jade"}).limit(5).count({ applySkipLimit:true }) 5
db.collection.count() 메서드 사용시 query 인자를 사용하지 않고, 즉 조건 없이 조회를 할 경우 Collection의 모든 도큐먼트 개수를 반환하게 됩니다. 즉 전체 도큐먼트를 카운트 하여 결과를 반환합니다.
다만 이와 같이 수행할 경우 실제로 Collection 전체의 도큐먼트를 일일이 확인하여 카운트 결과를 반환하는 것이 아닌 Collection의 메타데이터를 기반으로 결과를 반환 합니다. 즉 정확한 값이 아닐 수 있습니다.
When you call count without a query predicate, you may receive inaccurate document counts. Without a query predicate, count commands return results based on the collection's metadata, which may result in an approximate count
그래서 카운트가 부정확할 수도 있으나, MongoDB 서버에는 부하를 주지 않게 됩니다.
MongoDB 4.0 기능과 호환되는 몽고DB 드라이버는 countDocuments() 와 estimatedDocumentCount()에 대한 새로운 API를 선호하기 위해 각각의 커서와 컬렉션 count() API를 더 이상 사용하지 않습니다.
그래서 애플리케이션에서 MongoDB 드라이버를 통해서 사용할 경우 countDocuments() 또는 estimatedDocumentCount() 으로 대체하여 사용 해야 합니다.
## 테스트 python 코드 from pymongo import MongoClient import pprint client = MongoClient(host='localhost', port=27017,username='xxxxx',password='xxxxx') mydb = client['test'] mycol = mydb['test1'] pprint.pprint(mycol.count()) ## 실행 $ python3.8 mongo_count_test.py ........................................... raise TypeError( TypeError: 'Collection' object is not callable. If you meant to call the 'count' method on a 'Collection' object it is failing because no such method exists.
• count_documents 사용할 경우
## count_documents 을 사용하는 예제 코드 from pymongo import MongoClient import pprint client = MongoClient(host='localhost', port=27017,username='xxxxx',password='xxxxx') mydb = client['test'] mycol = mydb['test1'] pprint.pprint(mycol.count_documents({ "ename":"jade" })) ## 실행 $ python3.8 mongo_count_test.py 12
db.collection.distinct
distinct() 메서드는 유니크한 값을 출력하는 단일 목적의 Aggregation 입니다.
db.collection.distinct(field, query, options)
distinct() 메서드에서 사용 되는 인자는 field, query, options 입니다.
field 에는 유니크한 값을 확인하려는 필드명 입니다.
> db.test1.distinct("ename") [ "jade", "martin" ]
query를 지정(조건을 지정)하여 사용할 수도 있습니다.
> db.test1.distinct("ename", {empno:7654} ) [ "martin" ]
인덱스 사용이 가능할 때, db.collection.distinct() 작업은 인덱스를 사용할 수 있으며, 커버링 인덱스도 처리가 가능 합니다.
db.collection.estimatedDocumentCount()
db.collection.estimatedDocumentCount({}) 는 Collection 에서 필터 없이 전체 Document 갯수를 확인할 때 사용 합니다.
메서드명에서 예상할 수 있듯이 approximate count, 즉 대략적인 값을 리턴 하게 됩니다.
• 사용예시
> db.test1.estimatedDocumentCount({}) 18
사용할 수 있는 options 에는 maxTimeMS 이 있습니다. estimatedDocumentCount() 메서드의 최대 실행 허용 시간에 대한 옵션 입니다.
db.collection.estimatedDocumentCount()메서드는 쿼리 필터를 사용하지 않으며, 메타데이터를 사용하여 전체 컬렉션의 문서 수를 반환합니다.
Aggregation pipelines
위에서 언급한 3개의 단일 목적 Aggregation 이외에 여러가지 조회 조건이나 서비스 요건을 충족하기 위해서 사용자가 직접 구성하여 사용할 수 있는 Aggregation 을 지원하며, Aggregation pipelines 으로 분류 됩니다.
범용(적인) Aggregation 으로 표현 할 수 있으며 Aggregation pipelines 는 사용자가 추출하고자 하는 데이터 가공에 대한 내용을 직접 기술하여 사용하게 됩니다.
이때 데이터를 가공하는 작업은 스테이지(stage) 라는 단위로 작업이 구성 되며, 데이터는 이렇게 작성된 스테이지들을 하나의 pipe 처럼 흘러가는 형태로 원하는 데이터로 변환과 전달이 되며, 처리가 완료 후 집계된 데이터를 반출 하게 됩니다.
그래서 범용 Aggregation 이 정식 명칭으로 Aggregation pipelines 이라고 매뉴얼에 기재되어 있습니다.
Aggregation pipelines 은 pipeline 과 option 2개의 인자(파라미터)를 사용할 수 있습니다.
기능에 대해서 내용을 확인 해보기 위해서 먼저 아래와 같이 Collection 를 생성 하도록 하겠습니다.
db.orders.insertMany( [ { _id: 0, name: "Pepperoni", size: "small", price: 19, quantity: 10, date: ISODate( "2021-03-13T08:14:30Z" ) }, { _id: 1, name: "Pepperoni", size: "medium", price: 20, quantity: 20, date : ISODate( "2021-03-13T09:13:24Z" ) }, { _id: 2, name: "Pepperoni", size: "large", price: 21, quantity: 30, date : ISODate( "2021-03-17T09:22:12Z" ) }, { _id: 3, name: "Cheese", size: "small", price: 12, quantity: 15, date : ISODate( "2021-03-13T11:21:39.736Z" ) }, { _id: 4, name: "Cheese", size: "medium", price: 13, quantity:50, date : ISODate( "2022-01-12T21:23:13.331Z" ) }, { _id: 5, name: "Cheese", size: "large", price: 14, quantity: 10, date : ISODate( "2022-01-12T05:08:13Z" ) }, { _id: 6, name: "Vegan", size: "small", price: 17, quantity: 10, date : ISODate( "2021-01-13T05:08:13Z" ) }, { _id: 7, name: "Vegan", size: "medium", price: 18, quantity: 10, date : ISODate( "2021-01-13T05:10:13Z" ) } ] )
위에서 입력한 Collection 을 조회 하면 아래와 같습니다.
> db.orders.find() { "_id" : 0, "name" : "Pepperoni", "size" : "small", "price" : 19, "quantity" : 10, "date" : ISODate("2021-03-13T08:14:30Z") } { "_id" : 1, "name" : "Pepperoni", "size" : "medium", "price" : 20, "quantity" : 20, "date" : ISODate("2021-03-13T09:13:24Z") } { "_id" : 2, "name" : "Pepperoni", "size" : "large", "price" : 21, "quantity" : 30, "date" : ISODate("2021-03-17T09:22:12Z") } { "_id" : 3, "name" : "Cheese", "size" : "small", "price" : 12, "quantity" : 15, "date" : ISODate("2021-03-13T11:21:39.736Z") } { "_id" : 4, "name" : "Cheese", "size" : "medium", "price" : 13, "quantity" : 50, "date" : ISODate("2022-01-12T21:23:13.331Z") } { "_id" : 5, "name" : "Cheese", "size" : "large", "price" : 14, "quantity" : 10, "date" : ISODate("2022-01-12T05:08:13Z") } { "_id" : 6, "name" : "Vegan", "size" : "small", "price" : 17, "quantity" : 10, "date" : ISODate("2021-01-13T05:08:13Z") } { "_id" : 7, "name" : "Vegan", "size" : "medium", "price" : 18, "quantity" : 10, "date" : ISODate("2021-01-13T05:10:13Z") }
총 주문 수량 계산
입력한 데이터를 사용하여 다음 예제는 간단히 총 주문 수량을 계산하는 집계 쿼리를 수행해보도록 하겠습니다.
db.orders.aggregate( [ { $match: { size: "medium" } }, { $group: { _id: "$name", totalQuantity: { $sum: "$quantity" } } } ] )
위의 쿼리에 대해서 각각의 항목에 대한 내용은 다음과 같습니다.
$match stage
주문 Collection 에서 size가 medium 을 필터(조건) 합니다.
그리고 나머지 문서를 $group stage 로 전달 합니다.
$group stage
$match stage에서 전달 받은(남은) document 를 pizza 이름으로 grouping 합니다.
$sum 연산자를 사용하여 각 피자 이름의 총 주문 수량을 계산합니다.
합계는 aggregation 파이프라인에서 반환된 총 수량 필드에 저장됩니다.
위의 aggregation 수행결과는 다음과 같습니다.
{ "_id" : "Vegan", "totalQuantity" : 10 } { "_id" : "Cheese", "totalQuantity" : 50 } { "_id" : "Pepperoni", "totalQuantity" : 20 }
사용할 수 있는 집계 연산자로는 위의 예제에서 나온 $sum 을 비롯하여 $avg, $max, $min, $first, $last 등 다양하게 제공 되고 있습니다.
항목이 많은 관계로 아래 매뉴얼에서 지원되는 연산자를 참고하시면 됩니다.
총 주문 값 및 평균 주문 수량 계산
다음 예제에서는 두 날짜 사이의 총 피자 주문 값과 평균 주문 수량을 계산합니다
db.orders.aggregate( [ { $match: { "date": { $gte: new ISODate( "2020-01-30" ), $lt: new ISODate( "2022-01-30" ) } } }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalOrderValue: { $sum: { $multiply: [ "$price", "$quantity" ] } }, averageOrderQuantity: { $avg: "$quantity" } } }, { $sort: { totalOrderValue: -1 } } ] )
$match stage
orders 컬렉션의 도큐먼트를 $gte 및 $lt를 사용하여 지정된 날짜 범위의 문서로 필터링합니다.
그리고 나머지 문서를 $group stage 로 전달 합니다.
$group stage
$match stage에서 전달 받은(남은) document 를 $dateToString 을 통해서 grouping 합니다.
각 그룹에 대해서 다음과 같이 계산 합니다.
- totalOrderValue 는 $sum과 $multiply를 사용한 계산 입니다.
- averageOrderQuantity 는 $avg를 사용한 평균 주문 수량 입니다.
그룹화된 도큐먼트를 $sort stage 로 전달합니다.
$sort stage
각 그룹의 totalOrderValue을 기준으로 내림차순(-1)으로 문서를 정렬하며, 정렬된 문서를 반환합니다.
위의 aggregation 수행결과는 다음과 같습니다.
{ "_id" : "2022-01-12", "totalOrderValue" : 790, "averageOrderQuantity" : 30 } { "_id" : "2021-03-13", "totalOrderValue" : 770, "averageOrderQuantity" : 15 } { "_id" : "2021-03-17", "totalOrderValue" : 630, "averageOrderQuantity" : 30 } { "_id" : "2021-01-13", "totalOrderValue" : 350, "averageOrderQuantity" : 10 }
두번째 인자값에는 options 에는 다양한 옵션을 사용 할 수 있습니다.
db.orders.aggregate( [ { $match: { "date": { $gte: new ISODate( "2020-01-30" ), $lt: new ISODate( "2022-01-30" ) } } }, { $group: { _id: { $dateToString: { format: "%Y-%m-%d", date: "$date" } }, totalOrderValue: { $sum: { $multiply: [ "$price", "$quantity" ] } }, averageOrderQuantity: { $avg: "$quantity" } } }, { $sort: { totalOrderValue: -1 } } ],{ allowDiskUse:true } )
위의 예제 마지막 줄에 { allowDiskUse:true } 이 기재된 것 과 같이 사용할 수 있습니다.
• explain : aggregate() 명령의 실행 계획을 확인할 수 있는 옵션 입니다.
• allowDiskUse : MongoDB의 aggregate() 명령어는 정렬을 위해서 기본적으로 메모리를 100MB까지 사용할 수 있습니다.
그러므로 100MB 이상의 정렬을 해야하는 경우에는 aggregate() 명령어는 실패하게 도비니다. 이럴 경우 allowDiskUse:true 로 설정 후 쿼리를 수행한다면 디스크를 이용해서 정렬을 처리 할 수 있게 됩니다.
디스크는 MongoDB의 설정파일(mongod.conf) 에서 설정된 데이터 디렉토리(dbPath) 하위의 _tmp 디렉토리를 사용 합니다.
• cursor : aggregate() 수행 결과로 반환되는 커서의 배치 사이즈를 설정 합니다.
aggregate 명령은 커서를 반환하거나 결과를 컬렉션에 저장할 수 있습니다. 기본적으로는 결과 집합의 각 document는 16MB BSON 문서 크기 제한을 따릅니다.
단일 문서가 BSON 문서 크기 제한을 초과할 경우, 집합 쿼리는 에러가 발생 됩니다.
이 제한은 return document에만 적용됩니다. 다만 파이프라인 처리 중에 document가 이 크기를 초과할 수 있으며, 이때 cursor 옵션을 통해서 16MB 이상의 결과를 반환할 수 있습니다.
• maxTimeMS: aggregate() 명령이 실행되는 최대 시간을 설정 합니다.
• readConcern : aggregate() 명령이 도큐먼트의 개수를 확인시, 사용할 readConcern 옵션을 설정 합니다.
• collation : aggregate() 실행시 사용할 collation 을 설정 합니다.
Reference
Reference URL
• mongodb.com/v5.0/aggregation
• mongodb.com/v5.0/std-label-single-purpose-agg-methods
• mongodb.com/aggregation-pipeline
• mongodb.com/aggregation-pipeline-limits
• mongodb.com/operator/aggregation
Reference Book
• Real MongoDB
연관된 다른 글




Principal DBA(MySQL, AWS Aurora, Oracle)
핀테크 서비스인 핀다에서 데이터베이스를 운영하고 있어요(at finda.co.kr)
Previous - 당근마켓, 위메프, Oracle Korea ACS / Fedora Kor UserGroup 운영중
Database 외에도 NoSQL , Linux , Python, Cloud, Http/PHP CGI 등에도 관심이 있습니다
purityboy83@gmail.com / admin@hoing.io