MongoDB - 트랜잭션(Transaction) - Isolation Level(격리 수준)

Share

Last Updated on 4월 20, 2024 by Jade(정현호)

안녕하세요

이번 글은 MongoDB에서 트랜잭션과 Isolation Level 및 MVCC에 관련된 내용을 확인해보도록 하겠습니다.

MongoDB Isolation level

데이터베이스 시스템에서 isolation(격리)는 트랜잭션 무결성이 다른 사용자 및 시스템에 표시되는 방식을 결정합니다.

격리 수준(isolation level)이 낮을수록 많은 사용자가 동시에 동일한 데이터에 액세스할 수 있는 능력이 높아지지만 사용자가 겪을 수 있는 동시성 효과(예: 더티 읽기 또는 lost updates)도 늘어납니다.

반대로 격리 수준이 높을수록 사용자에게 발생할 수 있는 동시성 효과 유형은 줄어들지만 더 많은 시스템 리소스가 필요하고 한 트랜잭션이 다른 트랜잭션을 차단할 가능성이 높아집니다.
- wikipedia

보통의 데이터베이스의 경우 4가지 격리 수준(isolation level)을 모두 지원하는 경우가 많으며, 기본 사용 격리수준은 Read committed 또는 Repeatable read을 주로 사용합니다.

Isolation Level

  • Serializable
  • Repeatable read
  • Read committed
  • Read uncommitted


Read uncommitted는 커밋 되지 않은 데이터를 다른 세션에서 읽을 수 있기 때문에 서비스 제공 데이터베이스나 쓰기와 읽기가 동시에 일어나는 Primary(Master) 인스턴스에서는 사용되지 않으나, 성능 향상을 위해서 와 같이 특정 목적으로 사용하는 경우도 있긴 합니다.

일반적인 서비스를 처리하는데 있어서 Read committed 나 Repeatable reads를 주로 사용되고 있으며, 데이터베이스마다 기본값으로 설정된 수준도 다르고, 격리 수준의 변경 유무 등도 다릅니다.

WiredTiger는 read-uncommitted, read-committed, 그리고 snapshot 세 가지 isolation level을 지원합니다.


다만 MongoDB에서 WiredTiger 스토리지 엔진 사용시에는 snapshot 격리 수준만 지원합니다.

여러 데이터베이스에서 ACID 원칙을 지키면서 동시성 이슈를 개선하기 위해서 MVCC를 사용하고 MVCC의 구현은 데이터베이스마다 각각 여러가지 기술을 토대로 구현하고 있습니다.

MongoDB에서 사용하는 WiredTiger 스토리지 엔진에서는 snapshot 격리 수준(isolation level)을 통해서 MVCC를 사용하고 있습니다.

스냅샷 격리 수준에서는 트랜잭션이 시작되기 전에 커밋된 레코드 버전을 읽습니다. Dirty reads(더티 읽기) 및 non-repeatable reads 를 방지 합니다. 다만 팬텀(phantoms) 읽기는 가능합니다.

 

WiredTiger에서 모든 갱신은(변경)은 snapshot 격리 수준에서 수행되어야 합니다.

스냅샷 격리 수준에서 트랜잭션 시작시 생성되는 스냅샷은 트랜잭션이 시작될 때 생성되며 트랜잭션의 전체 수명 주기 동안 유지됩니다.

예전버전에서는 사용자가 직접 트랜잭션을 제어할 수 없었으나 4.0 버전 부터는 뒤에서 소개될 다중 문서 트랜잭션(multi-document transactions) 지원 및 사용자가 직접 트랜잭션 제어가 가능 해졌습니다.

이전에 사용자가 직접 트랜잭션 제어가 불가능한 버전에서는 직접 제어가 되지 않아 불편한 부분이 있었던 부분도 있지만, 트랜잭션이 도큐먼트 단위로 자동으로 커밋이 되기 때문에 장시간 커밋되지 않고 유지되는 트랜잭션이 존재할 수 없었습니다.

 

MySQL과 같은 RDBMS에서도 그렇고 MongoDB서버에서도 장기간의 트랜잭션이 유지될 경우 이로 인해서 성능 저하가 발생할 수도 있기 때문에 장기간 트랜랜잭션이 유지되지 않는다는 부분은 장점이라고 할 수 있습니다.

MongoDB에서 WiredTiger 스토리지 엔진을 사용하여 데이터를 읽게 되면 위에서 설명한 내용과 같이 트랜잭션을 사용하여 특정 시점의 데이터를 읽게 되고 이를 스냅샷(snapshot) 이라고 합니다.

스냅샷은 트랜잭션내 특정 시점의 데이터 상태를 그대로 유지할 수 있는 개념으로 원리는 특정 트랜잭션을 시작하고 그 트랜잭션에서 데이터를 읽는 것을 의미합니다.


데이터를 읽을 경우 다음과 같은 조건이 만족할 때까지 스냅샷을 유지하게 됩니다.

  • 쿼리가 지정된 건수의 도큐먼트를 읽을 경우
  • 쿼리가 지정된 시간 동안 수행된 경우
  • 트랜잭션이 종료(commit 또는 rollback 또는 abort)되기 전까지

 

MongoDB에서는 트랜잭션에 대해서 Yield 기능이 있으며 1, 2번이 이에 해당하며 이와 관련된 파라미터는 internalQueryExecYieldIterationsinternalQueryExecYieldPeriodMS 입니다.

Yield는 MongoDB가 트랜잭션을 미지원 하는 DB로 부터 시작하였고 높은 트랜잭션 구현이 주요 목표가 아니었기 때문에 트랜잭션에 따른 정합성보다는 높은 동시 처리성이 더 주요한 목표였습니다.

이러한 목표와 목적에 의해서 MongoDB에서는 설정된 조건보다 오랜 시간 실행되거나 많은 자원을 소모하는 경우에는 잠깐 쉬었다가 다시 처리를 재개하는 형태로 구현되었습니다.

이와 같은 기능을 MongoDB에서는 Yield 기능이라고 합니다. 데이터 처리를 위해서 획득하였던 잠금까지 모두 해제하고 CPU만 놓고 다시 쓰레드 스케줄링을 기다리는 형태로 처리됩니다.

 

조건에 만족할 때까지 스냅샷이 유지된다는 의미는 만족하지 못하면 스냅샷이 유지가 되지 않음을 의미합니다.

예를 들어 1,2,3,4,5 5개의 도큐먼트가 있는 컬렉션에서 세션1에서 1,2,3 까지 읽고 Yield 가 되었을 경우 다시 이어서 읽기 전에 다른 세션2에서 4 도큐먼트를 삭제하였을 경우 세션1에서 다시 재개해서 읽게 될 때는 3 이후부터 다시 읽어서 처리를 하게 되는데 4가 이미 삭제가 되었기 때문에 3 다음에 5가 출력이 되게 됩니다.

