MySQL Debezium CDC 구성 (1) - 데비지움 - PostgreSQL 복제 - Kafka Connect - 카프카

Share

Last Updated on 7월 1, 2023 by Jade(정현호)

안녕하세요     
이번 포스팅은 Kafka 및 Debezium connector 와 JDBC connector 이용하여 MySQL 과 PostgreSQL 간의 CDC 연결에 대한 내용에 대해서 확인 해 보도록 하겠습니다.      

Kafka 개요

아파치 카프카는 실시간으로 레코드의 스트림을 게시, 구독, 저장, 처리할 수 있는 분산 데이터 스트리밍 플랫폼입니다.

다양한 소스에서 데이터 스트림을 처리하고 여러 컨슈머에게 전달하기 위해 설계되었습니다.

간단히 말해서, 카프카는 단순히 A지점에서 B지점으로 데이터를 이동시키는 것뿐 아니라 A지점에서 Z지점과 필요한 모든 곳으로 동시에 대규모 데이터를 이동시킬수 있습니다.

아파치 카프카는 전통적인 엔터프라이즈 메시징 시스템의 대안이며, 이제는 다양한 엔터프라이즈 요구 사항에 적용 가능한 오픈 소스 데이터 스트리밍 솔루션으로 사용 되고 있습니다.

현대적인 애플리케이션은 스트리밍 데이터를 사용하여 데이터를 처리, 저장, 분석합니다.

스트리밍 데이터는 데이터 세트에 발생한 변경 사항이나 이벤트의 실행 로그(대개 매우 빠른 속도로 변경됨)라고 볼 수도 있습니다.

스트리밍 데이터의 소스일 수 있는 경우는 금융 거래, 사물 인터넷(IoT) 센서 데이터, 물류 운영, 소매 주문 또는 병원 환자 모니터링 등 다양하며, 데이터 스트리밍은 이벤트에 대한 실시간 응답이 필요한 상황에 적합하다고 할 수 있습니다.

마이크로서비스(Microservices)는 개발 환경을 바꾸어 놓았습니다. 공유 데이터베이스 계층과 같은 종속성을 줄여 개발자들이 더욱 민첩하게 작업을 수행하도록 해줍니다.

그러나 개발자가 구축 중인 분산형 애플리케이션이 데이터를 공유 하려면 특정한 유형의 통합 환경이 필요합니다.

널리 사용되는 통합 옵션으로 동기식 방법이 있는데, 이는 서로 다른 사용자 간 데이터를 공유하는 데 애플리케이션 프로그래밍 인터페이스(API)를 활용합니다.

또 다른 통합 옵션으로는 중간 데이터 스토어에 데이터를 복제하는 비동기식 방법이 있습니다.

Apache Kafka는 바로 이런 맥락에 등장하는 솔루션으로, 다른 개발팀의 데이터를 스트리밍하여 데이터 스토어를 채우면 해당 데이터를 여러 팀과 이들의 애플리케이션 간에 공유할 수 있게 됩니다.

아파치 카프카의 발전 역사와 개요 등에 대한 내용은 아래 포스팅을 참조하시면 됩니다.



Kafka API
관리 및 관리 작업을 위한 명령줄 도구 외에도 Kafka에는 5가지 핵심 API가 있습니다.

  • Admin API
  • Producer API
  • Consumer API
  • Kafka Streams API
  • Kafka Connect API


이번 글에서 다루고 있는 주된 내용은 Kafka 와 Kafka Connect API 사용과 관련된 내용입니다.

Kafka Connect API는 재사용 가능한 Data import Connector, Data export Connector를 생성하고 실행하는 데 사용됩니다.

이를 통해 외부 시스템 및 애플리케이션에서 이벤트 스트림을 읽거나 생성하여 Kafka와 통합할 수 있습니다.
                 

Kafka Connect

Kafka Connect(API)는 Apache Kafka®와 다른 데이터 시스템 간에 데이터를 확장 가능하고 안정적으로 스트리밍하기 위한 도구입니다.

Kafka Connect를 사용하면 대량의 데이터를 Kafka로 이동시키는 Connector를 빠르고 쉽게 생성 및 사용할 수 있습니다.

