인생을 코딩하다.

Kafka에서 파티션 증가 없이 동시 처리량을 늘리는 방법 - Parallel Consumer 본문

Kafka

Kafka에서 파티션 증가 없이 동시 처리량을 늘리는 방법 - Parallel Consumer

Hyung1 2025. 1. 31. 00:14
728x90
반응형

보통 Kafka 사용 시 처리량을 늘리기 위해 컨슈머와 파티션 수를 늘린 후 병렬처리를 통해 처리량을 증가시킵니다. 하지만 저는 처리량을 늘릴 때마다 파티션을 추가하는 것이 최선인지에 대한 의문을 가지고 있었습니다. 파티션을 늘리는 대신, 한 파티션에서 단일 메시지가 아니라 여러 개의 메시지를 가져와 병렬 처리하면, 파티션 개수를 증가시키지 않고도 성능을 높일 수 있지 않을까 하는 고민을 해왔습니다. 이러한 고민을 하게 된 이유는 아래와 같습니다.
 

  1. 인프라 비용 증가
    • 파티션이 많아질수록 추가적인 브로커가 필요할 가능성이 높아지고 브로커 노드 수 증가에 따른 서버 비용, 네트워크 비용 등이 상승할 수 있습니다.
  2. 브로커 파일 시스템 리소스 사용량 증가
    • Kafka 브로커는 각 파티션별로 데이터를 저장합니다. 데이터 정보(.log 파일)뿐만 아니라 메타 정보(.index, .timeindex, .snapshot 파일)도 함께 저장되며, 파티션 수가 증가할수록 파일 오픈 비용, 디스크 사용량이 증가합니다.
  3. 장애에 더 취약한 구조
    • 단일 브로커에 많은 파티션 리더가 배치되면, 브로커 장애 또는 재시작 시 영향받는 범위가 넓어질 수 있습니다.
  4. 복제 비용 증가
    • 파티션 단위로 설정된 replicas 수만큼 복제가 이루어지므로, 복제 과정에서 디스크 사용량 증가 및 레이턴시 증가가 발생합니다
 

처리량을 높이기 위해 무조건 파티션을 늘려 인프라 비용을 증가시키기보다는, 기존 파티션 수를 유지한 채 하나의 파티션에서 여러 개의 메시지를 조회하는 방식으로 불필요한 비용 증가를 방지할 수 있다고 판단하였습니다. 또한 파티션 수가 적은 환경에서는 이러한 문제가 크지 않을 수 있지만, 과도하게 증가한 환경에서는 심각한 성능 저하로 이어질 수 있다고 생각하였습니다.

 
이 문제를 해결할 방법을 찾던 중, Confluent Inc.에서 개발한 오픈소스 라이브러리인 Parallel Consumer를 발견하였고, 이를 활용한 PoC(Proof of Concept)를 진행해보았습니다.

개요

Parallel Consumer는 Confluent Inc.에서 개발한 오픈소스 라이브러리로, 단일 파티션 내에서 여러 컨슈머 스레드를 활용하여 동시 처리량을 증가시키는 기능을 제공합니다. 일반적으로 Kafka에서는 하나의 파티션이 하나의 컨슈머 스레드에 의해 처리되지만, Parallel Consumer를 사용하면 파티션 개수를 증가시키지 않고도 병렬 처리 성능을 향상시킬 수 있습니다. 해당 글에서는 Parallel Consumer의 내부 구조를 분석하고, 연동 및 개발 과정에서 수행한 PoC(Proof of Concept) 결과를 정리하였습니다.

아래는 기존 kafka Consumer(위)과 Parallel Kafka Consumer(아래)를 비교한 그림입니다.

