Cluster coordination은 Zookeeper가 제공하는 기능 중의 하나로서 분산 환경을 구성하는데 핵심이 되는 기능이다. coordination은 다양한 노드가 상호작용을 통해 작업을 진행할 경우 각 노드들의 상태를 관리하고, 클러스터 전체에 걸쳐 동기화시킴으로서 작업들이 적절하게 진행될 수 있도록 만들어준다.

Kafka에서는 대표적으로 다음의 작업 시에 Zookeeper의 Coordination을 활용하게 된다.

  • Topic 생성

  • Broker 추가

  • Broker의 실패

그 중에서도 해당 글에서는 Topic의 생성에 초점을 두고 Zookeeper의 Coordination이 이뤄지는 맥락에 대해서 이야기해보고자 한다.

Topic Creation

새로운 Kafka Topic이 생성되는 경우 다음의 작업이 필요하다.

  • 새로운 Topic이 생성되고, 해당 Topic의 구성이 어떻게 이뤄지는 지를 모든 Broker가 볼 수 있어야 한다.

  • 새로운 Topic을 담당하는 Partition들을 클러스터 내부의 Broker에 할당시켜야 한다. (이를 Partition assignment라고 한다)

  • 각 파티션을 담당하는 Leader broker가 선출되어야 한다.

Zookeeper는 이러한 작업들을 도와준다.

새로운 Topic이 생성되는 경우, 해당 Topic의 구성과 Partition assignment는 Zookeeper의 znode에 기록된다.

이처럼 Broker의 세션에 종속되지 않는 데이터는 persistent znode에 기록되어, Broker의 연결이 종료되더라도 zookeeper가 남아있다면 데이터가 유지되게 된다.

좀 더 깊게 들어가보자

Zookeeper의 znode는 트리 구조의 파일 계층처럼 데이터가 저장되는 데이터 모델이다.

각 kafka의 컨텍스트에서 znode는 /brokers/topics 라는 부모 znode 아래에서 각 Topic을 위한 데이터가 저장된다.

이때 저장되는 데이터는 Topic의 구성 정보와 partition assignment에 대한 정보이다.

샘플 Topic을 만들어보고, 해당 Topic을 통해 zookeeper의 역할을 알아보자.

먼저 여러 Broker로 구성된 클러스터라고 할 때 특정 Broker에서 Topic를 생성한다고 가정하자. 해당 Topic은 다른 모든 Broker들에게 전파되어야 한다.

이제 실제로 그런지 실습해보자.

먼저 1번 노드(kafka1)에서 Topic을 생성한다.

(현재 Docker Compose로 총 3개의 노드가 하나의 도커 네트워크에 묶여있다. 때문에 Hostname이 지정되어 있어 IP 주소 대신 Hostname이 사용되는 환경이다.)

sh-4.4$ kafka-topics.sh --create --bootstrap-server kafka1:9092 --replication-factor 1 --partitions 1 --topic sample-topic


bootstrap-server는 Kafka의 클라이언트(producer 또는 consumer)와 연결되는 최초의 Broker를 의미한다. 해당 세션 연결을 통해 클라이언트는 Kafka 클러스터에 대한 메타데이터를 획득한다

  • boostrap-server에 대한 더 자세한 설명해당 과정을 거치기 위해 클라이언트는 적어도 하나의 Broker와 연결되어야 한다. 최초의 연결이 이뤄진 후에야 비로소 클라이언트는 해당 Kafka 클러스터에 대한 메타데이터를 받아올 수 있다.

  • 클라이언트가 메타데이터를 받았다면, 해당 토픽을 처리할 준비가 완료되었다. 이후 Topic을 구성하는 Partition들을 처리하는 Leader Broker와 세션을 구성한다. 이때 bootstrap-server는 메타데이터 반환이라는 역할을 다했기 때문에 연결이 꼭 유지되어야 하는 것은 아니다. 만약 해당 Broker가 요구되는 Partition을 처리하는데 필요하지 않다면 (Leader가 아니라면), 세션은 종료될 것이다.

  • Kafka 클라이언트가 Topic을 경유하여 메세지를 보내거나 받는 경우, 먼저 해당 Kafka 클러스터 내부에 어떤 Broker들이 있는지 알아야 함과 동시에 해당 Topic의 Partition들을 처리하는 Leader Broker에 대해서 알고 있어야 한다. 이러한 정보는 각 Kafka Broker를 통해 획득되어지는데, 이는 결국 Zookeeper의 Persistant znode에 저장되어 전파되는 정보들이다.

해당 명령이 성공적이었다면 다음과 같은 결과를 얻을 수 있다.