Apache Kafka 와 Kafka Connect 그리고 Connector 등에 대해서 조금 더 구분하여 설명 하도록 하겠습니다.

Apache Kafka라고 통상 표현하는 것에는 Kafka Core 와 위에서 설명한 5개 Kafka API 를 포함한 핵심 기능을 의미 합니다.

카프카를 설치후 서비스를 시작한 후에 kafka-console-producer.shkafka-console-consumer.sh 을 통해서 토픽을 생성하고 데이터를 프로듀싱 및 토픽을 컨슈밍 하는 등의 일들은 Producer API와 Consumer API 등을 사용하여 수행할 수 있습니다.

Kakfa Connect(API)는 다른 데이터 시스템간의 연결 그리고 데이터 이동을 쉽고 빠르게 하기 위한 지원 기능(도구) 입니다.

Kakfa Connect를 사용하기 위해서는 연결하고 데이터를 추출 및 전달 하기 위한 데이터 시스템에 따라서 Connector Plugin이 필요 합니다.  Connector(커넥터)는 데이터를 추출하고 전달(복사)하는 것에 대한 정의이며, 커넥터 인스턴스는 Kafka를 통해서 데이터 시스템간 데이터 복사를 관리하는 논리적 작업을 의미 합니다.

보통 커넥터 인스턴스와 커넥터 플러그인은 모두 "Connector(커넥터)"로 지칭하기도 하지만, 명확히 구분 하면 예를 들어 "커넥터를 설치한다"는 플러그인을 의미하며, 커넥터의 상태를 설정/확인 한다"는 커넥터 인스턴스를 의미합니다.


이러한 Kafka Connect는 전체 데이터베이스를 수집하거나 모든 애플리케이션 서버에서 Kafka 토픽으로 메트릭을 수집하여 대기 시간이 짧은 스트림 처리에 데이터를 사용할 수 있도록 합니다.

Connector는 Kafka topic에서 Elasticsearch와 같은 보조 인덱스 또는 Hadoop과 같은 배치 시스템으로 데이터를 전달할 수 있습니다. 이를 통해 오프라인 분석을 위해 데이터를 수집하거나, 다른 분산 시스템에서 Kafka로 데이터를 복제할 수 있습니다.


Kafka Connect는 다음과 같은 이점을 제공 합니다.

  • 데이터 중심 파이프라인 : Connect는 의미 있는 데이터 추상화를 사용하여 데이터를 Kafka로 가져오거나 푸시 합니다.
  • 유연성 및 확장성 : Connect는 단일 노드(독립형) 또는 조직 전체 서비스로 확장된(분산형) 스트리밍 및 배치 지향 시스템으로 실행됩니다.
  • 재사용성 및 확장성 : Connect는 기존 커넥터를 활용하거나 요구 사항에 맞게 확장하고 생산 시간을 단축합니다.


Kafka Connect는 Kafka와 데이터를 스트리밍하는 데 중점을 둡니다. 이를 통해 고품질, 신뢰성 높은 커넥터 플러그인을 더 간단하게  작성 할 수 있습니다.

또한 Kafka Connect를 사용하면 다른 프레임워크로는 구현하기 어려운 보장성을 확보할 수 있습니다. Kafka와 스트림 처리 프레임워크와 함께 사용할 때 Kafka Connect는 ETL 파이프라인의 중요한 구성요소입니다.

         

Kafka Connect의 작동 방식

Kafka Connect는 단일 머신에서 작업을 실행하는 독립형 프로세스로 배포할 수도 있고(사용예: 로그 수집), 전체 조직을 지원하며 분산, 확장이 가능한 fault-tolerant(내결함성)서비스로 배포할 수도 있습니다.

각각 Standalone 모드Distributed 모드 동작을 의미 하며 해당 내용은 아래에서 설명 되고 있습니다.

Kafka Connect낮은 진입 장벽과 낮은 운영 오버헤드를 제공합니다. 개발 및 테스트를 위한 독립 실행형 환경으로 소규모로 시작한 다음 전체 프로덕션 환경으로 확장하여 대규모 조직의 데이터 파이프라인을 지원할 수 있습니다.

이런 Kafka Connect를 이용한 커넥터에는 두 가지 유형이 있습니다.