Parallel Consumer의 주요 특징

  1. 단일 파티션 내 동시성 증가
    • 기존 Kafka의 소비 모델은 하나의 파티션이 단일 컨슈머 스레드에 의해 처리됩니다.
    • Parallel Consumer는 하나의 파티션 내에서 여러 개의 워커 스레드가 메시지를 병렬로 처리할 수 있습니다.
  2. 오프셋 관리 개선
    • Kafka의 기본 컨슈머 모델에서는 특정 레코드가 처리되지 않으면 다음 레코드로 진행할 수 없습니다.
    • Parallel Consumer는 개별 레코드 단위로 오프셋을 관리하여 특정 메시지가 지연되더라도 다른 메시지를 먼저 처리할 수 있습니다.
  3. 순서 보장 옵션 제공
    • 메시지 처리 순서를 엄격하게 유지해야 하는 경우, 특정 키 또는 파티션 단위로 순서를 보장하는 옵션을 제공합니다.
      • KEY: 동일 키의 메시지 순서를 보장.
      • PARTITION: 파티션 내 순서만 보장.
      • UNORDERED: 순서 보장 없이 병렬 처리.
    • 비순차 처리가 가능한 경우에는 높은 처리량을 달성할 수 있도록 최적화 되어 있습니다.

좀 더 자세한 내용은 해당 글을 참고해주세요.

조건

  • 1 ~ 100 사이의 key를 가진 message 100개를 생성하고 topic에 send 합니다.
@RestController
class UserController(
    private val userMessageProducer: UserMessageProducer,
) {

    @PostMapping("/send")
    fun send() {
        generateUserMessages().forEach { userMessageProducer.sendMessage(it) }
    }

    @PostMapping("/parallel-send")
    fun parallelSend() {
        generateUserMessages().forEach { userMessageProducer.sendParallelMessage(it) }
    }

    @PostMapping("/batch-parallel-send")
    fun batchParallelSend() {
        generateUserMessages().forEach { userMessageProducer.sendBatchParallelMessage(it) }
    }

    private fun generateUserMessages() = (1..100).map { index ->
        UserMessage(
            id = index.toLong(),
            age = index.toLong(),
            name = "name$index",
        )
    }
}
  • kafkaListener, ParallelKafkaListener(단 건, 배치)를 통해 topic의 message를 consume 합니다.
    • 해당 글에서는 배치 consume은 테스트 하지 않습니다.
  • 명확한 처리 시간 비교를 위해 message consume후 Thread.sleep(1000)을 걸어줍니다.
  • ParallelKafkaListener의 ordering는 UNORDERED로 지정합니다.
@Component
class UserMessageConsumer(
    private val objectMapper: ObjectMapper,
) {

    @KafkaListener(
        topics = [USER_TOPIC],
        groupId = "user-consumer-group",
        concurrency = "2"
    )
    fun listen(
        record: ConsumerRecord<String, String>,
        acknowledgment: Acknowledgment,
    ) {
        try {
            val message = objectMapper.readValue(record.value(), UserMessage::class.java)
            log.info("[Main Consumer(${Thread.currentThread().id})] Message arrived! - $message")
            Thread.sleep(1000)
            acknowledgment.acknowledge()
        } catch (e: InterruptedException) {
            e.printStackTrace()
            log.info(e.message)
        }
    }

    @KafkaParallelListener(
        topics = [PARALLEL_USER_TOPIC],
        groupId = "parallel-user-consumer-group",
        concurrency = 2,
        ordering = ParallelConsumerOptions.ProcessingOrder.UNORDERED
    )
    fun listen(
        record: ConsumerRecord<String, String>,
    ) {
        try {
            val message = objectMapper.readValue(record.value(), UserMessage::class.java)
            log.info("[Thread ${Thread.currentThread().id}] Partition ${record.partition()} - Message arrived! - $message")
            Thread.sleep(1000)
        } catch (e: InterruptedException) {
            e.printStackTrace()
            log.info(e.message)
        }
    }

    @KafkaParallelListener(
        topics = [BATCH_PARALLEL_USER_TOPIC],
        groupId = "batch-parallel-user-consumer-group",
        concurrency = 1,
        batchSize = 10,
        ordering = ParallelConsumerOptions.ProcessingOrder.UNORDERED
    )
    fun batchListen(
        records: List<ConsumerRecord<String, String>>,
    ) {
        try {
            val messages = records.map { it.value() }
            log.info("[Main Consumer(${Thread.currentThread().id})] Messages arrived! - $messages")
            Thread.sleep(1000)
        } catch (e: InterruptedException) {
            e.printStackTrace()
            log.info(e.message)
        }
    }

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }
}

Kafka Parallel Consumer 라이브러리는 @Listener 같은 어노테이션을 지원해주지 않아서 직접 개발하였습니다.