처음 시작했을 때 기준으로 스냅샷이 유지되었다면 1,2,3,4,5 5개 도큐먼트가 출력이 되었을 것이지만 Yield가 되었을 경우 그 다음 부터는 새로 읽기 때문에 처음 시작했을 때와 데이터의 모습이 다를 수 있습니다.

포스팅에서 사용하는 MongoDB 7.0 버전에서는 internalQueryExecYieldIterations의 기본값은 1000 이고, internalQueryExecYieldPeriodMS 기본값은 10 입니다.

기본값은 사용하는 버전에 따라서 다를 수 있습니다.

Yield 에 대해서 다음의 테스트를 통해서 기능 동작은 확인해볼 수 있습니다.
         

internalQueryExecYieldIterations

파라미터의 기본값은 1000이며 다음과 같이 파라미터 값을 조회할 수 있습니다.

mongo> db.adminCommand( { getParameter: 1,'internalQueryExecYieldIterations':1})


파라미터 값이 1000으로 설정 되어있기 때문에 1000 도큐먼트를 읽게 되면 Yield를 수행되게 됩니다.

테스트를 위해 다음과 같이 5만건 도큐먼트를 가진 컬렉션을 생성하도록 하겠습니다.

mongo> use test
mongo> for(var a=1; a<=50000; a++) db.ts_tran_yield.insertOne({ item: a})
mongo> db.ts_tran_yield.countDocuments()
50000


먼저 Yield가 되지 않은 snapshot 이 계속 유지되는 일반적인 상황 부터 확인해보도록 하겠습니다.

internalQueryExecYieldIterations 파라미터값 1000 개수보다 적게 읽었을 경우 부터 확인해보도록 하겠습니다.

Session 1 에서는 find로 조회를 하며, 다른 세션 Session 2 에서는 delete 로 도큐먼트를 삭제하도록 하겠습니다.

• Session 1

mongo> db.ts_tran_yield.find()
[
  { _id: ObjectId("66044b77097cebd7e0c29985"), item: 1 },
  { _id: ObjectId("66044b77097cebd7e0c29986"), item: 2 },
  { _id: ObjectId("66044b77097cebd7e0c29987"), item: 3 },
  { _id: ObjectId("66044b77097cebd7e0c29988"), item: 4 },
  { _id: ObjectId("66044b77097cebd7e0c29989"), item: 5 },
  { _id: ObjectId("66044b77097cebd7e0c2998a"), item: 6 },
  { _id: ObjectId("66044b77097cebd7e0c2998b"), item: 7 },
  { _id: ObjectId("66044b77097cebd7e0c2998c"), item: 8 },
  { _id: ObjectId("66044b77097cebd7e0c2998d"), item: 9 },
  { _id: ObjectId("66044b77097cebd7e0c2998e"), item: 10 },
  { _id: ObjectId("66044b77097cebd7e0c2998f"), item: 11 },
  { _id: ObjectId("66044b77097cebd7e0c29990"), item: 12 },
  { _id: ObjectId("66044b77097cebd7e0c29991"), item: 13 },
  { _id: ObjectId("66044b77097cebd7e0c29992"), item: 14 },
  { _id: ObjectId("66044b77097cebd7e0c29993"), item: 15 },
  { _id: ObjectId("66044b77097cebd7e0c29994"), item: 16 },
  { _id: ObjectId("66044b77097cebd7e0c29995"), item: 17 },
  { _id: ObjectId("66044b77097cebd7e0c29996"), item: 18 },
  { _id: ObjectId("66044b77097cebd7e0c29997"), item: 19 },
  { _id: ObjectId("66044b77097cebd7e0c29998"), item: 20 }
]
Type "it" for more
<-- 20건 출력 후 대기 상태임


• Session 2

// 21~40 사이에 도큐먼트 1개를 삭제
mongo> db.ts_tran_yield.deleteOne({'item':38})
{ acknowledged: true, deletedCount: 1 }

// 확인
mongo> db.ts_tran_yield.find({'item':38})
              <-- 데이터 없음
mongo>


• Session 1

// 데이터 계속 확인
Type "it" for more
mongo> it
[
  { _id: ObjectId("66044b77097cebd7e0c29999"), item: 21 },
  { _id: ObjectId("66044b77097cebd7e0c2999a"), item: 22 },
  { _id: ObjectId("66044b77097cebd7e0c2999b"), item: 23 },
  { _id: ObjectId("66044b77097cebd7e0c2999c"), item: 24 },
  { _id: ObjectId("66044b77097cebd7e0c2999d"), item: 25 },
  { _id: ObjectId("66044b77097cebd7e0c2999e"), item: 26 },
  { _id: ObjectId("66044b77097cebd7e0c2999f"), item: 27 },
  { _id: ObjectId("66044b77097cebd7e0c299a0"), item: 28 },
  { _id: ObjectId("66044b77097cebd7e0c299a1"), item: 29 },
  { _id: ObjectId("66044b77097cebd7e0c299a2"), item: 30 },
  { _id: ObjectId("66044b77097cebd7e0c299a3"), item: 31 },
  { _id: ObjectId("66044b77097cebd7e0c299a4"), item: 32 },
  { _id: ObjectId("66044b77097cebd7e0c299a5"), item: 33 },
  { _id: ObjectId("66044b77097cebd7e0c299a6"), item: 34 },
  { _id: ObjectId("66044b77097cebd7e0c299a7"), item: 35 },
  { _id: ObjectId("66044b77097cebd7e0c299a8"), item: 36 },
  { _id: ObjectId("66044b77097cebd7e0c299a9"), item: 37 },
  { _id: ObjectId("66044b77097cebd7e0c299aa"), item: 38 }, <-- 삭제되었으나 확인됨
  { _id: ObjectId("66044b77097cebd7e0c299ab"), item: 39 },
  { _id: ObjectId("66044b77097cebd7e0c299ac"), item: 40 }
]
Type "it" for more


위와 같이 간단한 테스트를 통해서 MongoDB WiredTiger는 기본적으로 MySQL에서의 Repeatable Read와 유사하게 동작하는 snapshot isolation 으로 동작하는 것을 확인할 수 있습니다.

이번에는 internalQueryExecYieldIterations 파라미터의 의해 Yield 가 동작하는 내용을 살펴보도록 하겠습니다.

internalQueryExecYieldIterations 파라미터 값이 1000 이기 때문에 편의상 mongosh 에서 displayBatchSize 를 1000 으로 변경 후 find 를 수행하도록 하겠습니다.

mongo> config.set("displayBatchSize", 1000)
Setting "displayBatchSize" has been changed


• Session 1