[instaclustr.com/kafka-connect-architecture-overview]

  • Source connector : 원본 커넥터는 전체 데이터베이스를 수집하고 Kafka 항목에 대한 스트림 테이블 업데이트를 수집합니다. 소스 커넥터는 또한 모든 애플리케이션 서버에서 메트릭을 수집하고 Kafka 주제에 데이터를 저장할 수 있으므로 대기 시간이 짧은 스트림 처리에 데이터를 사용할 수 있습니다.
  • Sink connector : 싱크 커넥터는 오프라인 분석을 위해 Kafka 주제의 데이터를 Elasticsearch와 같은 보조 인덱스 또는 Hadoop과 같은 배치 시스템으로 전달합니다.

              

Confluent Platform

Confluent Platform은 연속적이고 실시간 스트림 데이터를 쉽게 액세스, 저장 및 관리할 수 있도록 하는 대규모 데이터 스트리밍 플랫폼 입니다. 즉, 스트리밍 애플리케이션을 쉽게 구축할 수 있는 Kafka 기반 플랫폼입니다.

카프카의 창시자인 제이 크렙스는 공동 창시자인 준 라오, 네하 나크헤데와 함께 링크드인에서 독립하여 2014년에 컨플루언트(www.confluent.io) 라는 회사를 설립 하였으며, 컨플루언트에서 Confluent Platform을 기업용 기능으로 Apache Kafka의 이점을 확장하면서 Kafka의 사용 및 관리, 모니터링 기능을 제공하고 있습니다.

Confluent Platform은 데이터가 어떻게 전송되는지, 서로 다른 시스템 간에 통합되는 방법 등과 같은 기본 매커니즘에 대해서 문제나 걱정할 필요 없이 데이터로부터 비지니스 가치를 도출 하는데 집중 할 수 있도록 해줍니다.

특히 Confluent Platform은 데이터 원본을 Kafka에 연결하고 스트리밍 애플리케이션을 빌드하는 것을 단순화하며, Kafka 인프라의 보안, 모니터링 및 관리에 대해서 간편함을 제공합니다.

[Confluent Platform Components]

위의 Confluent Platform Components 이미지가 Apache Kafka 와 Kafka Connect, Connector Plugin 과 Confluent Platform 생태계를 전체적으로 전반적으로 이해하기 쉽도록 표현하고 있으며, Connector Plugin 등의 내용을 확인 해보겠습니다.

Apache Kafka 는 오프소스로 핵심 엔진인 Core 와 API 으로 구성되어 있으며, Apache Kafka Layer 다음 Layer로는 Development & Connectivity Layer를 확인 할 수 있습니다.

해당 Layer에 Connector 를 비롯하여 REST Proxy, Schema Registry 등을 확인 할 수 있습니다. Apache Kafka를 더 간편하고 여러 소스와 타켓 데이터 스토어의 연결을 지원하거나 카프카 클러스터를 사용한 보다 손쉬운 개발 및 사용환경을 제공 합니다.

그외 Layer에는 관리 및 모니터링, 성능과 확장 등의 기능이 있는 것을 확인 할 수 있습니다. 이처럼 Confluent Platform는 Apache Kafka에 대해서 실시간 스트림 데이터를 쉽게 액세스, 저장 및 관리, 모니터링, 확장, 보안 등의 다양한 기능을 지원하는 플렛폼 입니다.

Confluent Platform에서 다양하게 제공되는 기능에는 무료(오픈소스)인 Community도 있지만, 상용도 있습니다. 라이선스 모델 및 정보는 아래 페이지에서 확인해보시기 바랍니다.



글에서는 대상(Target) 데이터베이스로 PostgreSQL을 사용할 것이며, PostgreSQL에 데이터를 입력할 때는 JDBC Source/Sink Connector 를 사용하며 되며, 해당 커넥터 플러그인은 Community 오픈소스 입니다.

Confluent Hub 페이지에서 다양한 커넥터를 찾거나 다운로드 받을 수 있으며, Community 도 있지만 상용(유로)도 있습니다.


Kafka Connect 개발 가이드에 따라서 별도로 Connector를 작성해서 사용 할 수도 있긴 하지만, 이미 검증되어 공개적으로 배포된 커넥터도 다수가 있기 때문에 보통의 경우는 계속 개발 되고 있는 기존의 커넥터를 사용하시면 됩니다.

