카프카(Kafka) (4) - 내부 동작 원리와 구현 2

Share

Last Updated on 2월 19, 2023 by Jade(정현호)

안녕하세요. 
이번 포스팅은 카프카의 내부 동작 원리와 구현에 대한 글로 실전 카프카 개발부터 운영까지 책을 정리한 글 입니다. 

카프카 연재글로 아래 포스팅에서 이어지는 글 입니다. 

컨트롤러

컨트롤러는 카프카에서 리더 선출 하는 역할을 맡고 있습니다.

카프카 클러스터 중 하나의 브로커가 컨트롤러 역할을 하게 되며, 파티션의 ISR 리스트 중에서 리더를 선출 합니다.

ISR 에 대한 정보는 이전 포스팅에서 확인 하시면 됩니다.


리더를 선출하기 위해 ISR 리스트 정보는 안전한 저장소에 보관되어 있어야 하는데, 가용성 보장을 위해서 주키퍼에 저장되어 있습니다.

컨트롤러는 브로커가 실패하는 것을 예의 주시하고 있으며, 만약 브로커의 실패가 감지가 되면 즉시 ISR 리스트 중 하나를 새로운 파티션 리더로 선출 하며, 그리고 새로운 리더 정보를 주키퍼에 기록하고, 변경된 정보를 모든 브로커에게 전달 합니다.
         

예기치 않은 장애로 인한 리더 선출

리더 브로커가 비정상 종료 시 새로운 파티션의 리더가 선출되는 과정에 대해서 확인 해 보도록 하겠습니다.

[그림 1]


위의 그림 1 은 예기치 않은 장애로 인한 리더 선출 과정을 이미지화 한 것 으로 각 단계별 내용은 아래와 같습니다.

1) 파티션 0 의 리더인 1번 브로커가 예기치 않게 다운이 됩니다.
2) 주키퍼는 1번 브로커와의 연결이 끊어진 후, 0번 파티션의 ISR에서 변화가 생겼음을 감지합니다.
3) 컨트롤러는 주키퍼 워치를 통해 0번 파티션에 변화가 생긴것을 감지하고 해당 파티션의 ISR 중에서 새로운 리더를 선출 합니다.
4) 컨트롤러는 0번 파티션의 새로운 리더가 어떤 번호의 브로커가 되었다는 정보를 주키퍼에 기록 합니다.
5) 이렇게 갱신된 정보는 현재 활성화 상태인 모든 브로커에게 전파 됩니다.


리더 선출 과정은 컨트롤러에 의해 이루어집니다. 파티션이 하나인 경우 컨트롤러가 새로운 리더를 선출하고 리더 정보를 주키퍼에 기록며 다른 브로커에게 업데이트 정보를 전파하는 데는 그리고 오랜 시간이 걸리지 않습니다.

예를 들어 하나의 파티션에서 리더 선출 과정에서 0.2초만에 완료가 될 경우 토픽에 1개 파티션만 있다면 0.2초만에 리더선출 과정이 완료 될 것 입니다.
하지만 1만개의 파티션에 대해서 리더 선출이 이루어져야 한다면 전체 작업 소요시간은 약 2,000초가 걸리게 되며 분으로 환산하면 30춘이 조금 넘는 시간 입니다.

1대의 브로커의 장애에 의해 리더 선출 과정에서 30여 분간 통신이 끊어지는 상황이 발생된다면, 카프카 사용에 매우 어려운 상황을 직면 하게 될 것 입니다.

이런 상황을 개선하고자 2018년 11월 릴리즈된 카프카 버전 1.1.0 버전 부터는 리더 선출 작업 속도가 빨라지게 개선 되었습니다.


카프카 1.0.0 버전에서 6분 30초가 소요 되었던 리더 선출 과정에서 불필요한 로깅을 없애고 주키퍼 비동기 API가 반영된 카프카 1.1.0 버전에서는 약 3초만에 완료 되었습니다.
          

제어된 종료에 의한 리더 선출

이전 단계 내용은 예기치 않은 장애로 인한 리더 선출 과정이었으며, 이번에는 제어된 종료 과정에서 리더 선출 작업에 대해서 확인 해보도록 하겠습니다.

