튼튼발자 개발 성장기🏋️

Kafka CDC: MySQL to MongoDB 동기화 #2 본문

기타/apache kafka

Kafka CDC: MySQL to MongoDB 동기화 #2

시뻘건 튼튼발자 2025. 7. 8. 19:41
반응형

이전에는 CDC 기반 Kafka를 통한 데이터 동기화의 동작 원리를 알아보았다.

이번에는 Kafka와 Kafka connect를 Dockerizing해보겠다.

 

들어가기에 앞서 source connector에 사용될 사용자는 아래와 같은 권한이 필요하다.

grant select, reload, replication client, replication slave on *.* to '{username}'@'%';

 

아래 docker-compose.yaml 파일 내용을 살펴보자

services:
  kafdrop:
    image: obsidiandynamics/kafdrop:latest
    container_name: kafdrop
    restart: "always"
    ports:
      - "9000:9000"
    environment:
      KAFKA_BROKERCONNECT: "kafka-1:9092,kafka-2:9092,kafka-3:9092"
    depends_on:
      - "kafka-1"
      - "kafka-2"
      - "kafka-3"

  kafka-1:
    container_name: kafka-1
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/arm64
    ports:
      - "${KAFKA_1_PORT}:9092"
    environment:
      KAFKA_NODE_ID: 1
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-1:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_1_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_1_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  kafka-2:
    container_name: kafka-2
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/arm64
    ports:
      - "${KAFKA_2_PORT}:9093"
    environment:
      KAFKA_NODE_ID: 2
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-2:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_2_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_2_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  kafka-3:
    container_name: kafka-3
    image: confluentinc/cp-kafka:7.7.1
    platform: linux/arm64
    ports:
      - "${KAFKA_3_PORT}:9094"
    environment:
      KAFKA_NODE_ID: 3
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka-3:29092,EXTERNAL://${DOCKER_HOST_IP}:${KAFKA_3_PORT}
      KAFKA_LISTENERS: INTERNAL://0.0.0.0:29092,CONTROLLER://0.0.0.0:29093,EXTERNAL://0.0.0.0:${KAFKA_3_PORT}
      KAFKA_PROCESS_ROLES: broker,controller
      KAFKA_CONTROLLER_QUORUM_VOTERS: 1@kafka-1:29093,2@kafka-2:29093,3@kafka-3:29093
      KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL
      KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
      CLUSTER_ID: ${KAFKA_CLUSTER_ID}
      KAFKA_LOG_DIRS: /var/lib/kafka/data
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: true

  kafka-connect:
    container_name: kafka-connect
    image: debezium/connect:2.7.3.Final
    platform: linux/arm64
    ports:
      - "${DEBEZIUM_PORT}:8083"
    environment:
      REST_PORT: 8083
      REST_LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
      GROUP_ID: ${KAFKA_CLUSTER_ID}
      BOOTSTRAP_SERVERS: kafka-1:29092,kafka-2:29092,kafka-3:29092
      REST_ADVERTISED_LISTENERS: http://${DOCKER_HOST_IP}:${DEBEZIUM_PORT}
      LISTENERS: http://0.0.0.0:${DEBEZIUM_PORT}
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      OFFSET_FLUSH_INTERVAL_MS: 10000
      CONFIG_STORAGE_TOPIC: connect-configs
      OFFSET_STORAGE_TOPIC: connect-offsets
      STATUS_STORAGE_TOPIC: connect-status
      CONFIG_STORAGE_REPLICATION_FACTOR: 3
      OFFSET_STORAGE_REPLICATION_FACTOR: 3
      STATUS_STORAGE_REPLICATION_FACTOR: 3
      LOG4J_LOGGERS: org.apache.kafka.connect.runtime.WorkerSinkTask=DEBUG
      #LOG4J_ROOT_LOGLEVEL: INFO
    volumes:
      - ${PLUGINS_DIR}:/kafka/connect/plugins
    depends_on:
      - kafka-1
      - kafka-2
      - kafka-3

 

  1. Kafdrop
    • Kafka 브로커를 모니터링할 수 있는 웹 기반 UI.
    • kafka-1, kafka-2, kafka-3 이후에 실행
    • "kafka-1:9092,kafka-2:9092,kafka-3:9092" (Kafka 3개 브로커와 연결)
  2. kafka-1, kafka-2, kafka-3
    1. 3노드 Kafka 브로커 클러스터(최신 KRaft 모드 기반, ZooKeeper 미사용)
    2. environment
      1. KAFKA_NODE_ID
        • 브로커/컨트롤러 id (1, 2, 3)
      2. KAFKA_LISTENER_SECURITY_PROTOCOL_MAP, ADVERTISED_LISTENERS, LISTENERS
        • INTERNAL, CONTROLLER, EXTERNAL 세 가지 리스너 설정
        • INTERNAL: 브로커 간 통신
        • CONTROLLER: 컨트롤러 전용 통신
        • EXTERNAL: 외부 접근용(호스트 IP, 환경변수로 지정)
      3. KAFKA_PROCESS_ROLES
        • KRaft 모드에서 브로커와 컨트롤러
      4. KAFKA_CONTROLLER_QUORUM_VOTERS
        • 컨트롤러 선택 > 3개의 노드
  3. Kafka Connect
    1. Kafka로 다양한 소스(DB 등)와 싱크 연동/동기화용, Debezium 확장
    2. environment
      1. REST_LISTENERS/PORT/ADVERTISED_LISTENERS
        1. Kafka Connect REST API 설정
      2. BOOTSTRAP_SERVERS
        1. kafka-1,2,3 브로커와 연결 (포트 29092로 내부 통신)