글에서는 타켓 데이터베이스인 PostgreSQL에 데이터를 복제하는데 JDBC Source/Sink Connector 를 사용하였습니다.
                 

Debezium Architecture

CDCChange Data Capture 의 약자로 실시간 또는 준 실시간(near real-time) 으로 입력 및 변경 된 내용을 로그를 참고하여 구성된 목표 시스템에 데이터를 복제하기 위한 용도로 사용 됩니다.

ETL은 소스DB에 SQL 쿼리문을 수행하여 직접 데이터를 추출하기 때문에 소스DB에 부하를 주며 Batch 형식으로 수행하기 때문에 최신의 데이터를 포함한 데이터 통합이나 분석을 위해서는 ETL Batch 작업 수행이 필요한 부분이 있는 반면에 CDC 는 거의 실시간성을 확보 할 수 있으며 입력 및 변경된 트랜잭션 로그를 통해서 복제하기 때문에 소스DB에 직접적인 부하를 주지 않습니다.

준 실시간으로 복제를 하기 때문에 데이터 통합을 위한 파이프라인 이외에 DR , 이기종 DB 간의 데이터 이관 및 데이터 복제 유지 등에서 사용 됩니다.

CDC 에는 상용 제품을 포함 여러 제품이 있으며 글에서는 오픈소스 Debezium 에 대해서 기술하고 있습니다.

Debezium은 가장 일반적으로 Apache Kafka Connect를 통해 배포합니다. Kafka Connect는 커넥터를 구현하고 운영하기 위한 프레임워크이자 런타임입니다.

위에서 설명한 내용과 같이 kafka connect 는 2가지 유형의 connector 가 있습니다.

  • Source connector : Debezium과 같이 Source에서 데이터를 추출해서 Kafka 클러스터로 레코드를 보내는 것, 즉 Producer 역할을 하는 Kafka Connector를 Source connectors 라고 합니다.
  • Sink connector : Kafka topic을 다른 시스템(target)으로 레코드를 전달하는 것, 즉 Consumer 역할을 하는 Kafka Connector를 Sink Connector 라고 합니다.


2개의 connector 에서 Debezium 은 여러 Source connector 중 하나의 connector plugin이며, Debezium 을 사용할 수 있는 3가지 방법 중 대표적인 방법 입니다.

다음 이미지는 Debezium을 기반으로 한 변경 데이터 캡처 파이프라인의 아키텍처를 보여줍니다.

[debezium.io/architecture]


이미지에 표시된 것처럼 MySQL 및 Postgres용 Debezium 커넥터는 이러한 두 가지 유형의 데이터베이스에 대한 변경사항을 캡처하기 위해 배포됩니다.

각 Debezium 소스 커넥터는 소스 데이터베이스에 대한 연결을 설정 하고 아래의 방법으로 변경사항을 캡처 합니다.

  • MySQL 커넥터는 binlog에 액세스하기 위해 클라이언트 라이브러리를 사용합니다.
  • PostgreSQL 커넥터는 logical replication stream 에서 읽습니다.


카프카 커넥트는 카프카 브로커 외에 별도의 서비스로 운영 됩니다.

기본적으로 한 데이터베이스 테이블의 변경사항은 테이블 이름에 해당하는 이름의 Kafka 토픽에 기록됩니다. 필요한 경우 Debezium의 topic routing transformation을 구성하여 대상 topic 이름을 조정 할 수 있습니다.

  • 테이블에 해당하는 동일한 이름의 토픽을 사용
  • 테이블 이름과 이름이 다른 항목으로 레코드 라우팅
  • 여러 테이블에 대한 변경 이벤트 레코드를 하나의 topic으로 스트리밍


변경 이벤트 레코드가 아파치 카프카에 저장된 후, 카프카 커넥트 eco-system의 커넥터들은 레코드를 Elasticsearch, 데이터 웨어하우스, 분석 시스템, 인피니스판과 같은 캐시와 같은 다른 시스템과 데이터베이스로 스트리밍할 수 있습니다.