@Target(AnnotationTarget.CLASS, AnnotationTarget.FUNCTION, AnnotationTarget.ANNOTATION_CLASS)
@Retention(AnnotationRetention.RUNTIME)
@MessageMapping
@MustBeDocumented
annotation class KafkaParallelListener(
    val topics: Array<String>,
    val groupId: String = "",
    val concurrency: Int = ParallelConsumerOptions.DEFAULT_MAX_CONCURRENCY,
    val batchSize: Int = 1,
    val ordering: ParallelConsumerOptions.ProcessingOrder = ParallelConsumerOptions.ProcessingOrder.KEY,
    val clientIdPrefix: String = "",
    val clientIdSuffix: String = "",
)
class KafkaParallelStreamProcessorFactory<K, V> {

    fun createParallelStreamProcessor(
        kafkaConsumerFactory: ConsumerFactory<K, V>, // Kafka Consumer를 생성하는 팩토리
        topics: Array<String>, // 구독할 Kafka 토픽 목록
        ordering: ParallelConsumerOptions.ProcessingOrder, // 메시지 처리 순서 설정 (UNORDERED, PARTITION, KEY)
        concurrency: Int, // 병렬로 처리할 최대 동시 실행 개수
        batchSize: Int, // 한 번에 가져올 메시지 배치 크기 (1 이상일 경우 배치 처리)
        groupId: String, // Kafka Consumer Group ID (같은 그룹이면 메시지를 분배하여 처리)
        clientIdPrefix: String, // Kafka Consumer의 Client ID 접두사 (개별 Consumer 식별용)
        clientIdSuffix: String, // Kafka Consumer의 Client ID 접미사 (여러 개의 Consumer 구별용)
    ): ParallelStreamProcessor<K, V> {
        val options: ParallelConsumerOptions<K, V> = ParallelConsumerOptions.builder<K, V>()
            .ordering(ordering)
            .maxConcurrency(concurrency)
            .batchSize(batchSize)
            .consumer(
                kafkaConsumerFactory.createConsumer(
                    groupId.takeIf { it.isNotEmpty() },
                    clientIdPrefix,
                    clientIdSuffix,
                ),
            )
            .build()
        val eosStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options)
        eosStreamProcessor.subscribe(topics.toList())
        return eosStreamProcessor
    }
}
@Component
class KafkaParallelStreamProcessor(
    private val kafkaConsumerFactory: ConsumerFactory<String, Any>,
    private val kafkaParallelListenerValidator: KafkaParallelListenerValidator,
) : BeanPostProcessor, DisposableBean {

    private val kafkaParallelConsumerFactory = KafkaParallelStreamProcessorFactory<String, Any>()
    private val consumers = mutableListOf<ParallelStreamProcessor<String, Any>>()

    override fun postProcessAfterInitialization(
        bean: Any,
        beanName: String
    ): Any? {
        bean.javaClass.methods
            .mapNotNull { method -> method.getAnnotation(KafkaParallelListener::class.java)?.let { method to it } }
            .forEach { (method, annotation) -> processKafkaParallelListenerMethod(bean, method, annotation) }
        return bean
    }

    private fun processKafkaParallelListenerMethod(
        bean: Any,
        method: Method,
        kafkaParallelListener: KafkaParallelListener,
    ) {
        kafkaParallelListenerValidator.validate(kafkaParallelListener)

        val processor = with(kafkaParallelListener) {
            kafkaParallelConsumerFactory.createParallelStreamProcessor(
                kafkaConsumerFactory = kafkaConsumerFactory,
                topics = topics,
                ordering = ordering,
                concurrency = concurrency,
                batchSize = batchSize,
                groupId = groupId,
                clientIdPrefix = clientIdPrefix,
                clientIdSuffix = "",
            )
        }
        processor.poll { context: PollContext<String, Any> ->
            try {
                val records = context.consumerRecordsFlattened
                val parameterType = method.parameterTypes.firstOrNull()
                if (parameterType != null && parameterType.isAssignableFrom(List::class.java)) {
                    method.invoke(bean, records)
                    return@poll
                }
                records.forEach { method.invoke(bean, it) }
            } catch (e: Exception) {
                log.error("Error while processing message", e)
            }
        }
        consumers.add(processor)
    }

    override fun destroy() {
        consumers.forEach(ParallelStreamProcessor<String, Any>::close)
    }

    companion object {
        private val log = LoggerFactory.getLogger(this::class.java)
    }
}