Created topic sample-topic.


계속해서 1번 노드에서 zookeeper의 znode에 해당 Topic 정보가 저장되었는지 확인하려고 한다. 다음의 명령을 통해 zookeeper-shell에 접속한다.

sh-4.4$ zookeeper-shell zookeeper:2181


*파라미터 값으로 Zookeeper의 주소를 넣어준다.

연결이 성공적이었다면 다음의 결과를 얻는다.

Connecting to zookeeper:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null


Zookeeper shell 내부에서 ls 명령을 통해 topic이 잘 생성되었는지 확인한다.

ls /brokers/topics
[sample-topic, sample-topice]


*하나는 원래 있던 Topic으로 sample-topic의 오타이다. 삭제를 고민하다 여러 Topic이 있는 경우 전달하는 바가 좀 더 명확할 것 같아 남겨두었다.

Topic이 잘 생성되었다면, 다른 Broker로 이동하여 해당 Broker에서도 Topic이 잘 기록되어 있는지 확인해보자.

2번 Broker로 이동하여 마찬가지로 Zookeeper shell 내부에서 ls 명령어를 수행하였다.

sh-4.4$ zookeeper-shell zookeeper:2181
Connecting to zookeeper:2181
Welcome to ZooKeeper!
JLine support is disabled

WATCHER::

WatchedEvent state:SyncConnected type:None path:null
ls /brokers/topics
[sample-topic, sample-topice]


마찬가지로 동일한 Topic 목록을 확인할 수 있다. 사실 동일한 Zookeeper를 참조하므로 당연한 결과이다. 이를 통해 zookeeper의 역할을 잘 이해할 수 있으리라 생각한다.

추가적으로 zookeeper-shell 에는 해당 znode의 정보를 확인할 수 있는 get 명령어가 존재하는데 이를 통해 topic의 메타데이터에 해당하는 znode를 확인할 수 있다.

sh-4.4$ echo "get /brokers/topics/sample-topic" | zookeeper-shell zookeeper:2181


*get /brokers/topics/sample-topic을 zookeeper-shell 내부적으로 실행

성공했다면 다음과 같은 정보를 얻을 수 있다.

{"partitions":{"0":[3]},"topic_id":"UWF0HcJqT8ugeis0q8yDRQ","adding_replicas":{},"removing_replicas":{},"version":3}

 


각 필드는 다음을 의미한다.

  • partitions: Topic이 1개의 partition(”0”)을 가지고 있고, 해당 Partition에 연결된 Broker는 3번 Broker이다.

  • topic_id: 해당 Topic을 구분하는 고유한 구별자이다. Kafka는 Topic을 내부적으로 관리할 때 이를 사용한다.

  • adding_replicas/removing_replicas: 해당 필드는 Kafka의 replication factor가 증가하는 상태이거나 감소하는 상태임을 의미한다. 만약 partition에 대한 재할당이 이뤄지는 경우, 해당 필드는 재할당이 이뤄지는 partition에 대한 정보와 이와 관련된 broker의 id를 담게 될 것이다.

  • version: topic 구성 형식의 버전을 의미한다. 새로운 형태의 kafka에서는 더 높은 숫자의 version이 적힐 것이다.

  • 유사하게 znode 중에는 각 Broker의 세션 연결 상태를 관리하는 /brokers/ids 가 존재한다. host와 port 정보 그리고 해당 세션이 연결되어 있는지 여부 등이 저장된다.

replica와 partition이 추가된다면 znode에 저장된 메타데이터는?

앞선 예시는 별도의 replica와 partition을 생성하지 않아 topic의 메타데이터가 제공하는 정보가 다소 부실했다. 과연 replica와 partition이 추가된다면 메타데이터는 어떻게 변하게 될까?

이번에는 replication-factor를 2, partition을 2로 설정하여 데이터가 들어오면 2개의 partition에 나뉘어 저장되게 설정함과 동시에, 2개의 replica가 생성되도록 구성해보자. (leader가 각 partition을 담당하게 될 것이며, 각 partition은 2개의 broker에 replica를 생성할 것이다)

앞서 설명한대로 kafka-topic을 생성한다. (--replication-factor 2 --partitions 2)

kafka-topics --create --bootstrap-server kafka1:9092 --replication-factor 2 --partitions 2 --topic sample-topic-replica


해당 topic의 메타데이터를 살펴보면 다음의 결과를 얻는다.

  • partitions: 현재 0번 partition에 대해서, 2번과 1번 Broker에 replica가 생성되었다.

  • topic_id: (위의 설명 참고)

  • adding_replicas/removing_replicas: (위의 설명 참고)

  • version: (위의 설명 참고)

