Kafka Consumer
다른 메시징 플랫폼과 달리 컨슈머가 데이터를 가져가더라도 데이터가 사라지지 않는다. 이 특징은 데이터 파이프라인으로써 좋은 장점을 가지고 있다.
Consumer role
- Topic의 partition으로 부터 데이터를 가져온다. (= polling)
- Partition offset 위치 기록 (commit) offset: 파티션에 있는 데이터의 번호
- Consumer Group을 통해 병렬처리 파티션 개수에 따라 Consumerm을 여러개 만들면 병렬처리가 가능해져 더욱 빠른 속도로 데이터를 처리할 수 있다.
라이브러리 추가 필수 (Producer 에서의 고려사항과 같음)
-> 이전 Producer 블로그 참고
Comsumer 생성 코드 예제
- 자바 프로퍼티 설정을 통해 기본적인 컨슈머 옵션을 지정할 수 있다.
- bootstrap.servers 옵션을 통해 카프카 브로커를 설정한다.
- 카프카 브로커 중 한개에서 이슈가 생기면 다른 브로커가 붙을 수 있도록 여러개의 브로커를 지정하는 것을 추천한다.
- group.id 를 지정해야한다. (= 컨슈머 그룹)
- key와 value 에 대한 직렬화 설정을 추가한다.
- 설정 마친 이후, KafkaConsumer 클래스를 통해 이전에 선언한 설정들을 매개변수로 해서 consumer 인스턴스를 생성한다.
- 컨슈머 그룹을 정하고 어느 카프카 브로커에서 데이터를 가지고 올지 선언했기 때문에 어느 토픽을 대상으로 데이터를 가져올 것인가에 대해서만 선언하면 된다.
→ consumer의 subscribe() 메서드 통해서 설정 가능. 만약 특정 토픽의 전체 파티션이 아니라 일부 파티션에 대한 데이터만 가져오고 싶다면 assign() 메서드를 사용하면 된다. key 가 존재하는 데이터라면 이 방식을 통해 데이터의 순서를 보장하는 데이터 처리를 할 수 있다.
- polling loop : poll() 메서드가 포함된 무한루프
컨슈머 API의 핵심은 브로커로부터 연속적으로, 컨슈머가 허락하는 한 많은 데이터를 읽는 것이다. poll() 메서드를 통해 데이터를 가져오며, 설정한 시간동안 데이터를 기다린다. 여기서는 0.5초 동안 데이터가 도착하기를 기다리고 코드를 실행한다. 만약 데이터가 들어오지 않았을 때는 빈값의 records 변수를 반환하고, 데이터가 있다면 데이터가 존재하는 records 값을 반환한다.
records 변수는 데이터 배치로서 레코드의 묶음 list 이다. 그러므로 실제로 카프카에서 데이터를 처리할 때는 가장 작은 단위인 records로 나누어 처리하도록 한다. Consumer Rrcord<String, String> record : records records 변수를 for 루프에서 반복처리하면서 실질적으로 처리하는 데이터를 가져오게 하는 것이다. for 구문 내부에서 record 변수의 value() 메서드로 반환된 값이 이전에 producer가 전송한 데이터라고 보면 된다. 현재 위의 코드에서는 println()으로 가져온 데이터를 찍어보기만 했지만 실제 기업에서는 Hadoop 이나 ElasticSearch아 같은 저장소에 저장하는 로직을 넣기도 한다.
어떻게 consumer가 데이터를 가져갈까?
파티션에 들어간 데이터는 파티션 내에서 고유한 번호를 가지게 된다. 이 번호를 offset 이라고 부른다. 토픽 별 그리고 파티션 별로 별개로 지정된다.
- offset 의 역할: 컨슈머가 데이터를 어느 지점까지 읽었는지 확인하는 용도로 사용됨. 컨슈머가 데이터를 읽기 시작하면 offset을 commit 하게 되는데 이렇게 가져간 내용에 대한 정보는 카프카의 __consumer_offset 토픽에 저장된다.
- 만약 컨슈머가 의도치 않게 실행이 중지 되었을 때, 파티션의 몇번 offset까지 읽었는지에 대한 정보가 __consumer_offset 에 저장되어 있기 때문에 컨슈머를 재실행해도 중지 되었던 시작 위치부터 다시 복구하여 데이터를 처리할 수 있다.
- 즉, 컨슈머에 이슈가 발생하더라도 데이터의 처리 시점을 복구할 수 이쓴 고가용성의 특징을 가지게 된다.
Consumer 의 특징
1) Multiple Consumer
컨슈머가 특정 파티션의 데이터만을 가져갈 수 있다는 점.
각 컨슈머가 각각의 파티션 할당하여 데이터를 가져가서 처리한다. 더이상 할당될 파티션이 없을 경우 컨슈머를 생성하면 안된다. 여러 파티션을 가진 토픽에 대해서 컨슈머를 병렬처리하고 싶다면 반드시 컨슈머 개수 ≤ 파티션 개수 로 실행되어야한다.
2) Different Groups
각기 다른 컨슈머 그룹에 속한 컨슈머들은 다른 컨슈머 그룹에 영향을 끼치지 않는다.
⇒ 서로 다른 컨슈머 그룹이 동일한 토픽의 데이터를 가져갈 수 있다는 점.
가정) 데이터 실시간 시각화 및 분석을 위해 ElasticSearch 에 데이터를 저장하는 컨슈머 그룹이 있다고 가정한다. 여기에 추가로 데이터 백업을 위해 hadoop에 데이터를 저장하는 컨슈머 그룹을 새로 생성했다.
질문) 만약 ElasticSearch 저장하는 컨슈머 그룹이 각 파티션에 특정 offset을 읽고 있는 것은 hadoop에 저장하는 역할을 하는 컨슈머 그룹이 데이터를 읽는데에 영향을 미칠까?
답) __consumer_offset 토픽에는 컨슈머 그룹별로, 토픽별로 offset을 나누어 저장하기 때문이다. 그렇기에 하나의 토픽으로 들어온 데이터는 다양한 역할을 하는 여러 컨슈머들이 각자 원하는 데이터로 처리가 될 수 있다.
Consumer options
💡 필수옵션
bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록
group.id : 컨슈머 그룹
id key.deserializer : 메시지 키 역직렬화에 사용되는 클래스
value.serializer : 메시지 값을 역직렬화 하는데 사용되는 클래스
💡 선택옵션
enable.auto.commit : 자동 오프셋 커밋 여부
auto.commit.interval.ms : 자동 오프셋 커밋일 때 interval 시간
auto.offset.reset : 신규 컨슈머 그룹일 때 읽을 파티션의 오프셋 위치
client.id : 어떤 클라이언트 값 식별자
max.poll.records : poll() 메서드 호출로 반환되는 레코드의 최대 개수
session.timeout.ms : 컨슈머가 브로커와 연결이 끊기는 시간
Consumer lag 모니터링
Consumer lag : 프로듀서가 넣은 데이터의 최신 offset과 컨슈머가 가져가는 offset이 차이
사용 이유 : 컨슈머의 성능이 저하되거나 비정상동작하면 lag이 필연적으로 발생하기 때문에 주의 깊게 살펴볼 필요가 있다.
Consumer lag 수집의 문제점
(1) 컨슈머 인스턴스 장애가 발생하면 지표 수집 불가능
(2) 구현하는 컨슈머마다 지표를 수집하는 로직 개발 필요
(3) consumer lag 최대값(records-lag-max)만 알 수 있음.
하지만 우리가 알고 싶은 정보는 각각의 파티션들마다의 consumer lag 정보
(+) 카프카 파티션에 대한 자동적으로 scale-in/out이 자동적으로 되지 않는다.
Burrow를 활용한 모니터링 방법
Burrow: linkedin에서 공개한 opensource lag monitoring application이다. rest api를 통해 lag 정보를 전달받을 수 있다.
Kafka-client 라이브러리를 사용해서 java, scala와 같은 언어를 통해 카프카 컨슈머를 구현하며, 여기서 생성한 consumer 객체를 통해 현재 lag 정보를 가져올 수 있다. 만약 실시간 lag 모니터링을 하고 싶다면 데이터를 ElasticSearch 나 InfluxDB와 같은 저장소에 넣은 뒤 grafana 대시보드를 통해 확인할 수도 있다. 하지만 Consumer 단위에서 lag 모니터링은 아주 위험하고 운영요소가 많이 들어간다는 문제점이 있다.
컨슈머 로직단에서 lag을 수집하는 것은 컨슈머 상태에 dependency가 걸리기 때문이다. 컨슈머가 비정상적으로 종료되게 된다면 더이상 컨슈머는 lag 정보를 보낼 수 없기 때문에 더 이상 lag 측정이 어렵다. 또한 새로운 컨슈머를 생성할 때마다 해당 컨슈머에 lag 정보를 특정 저장소에 저장할 수 있도록 로직을 개발해야한다. 만약 컨슈머 lag을 수집할 수 없는 컨슈머라면 lag을 모니터링 할 수 없어 운영이 매우 까다로워진다. 그래서 링크드인에서는 아파치 카프카와 함께 카프카의 consumer lag을 효과적으로 모니터링 할 수 있도록 Burrow를 내놓았다.
Burrow의 특징
- 멀티 카프카 클러스터 지원
카프카 클러스터를 사용하고 있는 기업이라면 대부분 두 개 이상의 카프카 클러스터를 구축하고 있다. 카프카 클러스터가 여러개인 경우에도 Burrow application 1개만 실행해서 연동한다면 카프카 클러스터들에 붙은 컨슈머의 lag을 모두 모니터링할 수 있다. - Sliding window를 통한 Consumer의 status 확인
consumer의 status를 ERROR, WARNING, OK로 표현할 수 있다. 만약 데이터 양이 일시적으로 증가하여 consumer offset이 증가하면 WARNING으로 정의된다. 만약 데이터는 많아지지만 consumer가 데이터를 가져가지 않으면 ERROR로 정의하여 실제 컨슈머의 문제 여부를 알 수 있다. - HTTP api 제공
위와 같은 정보들은 Burrow가 정의한 HTTP api를 통해 조회 가능하다. 가장 범용적인 프로토콜인 HTTP 사용됨.
'Open source' 카테고리의 다른 글
Kafka Consumer 실습 - kafka (6) (0) | 2024.03.04 |
---|---|
Kafka Producer 실습 - kafka (5) (0) | 2024.03.02 |
Kafka Broker, Replication, ISR - kafka(3) (0) | 2024.02.22 |
Kafka Producer - kafka(2) (0) | 2024.02.22 |
Kafka 란? - kafka(1) (0) | 2024.02.22 |