mongo> db.ts_tran_yield.find()
[
  { _id: ObjectId("66044b77097cebd7e0c29985"), item: 1 },
  { _id: ObjectId("66044b77097cebd7e0c29986"), item: 2 },
  { _id: ObjectId("66044b77097cebd7e0c29987"), item: 3 },
                  <...중략...>
  { _id: ObjectId("66044b7b097cebd7e0c29d6b"), item: 999 },
  { _id: ObjectId("66044b7b097cebd7e0c29d6c"), item: 1000 },
  { _id: ObjectId("66044b7b097cebd7e0c29d6d"), item: 1001 }
]
Type "it" for more


• Session 2

// 1개의 도큐먼트를 삭제
mongo> db.ts_tran_yield.find({'item':1995})
[ { _id: ObjectId("66044b7e097cebd7e0c2a14f"), item: 1995 } ]

mongo> db.ts_tran_yield.deleteOne({'item':1995})
{ acknowledged: true, deletedCount: 1 }

-- 확인
mongo> db.ts_tran_yield.find({'item':1995})
              <-- 데이터 없음
mongo>


• Session 1

// 계속해서 데이터를 확인
Type "it" for more
rs0 [primary] test> it
[
  { _id: ObjectId("66044b7b097cebd7e0c29d6e"), item: 1002 },
  { _id: ObjectId("66044b7b097cebd7e0c29d6f"), item: 1003 },
  { _id: ObjectId("66044b7b097cebd7e0c29d70"), item: 1004 },
  { _id: ObjectId("66044b7b097cebd7e0c29d71"), item: 1005 },
                  < ... 중략 ... >  
  { _id: ObjectId("66044b7e097cebd7e0c2a14d"), item: 1993 },
  { _id: ObjectId("66044b7e097cebd7e0c2a14e"), item: 1994 }, <!!---
  { _id: ObjectId("66044b7e097cebd7e0c2a150"), item: 1996 }, <!!---
  { _id: ObjectId("66044b7e097cebd7e0c2a151"), item: 1997 },
  { _id: ObjectId("66044b7e097cebd7e0c2a152"), item: 1998 },
  { _id: ObjectId("66044b7e097cebd7e0c2a153"), item: 1999 },
  { _id: ObjectId("66044b7e097cebd7e0c2a154"), item: 2000 },
  { _id: ObjectId("66044b7e097cebd7e0c2a155"), item: 2001 },
  { _id: ObjectId("66044b7e097cebd7e0c2a156"), item: 2002 }
]
Type "it" for more


위의 테스트 결과를 보면 이전과 다르게 Session 2 에서 삭제한 1995 번째 도큐먼트가 Session 1에서도 제외되고 출력되는 것을 확인할 수 있습니다.

이런 부분은 snapshot isolation 으로 데이터를 읽었지만 읽었을 당시의 데이터가 중간에 달라지는 데이터 일관성 측면에서 문제를 야기할 수도 있습니다.
             

internalQueryExecYieldPeriodMS

이번에는 internalQueryExecYieldPeriodMS 파라미터에 대해서 확인해보도록 하겠습니다.

파라미터가 더 쉽게 적용될 수 있도록 값을 1 로 낮추고 이전에 확인했던 internalQueryExecYieldIterations 파라미터에 의해서 Yield가 발생되지 않도록 값을 증가시키도록 하겠습니다.

mongo> db.adminCommand( { setParameter: 1, 'internalQueryExecYieldPeriodMS': 1 } )
mongo> db.adminCommand( { setParameter: 1, 'internalQueryExecYieldIterations': 99999999 } )


mongosh 에서 displayBatchSize 를 조금 더 늘려서 출력을 해보도록 하겠습니다.

mongo> config.set("displayBatchSize", 5000)
Setting "displayBatchSize" has been changed


• Session 1

mongo> db.ts_tran_yield.find()
[
  { _id: ObjectId("66044b77097cebd7e0c29985"), item: 1 },
  { _id: ObjectId("66044b77097cebd7e0c29986"), item: 2 },
  { _id: ObjectId("66044b77097cebd7e0c29987"), item: 3 },
                  < ... 중략 ... >
  { _id: ObjectId("66044b88097cebd7e0c2ad0d"), item: 5001 },
  { _id: ObjectId("66044b88097cebd7e0c2ad0e"), item: 5002 }
]
Type "it" for more


• Session 2

// 1개의 도큐먼트를 삭제
mongo> db.ts_tran_yield.find({'item':9998})
[ { _id: ObjectId("66044b96097cebd7e0c2c092"), item: 9998 } ]

mongo> db.ts_tran_yield.deleteOne({'item':9998})
{ acknowledged: true, deletedCount: 1 }

-- 확인
mongo> db.ts_tran_yield.find({'item':9998})
              <-- 데이터 없음
mongo>


• Session 1

// 계속해서 데이터를 확인
Type "it" for more
mongo> it
[
  { _id: ObjectId("66044b88097cebd7e0c2ad0f"), item: 5003 },
  { _id: ObjectId("66044b88097cebd7e0c2ad10"), item: 5004 },
  { _id: ObjectId("66044b88097cebd7e0c2ad11"), item: 5005 },
                < ... 중략 ... >
  { _id: ObjectId("66044b96097cebd7e0c2c090"), item: 9996 },
  { _id: ObjectId("66044b96097cebd7e0c2c091"), item: 9997 }, <!!---
  { _id: ObjectId("66044b96097cebd7e0c2c093"), item: 9999 }, <!!---
  { _id: ObjectId("66044b96097cebd7e0c2c094"), item: 10000 },
  { _id: ObjectId("66044b96097cebd7e0c2c095"), item: 10001 },
  { _id: ObjectId("66044b96097cebd7e0c2c096"), item: 10002 },
  { _id: ObjectId("66044b96097cebd7e0c2c097"), item: 10003 }
]
Type "it" for more

 

이번에도 Yield에 의해서 잠시 멈춤 후 읽게 되었을 때 다시 읽게 되면서 다른 세션에서 삭제한 도큐먼트에 대한 내용이 동일하게 반영되어 데이터가 삭제된 결과로 출력되는 것을 확인할 수 있습니다.

이와 같이 특정 조건이나 규칙에 따라서 잠금 Yield가 수행됨에 따라 쿼리의 수행 결과 일관성에 문제가 발생될 수도 있습니다.
이런 부분이 문제가 된다면 파라미터 값을 늘려서 Yield 가 덜 적극적으로 수행되도록 할 수 있습니다.
                 

transactionLifetimeLimitSeconds

스냅샷이 유지되기 위한 내용으로 "트랜잭션이 종료(commit 또는 rollback 또는 abort)되기 전까지" 가 있습니다.
당연히 데이터를 읽을 때 트랜잭션이 시작 및 스냅샷이 생성이 되고 데이터 출력이 모두 완료되면 트랜잭션 종료됩니다.