제어된 브로커 종료는 관리자에 의해 이루어진 자연스러운(graceful) 종료 또는 안전한 종료를 의미 합니다.

[그림 2]


위의 그림 2 은 제어된 종료 과정을 이미지화 한 것 으로 각 단계별 내용은 아래와 같습니다.

1) 관리자가 브로커 종료 명령어를 실행하고, SIG_TERM 신호가 브로커에게 전달 됩니다.
2) SIG_TERM 신호를 받은 브로커는 컨트롤러에게 알립니다.
3) 컨트롤러는 리더 선출 작업을 진행하고, 해당 정보를 주피커에 기록 합니다.
4) 컨트롤러는 새로운 리더 정보를 다른 브로커들에게 전송하게 됩니다.
5) 컨트롤러는 종료 요청을 보낸 브로커에게 정상 종료한다는 응답을 보내게 됩니다.
6) 응답을 받은 브로커는 캐시에 있는 내용을 디스크에 저장하고 종료 하게 됩니다.
     

종료 방식의 차이

제어된 종료 와 예기치 않은 종료(급작스러운 종료)와의 큰 차이는 다운타임 입니다.

제어된 종료를 사용하면 카프카 내부적으로 파티션들이 다운타임을 최소화 할 수 있습니다. 이유는 브로커가 종료 되기 전, 컨트롤러는 해당 브로커가 리더도 할당된 전체 파티션에 대해서 리더 재 선출 작업을 진행하기 때문 입니다.

물론 제어된 종료라도 리더 선출 작업 시간 동안 일시적인 다운타임은 발생할 수 있습니다. 다만 리더 선출 작업 대상 파티션들의 리더들이 활성화된 상태에서 컨트롤러는 순차적으로 하나의 파티션 마다 리더를 선출하게 되므로 결과적으로 각 파티션별 다운타임을 최소화 할 수 있습니다.


예기치 않은 종료(장애에 의한)에 의해 리더 선출 과정은 이미 대상 파티션들의 리더가 종료 된 상태가 되고, 파티션들의 다운타임은 새로운 리더 선출 작업이 완료 될 때 까지 지속되게 됩니다. 컨트롤러는 순차적으로 하나의 파티션마디 리더를 선출하게 되며, 첫번째 대상 파티션의 다운타임은 길지 않을수 있지만 마지막 리더 선출 대상의 파티션은 다운타임이 오랜시간이 걸려서 수행될 것 입니다.

또한 제어된 종료의 경우 (리더)브로커는 자신의 모든 로그를 디스크에 동기화한 후 종료됨에 따라 이후 다시 브로커가 재시작할 때 로그 복구 시간이 짧아지게 됩니다.
다양한 장점이 있는 제어된 종료를 사용하려면 control.shutdown.enable = true 설정이 브로커 설정 파일인 server.properties 파일에 설정이 되어 있어야 합니다.


브로커의 설정파일에 옵션이 명시되어 있지 않다고 해서 옵션이 반영되지 않은 것은 아닙니다. 기본값은 따로 명시하지 않아도 적용되므로 현재 브로커의 설정을 확인해볼 필요는 있습니다.

카프카에서 제공하는 kafka-configs.sh 명령어를 이용해서 --broker 옵션과 확인하고자 하는 브로커 아이디를 통해서 현재 브로커의 설정 상태를 확인할 수 있습니다.

$ kafka-configs.sh --bootstrap-server kafka1:9092 \
--broker 1 --describe --all


All configs for broker 1 are:
  log.cleaner.min.compaction.lag.ms=0 sensitive=false synonyms={DEFAULT_CONFIG:log.cleaner.min.compaction.lag.ms=0}
  offsets.topic.num.partitions=50 sensitive=false synonyms={DEFAULT_CONFIG:offsets.topic.num.partitions=50}
  log.flush.interval.messages=9223372036854775807 sensitive=false synonyms={DEFAULT_CONFIG:log.flush.interval.messages=9223372036854775807}
  controller.socket.timeout.ms=30000 sensitive=false synonyms={DEFAULT_CONFIG:controller.socket.timeout.ms=30000}
  principal.builder.class=null sensitive=false synonyms={}
  log.flush.interval.ms=null sensitive=false synonyms={}

