Archieve/Kafka

max.block.ms 주절주절

mydailylogs 2025. 4. 21. 23:51

브로커에 메세지를 전송하기 전 Producer의 큰 흐름을 3 단계로 나눠보면 Producer -> Accumulator -> Sender로 구분할 수 있습니다. (네이버 D2에서 훌륭하게 소개해주고 계십니다 https://d2.naver.com/helloworld/6560422)

 

대표적으로 메시지를 전송할 때 사용하는 send() 메서드는, 먼저 Kafka 브로커로부터 토픽과 파티션에 대한 메타데이터를 확보한 후, 이 정보를 바탕으로 파티션을 결정하고, 그에 따라 내부 버퍼(RecordAccumulator의 deque)에 데이터를 적재합니다. 이 버퍼에 쌓인 데이터는 배치 조건이 충족되면 Sender 스레드에 의해 브로커로 전송됩니다. 이때 내부 버퍼가 꽉 차서 더이상 데이터를 담을 수 없는 경우, send( )는 데이터를 버퍼 안으로 밀어넣기 전까지 block 상태에 들어가게 됩니다.

 

이렇게 block 상태에 들어간 send() 메서드 호출은, RecordAccumulator의 메모리가 확보될 때까지 대기하게 되며, 이때 기다릴 수 있는 최대 시간이 바로 max.block.ms로 설정된 값입니다.

 

max.block.ms의 기본값은 60000 (60초)로 설정되어 있으며, 만약 이 시간 내 버퍼 공간이 확보되지 않으면, Kafka는 더 이상 대기하지 않고 TimeoutException 에러를 발생시킵니다 (엄밀히 말하면 여기서 말하는 시간은 네트워크 지연 + 버퍼 공간 확보 시간).

 

max.block.ms를 보다 깊이 있게 이해하기 위해서는 먼저 send() 메서드의 동작 방식을 살펴볼 필요가 있습니다. 이 메서드는 Kafka의 KafkaProducer 클래스 내부에 정의된 doSend() 메서드를 통해 구현되어 있으며, doSend()는 메시지를 Kafka 브로커로 전송하기까지의 실질적인 처리 흐름 전체를 담당하는 핵심 메서드입니다.

 

doSend() 메서드는 메시지를 Kafka 브로커로 전송하기 위해 먼저, 전송 대상 토픽과 파티션에 대한 메타데이터를 확보한 뒤(waitOnMetadata), 메시지의 key와 value를 각각 바이너리 형식으로 직렬화합니다(serialize).

 

이후 직렬화된 데이터를 기반으로 메시지를 어느 파티션에 보낼지를 결정하고(partition), 그 결과를 바탕으로 메시지를 RecordAccumulator의 내부 버퍼에 적재합니다(append). 이때 만약 버퍼가 가득 찬 상태라면, 설정된 시간 동안 공간이 생기기를 대기하게 됩니다.

 

마지막으로, 트랜잭션을 사용하는 경우 해당 파티션을 트랜잭션에 등록하고, Sender 스레드를 깨워 메시지를 브로커로 실제 전송하도록 합니다.

 

아래는 카프카 소스 내 doSend()의 코드 일부입니다:
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java)

 

    private Future<RecordMetadata> doSend(ProducerRecord<K, V> record, Callback callback) {

        long nowMs = time.milliseconds();
        clusterAndWaitTime = waitOnMetadata(..., maxBlockTimeMs);
        nowMs += clusterAndWaitTime.waitedOnMetadataMs;
        long remainingWaitMs = Math.max(0, maxBlockTimeMs - clusterAndWaitTime.waitedOnMetadataMs);

 

먼저 waitOnMetadata() 단계에서, 해당 토픽에 대한 메타데이터가 준비될 때까지 max.block.ms 범위 내에서 대기합니다. 이때 기다린 시간을 측정하여, 남은 시간을 remainingWaitMs로 계산합니다.

 

RecordAccumulator.RecordAppendResult result = accumulator.append(..., remainingWaitMs, ...);

 

계산된 remainingWaitMs는 이후 RecordAccumulator.append() 단계에서 버퍼가 가득 찼을 때 사용할 수 있는 남은 대기 시간으로 전달됩니다.

 

최종적으로 accumulator.append()BufferPool 클래스의 allocate() 메서드를 통해 버퍼 공간 할당을 시도하게 되는데, 이때 사용 가능한 메모리가 충분하지 않다면 Kafka는 설정된 max.block.ms를 기반으로 정해진 시간 동안 버퍼 공간이 확보되기를 기다립니다.

 

아래는 BufferPool.allocate()의 코드 일부입니다.
(https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/internals/BufferPool.java)

 

long remainingTimeToBlockNs = TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);
this.waiters.addLast(moreMemory);

while (accumulated < size) {
    long startWaitNs = time.nanoseconds();
    boolean waitingTimeElapsed;

    try {
        waitingTimeElapsed = !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);
    } finally {
        long endWaitNs = time.nanoseconds();
        long timeNs = Math.max(0L, endWaitNs - startWaitNs);
        recordWaitTime(timeNs);
    }

    if (waitingTimeElapsed) {
        throw new BufferExhaustedException("Failed to allocate " + size + " bytes within the configured max blocking time " + maxTimeToBlockMs + " ms...");
    }

    remainingTimeToBlockNs -= timeNs;
    ...
}

 

여기서 Kafka는 내부 waiters 큐에 현재 요청을 등록한 뒤, moreMemory.await(...)를 통해 지정된 시간 동안 메모리 여유가 생기기를 반복적으로 대기합니다.

 

반복 루프에서의 각 await() 호출은 실제로 기다린 시간을 측정하고, 남은 대기 시간에서 이를 차감하여 remainingTimeToBlockNs 값을 갱신합니다. 이 과정을 통해 Kafka는 전체 대기 시간이 max.block.ms를 초과하지 않도록 엄격하게 제어합니다.

 

만약 이 루프가 버퍼를 확보하지 못한 채 지정된 시간(max.block.ms)을 모두 소진하면, waitingTimeElapsed가 true가 되어 BufferExhaustedException 예외를 발생시키게 됩니다. 이 예외는 다시 상위로 전파되어 KafkaProducer.send() 호출 시 사용자에게 TimeoutException 형태로 전달됩니다.

 

Kafka 공식 문서에서는 max.block.ms에 대해 다음과 같은 설명을 덧붙이고 있습니다:

 

this timeout bounds the total time waiting for both metadata fetch and buffer allocation (blocking in the user-supplied serializers or partitioner is not counted against this timeout)

 

즉, 이 설정(max.block.ms)은 메타데이터 조회버퍼 메모리 할당을 기다리는 시간을 합산해 제한하는 타임아웃이며, 사용자가 직접 구현한 serializer나 partitioner 내부에서 발생하는 지연에는 적용되지 않습니다.

 

Kafka 입장에서는 사용자 정의 코드 내부에서 어떤 일이 벌어지는지 알 수 없기 때문에, serializer나 partitioner 내부에서 발생하는 블로킹은 타임아웃 대상에서 제외됩니다. 예를 들어, serializer 내부에서 외부 API를 호출하거나, 파일 I/O, 락 대기 등의 동작이 포함되어 있더라도 Kafka는 해당 코드의 실행 시간이나 안정성을 전혀 감지할 수 없습니다. 이 때문에 사용자 정의 serialize, partitioner 로직을 잘못 작성하면, 정확한 원인을 진단하기 어려운 상태 속에 send()가 멈춘 것처럼 보일 수 있다는 점에서 주의가 필요해보입니다.