트랜잭션이 종료되는 되는 경우는 일반적인 종료인 commit 또는 rollback도 있지만 abort에 의한 종료도 있을 수 있습니다.

이와 관련된 파라미터로는 transactionLifetimeLimitSeconds 이 있습니다.

transactionLifetimeLimitSeconds 는 트랜잭션 최대 실행 시간을 결정하는 파라미터로 기본값이 60초(1분) 입니다.

인스턴스 레벨에서 값 수정이 가능하며, 샤드 클러스터의 경우 모든 샤드 및 리플리카 셋 멤버별로 설정해야 합니다.

transactionLifetimeLimitSeconds 파라미터 설정 값을 초과하는 트랜잭션은 만료된 것으로 간주되며 주기적인 정리 프로세스(cleanup process)에 의해 중단(정리) 됩니다.

정리 프로세스(cleanup process) 동작 수행은 60초와 transactionLifetimeLimitSeconds/2 중에서 낮은 주기로 실행됩니다.

MongoDB 4.0 버전 부터는 사용자가 명시적으로 트랜잭션을 제어할 수 있으며 트랜잭션을 제어하는데 있어서 2가지 API를 제공합니다. 이 부분은 트랜잭션에서 더 자세하게 다루도록 하겠습니다.

테스트시에는 startTransaction 메서드를 사용하여 명시적으로 트랜잭션 제어하도록 하겠으며 테스트를 위해 컬렉션을 생성 후 진행하도록 하겠습니다.

// 테스트를 위한 5건 컬렉션을 생성
mongo> for(var a=1; a<=5; a++) db.ts_tran_snapshot.insertOne({ item: a})

mongo> db.ts_tran_snapshot.find()
[
  { _id: ObjectId("6604587af486eb74f538b801"), item: 1 },
  { _id: ObjectId("6604587af486eb74f538b802"), item: 2 },
  { _id: ObjectId("6604587af486eb74f538b803"), item: 3 },
  { _id: ObjectId("6604587af486eb74f538b804"), item: 4 },
  { _id: ObjectId("6604587af486eb74f538b805"), item: 5 }
]


Session 1에서 먼저 트랜잭션 시작 후 조회를 한 후에, Session 2에서 도큐먼트를 1건 삭제하고 다시 Session 1 에서 조회를 하는 순서로 진행하겠습니다.

• Session 1

// startTransaction을 통해서 트랜잭션을 시작하고 컬렉션을 조회

mongo> session1 = db.getMongo().startSession()
mongo> session1.startTransaction()
mongo> session1.getDatabase(db.getName()).ts_tran_snapshot.find()
[
  { _id: ObjectId("6604587af486eb74f538b801"), item: 1 },
  { _id: ObjectId("6604587af486eb74f538b802"), item: 2 },
  { _id: ObjectId("6604587af486eb74f538b803"), item: 3 },
  { _id: ObjectId("6604587af486eb74f538b804"), item: 4 },
  { _id: ObjectId("6604587af486eb74f538b805"), item: 5 }
]

// 한번더 조회
mongo> session1.getDatabase(db.getName()).ts_tran_snapshot.find()
[
  { _id: ObjectId("6604587af486eb74f538b801"), item: 1 },
  { _id: ObjectId("6604587af486eb74f538b802"), item: 2 },
  { _id: ObjectId("6604587af486eb74f538b803"), item: 3 },
  { _id: ObjectId("6604587af486eb74f538b804"), item: 4 },
  { _id: ObjectId("6604587af486eb74f538b805"), item: 5 }
]


• Session 2

// 세션2에서 도큐먼트 1건 삭제
mongo> db.ts_tran_snapshot.find({'item':3})
[ { _id: ObjectId("6604587af486eb74f538b803"), item: 3 } ]

mongo> db.ts_tran_snapshot.deleteOne({'item':3})
{ acknowledged: true, deletedCount: 1 }

-- 확인
mongo> db.ts_tran_snapshot.find({'item':3})
              <-- 데이터 없음
mongo>


• Session 1

// Session 1 에서 다시 조회
mongo> session1.getDatabase(db.getName()).ts_tran_snapshot.find()
[
  { _id: ObjectId("6604587af486eb74f538b801"), item: 1 },
  { _id: ObjectId("6604587af486eb74f538b802"), item: 2 },
  { _id: ObjectId("6604587af486eb74f538b803"), item: 3 },
  { _id: ObjectId("6604587af486eb74f538b804"), item: 4 },
  { _id: ObjectId("6604587af486eb74f538b805"), item: 5 }
]

------- 잠시 대기 후 다시 조회 실행 -------

mongo>  session1.getDatabase(db.getName()).ts_tran_snapshot.find()
MongoServerError: Transaction with { txnNumber: 1 } has been aborted.
  <!!--


위의 테스트와 같이 명시적으로 트랜잭션을 활성화하였기 때문에 5건 모두 출력이 되었지만 트랜잭션내 이기 때문에 다른 세션에서 도큐먼트를 삭제했음에도 처음에 읽었던 동일한 값을 출력하고 있습니다.

다만 transactionLifetimeLimitSeconds 파라미터가 기본값 60초(1분) 으로 설정되어 있기 때문에 테스트 내용과 같이 시간초과에 의해서 "트랜잭션 abort" 이 발생됨을 확인할 수 있습니다.

다시 조회를 하면 새로운 스냅샷을 생성하여 읽게 됨으로 다른 세션에서 삭제한 도큐먼트를 제외하고 출력이 되게 됩니다.


• Session 1

mongo> session1.getDatabase(db.getName()).ts_tran_snapshot.find()
MongoServerError: Transaction with { txnNumber: 1 } has been aborted.

mongo> db.ts_tran_snapshot.find()
[
  { _id: ObjectId("6604587af486eb74f538b801"), item: 1 },
  { _id: ObjectId("6604587af486eb74f538b802"), item: 2 },
  { _id: ObjectId("6604587af486eb74f538b804"), item: 4 }, <!!--
  { _id: ObjectId("6604587af486eb74f538b805"), item: 5 }  <!!--
]


앞에서 쿼리 수행이 일정 시간이나 읽은 도큐먼트 건수가 되면 잠금 Yield가 되면서 읽었던 스냅샷이 초기화가 되고 다시 읽게 되는 내용을 확인해보았습니다.


정렬을 포함할 경우

조회시(find)에 정렬을 포함하게 되면 조금은 다른 결과를 보여주게 되고 다음과 같이 확인해보도록 하겠습니다.

테스트를 위해 다음과 같이 1만건 도큐먼트를 가진 컬렉션을 생성하도록 하겠습니다.

internalQueryExecYieldIterations 와 internalQueryExecYieldPeriodMS 둘다 기본값(1000 와 10) 인 상태입니다.

