Last Updated on 9월 5, 2023 by Jade(정현호)
안녕하세요
이번 글은 Kafka 및 Debezium connector 와 JDBC connector 이용하여 MySQL 과 PostgreSQL 간의 CDC 연결에 대한 내용 내용으로 아래 이전 글로 부터 이어지는 두번째 글 입니다.
기존 글에서 내용이 추가 됨에 따라서 글을 분리 하였으며, 이번 글은 카프카 커넥터 플러그인 설치 및 구성 등의 내용 부터 시작 합니다.
카프카 커넥터
카프카 커넥터 설치 및 구성, 카프카 커넥터 시작 등을 진행하도록 하겠습니다.
앞에 과정을 통해서 카프카 클러스터 설치(+주키퍼) 및 Source , Target Database 의 구성을 완료 하였습니다.
카프카 커넥터에는 source connector 와 sink connector 가 있으며, source connector 에는 debezium connector , sink connect 에는 JDBC Source/Sink Connector 를 사용하도록 하겠습니다.
포스팅 환경에서의 kafka 위치는 /usr/local/kafka 이며, 하위 경로에 plugins 디렉토리를 생성 하여 connector 디렉토리로 사용하도록 하겠습니다.
# sudo mkdir -p /usr/local/kafka/plugins
커넥터 설치
커넥터 다운로드 및 압축 해제
• Debezium connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.2.Final/debezium-connector-mysql-2.1.2.Final-plugin.tar.gz sudo tar zxvf debezium-connector-mysql-2.1.2.Final-plugin.tar.gz -C /usr/local/kafka/plugins/
• JDBC source/sink connector
wget https://d1i4a15mxbxib1.cloudfront.net/api/plugins/confluentinc/kafka-connect-jdbc/versions/10.6.0/confluentinc-kafka-connect-jdbc-10.6.0.zip sudo unzip confluentinc-kafka-connect-jdbc-10.6.0.zip -d /usr/local/kafka/plugins
디렉토리 확인
# ls /usr/local/kafka/plugins confluentinc-kafka-connect-jdbc-10.6.0 debezium-connector-mysql
kafka connect 실행
카프카 커넥트(connect)를 실행하는 방법은 단일 모드 커넥트 와 분산모드 커넥트 2가지가 있습니다.
• 카프카 커넥터 Standalone 와 Distributed
- Standalone Mode - 하나의 work process에서 모든 커넥터와 태스크가 실행됩니다. 커넥터와 태스크가 특정 컴퓨터에서만 실행되는 경우에 사용 하면 좋습니다. Rest API대신 Command Line으로 실행 합니다.
- Distributed Mode - Connect 클러스터를 형성하는 여러 시스템(노드)에서 Connect worker를 실행합니다.
Kafka Connect는 실행 중인 커넥터를 클러스터 전체에 배포합니다. 필요에 따라 노드를 추가하거나 제거할 수 있습니다.
이 Distributed Mode는 내결함성이 Standalone Mode 비해 더 뛰어납니다.
예를 들어 노드가 예기치 않게 클러스터를 Eviction 되었을 경우 Kafka Connect는 해당 노드의 작업을 클러스터의 다른 노드에 배포합니다. 그리고 Kafka Connect는 안전하게 복제되는 Kafka 클러스터 내부에 커넥터 구성, 상태 및 오프셋 정보를 저장하기 때문에 Connect 작업자가 실행되는 노드를 잃어도 데이터가 손실되지 않습니다.
분산 모드는 확장성 및 고가용성, 관리 이점 때문에 프로덕션 환경에 권장됩니다.
포스팅에서는 Distributed Mode 으로 구성하였습니다.
properties 설정
분산 모드(Distributed Mode) 커넥트를 사용하기 위해서는 분산 모드 설정 파일인 connect-distributed.properties 을 수정 해야 합니다.
포스팅에서는 커텍트를 실행하기 위해서 필수적으로 설정 해야 하는 것에 대해서만 설정을 진행 하도록 하겠습니다.
connect-distributed.properties 파일에서 bootstrap.servers 와 plugin.path 항목을 수정하면 됩니다.
bootstrap.servers 는 kafka cluster 정보를 입력 하면 되며, plugin.path 에는 설치한(압축을 해제한) connector plugin 디렉토리 위치 경로를 입력 합니다.
디렉토리는 콤마로 구분하여 여러개의 경로를 지정할 수 있습니다.
# 디렉토리 이동 및 파일 편집 cd /usr/local/kafka/ sudo vi ./config/connect-distributed.properties # 2개 항목에 대해서 아래 예시와 같이 수정 합니다. bootstrap.servers=localhost:9092,localhost:9093,localhost:9094 plugin.path=/usr/local/kafka/plugins
위의 내용은 포스팅 환경에서의 포트 및 경로 예시 입니다.
포스팅에서는 카프카 클러스터 구성하여 3개의 kafka 브로커의 ip 를 입력 하였습니다.
Distributed Mode Connect 실행
커넥트 실행은 다음과 같이 사용할 Mode의 properties 를 지정하여 실행(구동) 하며, 아래와 같이 daemon 형태로 실행 할 수 있습니다.
# 실행 cd /usr/local/kafka/ sudo ./bin/connect-distributed.sh -daemon ./config/connect-distributed.properties # 프로세스 확인 ps -ef| grep connect-distributed.properties # 포트 오픈 확인 sudo netstat -antp | grep 8083 tcp6 0 0 :::8083 :::* LISTEN 10109/java
프로세스 기동 및 포트 오픈을 확인 하였다면 커넥트 정보를 확인합니다.
$ curl -s localhost:8083 | jq (결과) { "version": "3.4.0", "commit": "2e1947d240607d53", "kafka_cluster_id": "fba56711edb564A43vv1" }
[참고] jq 패키지가 설치가 안되어 있을 경우 다음과 같이 설치 합니다.
• RPM 계열
# Linux version 7 sudo yum install jq # Linux Version 8 sudo dnf install jq
• Ubuntu
sudo apt install jq
커넥트에 존재하는 커넥터 플러그인 목록 및 버전 정보를 확인 합니다.
curl localhost:8083/connector-plugins | jq [ { "class": "io.confluent.connect.jdbc.JdbcSinkConnector", "type": "sink", "version": "10.6.0" }, { "class": "io.confluent.connect.jdbc.JdbcSourceConnector", "type": "source", "version": "10.6.0" }, { "class": "io.debezium.connector.mysql.MySqlConnector", "type": "source", "version": "2.1.2.Final" }, { "class": "org.apache.kafka.connect.mirror.MirrorCheckpointConnector", "type": "source", "version": "3.4.0" }, { "class": "org.apache.kafka.connect.mirror.MirrorHeartbeatConnector", "type": "source", "version": "3.4.0" }, { "class": "org.apache.kafka.connect.mirror.MirrorSourceConnector", "type": "source", "version": "3.4.0" } ]
커넥터 플러그인 확인시 설치한 debezium connector 와 JdbcSinkConnector/JdbcSourceConnector 가 리스트 업 되는 것 확인할 수 있으며 버전 또한 설치한 버전과 일치하는 것을 확인 할 수 있습니다.
Rest API 로 connector 생성
Rest api를 호출하여 connector를 생성 하도록 하겠습니다.
포스팅에서는 MySQL Source 인스턴스 2개를 사용함에 따라서 Source Debezium Connector 2개, Sink Connector 2개를 생성 하도록 하겠습니다.
먼저 Source connector 를 생성하도록 하겠습니다.
• 첫번재 MySQL 인스턴스를 대상으로 하는 커넥터
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ "name": "source-mysql-cdctestsrv1-db1", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3306", "database.user": "cdc", "database.password": "password123", "database.server.id": "47858934", "database.include.list": "db1", "include.schema.changes": "true", "include.schema.comments" : "true", "tombstones.on.delete": "true", "database.allowPublicKeyRetrieval": "true", "topic.prefix" : "cdctestsrv1", "heartbeat.interval.ms": "2000", "snapshot.mode": "when_needed", "snapshot.locking.mode": "none", "skipped.operations" : "none", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", "schema.history.internal.kafka.topic": "cdc-history-cdctestsrv1-db1", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "time.precision.mode": "connect" } }' | jq
커넥터 이름 : source-mysql-cdctestsrv1-db1
복제할 스키마 : database.include.list 를 사용하여 db1 으로 지정
• 두번재 MySQL 인스턴스를 대상으로 하는 커넥터
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ "name": "source-mysql-cdctestsrv2-db2", "config": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "localhost", "database.port": "3307", "database.user": "cdc", "database.password": "password123", "database.server.id": "4783456", "database.include.list": "db2", "include.schema.changes": "true", "include.schema.comments" : "true", "tombstones.on.delete": "true", "database.allowPublicKeyRetrieval": "true", "topic.prefix" : "cdctestsrv2", "heartbeat.interval.ms": "2000", "snapshot.mode": "when_needed", "snapshot.locking.mode": "none", "skipped.operations" : "none", "schema.history.internal.kafka.bootstrap.servers": "localhost:9092", "schema.history.internal.kafka.topic": "cdc-history-cdctestsrv2-db2", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "time.precision.mode": "connect" } }' | jq
커넥터 이름 : source-mysql-cdctestsrv2-db2
복제할 스키마 : database.include.list 를 사용하여 db2 으로 지정
Source Connector 생성시 다양한 properties 가 사용되고 있습니다. properties는 Debezium 플러그인을 이용하면서 추가로 사용할 수 있는 또는 Debezium의 필수 properties도 있고, Kafka Connect 공통 properties도 있습니다.
아래 2개의 문서를 참고하시면 됩니다.
• debezium.io/documentation/mysql
• kafka.apache.org/#connectconfigs
사용된 Properties
• database.server.id : 이 숫자는 데이터베이스 클라이언트 ID를 의미 하며, 접속한 MySQL 클러스터(데이터베이스)에서 고유 해야 합니다.(Unique ID of the connector)
MySQL 기준으로 Primary(Master) 서버와 Replica(Slave) 서버로 Replication을 구성할 때 각 서버별로 server_id를 다르게 설정하는 것과 동일하게 생성하게 되는 Connector도 사용중인 서버의 server_id와 중복되지 않은 고유의 ID 값을 지정해야 합니다. 초기 스냅샷 이후에 debezium의 DB 세션은 아래와 같으며, 기존의 MySQL의 Replica(Slave) 인스턴스에서 접속한 것과 동일한 상태임을 확인할 수 있습니다.
| 74364 | cdc | x.x.x.x:1234 | NULL | Binlog Dump | 27 | Master has sent all binlog to slave; waiting for more updates | NULL |
• heartbeat.interval.ms : 이 속성은 커넥터가 Kafka topic으로 heartbeat 메시지를 얼마나 자주 보내는지를 조절합니다. 기본 동작은 커넥터가 heartbeat 메시지를 보내지 않는 것입니다.
heartbeat 메시지는 커넥터가 데이터베이스에서 변경 사항 이벤트를 수신하는지 모니터링하는 데 유용합니다. heartbeat 메시지는 커넥터가 재시작될 때 재전송해야 할 변경 이벤트 수를 줄이는 데 도움이 됩니다.
해당 파라미터를 설정하지 않으면 Connector 를 재시작시에 변경량이 적은 테이블이 있는 토픽이 있는 경우 오래된 이전의 binlog 에서 부터 읽을 수 있어서 재시작시 Source 데이터베이스에 부하가 될 수 도 있습니다.
[관련 기술 블로그]
• include.schema.changes : 데이터베이스 스키마의 변경사항을 데이터베이스 서버 ID와 동일한 이름의 Kafka 토픽에 발행할지 여부를 지정하는 속성 입니다.
각 스키마 변경은 DDL 문이 포함된 값으로 데이터베이스 이름을 포함하는 키를 사용하여 기록됩니다. 이는 커넥터가 내부적으로 database schema history을 기록하는 방식과는 별도로 독립적으로 기록 됩니다.
• schema.history.internal.kafka.topic : include.schema.changes 속성 설정에 따른 스키마 변경사항(DDL)을 기록하는 것과 별도로 Debezium connector에서 별도로 스키마 변경 히스토리를 기록하는 토픽을 지정하는 속성 입니다.
• skipped.operations : CDC에서 생략할 Operation 종류를 선택하는 속성으로 insert/create , updates, delete , truncate 4개의 오퍼레이션에 대해서 설정할 수 있으며, 기본값으로 truncate 입니다. 그러므로 truncate 작업에 대해서 스키마 변경을 기록하는 토픽에 스트림 하기 위해서는 none 으로 설정해서 skip 하는 오퍼레이션을 모두 제외하면 됩니다.
REST API를 사용하면 현재 실행중인 connector의 태스크 상태, 커넥터 상태, 플러그인, 토픽 리스트 등을 확인 할 수 있습니다.
설정된 8083 포트로 호출 하여 사용할 수 있으며 HTTP 메소드 기반 API 를 지원 합니다.
| 요청 메소드 | 호출 경로 | 설명 | |----------|-----------------------------------------------------|------------------------------| | GET | / | 실행 중인 커넥트 정보확인 | | GET | /connectors | 실행 중인 커넥터 이름 확인 | | POST | /connectors | 새로운 커넥터 생성 요청 | | GET | /connectors/{커넥터 이름} | 실행중인 커넥터 정보 확인 | | GET | /connectors/{커넥터 이름}/config | 실행중인 커넥터의 설정값 확인 | | PUT | /connectors/{커넥터 이름}/config | 실행중인 커넥터 설정값 변경 요청 | | GET | /connectors/{커넥터 이름}/status | 실행중인 커넥터 상태 확인 | | POST | /connectors/{커넥터 이름}/restart | 실행중인 커넥터 재시작 요청 | | PUT | /connectors/{커넥터 이름}/pause | 커넥터 일시 중지 요청 | | PUT | /connectors/{커넥터 이름}/resume | 일시 중지된 커넥터 실행 요청 | | DELETE | /connectors/{커넥터 이름}/ | 실행 중인 커넥터 종료 | | GET | /connectors/{커넥터 이름}/tasks/{태스크 아이디}/status | 실행 중인 커넥터의 태스크 상태 확인 | | POST | /connectors/{커넥터 이름}/tasks/{태스크 아이디}/restart | 실행 중인 커넥터의 태스크 재시작 요청 | | GET | /connectors/{커넥터 이름}/topics | 커넥트에 존재하는 커넥터 플러그인 확인 | | PUT | /connector-plugins/ | 커넥트에 존재하는 커넥터 플러그인 확인 | | PUT | /connector-plugins/{커넥터 플러그인 이름}/config/validate | 커넥터 생성 시 설정값 유효 여부 확인 |
위에서 생성한 커넥터에 대한 상태 정보는 아래와 같이 확인 할 수 있습니다.
########### source-mysql-cdctestsrv1-db1 curl --location --request GET 'http://localhost:8083/connectors/source-mysql-cdctestsrv1-db1/status' > --header 'Content-Type: application/json' | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 182 100 182 0 0 30333 0 --:--:-- --:--:-- --:--:-- 30333 { "name": "source-mysql-cdctestsrv1-db1", "connector": { "state": "RUNNING", "worker_id": "10.0.2.15:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.2.15:8083" } ], "type": "source" } ########### source-mysql-cdctestsrv2-db2 curl --location --request GET 'http://localhost:8083/connectors/source-mysql-cdctestsrv2-db2/status' > --header 'Content-Type: application/json' | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 182 100 182 0 0 12133 0 --:--:-- --:--:-- --:--:-- 13000 { "name": "source-mysql-cdctestsrv2-db2", "connector": { "state": "RUNNING", "worker_id": "10.0.2.15:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.2.15:8083" } ], "type": "source" }
위에서 생성한 2개의 커넥터를 통해서 전달되는 레코드를 컨슘하기 위한 각각의 Sink 커넥터를 생성 하도록 하겠습니다.
• sink-postgres-sink-db1
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ "name": "sink-postgres-sink-db1", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://localhost:5432/sinkdb?currentSchema=db1", "connection.user": "debezium_postgres", "connection.password": "debezium_postgres123", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "tombstones.on.delete": "true", "insert.mode": "upsert", "pk.mode": "record_key", "table.name.format":"${topic}", "topics.regex": "cdctestsrv1.db1.(.*)", "offset.flush.interval.ms": "1000", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "transforms": "unwrap, route, TimestampConverter", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "none", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss", "transforms.TimestampConverter.target.type": "Timestamp", "transforms.TimestampConverter.field": "update_date" } }' | jq
• sink-postgres-sink-db2
curl --location --request POST 'http://localhost:8083/connectors' --header 'Content-Type: application/json' --data-raw '{ "name": "sink-postgres-sink-db2", "config": { "connector.class": "io.confluent.connect.jdbc.JdbcSinkConnector", "tasks.max": "1", "connection.url": "jdbc:postgresql://localhost:5432/sinkdb?currentSchema=db2", "connection.user": "debezium_postgres", "connection.password": "debezium_postgres123", "auto.create": "true", "auto.evolve": "true", "delete.enabled": "true", "tombstones.on.delete": "true", "insert.mode": "upsert", "pk.mode": "record_key", "table.name.format":"${topic}", "topics.regex": "cdctestsrv2.db2.(.*)", "offset.flush.interval.ms": "1000", "key.converter": "org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable": "true", "value.converter": "org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable": "true", "transforms": "unwrap, TimestampConverter", "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState", "transforms.unwrap.drop.tombstones": "false", "transforms.unwrap.delete.handling.mode": "none", "transforms": "unwrap, route, TimestampConverter", "transforms.route.type": "org.apache.kafka.connect.transforms.RegexRouter", "transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)", "transforms.route.replacement": "$3", "transforms.TimestampConverter.type": "org.apache.kafka.connect.transforms.TimestampConverter$Value", "transforms.TimestampConverter.format": "yyyy-MM-dd HH:mm:ss", "transforms.TimestampConverter.target.type": "Timestamp", "transforms.TimestampConverter.field": "update_date" } }' | jq
사용된 Properties
• auto.create : 테이블 객체가 존재 하지 않을 경우 자동 생성 옵션으로, 데이터 입력 시점에 테이블이 존재 하지 않았을 경우 자동 생성 여부에 관한 속성 입니다.
• auto.evolve : 데이터 입력 시점에서 테이블에 해당 컬럼이 없었을 경우 자동 추가에 관한 속성으로, 모든 테이블 변경(alter)는 아니며 ADD Column 에 대해서 자동 추가 지원 됩니다.
• delete.enabled : 레코드 값이 null 일 경우 삭제 처리 여부로 설명되어 있습니다. 이 속성을 true로 설정해서 사용하려면 pk.mode 속성이 record_key 이어야 합니다.
• pk.mode : 테이블의 PK설정에 관련된 속성으로 글에서는 record_key으로 설정 하였으며, record_key는 카프카 메세지의 key 값을 데이터베이스의 Primary Key로 사용함을 의미 합니다.
JDBC Sink Connector Configuration Properties 와 Debezium New Record State Extraction 문서 에서 추가적인 자세한 내용을 확인 할 수 있습니다.
생성한 커넥터의 삭제는 아래와 같은 명령어로 수행 합니다.
구문) curl --location --request DELETE 'http://localhost:8083/connectors/커넥터명' 예시) curl --location --request DELETE 'http://localhost:8083/connectors/sink-postgres-sink-db2'
생성한 Sink Connector 에 대해서 상태를 확인 합니다.
curl --location --request GET 'http://localhost:8083/connectors/sink-postgres-sink-db1/status' > --header 'Content-Type: application/json' | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 174 100 174 0 0 15818 0 --:--:-- --:--:-- --:--:-- 15818 { "name": "sink-postgres-sink-db1", "connector": { "state": "RUNNING", "worker_id": "10.0.2.15:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.2.15:8083" } ], "type": "sink" } curl --location --request GET 'http://localhost:8083/connectors/sink-postgres-sink-db2/status' > --header 'Content-Type: application/json' | jq % Total % Received % Xferd Average Speed Time Time Time Current Dload Upload Total Spent Left Speed 100 174 100 174 0 0 21750 0 --:--:-- --:--:-- --:--:-- 21750 { "name": "sink-postgres-sink-db2", "connector": { "state": "RUNNING", "worker_id": "10.0.2.15:8083" }, "tasks": [ { "id": 0, "state": "RUNNING", "worker_id": "10.0.2.15:8083" } ], "type": "sink" }
데이터 입력 및 토픽 확인
커넥터의 연결이 완료 되었다면 아래와 같은 예제 테이블 과 데이터를 입력해보도록 하겠습니다.
• 첫번째 MySQL 인스턴스에서 데이터 입력 - DB스키마명 : db1
mysql> use db1; mysql> create table employees_db1_1 ( emp_no int not null, birth_date date not null, first_name varchar(14) not null, last_name varchar(16) not null, gender enum ('M','F') not null, hire_date date not null, test_col_1 text, test_col_2 bigint, test_col_3 mediumint unsigned, primary key (emp_no) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; mysql> INSERT INTO `employees_db1_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26','A',1,1), (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21','B',2,2), (10003,'1959-12-03','Parto','Bamford','M','1986-08-28','C',3,3), (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01','D',4,4); mysql> INSERT INTO `employees_db1_1` VALUES (10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12','E',5,5);
• 두번째 MySQL 인스턴스에서 데이터 입력 - DB스키마명 : db2
-- 스키마 db2 mysql> use db2; mysql> create table employees_db2_1 ( emp_no int not null, birth_date date not null, first_name varchar(14) not null, last_name varchar(16) not null, gender enum ('M','F') not null, hire_date date not null, test_col_1 text, test_col_2 bigint, test_col_3 mediumint unsigned, primary key (emp_no) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; mysql> INSERT INTO `employees_db2_1` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26','A',1,1), (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21','B',2,2), (10003,'1959-12-03','Parto','Bamford','M','1986-08-28','C',3,3), (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01','D',4,4);
입력 후 잠시 후에 PostgreSQL 에서 아래와 같이 테이블 목록을 확인 합니다.
sinkdb=> dt+ List of relations Schema | Name | Type | Owner | Persistence | Access method | Size | Description --------+-----------------+-------+-------------------+-------------+---------------+-------+------------- db1 | employees_db1_1 | table | debezium_postgres | permanent | heap | 16 kB | db2 | employees_db2_1 | table | debezium_postgres | permanent | heap | 16 kB | (2 rows)
2개의 테이블이 복제되어 생성된 것을 확인 할 수 있습니다.
데이터 조회시에도 조회가 되는 것을 확인 할 수 있습니다.
sinkdb=> select * from db1.employees_db1_1; emp_no | birth_date | first_name | last_name | gender | hire_date | test_col_1 | test_col_2 | test_col_3 --------+------------+------------+-----------+--------+------------+------------+------------+------------ 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | A | 1 | 1 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | B | 2 | 2 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | C | 3 | 3 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | D | 4 | 4
한번 더 테이블 생성을 해보도록 하겠습니다.
-- 첫번째 인스턴스의 db1 스키마에서 실행 mysql> create table employees_db1_2 ( emp_no int not null, birth_date date not null, first_name varchar(14) not null, last_name varchar(16) not null, gender enum ('M','F') not null, hire_date date not null, test_col_1 text, test_col_2 bigint, test_col_3 mediumint unsigned, primary key (emp_no) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci; mysql> INSERT INTO `employees_db1_2` VALUES (10001,'1953-09-02','Georgi','Facello','M','1986-06-26','A',1,1), (10002,'1964-06-02','Bezalel','Simmel','F','1985-11-21','B',2,2), (10003,'1959-12-03','Parto','Bamford','M','1986-08-28','C',3,3), (10004,'1954-05-01','Chirstian','Koblick','M','1986-12-01','D',4,4);
PostgreSQL 에서 테이블 목록을 확인 합니다.
sinkdb=> dt List of relations Schema | Name | Type | Owner --------+-----------------+-------+------------------- db1 | employees_db1_1 | table | debezium_postgres db1 | employees_db1_2 | table | debezium_postgres db2 | employees_db2_1 | table | debezium_postgres (3 rows)
데이터 입력 및 업데이트 후 확인 해보도록 하겠습니다.
-- 첫번째 인스턴스의 db1 스키마에서 실행 mysql> use db1; mysql> INSERT INTO `employees_db1_1` VALUES (10005,'1955-01-21','Kyoichi','Maliniak','M','1989-09-12','E',5,5); mysql> update employees_db1_1 set first_name='aaaa111' where emp_no=10005;
PostgreSQL 에서 데이터를 확인해보겠습니다.
sinkdb=# select * from db1.employees_db1_1; emp_no | birth_date | first_name | last_name | gender | hire_date | test_col_1 | test_col_2 | test_col_3 --------+------------+------------+-----------+--------+------------+------------+------------+------------ 10001 | 1953-09-02 | Georgi | Facello | M | 1986-06-26 | A | 1 | 1 10002 | 1964-06-02 | Bezalel | Simmel | F | 1985-11-21 | B | 2 | 2 10003 | 1959-12-03 | Parto | Bamford | M | 1986-08-28 | C | 3 | 3 10004 | 1954-05-01 | Chirstian | Koblick | M | 1986-12-01 | D | 4 | 4 10005 | 1955-01-21 | aaaa111 | Maliniak | M | 1989-09-12 | E | 5 | 5
CDC 기능이 정상적으로 동작하는 것을 확인 할 수 있습니다.
토픽 목록 확인
토픽 목록 및 토픽 삭제는 kafka-topics.sh 을 사용 합니다.
• 토픽 목록 확인
cd /usr/local/kafka ./bin/kafka-topics.sh --list --bootstrap-server localhost:9092 __consumer_offsets cdc-history-cdctestsrv1-db1 cdc-history-cdctestsrv2-db2 cdctestsrv1 cdctestsrv1.db1.employees_db1_1 cdctestsrv1.db1.employees_db1_2 cdctestsrv2 cdctestsrv2.db2.employees_db2_1 connect-configs connect-offsets connect-status
위의 토픽 목록에서
• cdctestsrv1 와 cdctestsrv2 는 테이블의 DDL 수행 기록이 있는 토픽으로 Source connector 생성의 Rest API 구문에서 topic.prefix 에 지정된 명칭으로 토픽이 생성 됩니다.
• cdc-history-cdctestsrv1-db1 와 cdc-history-cdctestsrv2-db2 토픽은 Debezium Source Connector 를 사용할 경우 사용할 수 있는 속성 schema.history.internal.kafka.topic 에서 지정된 이름의 토픽으로 해당 토픽에서도 스키마 변경에 관한 기록, 즉 DDL 실행에 대한 정보를 담고 있는 토픽 입니다.
• cdctestsrv1.db1.employees_db1_1 와 같은 토픽이 변경 데이터 레코드에 관한 정보가 있는 토픽 입니다.
테이블 별로 토픽이 존재하는 것을 확인할 수 있습니다.
• 컨슈머 토픽 내용 확인
컨슈머 토픽을 확인 하기 위해서는 kafka-console-consumer.sh 를 사용 합니다.
cd /usr/local/kafka ./bin/kafka-console-consumer.sh > --topic cdctestsrv1.db1.employees_db1_1 > --bootstrap-server localhost:9092 > --from-beginning | jq { "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "int32", "optional": false, "field": "emp_no" }, { "type": "int32", "optional": false, "name": "org.apache.kafka.connect.data.Date", "version": 1, "field": "birth_dat < ... 중략 ... >
스키마 변경 사항 반영 관련
구성된 Kafka 파이프라인을 통해서 Source 에서 변경된 내용에 대해서 Target 에는 정상적으로 반영이 됩니다.
다만, PostgreSQL 로 복제를 반영하기 위해서 사용하고 있는 JDBC Sink Connector 에서는 DDL 이 제한적으로 지원 됩니다.
auto.create 속성의 사용으로 레코드가 insert 시 테이블이 존재하지 않는 경우 테이블을 생성할 수 있습니다.
auto.evolve 속성을 활성화 하게 되면 커넥터는 레코드를 만났을 때 컬럼이 누락된 것으로 확인되면 대상 테이블에 ALTER를 실행하여 제한적으로 DDL 을 수행할 수 있습니다. 하지만 데이터 타입 변경 및 컬럼 제거 등은 위험할 수 있기 때문에 커넥터는 테이블에서 이러한 변경사항(DDL) 은 적용 되지 않습니다.
그외 다른 DDL인 truncate table , drop table 등과 같은 DDL구문도 처리 되지 않습니다.
정리하면 CREATE TABLE 과 ALTER TABLE ADD COLUMN 이외에는 DDL이 지원이 되지 않습니다.
auto.evolve을 true 하였을 경우(글 예제에서는 true로 되어 있음) Source 에서 column을 rename 후에 바로 insert 구문이 시행이 되면 Target 에서는 column rename 이 된 것이 반영되지 않았기 때문에 새로운 컬럼(필드)에 value 정보를 받게 되고, Target 에서는 없는 컬럼으로 판단되어 새로운 컬럼을 추가하여 데이터를 입력하게 됩니다.
그래서 auto.evolve 를 사용할 경우 column rename에 대해서 이와 같이 유의할 필요가 있습니다.
그래서 DDL 의 경우 별도로 기록되고 있는 토픽을 컨슈밍하여 Target에 반영하는 프로그램(또는 배치Job)을 생성을 해서 사용하거나, Source 에서 DDL 을 수행하기 전에 Target 에서 먼저 그에 맞는 DDL 처리를 하는 절차적인 수립이 필요합니다.
토픽을 컨슈밍하여 처리할 경우 위의 "토픽 목록 확인" 내용에서 설명한 것과 같이 DDL 정보를 담고 있는 별도의 토픽을 활용하시면 됩니다.
Reference
Reference URL
• confluent.io/#kafka-connect
• confluent.io/current/platform
• debezium.io/architecture.html
• debezium.io/connectors/mysql.html
• debezium.io/connectors/mysql.html#setting-up-mysql
• debezium.io/connectors/mysql.html#mysql-connector-properties
• confluent.io/kafka-connectors/self-managed/userguide.html
• debezium.io/connectors/#_required_debezium_mysql_connector_configuration_properties
• confluent.io/connect/concepts.html
• confluent.io/connect/transforms/overview.html
• debezium.io/transformations/event-flattening.html
• confluent.io/kafka-connectors/sink-connector/overview.htm
• redhat.com/what-is-apache-kafka
연재 이전 글
다양한 소스에서 데이터 스트림을 처리하고 여러 컨슈머에게 전달하기 위해 설계되었습니다.
관련된 다른 글
Principal DBA(MySQL, AWS Aurora, Oracle)
핀테크 서비스인 핀다에서 데이터베이스를 운영하고 있어요(at finda.co.kr)
Previous - 당근마켓, 위메프, Oracle Korea ACS / Fedora Kor UserGroup 운영중
Database 외에도 NoSQL , Linux , Python, Cloud, Http/PHP CGI 등에도 관심이 있습니다
purityboy83@gmail.com / admin@hoing.io
저도 카프카에 발을 담가봅니다. ㅎㅎㅎ
굿~! 좋네요
화이팅 입니다.!
카테고리 맞나요?
(1)글과 카테고리가 달라요;;;
글 잘 보고 있습니다...
안녕하세요
카테고리 수정하였습니다.
감사합니다.
커넥터 생성을 할려고 하는데 카프카는 클러스터로 구성되어있습니다.
3노드중 한군데에다가 생성하면 되는건가요?
rest api로 커넥터를 만드는 것이라면
분산 모드(Distributed Mode)로 커넥트를 실행한 곳의 ip와 포트를 통해서
rest api를 수행해서 생성하시면 됩니다
postgresql sink jdbc connector에서 수정 부탁드립니다.
"transforms.route.regex": "([^.]+)\.([^.]+)\.([^.]+)",
에서
"transforms.route.regex": "([^.]+)\\.([^.]+)\\.([^.]+)",
로 수정해주세요.
안녕하세요
\\ 가 어떤 이유로 \ 으로 치환이 된 것 같네요
\\ 로 수정해두었습니다.
피드백 감사합니다.
source Connector의 worker_id(10.0.2.15:8083)와 Sink Connector의 worker_id(10.0.2.15:8083)의 ip와 port가 동일한 것으로 봐서 혹시 데이터 입력 및 토픽 확인의 절차는 동일서버에 mysql, postgresql, kafka 등이 모두 설치된 환경인가요?
Mysql, postgreSQL, Kafka를 각각 별도의 서버로 구성하고, 실습을 했더니 데이터 복제가 잘 안되네요. ㅠ
안녕하세요
여러 서버에 따로 구성하였을 경우 원활하게 진행되지 않은 원인으로는 다양한 이유가 있을 것 같습니다.
먼저 connector 의 로그에서 확인해볼수있는 특이한 에러 메세지가 있는지 살펴봐주시고
있다면 공유한번 해주세요
감사합니다.