이제 컨테이너를 실행해보자.

docker-compose -p kafka up -d
docker ps

 

[그림 1] docker containers

[그림 1]과 같이 컨테이너가 실행되었다면 성공이다. 이젠 source connector와 sync connector를 custom해서 등록해주기만하면 된다. io.debezium.connector.mysql.MySqlConnectorcom.mongodb.kafka.connect.MongoSinkConnector 를사용해도 되지만, 현업에서는 커스텀으로부터 로깅과 대응을 쉽게 하기 때문에 커스텀 방법을 알아야한다. 어떻게 동작하는지 살펴볼겸 커스텀 해보자.

 

Source Connector

class CDCSourceConnector: MySqlConnector() {

    override fun start(props: Map<String, String>) {
        super.start(props)
        logger.info { "starting CDC source connector!!!" }
    }

    override fun taskClass(): Class<out Task> {
        return CDCSourceTask::class.java
    }

    override fun stop() {
        logger.info { "stopped CDC source connector!!!" }
    }

    companion object {
        private val logger = KotlinLogging.logger {}
    }
}

 

크게 start, stop 그리고 taskClass 함수 3가지만 커스텀하면 된다. 물론 커스텀을 원치 않으면 version 함수처럼 override하지 않으면 된다.

 

  1. start(): 소스 커넥터가  최초 구동시 실행된다.
  2. stop(): 소스 커넥터가  종료시 실행된다.
  3. taskClass(): 소스 커넥터 내에 실행할 task를 리턴한다. 아래에서 살펴보겠지만 이 역시 커스텀한다.

참고로 카프카 커넥트에 대해서 설명한 적이 있는데 한 번 살펴보면 좋다.

class CDCSourceTask : MySqlConnectorTask() {