선택한 싱크 커넥터에 따라 Debezium의 New Record State Extraction transformation을 구성해야 할 수 있습니다

Kafka Connect SMT는 Debezium의 변경 이벤트에서 싱크 커넥터로 after structure를 전파합니다. 이것은 기본적으로 전파되는 상세 변경 이벤트 레코드를 대신합니다.


Debezium 사용하는 방법으로 위에서 설명한 Kafka Connect를 통한 사용 이외에 2가지 방법이 더 존재 합니다.

  • Embedded Engine -> Debezium Engine
  • Debezium Server



Debezium Engine
Debezium을 사용하는 유형으로 카프카 커넥트 방식 외 다른 방법으로 Debezium Engine 이 있습니다.
원래는 Embedded Engine 이었으나 Deprecated 되었고 Debezium Engine 으로 대체 되었습니다.

일부 애플리케이션은 Debezium 커넥터를 애플리케이션 공간 내에 직접 내장해서 사용하는 것을 선호할 수 있습니다.

이런 애플리케이션은 여전히 동일한 데이터 변경 이벤트를 Kafka 로 보내길 원하며, 애플리케이션에서 직접 전송하는 것을 원할 수 있습니다.

이러한 경우 Debezium Engine api 모듈은 애플리케이션이 Debezium Engine을 사용하여 API 를 통해서 CDC 기능을 사용할 수 있습니다.


Debezium Server
Debezium을 배포하는 또 다른 방법은 Debezium 서버를 사용하는 것입니다 . Debezium 서버는 소스 데이터베이스에서 다양한 메시징 인프라로 변경 이벤트를 스트리밍하는 구성 가능하고 바로 사용할 수 있는 애플리케이션입니다.

다음 이미지는 Debezium 서버를 사용하는 변경 데이터 캡처 파이프라인의 아키텍처를 보여줍니다.

[debezium.io/architecture]


Debezium Server는 Debezium 소스 커넥터 중 하나를 사용하여 소스 데이터베이스에서 변경 사항을 캡처하도록 구성됩니다.
변경 이벤트는 JSON 또는 Apache Avro와 같은 다양한 형식으로 직렬화할 수 있으며 Amazon Kinesis, Google Cloud Pub/Sub 또는 Apache Pulsar와 같은 다양한 메시징 인프라 중 하나로 전송됩니다.

변경 이벤트는 JSON 또는 Apache Avro와 같은 다양한 형식을 사용한다는 것은 kakfa 를 거치지 않고 다른 곳으로 데이터를 보내는 아키텍처를 사용함을 의미 합니다. 
                         

Kafka 클러스터 구성

카프카 클러스터 구성에 관해서는 아래 설치 포스팅을 참조하시면 되며, 동일 서버(1개 서버) 에서 주키퍼 3개 및 카프카 3개를 설치한 구성으로 진행 하였습니다.



환경 정보
포스팅에서 사용 및 구성한 환경 정보는 아래와 같습니다.

• 카프카 버전 : 2.8.2
• 주키퍼 버전 : 3.8.1
• 자바 버전 : 11 
• MySQL : 8.0.28 - 2개 인스턴스
• PostgreSQL : 14.6
• Debezium Connector : 2.1.2
• jdbc source/sinc connector : 10.6.0

동일 서버의 주키퍼 및 카프카 설치에 대한 부분은 간략하게 아래와 이미지와 같은 형태가 됩니다.


주키퍼 대신 KRaft를 사용한 Kafka 설치는 아래 포스팅을 참조하시면 됩니다.



CDC 복제 구성 정보

포스팅에서는 동일 서버 모두 구성하였으며, Source Database 로는 MySQL 2개 인스턴스를 이용하였으며, MySQL에서 Kafka/Debezium 을 이용해서 1개의 PostgreSQL 로 복제 동기화를 구성하였습니다.
그림으로 간략하게 표현 하면 아래와 같습니다.


포스팅과 같이 동일 서버(한 개 서버)에서 구성 테스트를 진행할 경우 주키퍼 3, 카프카3, MySQL, PostgreSQL 등 다수의 애플리케이션 및 데몬이 실행됨에 따라서 메모리가 많이 필요 할 수 있습니다.

