일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | ||||||
2 | 3 | 4 | 5 | 6 | 7 | 8 |
9 | 10 | 11 | 12 | 13 | 14 | 15 |
16 | 17 | 18 | 19 | 20 | 21 | 22 |
23 | 24 | 25 | 26 | 27 | 28 |
- Spring
- 데이터베이스
- framework
- Elasticsearch
- 애자일기법
- 엘라스틱서치
- 읽기쉬운코드
- 애자일프로그래밍
- 백준
- 그리디알고리즘
- 코딩테스트
- 애자일
- 개발자
- 그리디
- cleancode
- 클린코드
- 스프링
- JPA
- 알고리즘
- Baekjoon
- Java
- 프레임워크
- API
- 개발
- database
- 코드
- 코딩
- 자바
- ES
- spring boot
- Today
- Total
튼튼발자 개발 성장기🏋️
카프카 상세 개념 본문
1. 토픽과 파티션
1.1 적정 파티션 개수
토픽은 카프카에서 데이터를 관리하는 기본 단위이며, 각 토픽은 여러 개의 파티션으로 구성된다. 파티션은 데이터의 분산을 가능하게 하며, 병렬 처리를 통해 성능을 최적화할 수 있다. 적정 파티션 개수를 결정하는 것은 매우 중요하다. 어떻게 설정하느냐에 따라서 성능을 좌우하기 때문이다. 적절한 파티션 개수는 다음 요소에 따라 결정된다.
- 프로듀서/컨슈머 수: 높은 병렬 처리를 위해 파티션 수를 컨슈머 스레드(혹은 프로세스) 수와 비슷하게 맞추는 것이 이상적이다.
- 데이터 처리량: 파티션 수가 많을수록 병목 현상이 줄어들며, 데이터 처리량이 증가한다.
- 브로커 수: 파티션은 브로커에 분산되므로 브로커의 수에 따라 파티션 수를 조정해야 한다.
- ISR(In-Sync Replicas)와 가용성: 파티션 수가 증가하면 ISR의 크기가 줄어들 수 있으며, 이에 따라 데이터 가용성에 영향을 미칠 수 있다.
프로듀서 전송 데이터량 < 컨슈머 데이터 처리량 x 파티션 개수
예를 들어,
프로듀서가 보내는 데이터가 초당 1,000 레코드이고, 컨슈머가 처리할 수 있는 데이터가 초당 100 레코드이면, 파티션 개수는 10개 적정 개수가 된다.
1.2 토픽 정리 정책(cleanup.policy)
카프카는 데이터 정리 정책으로 cleanup.policy를 제공한다. 이는 로그 세그먼트가 만료되었을 때 데이터를 어떻게 처리할지 결정하는 정책이다. 주요 옵션은 다음과 같다.
- delete: 기본 설정으로, 설정된 유지 기간(retention period)이 지나면 로그 세그먼트가 삭제된다. 이는 오래된 데이터를 자동으로 정리하여 디스크 사용량을 최적화한다.
- compact: 로그 압축(compaction)을 통해 같은 키를 가진 오래된 레코드를 삭제하고 최신 상태만 유지한다. 이는 주로 상태 저장(Stateful) 애플리케이션에서 유용하다.
토픽 삭제 정책(delete policy)
cleanup.policy 옵션을 delete로 설정함으로써 토픽 삭제를 활성화할 수 있다. 이 설정이 활성화된 경우, 관리자는 필요에 따라 토픽을 삭제하여 디스크 공간을 회수할 수 있다. 토픽의 데이터를 삭제할 때는 세그먼트 단위로 삭제된다. 세그먼트는 파티션마다 별개로 생성되며 세그먼트의 파일 이름은 오프셋 중 가장 작은 값이 된다. segment.bytes 옵션으로 1개의 세그먼트 크기를 정의할 수도 있다. segment.bytes 값보다 커질 경우에 기존에 적재하던 세그먼트 파일을 닫고 새로운 세그먼트를 열어서 데이터를 저장한다. 이 세그먼트를 액티브 세그먼트라고 한다.
삭제 정책이 실행되는 시점은 시간 또는 용량으로 나누어 설정할 수 있다. 시간 단위로 실행하고 싶다면 retention.ms 설정을 통해 밀리초 단위로 데이터를 유지하는 기간을 설정할 수 있다. 카프카는 일정 주기마다 세그먼트 파일의 마지막 수정 시간과 retention.ms 값을 비교하는데, 세그먼트 파일의 마지막 수정 시간이 retention.ms를 넘어가면 그 때 세그먼트가 삭제된다. 만약 용량 단위로 실행하고 싶다면 retention.bytes 설정을 통해 토픽의 데이터 크가가 retention.bytes 값을 넘어가면 세그먼트 파일을 삭제할 수 있도록 할 수 있다.
한 번 삭제된 데이터는 복구 불가능하다.
![](http://t1.daumcdn.net/tistory_admin/static/images/no-image-v1.png)
토픽 압축 정책(compact policy)
cleanup.policy 옵션을 compact로 설정함으로써 토픽 압축을 활성화 할 수 있다. 압축은 특정 조건에 따라 로그를 압축하여 같은 키를 가진 오래된 데이터를 삭제하고 최신 상태만 유지하게 한다. 이 방식은 주로 키-값 저장소와 같이 상태를 유지해야 하는 애플리케이션에 유용하며, 디스크 공간 절약과 빠른 데이터 조회가 가능하다.
압축의 시작 지점은 min.cleanable.dirty.ratio 옵션으로 설정할 수 있다. 이 옵션 값은 액티브 세그먼트를 제외한 세그먼트에 남아있는 데이터의 테일 영역과 헤드영역의 레코드 개수의 비율을 뜻한다.
- 테일 영역(Tail Section): 로그 압축이 적용된 부분으로, 주로 과거의 데이터를 포함하고 있다. 이 영역의 데이터는 압축이 완료되었기 때문에 케시지 키의 중복이 없다.
- 헤드 영역(Head Section): 아직 압축이 적용되지 않은 데이터가 포함되어 있기 때문에 메지지 키의 중복이 있을 수 있다. 헤드 영역은 데이터를 실시간으로 수신하는 부분이기 때문에, 데이터가 활발히 추가되고 수정된다. 이 영역은 나중에 테일 영역으로 이동하여 압축의 대상이 된다.
![](http://t1.daumcdn.net/tistory_admin/static/images/no-image-v1.png)
1.3 ISR(In-Sync-Replicas)
![](https://blog.kakaocdn.net/dn/bvX4LL/btsJfFEifv3/nCEbk5g4kuJOdu4FEodeu1/img.png)
ISR(In-Sync Replicas)은 현재 리더 파티션과 동기화된 모든 복제본을 나타낸다. 리더 파티션은 ISR 중 하나이며, ISR 목록은 복제된 레코드가 모두 성공적으로 기록된 브로커를 포함한다. 만약 리더가 실패하면, ISR 내의 다른 복제본이 리더로 승격된다. ISR 크기는 가용성과 성능 간의 균형을 맞추는 중요한 요소다.
unclean.leader.election.enable
이 설정은 ISR 내의 모든 복제본이 동기화되지 않은 상태에서 리더를 선출할지를 결정한다. unclean.leader.election.enable값이 true로 설정되면, ISR에 없는 복제본도 리더로 승격될 수 있다. 이는 가용성을 높이지만 데이터 손실의 위험이 있다. 반대로, false로 설정하면 데이터 손실을 피할 수 있지만, 가용성은 낮아질 수 있다.
2. 카프카 프로듀서
2.1 acks 옵션
카프카 첫 시간에 등장했던 옵션 값이다. 프로듀서가 레코드를 브로커에 전송할 때, acks 옵션은 메시지 전송의 확인 여부를 설정한다.
- acks=0: 프로듀서는 브로커의 응답을 기다리지 않다. 성능이 좋지만 데이터 손실 위험이 있다.
- acks=1: 리더 파티션이 데이터를 수신하면 응답한다. 기본 설정으로 적절한 성능과 내구성 간의 균형을 제공한다.
- acks=all(또는 -1): ISR의 모든 복제본이 데이터를 수신해야만 응답한다. 가장 높은 내구성을 제공하지만, 성능이 떨어질 수 있다.
- min.insync.replicas: 프로듀서가 데이터를 전송할 때, ISR 내에서 최소 몇 개의 복제본이 동기화되어 있어야 하는지를 설정한다. 예를 들어, acks=all과 함께 min.insync.replicas를 2로 설정하면, 적어도 두 개의 복제본이 데이터를 수신해야만 성공적으로 전송된 것으로 간주된다.
2.2 멱등성 프로듀서(idempotence producer)
카프카의 멱등성 프로듀서는 중복된 메시지를 방지하여 데이터를 한 번만 정확히 전달하는 것을 보장한다. enable.idempotence를 true로 설정하면 정확히 한번 적재하는 로직이 성립되기 위해 프로듀서의 일부 옵션들이 강제로 설정되는데, 프로듀서의 데이터 재전송 횟수를 정하는 retries는 기본값으로 Integer.MAX_VALUE로 설정되고 acks옵션은 all로 설정된다.
이를 위해 카프카는 각 메시지에 고유의 프로듀서 번호(PID)와 시퀀스 번호(SID)를 부여하여 중복 메시지를 감지한다. 이 기능은 중복 메시지 전송의 위험이 있는 상황에서 데이터의 일관성을 유지하는 데 매우 유용하다. [그림 4]와 [그림 5]를 비교해보면 쉽게 이해할 수 있다.
SID는 1씩 증가하게 되는데, 만약 일정하지 않은 경우에는 OutOfOrderSequenceException이 발생하된다. 따라서 SID가 일정하지 않았을 경우를 예외처리하여 고려해야한다.
![](https://blog.kakaocdn.net/dn/spqhu/btsJfO8UI8G/GapLioVK6XdjTor86GG70k/img.png)
![](https://blog.kakaocdn.net/dn/1uxi0/btsJfR5zY62/QNiAlSFabiNzuZshtBdYrk/img.png)
2.3 트랜잭션 프로듀서(transaction producer)
트랜잭션 프로듀서는 카프카에서 원자성(atomicity)을 보장하는 메시지 전송을 가능하게 한다. 여러 메시지를 하나의 트랜잭션으로 묶어, 모든 메시지가 성공적으로 전송되거나, 실패 시 모두 롤백되도록 한다. 이는 특히 금융 거래와 같이 데이터의 일관성과 무결성이 중요한 시스템에서 필수적인 기능이다.
트랜잭션 프로듀서는 트랜잭션 시작과 끝을 표현하기 위해 COMMIT이라는 트랜잭션 레코드를 한 개 더 보낸다. 이 또한 레코드 특성을 가지고 있기 때문에 파티션에 저장되어 오프셋을 한 개 차지한다.
![](https://blog.kakaocdn.net/dn/bcWkmx/btsJdQ1W1NX/OFEBi0siMkhKNpVrGgaa5K/img.png)
3. 카프카 컨슈머
3.1 멀티 스레드 컨슈머
![](https://blog.kakaocdn.net/dn/FRflH/btsJfD7ztkN/AbtjUdYsQK0UOcPGiRwgW0/img.png)
카프카는 기본적으로 각 컨슈머가 하나의 파티션만을 처리하도록 설계되어 있다. 그러나 멀티 스레드 환경에서 이를 효율적으로 활용하기 위해 여러 스레드로 파티션을 병렬로 처리할 수 있다. 이를 통해 데이터 처리량을 높이고, 소비자의 처리 능력을 최대화할 수 있다.
여러 프로세스를 사용할 것인지, 하나의 프로세스에서 여러 스레드를 사용할 것인지에 따라 고려해야할 부분이 많다. 멀티 스레드로 개발할 경우 하나의 컨슈머 스레드에서 예외가 발생할 경우 프로세스 자체가 종료될 수 있으며 다른 스레드에 영향이 갈 수 있다. 그렇게 되면 데이터 중복 처리라던가 데이터 유실까지도 이어질 수 있다.
3.2 카프카 컨슈머 멀티 워커 스레드 전략
멀티 워커 스레드 전략은 하나의 컨슈머 스레드가 여러 파티션으로부터 데이터를 가져오고, 별도의 워커 스레드가 이를 처리하는 구조다. 이 방식은 데이터 처리의 병목을 줄이고, 더 높은 처리량을 가능하게 한다. 각 워커 스레드는 독립적으로 작동하므로, 특정 워커에서 발생한 오류가 다른 워커의 처리에 영향을 미치지 않는다.
public class MultiWorkerConsumer {
private static final String TOPIC_NAME = "my-topic";
private static final String BOOTSTRAP_SERVERS = "localhost:9092";
private static final String GROUP_ID = "my-consumer-group";
private static final int NUM_WORKERS = 4;
public static void main(String[] args) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList(TOPIC_NAME));
ExecutorService executor = Executors.newFixedThreadPool(NUM_WORKERS);
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
executor.submit(() -> {
processRecord(record);
});
}
}
} finally {
consumer.close();
executor.shutdown();
}
}
private static void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Processing record with key: %s, value: %s, partition: %d, offset: %d%n",
record.key(), record.value(), record.partition(), record.offset());
}
}
3.3 카프카 컨슈머 멀티 스레드 전략
멀티 스레드 전략은 각 컨슈머 스레드가 독립적으로 파티션을 처리하는 방식이다. 이 전략은 병렬 처리를 극대화할 수 있지만, 각 스레드 간의 데이터 일관성을 유지하기 어려운 점이 있다. 이를 해결하기 위해 각 스레드가 동일한 파티션을 처리하지 않도록 파티션 할당을 주의 깊게 설계해야 한다.
KafkaConsumer 클래스는 Thread safe 하지 않기 때문에 스레드 별로 인스턴스를 만들어어 운영해야한다.
public class MultiThreadedConsumer implements Runnable {
private final KafkaConsumer<String, String> consumer;
public MultiThreadedConsumer(String bootstrapServers, String groupId, String topic) {
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
this.consumer = new KafkaConsumer<>(props);
this.consumer.subscribe(Collections.singletonList(topic));
}
@Override
public void run() {
try {
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
processRecord(record);
}
}
} finally {
consumer.close();
}
}
private void processRecord(ConsumerRecord<String, String> record) {
System.out.printf("Thread %s: Processing record with key: %s, value: %s, partition: %d, offset: %d%n",
Thread.currentThread().getName(), record.key(), record.value(), record.partition(), record.offset());
}
public static void main(String[] args) {
int numConsumers = 3;
String bootstrapServers = "localhost:9092";
String groupId = "my-consumer-group";
String topic = "my-topic";
for (int i = 0; i < numConsumers; i++) {
Thread thread = new Thread(new MultiThreadedConsumer(bootstrapServers, groupId, topic));
thread.start();
}
}
}
3.4 컨슈머 랙
![](https://blog.kakaocdn.net/dn/cVIkLM/btsJeP8HETM/uZkcJB8eYK8B61yjMVZZv0/img.png)
컨슈머 랙은 컨슈머가 현재 처리하고 있는 오프셋과, 카프카 로그의 끝 오프셋(즉, 가장 최신 메시지) 간의 차이다. 이는 컨슈머가 얼마나 실시간 데이터를 따라잡고 있는지를 보여주는 중요한 지표다. 랙이 크면 컨슈머가 데이터를 따라잡지 못하고 있음을 의미하며, 이에 대한 조치가 필요할 것이다.
컨슈머 랙을 조회하는 방법은 3가지가 있다.
- 카프카 명령어를 사용하여 컨슈머 랙 조회: kafka-consumer-groups.sh 명령어를 사용하여 특정 컨슈머 그룹의 오프셋 정보를 확인하고, 각 파티션에서의 랙을 계산할 수 있다. 예를 들어, 다음과 같이 사용할 수 있다.이 방법은 일회성이기 때문에 모니터링하기에는 부족한 점이 많다.
kafka-consumer-groups.sh --bootstrap-server <broker> --describe --group <consumer-group>
- 컨슈머 metrics() 메서드를 사용하여 컨슈머 랙 조회: 컨슈머 API를 통해 metrics() 메서드를 사용하여 컨슈머 랙을 조회할 수 있다. 이를 통해 프로그램 내에서 실시간으로 랙을 모니터링할 수 있으며, 이 정보를 기반으로 동적인 스케일링이나 경고 시스템을 구축할 수 있다. 이 방법은 컨슈머가 정상적으로 동작하고 있을 경우에만 확인이 가능하기 때문에 컨슈머가 다운되거나 정상적으로 동작하지 않을 경우에는 모니터링이 불가능하다.
- 외부 모니터링 툴을 사용하여 컨슈머 랙 조회: 외부 모니터링 툴은 컨슈머 랙을 시각화하고, 알림을 설정할 수 있는 강력한 기능을 제공한다. (카프카 버로우, 그라파나, 데이터독 등)
3.5 컨슈머 배포 프로세스
컨슈머의 배포는 안정성과 가용성을 보장하기 위해 신중하게 계획되어야 한다. 일반적으로 롤링 전략 및 블루/그린 배포 전략이나 카나리 배포 전략을 사용하여 새로운 버전의 컨슈머를 단계적으로 배포한다. 이 과정에서 트래픽을 모니터링하고, 문제가 발생하면 빠르게 롤백할 수 있는 메커니즘을 마련하는 것이 중요하다.
중단 배포
중단 배포는 기존 컨슈머를 완전히 중지한 후 새로운 버전의 컨슈머를 배포하는 방식이다. 이 방식은 간단하지만, 배포 과정에서 잠시 동안 서비스가 중단될 수 있는 위험이 있다. 따라서 이를 최소화하기 위해 배포 시간을 신중히 선택하고, 중단 시간을 최소화하는 것이 중요하다.
이 배포 방식은 유연한 인스턴스 발급이 어려운 물리 장비에서 운영하는 경우 등 한정된 서버 자원을 운영할 때 적합하다. 기존 컨슈머 애플리케이션이 완전이 종료되면 더는 토픽의 데이터를 가져갈 수 없기 때문에 컨슈머 랙이 늘어날 것이다. 그 말은 즉슨 지연이 발생한다는 이야기가 된다. 이 상황에서 신규 컨슈머 애플리케이션 배포가 늦어지면 컨슈머 랙이 증가하고 서비스 지연으로까지 이어질 수 있다.
무중단 배포
무중단 배포는 서비스 중단 없이 새로운 버전의 컨슈머를 배포하는 전략이다. 이를 위해 롤링 업데이트나 블루/그린 배포 전략 등을 사용할 수 있다. 무중단 배포는 높은 가용성이 요구되는 시스템에서 필수적이며, 사용자가 서비스 중단을 느끼지 않도록 설계되어야 한다.
![](https://blog.kakaocdn.net/dn/vBqIP/btsJfQeBjGj/1euXblQCF50dKki4anC7lk/img.png)
![](https://blog.kakaocdn.net/dn/kE6us/btsJd2OyFUQ/5T1cn16OOlcKqKsuA1225K/img.png)
![](https://blog.kakaocdn.net/dn/dcfj5Q/btsJd7WGAWE/Jlswpb7ZGIhacsKiiMUYXk/img.png)
'기타 > apache kafka' 카테고리의 다른 글
Kafka MirrorMaker 2 (0) | 2024.08.17 |
---|---|
카프카 커넥트 (0) | 2024.08.17 |
카프카 스트림즈 (0) | 2024.08.11 |
카프카 클라이언트 (0) | 2024.08.10 |
아파치 카프카란? (0) | 2024.08.05 |