만약 zookeeper의 znode로 Topic을 제거한다면?

마지막으로 궁금증이 생긴다. 만약 Kafka 관리 명령어가 아니라 zookeeper의 znode를 지우면 topic이 사라지는 걸까?

일단 여러 많은 참고자료에서 kafka의 topic을 제거하기 위해서는 Kafka 관리 도구를 써야하며, zookeeper의 znode를 바로 지우는 것은 의도하지 않은 동작이라는 서술을 볼 수 있었다.

해당 Topic을 zookeeper 내부에서 지우기 위해 zookeeper-shell의 delete 명령어를 활용했다. delete 명령어는 안전을 위해 child znode가 존재한다면 삭제가 되지 않는다. 하지만 해당 예시는 의도하지 않은 작업을 시도하는 것이기 때문에, child node부터 쭉 타고 올라가서 하나의 topic 정보를 모두 제거해줄 것이다.

delete /brokers/topics/sample-topic-replica/partitions/0/state
delete /brokers/topics/sample-topic-replica/partitions/0
delete /brokers/topics/sample-topic-replica/partitions/1/state
delete /brokers/topics/sample-topic-replica/partitions/1
delete /brokers/topics/sample-topic-replica/partitions        
delete /brokers/topics/sample-topic-replica


znode 상에는 Topic 정보가 남아있지 않다. 과연 kafka에서 topic 리스트를 뽑으면 어떻게 될까?

다음의 명령을 통해 kafka의 topic 리스트를 뽑아보자. (현재 여전히 2번 노드에서 명령이 이뤄지고 있다)

sh-4.4$ kafka-topics --bootstrap-server kafka2:9093 --list


다음의 결과를 얻을 수 있다. 여전히 sample-topic-replica가 남아있다.

sample-topic
sample-topic-replica
sample-topice


topic이 저장된 위치를 찾아서

해당 명령을 수행한 이후 kafka 내부에 topic들의 위치가 저장되는 경로가 있지 않을까라는 막연한 추측을 할 수 있었다.

경로를 뒤져가며 /etc/kafka라는 경로를 통해 server.properties라는 kafka의 설정 파일을 찾을 수 있었고 해당 설정 파일의 중간 쯔음에는 log 정보가 저장되는 경로가 설정되어 있다.

  ############################# Log Basics #############################

  # A comma separated list of directories under which to store log files
  log.dirs=/var/lib/kafka


해당 경로에서 ls를 수행하였더니 다음과 같은 결과를 얻을 수 있었다.

sh-4.4$ ls /var/lib/kafka
cleaner-offset-checkpoint    meta.properties                   replication-offset-checkpoint  sample-topic-replica-1
log-start-offset-checkpoint  recovery-point-offset-checkpoint  sample-topic-replica-0


각 폴더에 대한 설명

  • cleaner-offset-checkpoint: log cleaner에 의해 삭제된 메세지들의 offset을 저장한다.

  • log-start-offset-checkpoint: 각 topic partition의 첫 번째 메세지의 offset을 저장한다.

  • recovery-point-offset-checkpoint: 각 topic patition의 마지막으로 완벽하게 써진 메세지를 저장한다. 이는 이후 broker가 실패하는 경우, 해당 지점에서부터 쓰기 작업을 실행하는데에 활용된다.

  • meta.properties: 해당 broker의 ID, host, port와 같은 메타데이터가 저장된다.

  • replication-offset-checkpoint: 각 topic partition의 마지막으로 복제된 메세지의 offset을 저장한다. 이는 이후 follower broker가 실패하는 경우 복제를 해당 지점부터 다시 수행할 때 활용된다.

  • sample-topic-replica-0 / sample-topic-replica-1: sample-topic-replica에 대한 partition들이다. 각 partition은 고유한 폴더를 갖게 되며, 클라이언트로부터 실제로 읽고 쓰여지는 실제 메세지에 해당하는 데이터 파일들을 하위 경로에 보유하게 된다.

추가) sample-topic-replica-0의 하위 파일들 

sh-4.4$ ls sample-topic-replica-1
00000000000000000000.index  00000000000000000000.timeindex  partition.metadata
00000000000000000000.log    leader-epoch-checkpoint

 

'Data Engineering' 카테고리의 다른 글

Zero Copy는 어떻게 Kafka의 속도를 높일까  (0) 2023.08.08
Kafka와 Zookeeper의 상호작용  (0) 2023.05.18
Zookeeper의 znode  (0) 2023.05.18
Coursera 데이터 엔지니어링 강의 목록  (0) 2022.12.08
Spark  (0) 2022.11.20