    override fun doPoll(): List<SourceRecord> {
        val records = super.doPoll()
        for (record in records) {
            try {
                // 이벤트 캡처
                val value = record.value()
                if (value is Struct) {
                    val opField = value.schema().fields().find { it.name() == "op" }
                    if (opField == null) {
                        continue
                    }
                    val op = value.getString("op")
                    val before = value.getStruct("before")
                    val after = value.getStruct("after")

                    val beforeMap = before?.schema()?.fields()?.associate { it.name() to before.get(it) } ?: emptyMap()
                    val afterMap = after?.schema()?.fields()?.associate { it.name() to after.get(it) } ?: emptyMap()

                    val diff = when {
                        op == "u" && before != null && after != null -> {
                            beforeMap.keys.union(afterMap.keys).mapNotNull { k ->
                                val oldVal = beforeMap[k]
                                val newVal = afterMap[k]
                                if (oldVal != newVal) "$k: '$oldVal' -> '$newVal'" else null
                            }
                        }
                        op == "c" && after != null -> afterMap.map { (k, v) -> "$k: [NEW] '$v'" }
                        op == "d" && before != null -> beforeMap.map { (k, v) -> "$k: [DELETED] '$v'" }
                        else -> emptyList()
                    }

                    // 데이터베이스/테이블 추출
                    val topic = record.topic()
                    val topicParts = topic.split(".")
                    val dbName = topicParts.getOrNull(1) ?: "unknown_db"
                    val tableName = topicParts.getOrNull(2) ?: "unknown_table"

                    // PK 추출
                    val keyStruct = record.key()
                    val pkValues: Map<String, Any?> = if (keyStruct is Struct) {
                        keyStruct.schema().fields().map { field ->
                            field.name() to keyStruct.get(field)
                        }.toMap()
                    } else {
                        mapOf("key" to keyStruct?.toString())
                    }
                    val pk = pkValues.entries.joinToString(", ") { "${it.key}=${it.value}" }

                    val eventDesc = when (op) {
                        "c" -> "INSERT"
                        "u" -> "UPDATE"
                        "d" -> "DELETE"
                        else -> "OTHER"
                    }

                    logger.info {
                        "DB: [$dbName], TABLE: [$tableName], PK: [${pk}], EVENT: [$eventDesc], CHANGES: ${diff.joinToString(", ")}"
                    }
                }
            } catch (e: Exception) {
                logger.warn(e) { "Failed to process SourceRecord for CDC logging" }
            }
        }
        return records
    }

    companion object {
        private val logger = KotlinLogging.logger {}
    }
}

 

 

  1. doPoll(): Kafka Connect Source Task의 이벤트 폴링 메서드다.

코드를 보면 어떤 역할을 하고 있는지 알 수 있지만 간략하게나마 덧붙이겠다.

  • SourceRecord의 값이 Struct 타입이면, Debezium의 standard CDC 이벤트로 판단.
    op 필드 (operation)로부터 이벤트 타입 체크
    - u : update
    - c : create (insert)
    - d : delete
  • before/after 필드로 변경 전후 데이터를 추출.
  • 데이터베이스/테이블명 및 PK 추출
    • Kafka 토픽명(record.topic())은 기본적으로 server.db.table 형태
    • 토픽명으로부터 db명과 table명을 추출
    • SourceRecord의 key가 Struct이면 각 key 필드값 추출 (PK)

이렇게하면 이벤트가 있을 때마다 어떤 데이터가 어떤 이벤트로 어떻게 변경되었는지 로그를 통해 알 수 있다.

DB: [db], TABLE: [table], PK: [key1=키값1, ...], EVENT: [이벤트], CHANGES: [변경리스트]

 

 

Sink Connector

class CDCSinkConnector: MongoSinkConnector() {

    override fun start(props: Map<String, String>) {
        super.start(props)
        logger.info { "started CDC sink connector!!!" }
    }

    override fun taskClass(): Class<out Task> = CDCSinkTask::class.java

    override fun stop() {
        logger.info { "stopped CDC sink connector!!!" }
    }

    companion object {
        private val logger = KotlinLogging.logger {}
    }
}

 

싱크 커넥터는 소스 커넥터와 내용일 동일하니 스킵하겠다.

 

class CDCSinkTask: MongoSinkTask() {

    override fun start(props: Map<String?, String?>?) {
        super.start(props)
        logger.info { "started CDC sink task!!!" }
    }

    override fun put(records: Collection<SinkRecord>) {
        for (record in records) {
            val topic = record.topic()
            val topicParts = topic.split(".")

            val db = topicParts.getOrNull(1) ?: "unknown_db"
            val collection = topicParts.getOrNull(2) ?: "unknown_collection"

            logger.info { """
                [CDC SYNC]
                Database   = $db
                Collection = $collection
                Key        = ${record.key()}
                Value      = ${record.value()}
            """.trimIndent() }
        }

        super.put(records)
    }

    override fun stop() {
        logger.info { "stopped CDC sink task!!!" }
    }