< 내용 중략 >


현재 사용하는 브로커의 제어된 종료 설정 상태를 확인 하여 설정이 되어 있지 않다면 설정하는 것을 권장됩니다.
           

로그(로그 세그먼트)

카프카의 토픽으로 들어오는 메세지(레코드)는 세그먼트(segement, 로그 세그먼트 라고도 함) 라는 파일에 저장되게 됩니다.

메세지는 정해진 형식에 맞추어 순차적으로 로그 세그먼트 파일에 기록 됩니다. 로그 세그먼트에는 메세지의 내용만 저장되는 것이 아니라 메세지의 키, 밸류, 오프셋, 메세지 크기 같은 정보도 함께 저장 되며, 로그 세그먼트 파일들은 브로커의 로컬 디스크에 보관되어 있습니다. 

하나의 로그 세그먼트 크기가 너무 커져버리면 파일을 관리하기 어렵기 때문에 로그 세그먼트의 최대 크기는 1GB 가 기본값 입니다.
로그 세그먼트가 1GB보다 커지는 경우에는 기본적으로 롤링(rolling) 전략을 적용합니다.

로그 세그먼트의 크기가 1GB에 도달하면 해당 세그먼트 파일을 클로즈(close) 하고, 새로운 로그 세그먼트를 생성하는 방식으로 진행 합니다.

1GB 크기에 설정에 의해서 파일이 롤링이 되지만, 파일이 무한정 늘어날 경우를 대비하여 관리자는 로그 세그먼트에 대한 관리 계획을 수립해야 합니다.

관리 계획으로는 크게 로그 세그먼트의 삭제 또는 컴팩션(compaction) 으로 구분할 수 있습니다.
                

로그 세그먼트 삭제

로그 세그먼트 삭제 옵션은 브로커의 설정 파일인 server.properties 에서 log.cleanup.policy 가 delete 로 명시되어야 합니다.

해당 값은 기본값으로 적용됨으로 별도로 설정하지 않아도 로그 세그먼트는 삭제 정책이 적용되게 됩니다.


세그먼트 삭제 테스트를 진행해보도록 하겠습니다. 계속 사용하였던 jade-test01 토픽을 이용하도록 하겠습니다.
토픽 생성에 관한 내용은 이전 포스팅 아래 링크에서 확인 하시면 됩니다.


먼저 콘솔 컨슈머를 통해서 jade-test01 토픽의 메세지를 확인 해보도록 하겠습니다.
--from-beginning 옵션을 통해서 처음 부터 메세지를 가져 올 수 있습니다.

$ kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --from-beginning

출력)
test message1
test message2


토픽을 생성하고 입력한 2개의 메세지를 잘 가져온 것 같습니다.

이제 jade-test01 토픽의 retention.ms 옵션을 조정해서 메세지를 삭제하도록 하겠으며 옵션 조정은 kafka-configs.sh 를 이용합니다. 

$ kafka-configs.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --add-config retention.ms=0 --alter

리턴 메세지)
Completed updating config for topic jade-test01.

위의 명령어는 retention.ms=0 이라는 설정을 추가하는 명령어 입니다. 여기서 retention.ms=0 이란 로그 세그먼트 보관 시간이 해당 숫자 보다 크다면 세그먼트를 삭제한다는 의미 입니다. 보관 시간이 0 보다 클 수 밖에 없으므로, 현재 저장된 모든 메세지는 삭제가 될 것 입니다.

정상적으로 옵션이 반영 되었는지를 확인 해보기 위해서 먼저 describe 로 확인 해보겠습니다.

$ kafka-topics.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --describe

결과)
Topic: jade-test01      PartitionCount: 1       
ReplicationFactor: 3    Configs: segment.bytes=1073741824,retention.ms=0
        Topic: jade-test01      Partition: 0    
        Leader: 2       Replicas: 1,2,3 Isr: 2,3,1

* 가로 길이에 따른 개행이 되어 있습니다.

출력 내용을 확인 해보면 Configs 라인에 retention.ms=0 이 추가 되어 있는 것을 확인할 수 있습니다.

로그 세그먼트 삭제는 기본적으로 5분 단위로 진행 됩니다. 따라서 명령어를 수행하고 바로 삭제 되는 것이 아니라 최대 5분 뒤에 수행이 되게 됩니다.

