Open source

Kafka Producer - kafka(2)

jogaknabi_1023 2024. 2. 22. 22:23

Kafka Producer

Producer role

  • Topic에 해당하는 메시지를 생성
  • 특정 Topic으로 데이터를 publish
  • 처리 실패/재시도

Kafka Producer 구조

라이브러리 추가 필수

기본적으로 자바 라이브러리 사용. 라이브러리 관리 도구인 gradle 이나 maven 사용

 

Producer 코드 예제

  • 자바 프로퍼티 객체를 통해 프로듀서의 설정 정의
  • 부트스트랩 서버 설정을 로컬 호스트의 카프카를 바라보도록 설정
  • 카프카 브로커의 주소목록은 되도록 2개 이상의 ip와 port를 설정하도록 권장
  • key와 value에 대해 스트링시리얼라이저로 직렬화. (시리얼라이저는 key 혹은 value를 직렬화하기 위해 사용된다. Byte array, String, Integer 시리얼라이즈를 사용할 수 있다.)
    여기서 key는 메시지를 보내면 토픽의 파티션이 지정될 때 사용된다.
KafkaProducer<String, String > producer = new KafkaProducer < String, String > (configs) ;
ProducerRecord record = new ProducerRecoed < String, String > ("click_log", "login");
producer.send(record);
producer.close();
  • 설정한 프로퍼티로 카프카 프로듀서 인스턴스를 만든다.
  • 전송할 객체는 어떻게 만들까?위의 코드에서는 click_log 토픽에 login 이라는 value를 보낸다. 만약 key를 포함하고 싶다면 ("click_log", “1”, "login") 프로듀서 레코드를 선언하면 된다.
  • 카프카 클라이언트에서는 ProducerRecord 클래스를 제공한다. ProducerRecord 인스턴스를 생성할 때 어느 토픽에 넣을 것인지 어떤 key 와 value를 담을 것인지 선언할 수 있다.
  • send() 메서드의 파라미터로 ProducerRecord를 넣으면 전송이 이루어지게 됨. click_log 토픽에 login value가 들어가게 된다.
  • 그리고 전송이 완료되면 close() 메서드를 통해 프로듀서를 종료한다.

 

어떻게 producer가 토픽의 파티션에 들어갈까?

1) key=null 인 경우

  • key 가 null인 데이터를 파티션이 1개인 토픽에 보내면 하나의 파티션에 차례대로 쌓이게 된다.
  • 만약 파티션이 한 개 더 늘어나면? (key=null)
  • 데이터가 라운드 로빈으로 2개의 파티션의 차례대로 쌓이게 됨.

2) key가 null이 아닌 경우

카프카는 key를 특정한 hash값으로 변경시켜 파티션과 1:1 매칭을 시킨다.

→ 결과 : 각 파티션에 동일 key의 value만 쌓이게 된다. 여기서 새로운 파티션을 추가하는 순간 key와 파티션의 매칭이 깨지기 때문에 key-파티션의 연결은 보장되지 않는다.

 

Producer Options

💡 필수옵션

bootstrap.servers : 카프카 클러스터에 연결하기 위한 브로커 목록

key.serializer : 메시지 키 직렬화에 사용되는 클래스

value.serializer : 메시지 값을 직렬화 하는데 사용되는 클래스

 

💡 선택옵션

acks : 레코드 전송 신뢰도 조절

comression.type : snappy, gzip, Iz4 중 하나로 압축하여 전송

retries : 클러스터 장애에 대응하여 메시지 전송을 재시도하는 횟수

buffer.memory : 브로커에 전송될 메시지의 버퍼로 사용될 메모리 양

batch.size : 여러 데이터를 함께 보내기 위한 레코드 크기

linger.ms : 현재의 배치를 전송하기 전까지 기다리는 시간

client.id : 어떤 클라이언트인지 구별하는 식별자