mongo> use test
mongo> for(var a=1; a<=10000; a++) db.ts_snapshot_order.insertOne({ item: a})
mongo> db.ts_snapshot_order.countDocuments()
10000

mongo> db.adminCommand( { getParameter: 1,'internalQueryExecYieldIterations':1})
{
  internalQueryExecYieldIterations: 1000,
  ok: 1,
  < ... 중략 ... >
}

mongo> db.adminCommand( { getParameter: 1,'internalQueryExecYieldPeriodMS':1})
{
  internalQueryExecYieldPeriodMS: 10,
  ok: 1,
  < ... 중략 ... >
}


mongosh 에서 displayBatchSize 를 1000 으로 설정하여 조회하도록 하겠습니다.

mongo> config.set("displayBatchSize", 1000)
Setting "displayBatchSize" has been changed


조회시에 다음과 같이 정렬(sort)를 포함해서 수행하도록 하겠습니다.

• Session 1

db.ts_snapshot_order.find().sort({item:1})
[
  { _id: ObjectId("66057f32eb0c583f3816379e"), item: 1 },
  { _id: ObjectId("66057f32eb0c583f3816379f"), item: 2 },
  { _id: ObjectId("66057f32eb0c583f381637a0"), item: 3 },
  { _id: ObjectId("66057f32eb0c583f381637a1"), item: 4 },
                < ... 중략 ... >
  { _id: ObjectId("66057f36eb0c583f38163b81"), item: 996 },
  { _id: ObjectId("66057f36eb0c583f38163b82"), item: 997 },
  { _id: ObjectId("66057f36eb0c583f38163b83"), item: 998 },
  { _id: ObjectId("66057f36eb0c583f38163b84"), item: 999 },
  { _id: ObjectId("66057f36eb0c583f38163b85"), item: 1000 }
]
Type "it" for more


이제 다른 세션에서 도큐먼트를 삭제해보도록 하겠습니다.

• Session 2

// 세션2에서 도큐먼트 1건 삭제
mongo> db.ts_snapshot_order.find({'item':1995})
[ { _id: ObjectId("66057f39eb0c583f38163f68"), item: 1995 } ]

mongo> db.ts_snapshot_order.deleteOne({'item':1995})
{ acknowledged: true, deletedCount: 1 }

-- 확인
mongo> db.ts_snapshot_order.find({'item':1995})
              <-- 데이터 없음
mongo>


• Session 1

// 이어서 조회를 진행합니다.
Type "it" for more
rs0 [primary] test> it
[
  { _id: ObjectId("66057f36eb0c583f38163b86"), item: 1001 },
  { _id: ObjectId("66057f36eb0c583f38163b87"), item: 1002 },
  { _id: ObjectId("66057f36eb0c583f38163b88"), item: 1003 },
  { _id: ObjectId("66057f36eb0c583f38163b89"), item: 1004 },
                  < ... 중략 ... >
  { _id: ObjectId("66057f39eb0c583f38163f62"), item: 1989 },
  { _id: ObjectId("66057f39eb0c583f38163f63"), item: 1990 },
  { _id: ObjectId("66057f39eb0c583f38163f64"), item: 1991 },
  { _id: ObjectId("66057f39eb0c583f38163f65"), item: 1992 },
  { _id: ObjectId("66057f39eb0c583f38163f66"), item: 1993 },
  { _id: ObjectId("66057f39eb0c583f38163f67"), item: 1994 },
  { _id: ObjectId("66057f39eb0c583f38163f68"), item: 1995 }, <!!---
  { _id: ObjectId("66057f39eb0c583f38163f69"), item: 1996 },
  { _id: ObjectId("66057f39eb0c583f38163f6a"), item: 1997 },
  { _id: ObjectId("66057f39eb0c583f38163f6b"), item: 1998 },
  { _id: ObjectId("66057f39eb0c583f38163f6c"), item: 1999 },
  { _id: ObjectId("66057f3aeb0c583f38163f6d"), item: 2000 }
]
Type "it" for more

 

위의 테스트에서는 이전과 다르게 다른 세션에서 삭제한 도큐먼트가 여전히 출력되는 것을 확인할 수 있습니다.
같은 격리수준을 사용하고 동일한 조건의 Yield가 설정 되어있지만 조회하는 방식(처리방식)에 따라서 쿼리 결과가 달라짐을 확인할 수 있습니다.

이와 같이 정렬(sort)를 통해서 조회를 하게 될 경우 보통의 RDBMS도 MongoDB도 메모리(정렬 버퍼, Sort Buffer)에서 적재 및 정렬을 하고 그 결과를 클라이언트에게 출력을 하기 때문에 다른 세션에서 도큐먼트를 삭제나 변경을 하였더라도 해당 내용은 정렬 버퍼에 임시로 로드 되어있는 결과에는 반영되지 않음으로 일관된 결과를 보여주게 되는 것입니다.

MongoDB에서 WiredTiger 스토리지 엔진을 사용할 경우 격리수준(isolation level)에 대해서 몇 가지 테스트를 통해서 repeatable read와 유사하게 동작 및 처리되는 것을 확인할 수 있었습니다.
                 

MongoDB Transaction(트랜잭션)

ACID의 정의

트랜잭션이 '진정한' 트랜잭션이 되려면 ACID라는 속성을 충족해야 합니다.

ACID는 원자성(atomicity), 일관성(Consistency), 고립성(Isolation), 영속성(Durability)의 약자입니다.

ACID은 트랜잭션이 정전 등 오류가 발생할 때도 데이터와 데이터베이스 상태의 유효성을 보장합니다.

  • 원자성: 트랜잭션 내 모든 작업이 적용되거나 아무 작업도 적용되지 않도록(All or Nothing) 합니다. 즉, 트랜잭션 부분적 적용이 불가 합니다.
  • 일관성: 트랜잭션이 성공하면 데이터베이스가 하나의 일관성이 있는 상태에서 다음 일관성 있는 상태로 이동하도록 합니다.
  • 고립성: 여러 트랜잭션이 데이터베이스에서 동시에 실행되도록 허용하는 속성입니다. 트랜잭션이 다른 트랜잭션의 부분 결과를 보지 않도록 보장합니다.
  • 영속성: 트랜잭션이 커밋 될 때 시스템 오류가 발생되더라도 모든 데이터가 유지되도록 합니다.

데이터베이스는 이러한 속성이 모두 충족하고 성공적일 경우에만 트랜잭션이 ACID를 준수한다고 할 수 있습니다.
                     

MongoDB 트랜잭션과 문장의 트랜잭션 처리

MongoDB 서버는 처음 시작에는 단일 도큐먼트(Single Document)의 트랜잭션만 지원하였으며, 이 말은 단일 도큐먼트의 변경에 대해서는 원자 단위의 처리가 보장된다는 것을 의미합니다.