디렉토리의 로그 파일을 확인 해본 결과 아래 와 같이 용량이 0 인 새로운 파일명 2.log 파일이 새로 생성이 된 것을 확인 할 수 있습니다.

$ ls -arlt
total 40
-rw-r--r--.  1 root root    10 May 22 03:27 00000000000000000002.snapshot
-rw-r--r--.  1 root root     8 May 22 18:13 leader-epoch-checkpoint
-rw-r--r--.  1 root root     0 May 22 18:15 00000000000000000002.log


당연히 콘솔 컨슈머로 조회를 해봐도 아무런 결과가 나오지는 않습니다.

$ kafka-console-consumer.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --from-beginning

< 결과 없음, 즉 메세지가 없음 >



설정 하였던 옵션을 다시 제거해보도록 하겠습니다.

$ kafka-configs.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --delete-config retention.ms --alter

리턴 메세지)
Completed updating config for topic jade-test01.



다시 describe 를 통해서 옵션이 변경 되었는지 살펴보도록 하겠습니다.

$ kafka-topics.sh --bootstrap-server kafka1:9092 \
--topic jade-test01 --describe

결과)
Topic: jade-test01      PartitionCount: 1       
ReplicationFactor: 3    Configs: segment.bytes=1073741824

        Topic: jade-test01      Partition: 0    
        Leader: 2       Replicas: 1,2,3 Isr: 2,3,1

* 가로 길이에 따른 개행이 되어 있습니다.

출력 내용에서 설정하였던 retention.ms 옵션이 삭제 된 것을 확인 할 수 있습니다.

카프카에서는 토픽마다 보관주기를 조정하여(설정하여) 얼마만큼의 기간동안 카프카에 로그를 저장할지를 결정하고 관리할 수 있습니다.

관리자가 토픽에 별도의 retention.ms 옵션을 설정하지 않으면 카프카의 server.properties에 적용된 옵션값이 적용 됩니다.
카프카의 기본값은 7일이며, 토픽 단위로 별도로 설정하지 않았다면 모든 세그먼트는 7일이 지난 후 삭제가 되게 됩니다.

세그먼트 관리에 대한 설정중에는 기간이 아닌 용량으로도 설정할 수 있으며 retention.bytes 라는 옵션을 지정하여 사용 합니다.
              

로그 세그먼트 컴팩션

컴팩션(compaction) 은 카프카에서 제공하는 로그 세그먼트 관리 정책 중 하나로, 로그를 삭제하지 않고 컴팩션하여 보관할 수 있는 방법 입니다. 

로그 컴팩션은 기본적으로 로컬 디스크에 저장되어 있는 세그먼트를 대상으로 실행되는데, 현재 활성화된 세그먼트는 제외하고 나머지 세그먼트들을 대상으로 컴팩션이 실행 됩니다.

컴팩션하더라도 카프카의 로컬 디스크에 로그를 무기한 보관한다면, 로그의 용량은 감당할 수 없이 커질 것 입니다.
따라서 카프카에서는 단순하게 메세지를 컴팩션만 해서 보관하기 보다는 좀더 효율적인 방법으로 컴팩션을 합니다.

카프카에서 로그 세그먼트를 컴팩션하면 메세지(레코드)의 키값을 기준으로 마지막의 데이터만 보관하게 됩니다.

메세지의 키값을 기준으로 컴팩션하는 방법이 다소 생소 할 수 있으며, 로그 컴팩션 기능을 이용하는 대표적인 예제는 바로 카프카의 __consumer_offset 토픽 입니다.

__consumer_offset 토픽은 카프카의 내부 토픽으로, 컨슈머 그룹의 정보를 저장하는 토픽 입니다. 각 컨슈머 그룹의 중요한 정보는 해당 컨슈머 그룹이 어디까지 읽었는지를 나타내는 오프셋 커밋 정보이며, __consumer_offset 에 키(컨슈머 그룹명, 토픽명)와 밸류(오프셋 커밋 정보) 형태로 메세지가 저장 됩니다.


