Open source

Kafka Producer 실습 - kafka (5)

jogaknabi_1023 2024. 3. 2. 14:32

이전 kafka 이론 정리한 것을 바탕으로 SK planet 토그온 세미나에서 진행했던 실습을 진행해보았다. 역시나 데브원영님의 유튜브를 참고하였다. 실습 시 자신의 PC(mac)에는 IntelliJ 설치, jdk 다운로드가 필요하고, AWS EC2 인스턴스를 하나 생성하기 때문에 미리 AWS 계정 회원가입이 되어있어야한다.

  • EC2 = Kafka
  • Local(mac) = Consumer, Producer

강의 링크: https://www.youtube.com/watch?v=4BbKCsKSq_I&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=17

 

 

EC2 카프카 설치 및 실행

인스턴스 생성 : Amazon Linux2, t2.micro

보안그룹 9092 포트 추가

# 다운로드 받은 pem키로 로컬에서 EC2 접근
chmod 400 test-kafka.pem
ssh -i kafka-test.pem ec2-user@{aws ec2 public ip}

# JDK 설치
sudo yum install -y java-1.8.0-openjdk-devel.x86_64

# 카프카 바이너리 파일 다운로드
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz
tar xvf kafka_2.12-3.6.1.tgz
cd kafka_2.12-3.6.1/

# HEAP 메모리 설정 변경 (t2.micro 디폴트 메모리 사이즈 때문에 카프카 안켜지기 때문)
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"

# server.properties 파일 수정 (주석제거 & ip 설정)
vi config/server.properties
###
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{aws ec2 public ip}:9092
###

# 주키퍼 서버 시작
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties

# 카프카 시작
bin/kafka-server-start.sh -daemon config/server.properties

# 실행 중인 자바 프로세스 확인
[ec2-user@ip-172-31-10-76 kafka_2.12-3.6.1]$ jps
9156 QuorumPeerMain
11227 Jps
10460 Kafka

 

 

로컬에서 Producer, Consumer 확인

여기서 3.36.117.173 은 EC2 인스턴스 퍼블릭 IP 이다.

curl https://archive.apache.org/dist/kafka/3.6.1/kafka_2.12-3.6.1.tgz --output kafka.tgz
tar -xvf kafka.tgz
cd kafka_2.12-3.6.1/bin

# 브로커 1대 파티션 3 topic은 test로 kafka topic 생성
./kafka-topics.sh --create --bootstrap-server 3.36.117.173:9092 --replication-factor 1 --partitions 3 --topic test
>hello
>jeongeun
>It is kafka test
>1 
>2
>3

# test란 topic 데이터 확인
# from-beginning 옵션으로 이전에 commit 했던 이전의 데이터들도 같이 나옴.
./kafka-console-consumer.sh --bootstrap-server 3.36.117.173:9092 --topic test --from-beginning
hello
jeongeun
It is kafka test
1
2
3
^CProcessed a total of 6 messages

# Consumer Group 지정하여 데이터 확인
#넣었던 데이터 순서대로 결과가 나오지 않음. -> 3개의 파티션으로 나뉘어 데이터가 들어갔으며 데이터를 가져오는데는 순서가 없기 때문.
#파티션 1로만 설정했을 때는 순서대로 나옴.
./kafka-console-consumer.sh --bootstrap-server 3.36.117.173:9092 --topic test -group testgroup --from-beginning

# Consumer Group의 목록을 확인
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --list

# testgroup이란 Consumer Group에 대한 정보를 확인
#어떤 토픽을 소비하고 있는지, 각 파티션에서 어느 오프셋까지 메시지를 소비했는지 등의 정보를 표시
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --describe

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            0          9               9               0               -               -               -
testgroup       test            1          6               6               0               -               -               -
testgroup       test            2          0               0               0               -               -               -

# 컨슈머 그룹(testgroup)이 특정 토픽(test)의 오프셋을 0으로 리셋
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
testgroup                      test                           0          0              
testgroup                      test                           1          0              
testgroup                      test                           2          0

# 컨슈머 그룹(testgroup)이 특정 토픽(test)의 특정 파티션(파티션 1)의 오프셋을 1로 리셋하는 작업
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 1 --execute

GROUP                          TOPIC                          PARTITION  NEW-OFFSET     
testgroup                      test                           1          1
# 결과
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --describe                                            

Consumer group 'testgroup' has no active members.

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
testgroup       test            0          0               9               9               -               -               -
testgroup       test            1          1               6               5               -               -               -
testgroup       test            2          0               0               0               -               -               -

 

위에서는 기본적인 카프카 동작을 알아보기 위한 가장 간단한 실습이었으며 다음으로는 key-value를 지정하여 kafka Producer 실습을 진행하였다.

 

Key - Value 지정한 Kafka Producer 실습

key 말고도 partition을 지정할 수 있기도 함.

package com.tacademy;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerWithKeyValue {
    private static String TOPIC_NAME = "test";
    private static String BOOTSTRAP_SERVERS = "3.36.117.173:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<>(configs);

        for (int index = 0; index < 10; index++) {
            String data = "This is record " + index;
            ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
            try {
                producer.send(record);
                System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
                Thread.sleep(1000);
            } catch (Exception e) {
                System.out.println(e);
            }
        }
    }
}

 

 

실행결과
→ key, value 와 같이 결과가 찍히는 것을 확인할 수 있음.

 

 

Record Key 란?

  • 역할 : 메시지를 구분하는 구분자 역할
  • 특징
    • 레코드 값(value)을 정의하는 구분자
      key에 해쉬값을 넣음으로써 중복처리 방지 가능
    • 동일 키 ⇒ 동일 파티션 적재
      • 순서를 보장하므로 상태머신(state machine)으로 사용 가능하다.
      • 역할에 따라 컨슈머 할당 적용이 가능하다.

지정된 key에 따라 파티션이 구분됨.

 

Record Value 란?

  • 역할 : 실질적으로 전달하고 싶은 데이터
  • 보낼 수 있는 데이터 type : String, ByteArray, Int 등
  • 데이터 포맷 상 특징
    • CSV, TSV, JSON Object 등 서비스의 특징에 맞게 사용 권장
    • CSV 사용시 (,) 콤마 기준으로 데이터 구분. 용량 이득.
    • JSON 사용시 key/value 형태로 확장성이 뛰어남.