이는 DB 서버에서 가장 기본적인 요건이며 도큐먼트 단위의 원자성(atomicity)을 보장할 수 없다면 데이터 입력, 수정(갱신), 삭제 등에 대해서 큰 문제를 야기할 수 있기 때문에 기본적이면서도 매우 중요한 요건입니다.

MongoDB에서는 데이터를 변경할 때 작업들은 여러 단위로 잘게 쪼개져서 수행됩니다. 예를 들어서 도큐먼트 1건을 입력(insertOne) 할 경우 다음과 같이 여러 개로 작업이 수행됩니다.

  1. 컬렉션에 도튜먼트 insert
  2. 컬렉션의 인덱스에 insert
  3. Oplog 컬렉션에 insert

 

컬렉션에 한 번에 여러 도큐먼트를 처리하는 경우도 있으며, MongoDB 서버 내부적으로 모든 처리가 도큐먼트 단위의 트랜잭션으로만 처리되는 것이 아니기 때문에 데이터와 읽기와 변경이 서로 다른 격리 수준 효과를 보여주게 됩니다.

WiredTiger 스토리지 엔진이 지원하는 격리수준과 무관하게 MongoDB 서버에서는 모든 데이터의 변경을 단일 도큐먼트 수준으로 트랜잭션을 제어합니다.

예를 들어, 다음과 같이 하나의 업데이트 문장으로 여러 도큐먼트를 변경하는 경우를 살펴보겠습니다.

mongo> db.test_tt.insertMany( [
  {_id:1, col:"A"},
  {_id:2, col:"B"},
  {_id:3, col:"C"},
])


하나의 문장으로 처리를 요청이지만 처리는 위의 문장들은 각각 개별의 작은 트랜젹션으로 다시 쪼개져서 실행이 됩니다.

[트랜잭션 시작]
입력: {_id:1, col:"A"},
[COMMIT]

[트랜잭션 시작]
입력: {_id:2, col:"B"},
[COMMIT]

[트랜잭션 시작]
입력: {_id:3, col:"C"},
[COMMIT]


UPDATE 뿐만 아니라 MongoDB의 모든 데이터 변경은 하나의 도큐먼트를 변경할 때마다 내부적으로 트랜잭션이 커밋이 되며 그래서 배치로 실행되는 업데이트나 삭제 쿼리(Batch Write)라 하더라도 MongoDB 내부적으로는 개별 트랜잭션으로 각 도큐먼트가 처리되고, 혹시 처리하는 도중에 에러가 발생하여 처리를 중단하는 경우에도 에러 직전까지 처리되었던 변경이 롤백 되지 못하는 것입니다.

먼저 다음과 같이 collection 및 유니크 인덱스 그리고 먼저 1건 도큐먼트를 입력하겠습니다.

mongo> db.createCollection('test_tt')
mongo> db.test_tt.createIndex({"col": 1}, {name: "idx_col", unique: true})
mongo> db.test_tt.insertOne({_id:3, col:"C"})


그 다음에는 다음과 같이 3건을 입력해보도록 하겠습니다.

mongo> db.test_tt.insertMany( [
  {_id:1, col:"A"},
  {_id:2, col:"B"},
  {_id:3, col:"C"},
])

[에러 발생]
Result: BulkWriteResult {
  insertedCount: 2,
  matchedCount: 0,
  modifiedCount: 0,
  deletedCount: 0,
  upsertedCount: 0,
  upsertedIds: {},
  insertedIds: { '0': 1, '1': 2, '2': 3 }
}
Write Errors: [
  WriteError {
    err: {
      index: 2,
      code: 11000,
      errmsg: 'E11000 duplicate key error collection: test.test_tt index: _id_ dup key: { _id: 3 }',
      errInfo: undefined,
      op: { _id: 3, col: 'C' }
    }
  }
]


위의 결과에서 볼 수 있듯이 duplicate key error 가 발생하였습니다. 데이터가 입력이 되었는지 다시 조회해보도록 하겠습니다.

mongo> db.test_tt.find()
[ { _id: 3, col: 'C' }, 
{ _id: 1, col: 'A' }, 
{ _id: 2, col: 'B' } ]


에러가 발생되었지만 key 중복이 되지 않은 _id:1 , _id:2 는 입력이 된 것을 확인할 수 있습니다.

이와 같이 1개의 문장으로 여러 도큐먼트를 처리할 때 한 단위(한개의 트랜잭션 단이)로 처리될 것으로 생각할 수 있지만, MongoDB에서는 도큐먼트 단위로 처리를 하게 됩니다.
        

다중 도큐먼트 트랜잭션 (Multi Document Transaction)

MongoDB는 4.0 버전부터 다중 도큐먼트 트랜잭션 (Multi Document Transaction)을 지원하기 시작하였습니다.

4.0 이하에는 지원하지 않았으며 4.0버전과 4.2 버전에서 다음과 같은 내용으로 기능이 추가되었습니다.

  • 버전 4.0: replica sets(Master/Slave 그룹)에서 multi-document transactions을 지원합니다.
  • 버전 4.2: MongoDB가 distributed(분산) transactions을 도입하여 sharded clusters에서 multi-document transactions에 대한 지원을 추가하고 sharded clusters에서 multi-document transactions에 대한 기존 지원을 통합하여 지원합니다.


MongoDB 4.2 이전에서는 컬렉션이나 인덱스를 만들거나 삭제하는 등 데이터베이스 카탈로그에 영향을 미치는 작업이 트랜잭션에서 허용되지 않습니다.

MongoDB 4.4부터는 암묵적 또는 명시적으로 트랜잭션에 컬렉션을 만들 수 있습니다.


• 현재 collection 현황

mongo> show collections
ts_snapshot_order
mongo> 


• Session 1

mongo> session1 = db.getMongo().startSession()
{ id: UUID("2999c200-0698-4cb9-bb5a-4f012095698f") }
mongo> session1.startTransaction()
mongo> session1.getDatabase(db.getName()).ts_cre_collection.insertOne({col:'A'})
mongo> session1.commitTransaction()


세션1에서 트랜잭션내에서 빈 collection에서 도큐먼트를 입력하였고 그 다음 commit 하였습니다. 컬렉션 생성 및 데이터 입력되었는지 확인해보도록 하겠습니다.

• Session 2

mongo> db.ts_cre_collection.find()
[ { _id: ObjectId("6606a854c746531b13d40014"), col: 'A' } ]

                

MongoDB에서 트랜잭션 사용법

MongoDB는 트랜잭션을 사용하기 위한 두가지 API를 제공하고 있습니다.

첫번째는 코어 API(Core API) 라는 관계형 데이터베이스에서 사용하는 유사한 구문 형태의 "start_transaction", "commit_transaction" 과 같은 메서드를 사용하는 형태입니다.