KAFKA_HEAP_OPTS 메모리를 다소 낮게 조정이 필요할 수 있으며 OS 메모리는 4GB 이상이 필요 할 수 있습니다.
                     

Source 와 Target DB 사전 구성

Source DB 와 Target DB에 대해서 사전에 설정이 필요한 부분과 생성이 필요한 내역에 대해서 진행 하도록 하겠습니다.

생성시에 사용한 DB명, 계정명 등은 포스팅에서 사용한 예시 이며, 필수적인 이름이나 명칭은 아닙니다.
           

MySQL 설정

포스팅에서는 MySQL 설치에 대해서는 다루지 않고 있으며, 설치 이후 설정 등에 대해서 다룹니다.

[참고] Centos8, RockyLinux8 버전에서 MySQL 8 버전 설치 포스팅 글



데이터베이스(스키마 생성)
포스팅에서는 2개 MySQL 인스턴스를 사용하였으며, 인스턴스 별로 각각 스키마를 생성하여 사용 하였습니다.

-- 인스턴스1
CREATE DATABASE db1 /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */;


-- 인스턴스2 
CREATE DATABASE db2 /*!40100 DEFAULT CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci */;



debezium 접속용 DB 계정 생성

CREATE USER 'cdc'@'%' IDENTIFIED WITH 'mysql_native_password' by 'password123';
GRANT SELECT, RELOAD, SHOW DATABASES, LOCK TABLES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'cdc'@'%';

전역 읽기 잠금을 허용하지 않는 Amazon RDS 또는 Amazon Aurora와 같은 호스팅 옵션을 사용하는 경우 일관된 스냅샷 을 생성하는 데 테이블 수준 잠금이 사용됩니다.

이 경우 LOCK TABLES 권한을 생성한 사용자에게 부여해야 합니다.
포스팅에서는 RDS 에서도 CDC 를 하는 가정에서 LOCK TABLES 권한 까지 부여하였습니다.

자세한 내용은 Debezium 스냅샷 문서를 참조하십시오.


DB 복제관련 설정
Debezium 이외 다른 CDC Application 에서도 각각의 requirement 가 있습니다.
원만하고 특이사항 없이 복제 동기화를 위해서는 요구 되는 내용은 필수적으로 설정이 필요 합니다.

  • server-id = 47858934
  • log_bin = mysql-bin
  • binlog_format = ROW
  • binlog_row_image = FULL
  • 5.7 버전) expire_logs_days = 5
    8.0버전) binlog_expire_logs_seconds = 432000
    -> 432000초 = 5일
  • log_slave_updates = ON 또는 log_replica_updates = ON
  • binlog_rows_query_log_events = ON
  • interactive_timeout
  • wait_timeout
  • unset binlog_row_value_options


server-id 는 복제 구성 인스턴스간에 반드시 달라야 합니다.
8.0.26 버전부터 log_slave_update 에서 log_replica_updates 으로 변경 되었습니다.

대형 데이터베이스에 대해 초기 일관된 스냅샷을 만들기 위해 테이블을 읽는 동안 설정된 연결이 시간 초과될 수 있습니다.
session timeout 관련된 2개의 파라미터 interactive_timeoutwait_timeout 을 적절히 크게 설정함으로써 이러한 에러를 방지 할 수 있습니다.
interactive_timeout 와 wait_timeout 파라미터 기본 값은 28800초(8시간) 입니다.

binlog_row_value_options 시스템 변수를 확인하여 이 값이 PARTIAL_JSON 으로 설정되어 있는지 확인 합니다. PARTIAL_JSON 으로 되어있을 경우 connector가 UPDATE 이벤트를 수행 하지 못할 수 있습니다.  

mysql> show global variables like 'binlog_row_value_options';
+--------------------------+-------+
| Variable_name            | Value |
+--------------------------+-------+
| binlog_row_value_options |       |
+--------------------------+-------+


위의 조회 결과가 "PARTIAL_JSON" 인 경우 다음과 같이 이 변수를 설정 해제합니다

mysql> set @@global.binlog_row_value_options="" ;

                

PostgreSQL 설정

PostgreSQL 에 대한 설치는 아래 설치 포스팅을 참조하시면 됩니다.