전체적인 코드는 https://github.com/Hyung1Jung/kafka-parallel-consumer를 참고해주세요.

Kafka Consumer vs Kafka Parallel Consumer 처리 시간 비교

partition, concurrency 수kafka consumerkafka parallel consumer
partition=1, concurrency=2100s50s
partition=2, concurrency=250s50s

 
위의 조건 기반으로 테스트 해본 결과 kafka parallel consumer 경우 파티션을 늘리지 않고도 동시 처리량을 늘릴 수 있는 것을 확인할 수 있습니다.
 
추가로, parallel consumerd의 경우  partition=1, concurrency=2, partition=2, concurrency=2  의 처리 시간이 동일한 것을 볼 수 있는데요. Parallel Consumer에서 concurrency는 전체적인 병렬 실행 수를 제한하며, 각 파티션별 concurrency를 설정하는 것이 아님을 테스트를 통해 확인할 수 있었습니다.

Case 1) partition=1, concurrency=2

  • 파티션 1개에서 최대 2개 동시 실행
[Thread 1] - Message 1
[Thread 2] - Message 2
(1초 후)
[Thread 1] - Message 3
[Thread 2] - Message 4
...

Case 2) partition=2, concurrency=2

  • 각 파티션이 아닌 전체적 파티션 기준으로 랜덤하게 최대 2개 실행
[Thread 1] - Message 1 (Partition 0)
[Thread 2] - Message 51 (Partition 1)
(1초 후)
[Thread 1] - Message 2 (Partition 1)
[Thread 2] - Message 52 (Partition 1)
(1초 후)
[Thread 1] - Message 3 (Partition 0)
[Thread 2] - Message 53 (Partition 0)
...

Kafka Parallel Consumer는 전체적으로 처리량을 조절하지만, 매번 각 파티션에서 정확히 한 개씩 가져오지는 
않고 파티션마다 번갈아 가며 메시지를 가져옴.

- 파티션 0,1에서 각각 한 개씩 → 기본적인 Fetch 동작 (각 파티션에서 균등하게 메시지를 가져옴)
- 파티션 1에서 두 개 → 특정 Poll 타이밍에 따라 파티션 1에서 먼저 Fetch되는 경우 발생
- 파티션 0에서 두 개 → Kafka가 Offset Commit 상태를 기반으로 특정 파티션을 우선적으로 Poll할 수 있음

즉, Parallel Consumer는 전체 동시 실행 개수를 조절하지만, 특정 시점에서 어떤 파티션이 먼저 가져가는지는 
Kafka의 Poll 타이밍과 Offset Commit 상태에 따라 달라짐

순서 보장이 필요 없는 경우 Kafka Parallel Consumer는 기존 Kafka와 다르게 Partiton을 늘리지 않고 concurrency만 늘림으로써 처리량을 높일 수 있는 것을 확인할 수 있었습니다. 처리량을 높일때 순서 보장이 필요 없다면 굳이 기존 Kafka를 사용하여 파티션을 늘리는 것보단 Parallel Consumer을 사용하여 파티션을 늘리지 않고 인프라 비용 절감, 성능 최적화 등의 이점을 얻는게 좋을 것 같습니다.

Kafka Parallel Consumer Key, Unordered, Partition 동작 방식 및 처리 시간 비교

이번엔 Kafka Parallel Consumer의 순서 보장 방식인 Unordered, Key, Partition의 동작 방식 및 처리 시간을 비교해보았습니다. .

    @PostMapping("/parallel-send")
    fun parallelSend() {
        generateUserMessages().forEach { userMessageProducer.sendParallelMessage(it) }
    }
    
    private fun generateUserMessages() = (1..100).map { index ->
        UserMessage(
            id = index.toLong() // or index.toLong() % 2,
            age = index.toLong(),
            name = "name$index",
        )
    }    
    
    @KafkaParallelListener(
        topics = [PARALLEL_USER_TOPIC],
        groupId = "parallel-user-consumer-group",
        concurrency = 2,
        ordering = ParallelConsumerOptions.ProcessingOrder.UNORDERED // or KEY or PARTITION
    )
    fun listen(
        record: ConsumerRecord<String, String>,
    ) {
        try {
            val message = objectMapper.readValue(record.value(), UserMessage::class.java)
            log.info("[Thread ${Thread.currentThread().id}] Partition ${record.partition()} - Message arrived! - $message")
            Thread.sleep(1000)
        } catch (e: InterruptedException) {
            e.printStackTrace()
            log.info(e.message)
        }
    }