두번째는 트랜잭션 사용에 권장되는 접근 방식인 콜백 API(Callback API) 입니다.

코어 API는 대부분의 오류에 재시도 로직(retry logic)을 제공하지 않으며 개발자(작업자)가 작업에 대한 로직, 트랜잭션 커밋 메서드 호출, 필요한 재시도 및 오류 로직을 모두 작성해야 합니다.

콜백 API는 지정된 논리 세션과 관련된 트랜잭션 시작, 콜백 함수로 제공되는 함수 실행, 트랜잭션 커밋(또는 오류 시 중단)을 포함해 코어 API에 비해 많은 기능이 래핑(wrapping) 되어있는 단일 메서드를 제공합니다.

이 메서드는 커밋 오류를 다시 처리하는 재시도 로직도 포함 되어있습니다. 콜백 API는 MongoDB 4.2 버전부터 추가되어서 트랜잭션을 사용한 애플리케이션 개발을 단순화시킵니다.


두 API 모두 개발자(작업자)는 트랜잭션이 사용할 논리 세션을 시작(생성)해야 하며, 트랜잭션의 작업이 특정 논리 세션과 연결되어 사용되어야 합니다. 즉, 그 논리 세션에 각 작업 내역을 전달합니다.

애플리케이션에서 업무 로직이 복잡하고 MongoDB의 트랜잭션 처리가 필요할 경우 가급적 코어 API 보다 콜백 API 사용이 권장됩니다.



코어 API와 콜백 API 비교

코어 API

  • 트랜잭션을 시작하고 커밋하려면 명시적으로 호출이 필요
  • TransientTransactionError 및 UnknownTransactionCommitResult에 대한 오류 처리 로직을 통합하지 않고, 대신에 사용자 지정 오류 처리를 통합하는 유연성을 제공
  • 특정 트랜잭션을 위해 API로 전달되는 명시적 논리적 세션이 필요


콜백 API

  • 트랜잭션을 시작하고 지정된 작업을 실행한 후 커밋(또는 오류 시 중단)
  • TransientTransactionError 및 UnknownTransactionCommitResult에 대한 오류 처리 로직을 자동으로 통합
  • 특정 트랜잭션을 위해 API로 전달되는 명시적 논리적 세션이 필요


두 트랜잭션 API의 Mongo에서의 사용법 및 차이점을 살펴보도록 하겠습니다.
            

코어 API

테스트를 위해 위에서 생성한 test_tt 컬렉션을 사용하도록 하겠습니다.

// 컬렉션 데이터 확인
mongo> db.test_tt.find()
[ { _id: 3, col: 'C' }, 
{ _id: 1, col: 'A' }, 
{ _id: 2, col: 'B' } ]


코어 API는 startTransaction(), commitTransaction(), commitTransaction() 메서드를 사용하며, 명시적으로 트랜잭션을 rollback 시키기 위해서 abortTransaction() 사용할 수도 있습니다.

Session 1에서는 test_tt 컬렉션의 _id:3에 대해서 명시적으로 트랜잭션을 시작한 후에 값을 업데이트 하도록 하겠습니다.

• Session 1

mongo> session1 = db.getMongo().startSession()
mongo> session1.startTransaction()

mongo> session1.getDatabase(db.getName()).test_tt.find()
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]

// 도큐먼트 업데이트
mongo> session1.getDatabase(db.getName()).test_tt.updateOne( { _id:3}, {$set: {col: 'D'}} )

// 컬렉션 다시 확인
mongo> session1.getDatabase(db.getName()).test_tt.find()
[ { _id: 3, col: 'D' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]

아직 commit이 되지 않은 상태입니다.

이제 다른 Session 2에서 동일한 _id: 3 에 대해서 업데이트를 수행하도록 하겠습니다. 먼저 데이터를 조회해보도록 하겠습니다.

• Session 2

mongo> session2 = db.getMongo().startSession()
mongo> session2.startTransaction()
mongo> session2.getDatabase(db.getName()).test_tt.find()
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]

아직 Session 1에서 commit 이 되지 않았기 때문에 _id: 3의 값은 C 입니다.

이제 세션 2에서 update 를 수행하도록 하겠습니다.


• Session 2

mongo> session2.getDatabase(db.getName()).test_tt.updateOne( { _id:3}, {$set: {col: 'E'}} )

MongoServerError: 
Caused by :: Write conflict during plan execution and yielding is disabled. 
          :: Please retry your operation or multi-document transaction.

* 가로 길이에 따라서 개행되어있습니다.

Write conflict 이 발생되면서 중단이 되었습니다.

참고로 위의 테스트를 진행시에 60초안에 진행을 해야 하며 60초이상이 지나면 transactionLifetimeLimitSeconds 파라미터의 기본값 60초에 의해서 트랜잭션이 abort 되게 됩니다.
           

콜백 API

이번에는 콜백 API를 통해서 동일하게 진행하도록 하겠습니다. Session1은 트랜잭션을 명시적으로 계속 활성화해두기 위해서 코어 API를 사용하고, Session 2에서는 콜백 API를 통해서 진행하도록 하겠습니다.


• Session 1

session1 = db.getMongo().startSession()
session1.startTransaction()
session1.getDatabase(db.getName()).test_tt.find()
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]

session1.getDatabase(db.getName()).test_tt.updateOne( { _id:3}, {$set: {col: 'D'}} )

session1.getDatabase(db.getName()).test_tt.find()
[ { _id: 3, col: 'D' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]

아직 commit 하지 않은 상태입니다.

이번에는 콜백 API를 사용해서 update를 수행하도록 하겠습니다.

• Session 2

// 카운트 변수 선언 및 초기화
var count = 0;

session2 = db.getMongo().startSession( )

session2.withTransaction( async() => {
const sessionCollection = session2.getDatabase(db.getName()).getCollection('test_tt')

// Document 업데이트 전 데이터 조회
print(sessionCollection.find());
count += 1;
print("writeConflicts가 발생하여 시도횟수 : " + count)

// Document Update
sessionCollection.updateOne({ _id:3}, {$set: {col: 'E'}} )
// 다시 조회
print(sessionCollection.find());
} )

[출력결과]
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 1
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 2
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 3
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 4

        ....... 반복 수행됨 .........

출력결과 내용 처럼 계속 Retry Logic 이 수행됨을 확인할 수 있습니다.

이제 Session 1에서 commit을 하도록 하겠습니다.

• Session 1

session1.commitTransaction()


• Session 2

출력결과]
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 1
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 2
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 3
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 4
        < ... 중략 ... >