PostgreSQL 설치 및 데이터베이스 초기화가 모두 완료 된 상태 이후 내용 부터 진행 하도록 하겠습니다.
postgres 계정 이외 계정 및 데이터베이스 이름 등은 포스팅에서의 예시 입니다.
(다른 이름, 다른 데이터베이스, 스키마명을 사용하여도 됩니다.)



계정 설정 및 생성
관리자(postgres) 계정에 대한 패스워드 설정 및 Kafka sink connector 가 연결할 복제 동기화 계정에 대해서 생성 하도록 하겠습니다.

# 1. postgres 계정으로 접속 후 비밀번호 설정
psql (14.6)
Type "help" for help.

postgres=# alter user postgres with password 'postgres123';


# 2. 계정 생성
create user debezium_postgres WITH LOGIN password 'debezium_postgres123';



데이터베이스 및 스키마 생성

# 1. 데이터베이스 생성 - sinkdb 라는 이름으로 생성함
postgres=# CREATE DATABASE sinkdb;


# 2. 권한 부여
postgres=# GRANT ALL PRIVILEGES ON DATABASE sinkdb TO debezium_postgres;


# 3. sinkdb 데이터베이스 접속
postgres=# \c sinkdb
You are now connected to database "sinkdb" as user "postgres".


# 4. 스키마 생성 : db1, db2
CREATE SCHEMA db1 AUTHORIZATION debezium_postgres;
CREATE SCHEMA db2 AUTHORIZATION debezium_postgres;


# 5. 스키마 리스트 조회
sinkdb=# \dn
      List of schemas
  Name  |       Owner
--------+-------------------
 db1    | debezium_postgres
 db2    | debezium_postgres
 public | postgres
(3 rows)
 



search_path
테이블 생성 정보 조회 등을 위해서 생성한 스키마에 대해서 search_path 를 설정 하도록 하겠습니다.
해당 세션 또는 터미널은 계속 조회를 해야함으로 유지하도록 합니다.

# search_path 설정
sinkdb=# SET search_path to db1,db2,public;
SET


# 테이블 리스트 조회
sinkdb=# \dt+
Did not find any relations.
<-- 현재는 테이블이 없음



pg_hba.conf 파일 수정
PostgreSQL 의 데이터 디렉토리 경로에 있는 pg_hba.conf 파일을 수정 합니다.

$ sudo vi pg_hba.conf


# 아래 내용을 추가
local   all             all                                     md5

만약 파일내에서 local 이 peer 로 설정 되어있을 경우 peer 을 md5 로 변경을 합니다.


포스팅 환경과 다르게 kafka 서버와 PostgreSQL가 다른 서버에 위치해 있다면 2가지 정도를 수정 및 추가 해야 합니다.
먼저 pg_hba.conf 파일에 아래 항목을 추가 합니다.

host    all             all             0.0.0.0/0                 md5


그리고 postgresql.conf 에서 아래 내역을 추가 합니다.

$ sudo vi postgresql.conf


listen_addresses = '*'


postgresql.conf 까지 수정하였을 경우 PostgreSQL 재시작이 필요 합니다.
                   

카프카 커넥터

카프카 커넥터에는 source connector 와 sink connector 가 있으며, source connector 에는 Debezium connector , sink connect 에는 JDBC Source/Sink Connector 를 사용하도록 하겠습니다.

카프카 커넥터 설치 및 구성, 카프카 커넥터 구동은 아래 다음 연재 글에서 이어 집니다.

            

Reference

Reference URL
confluent.io/#kafka-connect
confluent.io/current/platform

debezium.io/architecture.html
debezium.io/connectors/mysql.html
debezium.io/connectors/mysql.html#setting-up-mysql
debezium.io/connectors/mysql.html#mysql-connector-properties
confluent.io/kafka-connectors/self-managed/userguide.html
debezium.io/connectors/#_required_debezium_mysql_connector_configuration_properties
confluent.io/connect/concepts.html
confluent.io/connect/transforms/overview.html
debezium.io/transformations/event-flattening.html
confluent.io/kafka-connectors/sink-connector/overview.htm
redhat.com/what-is-apache-kafka



관련된 다른 글

 

              

0
글에 대한 당신의 생각을 기다립니다. 댓글 의견 주세요!x