1. Unordered

partition=1, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개2개씩전체 순서 X50s
Key가 고유하지 않은((1~100)%2) message 100개2개씩전체 순서 X, 같은 키 끼리 순서 X50s

partition=2, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개2개씩전체 순서 X, 파티션내 순서 X, 50s
Key가 고유하지 않은((1~100)%2) message 100개2개씩전체 순서 X, 파티션내 순서 X, 같은 키 끼리 순서 X50s

이유

  • Unordered는 순서를 보장하지 않는 방식입니다.
  • Key에 따라 특정 파티션으로 가더라도, 같은 키라도 동시성(concurrency) 때문에 순서가 보장되지 않습니다.
  • Concurrency가 2이므로 같은 파티션 내에서도 여러 개의 메시지가 동시에 처리될 가능성이 있습니다. → 파티션 내 순서 X.

2. Key

partition=1, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개2개씩전체 순서 X50s
Key가 고유하지 않은((1~100)%2) message 100개2개씩전체 순서 X, 같은 키 끼리 순서 O50s

partition=2, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개2개씩전체 순서 X, 파티션내 순서 X50s
Key가 고유하지 않은((1~100)%2) message 100개2개씩전체 순서 X, 파티션내 순서 O, 같은 키 끼리 순서 O, 같은 파티션 내 다른 키끼리 순서 X50s

이유

  • 같은 Key는 같은 파티션으로 가기 때문에 Key별 순서는 보장됩니다.
  • 하지만 파티션 내에서는 concurrency(병렬 처리)로 인해 순서가 보장되지 않을 수도 있습니다.
  • Key가 고유하면 여러 파티션으로 분산될 가능성이 높아지고, 병렬처리되면서 순서가 꼬일 가능성이 커집니다.

3. Partition

partition=1, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개1개씩전체 순서 O100s
Key가 고유하지 않은((1~100)%2) message 100개1개씩전체 순서 X, 같은 키 끼리 순서 O100s

partition=2, concurrency=2

 처리 개수처리 순서처리 시간
Key가 고유한(1~100) message 100개2개씩전체 순서 X, 파티션내 순서 O50s
Key가 고유하지 않은((1~100)%2) message 100개2개씩전체 순서 X, 파티션내 순서 O, 같은 키 끼리 순서 O, 같은 파티션 내 다른 키 끼리 순서 X50s

이유

  • Partition 모드는 각 파티션 내에서만 순서를 보장합니다.
  • 파티션이 하나면(Partition=1) 전체 순서가 유지되지만, 여러 개이면 전체 순서는 틀어집니다.
  • 같은 키는 같은 파티션으로 가므로 같은 키 순서는 유지되지만, 파티션 내에서 다른 키가 섞이면 같은 파티션 내에서 순서가 섞일 수 있습니다.

결과 정리 및 주요 차이점

 전체 순서파티션 내 순서같은 키 순서같은 파티션 내 다른 키 순서
UnorderedXXXX
KeyXO (같은 키)OX
PartitionXOOX
  • Unordered: 완전한 무질서입니다.
  • Key: 같은 키는 순서 보장되지만, 파티션 내에서 다른 키끼리는 섞일 수 있습니다.
  • Partition: 파티션 내 순서가 유지되고, 같은 키 순서 보장됩니다.

마치며

파티션을 늘리는 것이 항상 나쁜 것은 아니라고 생각합니다. 트래픽이 적고 기존 파티션이 많지 않다면 1~2개 추가하는 것으로 충분히 해결 가능할 것 같습니다. 하지만, 인프라 비용 최적화가 필요하거나 단일 메시지 처리 속도 개선이 어려운 상황이거나 기존 파티션이 이미 너무 많이 생성된 상황이라면 Parallel Consumer을 고려해볼만 할 것 같습니다.
 
특히 순서 보장이 필요 없는 경우에는 Parallel Consumer을 사용하여 사전에 인프라 비용을 최적화 하는 것이 좋을 것 같습니다.

Reference

728x90
반응형
Comments