writeConflicts가 발생하여 시도횟수 : 1100
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 1101
[ { _id: 3, col: 'C' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 1102
[ { _id: 3, col: 'D' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ]
writeConflicts가 발생하여 시도횟수 : 1103
[ { _id: 3, col: 'E' }, { _id: 1, col: 'A' }, { _id: 2, col: 'B' } ] <!!--


테스트 내용과 같이 콜백API를 통해서 트랜잭션 사용하게 되면 writeConflicts 상황에서 재시도 로직(retry logic)으로 처리되게 됩니다.

mongosh 에서 db.컬렉션명.updateOne 과 같이 데이터 변경을 할때도 writeConflicts가 발생되며 재시도 로직(retry logic)으로 진행됩니다.(바로 실패하지 않음)


개발 언어에서도 동일한 API를 사용해서 트랜잭션을 사용할 수 있으며, 파이썬에서 다음과 같이 사용합니다.

• 파이썬에서 코어 API 예시

#!/usr/bin/env python3
import pymongo

var_host=...
var_port=...
var_username=...
var_password=...

client = pymongo.MongoClient(
        host=var_host,
        port=int(var_port),
        tls=True,
        tlsAllowInvalidHostnames=True,
        tlsAllowInvalidCertificates=True,
        directConnection=True,
        authSource="admin",
        username=var_username,
        password=var_password,
)

my_wc_majority = WriteConcern('majority', wtimeout=1000)

# Prerequisite / Step 0: 데이터 입력

client.get_database( "webshop",
                     write_concern=my_wc_majority).orders.insert_one({"sku":
                     "abc123", "qty":0})
client.get_database( "webshop",
                     write_concern=my_wc_majority).inventory.insert_one(
                     {"sku": "abc123", "qty": 1000})

# Step 1: 트랜잭션 내의 작업 및 시퀀스를 정의합니다
def update_orders_and_inventory(my_session):
    orders = session.client.webshop.orders
    inventory = session.client.webshop.inventory


    with session.start_transaction(
            read_concern=ReadConcern("snapshot"),
            write_concern=WriteConcern(w="majority"),
            read_preference=ReadPreference.PRIMARY):

        orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
        inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
                             {"$inc": {"qty": -100}}, session=my_session)
        commit_with_retry(my_session)

# Step 2: 재시도 로직으로 트랜잭션 실행 및 커밋 시도
def commit_with_retry(session):
    while True:
        try:
            # 커밋은 트랜잭션 시작 시 설정된 쓰기 우려 사항을 사용합니다.
            session.commit_transaction()
            print("Transaction committed.")
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # Can retry commit
            if exc.has_error_label("UnknownTransactionCommitResult"):
                print("UnknownTransactionCommitResult, retrying "
                      "commit operation ...")
                continue
            else:
                print("Error during commit ...")
                raise

# Step 3: 트랜잭션 함수 txn_func를 실행하기 위해 재시도 로직으로 시도합니다
def run_transaction_with_retry(txn_func, session):
    while True:
        try:
            txn_func(session)  # 트랜잭션을 수행합니다.
            break
        except (ConnectionFailure, OperationFailure) as exc:
            # 일시적인 오류일 경우 전체 트랜잭션 재시도
            if exc.has_error_label("TransientTransactionError"):
                print("TransientTransactionError, retrying transaction ...")
                continue
            else:
                raise

# Step 4: Session 시작
with client.start_session() as my_session:

# Step 5: 함수 'run_transaction_with_retry'를 호출하여 'update_orders_and_inventory'를 호출하고 세션 'my_session'을 호출하여 이 트랜잭션과 연결합니다.

    try:
        run_transaction_with_retry(update_orders_and_inventory, my_session)
    except Exception as exc:
        # 오류가 발생하였습니다. The error handling code is not
        # Core API에서는 오류 처리 코드가 구현되지 않습니다.
        raise

위의 콜백 API를 통해서 사용한 코드에서는 개발 로직에서 재시도 로직이나 기타 에러에 따른 예외처리를 직접 구현한 것을 확인할 수 있습니다.


• 파이썬에서 콜백 API 예시

#!/usr/bin/env python3
import pymongo

var_host=...
var_port=...
var_username=...
var_password=...

client = pymongo.MongoClient(
        host=var_host,
        port=int(var_port),
        tls=True,
        tlsAllowInvalidHostnames=True,
        tlsAllowInvalidCertificates=True,
        directConnection=True,
        authSource="admin",
        username=var_username,
        password=var_password,
)

my_wc_majority = WriteConcern('majority', wtimeout=1000)

# Prerequisite / Step 0: 데이터 입력

client.get_database( "webshop",
                     write_concern=my_wc_majority).orders.insert_one({"sku":
                     "abc123", "qty":0})
client.get_database( "webshop",
                     write_concern=my_wc_majority).inventory.insert_one(
                     {"sku": "abc123", "qty": 1000})


# Step 1: 트랜잭션 내에서 수행할 작업 시퀀스를 지정하는 콜백을 정의합니다.

def callback(my_session):
    orders = my_session.client.webshop.orders
    inventory = my_session.client.webshop.inventory

    # Important:: 세션 변수 'my_session'을 작업에 전달해야 합니다.

    orders.insert_one({"sku": "abc123", "qty": 100}, session=my_session)
    inventory.update_one({"sku": "abc123", "qty": {"$gte": 100}},
                         {"$inc": {"qty": -100}}, session=my_session)


#. Step 2: 클라이언트 세션 시작.

with client.start_session() as session:


# Step 3: with_transaction을 사용하여 트랜잭션을 시작하고 콜백을 실행하며 커밋(또는 오류 시 중단)

    session.with_transaction(callback,
                             read_concern=ReadConcern('local'),
                             write_concern=my_write_concern_majority,
                             read_preference=ReadPreference.PRIMARY)

이전 코어API를 사용했을때에 비해 콜백 API는 재시도 로직이 수행되기 때문에 코드가 확실히 코드가 짧고 간결해진 부분을 확인할 수 있습니다. 


Oplog 크기 제한             

MongoDB는 트랜잭션의 쓰기 작업에 필요한 만큼 oplog 항목을 생성하게 됩니다. 그러나 각 oplog는 BSON 도큐먼트 크기 제한인 16MB이어야 합니다.

 

이번 글에서는 MongoDB에서의 Isolation 과 MVCC 그리고 트랜잭션에 대해서 살펴보았습니다. 여기서 글을 마무리하도록 하겠습니다.

연관된 이어지는 글

                             

Reference

Reference URL
wiredtiger.com/explain_isolation
WiredTiger: WiredTiger Architecture Guide
mongodb.com/core/wiredtiger
mongodb.com/Session.startTransaction
mongodb.com/Session.commitTransaction
• mongodb.com/Session.withTransaction

Reference Book
몽고DB 완벽 가이드 3판(번역서)
Real MongoDB

연관된 다음 글

 

 

 

                

0
글에 대한 당신의 생각을 기다립니다. 댓글 의견 주세요!x