    companion object {
        private val logger = KotlinLogging.logger {}
    }
}

 

  1. put(): 카프카 토픽에 메시지가 쌓였을 때 Consume해서 recode를 처리한다.

데이터 싱크를 맞추는 과정에서 sink connector가 받은 record에 대해서 어떤 변화가 있는지 로깅했다. mongoDB의 어떤 데이터베이스의 어떤 컬렉션의 어떤 데이터가 영향이 갔는지를 로그를 통해 알 수 있다.

 

connector 등록

커스텀한 두 커넥터를 이제 kafka-connect에 등록해주기만하면 준비 끝이다. 커넥터를 등록할 때는 http client를 통해 등록하는 것을 권장하므로 curl을 가지고 등록한다.

우선 아래와 같이 kafka-connect에 등록할 두 커넥터에 대해서 request body로 보낼 json을 생성해두자.

// cdc-mysql-source-connector.json
{
  "name": "cdc-mysql-source-connector",
  "config": {
    "connector.class": "org.smallgoliath.example.sourceconnector.CDCSourceConnector",
    "tasks.max": "1",
    "connect.timeout.ms": "5000",
    "database.hostname": "x.x.x.x",
    "database.port": "3306",
    "database.user": "xxxxx",
    "database.password": "xxxxx",
    "database.server.id": "1",
    "database.include.list": "kafkaCDC",
    "table.include.list": "kafkaCDC.sns_raise_user,kafkaCDC.user_action_verification",
    "database.history.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092",
    "database.history.kafka.topic": "schema-changes.kafkaCDC",
    "topic.prefix": "cdc",
    "schema.history.internal.kafka.topic": "schema-changes.internal.kafkaCDC",
    "schema.history.internal.kafka.bootstrap.servers": "kafka-1:29092,kafka-2:29092,kafka-3:29092",
    "snapshot.mode": "initial"
  }
}

// cdc-mongodb-sink-connector.json
{
  "name": "cdc-mongo-sink-connector",
  "config": {
    "connector.class": "org.smallgoliath.example.sinkconnector.CDCSinkConnector",
    "topics.regex": "cdc\\.\\w+\\.\\w+$",
    "connection.uri": "mongodb://root:111111@10.2.114.35:27017",
    "database": "kafkacdc",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false
  }
}

 

이제 아래와 같이 curl을 실행하면 커넥터가 등록된다.

curl -v -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @cdc-mysql-source-connector.json

curl -v -X POST http://localhost:8083/connectors \
-H "Content-Type: application/json" \
-d @cdc-mongodb-sink-connector.json

 

테스트

이제 여러가지 테스트를 해볼 수 있다.

  1. 기본적인 데이터 동기화
    1. 스냅샷: 커넥터 등록 이후 기존 데이터 싱크 여부(O)
    2. 이벤트: 이벤트로부터 변경되는 데이터 싱크 여부(O)
  2. 커넥터가 shutdown 되었을 때 동작 확인
    1. restart 이후 shutdown되었을 때 일어난 이벤트에 대한 데이터 싱크 여부(O)
    2. restart 이후 이벤트에 대한 데이터 싱크 여부(O)
  3. kafka가 shutdown 되었을 때 동작 확인
    1. 1개의 node가 shutdown 되었을 때
      1. restart 이후 shutdown되었을 때 일어난 이벤트에 대한 데이터 싱크 여부(O)
      2. restart 이후 이벤트에 대한 데이터 싱크 여부(O)
    2. 모든 node가 shutdown 되었을 때
      1. restart 이후 shutdown되었을 때 일어난 이벤트에 대한 데이터 싱크 여부(X)
      2. restart 이후 이벤트에 대한 데이터 싱크 여부(O)

 

위 테스트 케이스에 대해서 결과가 왜 그런지는 이전 글을 자세히 살펴보면 알 수 있다.

 

예제

Github 참고

반응형

'기타 > apache kafka' 카테고리의 다른 글

Kafka CDC: MySQL to MongoDB 동기화 #1  (0) 2025.07.05
카프카 상세 개념  (1) 2024.08.25
Kafka MirrorMaker 2  (0) 2024.08.17
카프카 커넥트  (1) 2024.08.17
카프카 스트림즈  (0) 2024.08.11