예를 들어 CG01 컨슈머 그룹이 T01 토픽을 컨슘하고, 첫 번째 메세지를 읽고 커밋했다고 가정한다면 이 정보는 키 와 밸류 형태의 메세지로 __consumer_offset 토픽에 저장 됩니다.

따라서 키는 CG01(컨슈머 그룹), T01(토픽명) 이 되고 밸류는 1(오프셋) 인 메세지가 __consumer_offset 토픽에 저장되게 됩니다.

그리고 1시간 뒤에 컨슘하면서 두번째 메세지를 읽고 커밋하면 밸류는 2인 메세지가 __consumer_offset 에 저장됩니다.

그리고 다시 또 1시간 뒤에 세번째 메세지를 읽고 커밋을 하면 밸류는 3인 메세지가 _consumer_offset 에 저장됩니다.

그래서 현재 __consume_offset 토픽에 저장된 메세지는 총 3개로 밸류가 1,2,3 인 메세지 입니다.
이후 로그 컴팩션이 동작하면 CG01,T01 키값의 밸류 중에서 마지막 메세지인 3만 남게 됩니다. 컨슈머 그룹은 항상 마지막으로 커밋된 오프셋 정보가 중요하므로, 과거에 커밋된 정보들은 삭제 되어도 무방 합니다.

이렇게 로그 컴팩션은 메세지의 키값을 기준으로 과거 정보는 중요하지 않고 가장 마지막 값이 필요한 경우에만 사용 합니다.


또 다른 사용 예시로는 현재 구매 현황 상태를 보여주는 시스템에서도 로그 컴팩션을 이용할 수 있습니다.
이때는 고유한 사용자 아이디가 메세지의 키값이고 현재의 구매 상태 정보가 메세지의 밸류값이 되게 됩니다.

구매 상태 정보는 주문 완료 -> 배송 준비 -> 배송 중 -> 배송 완료 의 총 4단계로 나타냅니다.

구매한 사용자 아이디(메세지의 키) 를 기준으로 최종 상태(메세지의 밸류)만 사용자에게 노출하면 되므로, 카프카의 로그 컴팩션 기능을 활용할 수 있습니다.

프로듀서가 카프카로 메세지를 전송할 때, 메세지에는 메세지(레코드)의 키 와 밸류를 같이 전송하게 됩니다. 키는 필수이며, 밸류는 필수는 아닙니다. 따라서 로그 컴팩션 기능을 사용하고자 한다면 키프카로 메세지를 전송할때도 키를 필수로 전송해야 합니다.

[Introduction to Topic Log Compaction in Apache Kafka]


로그 컴팩션의 장점은 빠른 장애 복구 입니다. 장애 복구 시 전체 로그를 복구하지 않고, 메세지의 키를 기준으로 최신의 상태만 복구를 합니다. 따라서 전체 로그를 복구할 때 보다 복구 시간을 줄일 수 있다는 장점이 있습니다.


컨슈머가 처리한 메세지들에서 오류가 발견되어 재처리가 필요한 상황이라고 가정한다면 로그 컴팩션 기능이 사용하지 않는다면 재처리를 위해서 모든 로그를 다시 적용을 해야 겠지만, 로그 컴팩션을 이용하여 마지막 메세지만 빠르게 처리할 수 있습니다.

하지만 빠른 재처리라는 장점이 있다고 하여 모든 토픽에서 로그 컴팩트 적용은 고민이 필요 한 부분 입니다.
키 값 기준으로 최종값만 필요한 워크로드에 적용하는 것이 바람직합니다.

또한 카프카에서 로그 컴팩트 작업이 실행되는 동안 브로커의 과도한 입출력(I/O) 부하가 발생할 수 있으므로 이점도 주의 해야 합니다.

해당 포스팅은 실전 카프카 개발부터 운영까지  책의 많은 내용 중에서 일부분의 내용만 함축적으로 정리한 것으로 모든 내용 확인 및 이해를 위해서 직접 책을 통해 모든 내용을 확인하시는 것을 권해 드립니다.
           

Reference

Reference Book
• 실전 카프카 개발부터 운영까지 


Reference URL
Introduction to Topic Log Compaction in Apache Kafka


연관된 다른 글

 

                                     

0
글에 대한 당신의 생각을 기다립니다. 댓글 의견 주세요!x