이전 kafka 이론 정리한 것을 바탕으로 SK planet 토그온 세미나에서 진행했던 실습을 진행해보았다. 역시나 데브원영님의 유튜브를 참고하였다. 실습 시 자신의 PC(mac)에는 IntelliJ 설치, jdk 다운로드가 필요하고, AWS EC2 인스턴스를 하나 생성하기 때문에 미리 AWS 계정 회원가입이 되어있어야한다.
- EC2 = Kafka
- Local(mac) = Consumer, Producer
강의 링크: https://www.youtube.com/watch?v=4BbKCsKSq_I&list=PL3Re5Ri5rZmkY46j6WcJXQYRlDRZSUQ1j&index=17
EC2 카프카 설치 및 실행
인스턴스 생성 : Amazon Linux2, t2.micro
보안그룹 9092 포트 추가
# 다운로드 받은 pem키로 로컬에서 EC2 접근
chmod 400 test-kafka.pem
ssh -i kafka-test.pem ec2-user@{aws ec2 public ip}
# JDK 설치
sudo yum install -y java-1.8.0-openjdk-devel.x86_64
# 카프카 바이너리 파일 다운로드
wget https://downloads.apache.org/kafka/3.6.1/kafka_2.12-3.6.1.tgz
tar xvf kafka_2.12-3.6.1.tgz
cd kafka_2.12-3.6.1/
# HEAP 메모리 설정 변경 (t2.micro 디폴트 메모리 사이즈 때문에 카프카 안켜지기 때문)
export KAFKA_HEAP_OPTS="-Xmx400m -Xms400m"
# server.properties 파일 수정 (주석제거 & ip 설정)
vi config/server.properties
###
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://{aws ec2 public ip}:9092
###
# 주키퍼 서버 시작
bin/zookeeper-server-start.sh -daemon config/zookeeper.properties
# 카프카 시작
bin/kafka-server-start.sh -daemon config/server.properties
# 실행 중인 자바 프로세스 확인
[ec2-user@ip-172-31-10-76 kafka_2.12-3.6.1]$ jps
9156 QuorumPeerMain
11227 Jps
10460 Kafka
로컬에서 Producer, Consumer 확인
여기서 3.36.117.173 은 EC2 인스턴스 퍼블릭 IP 이다.
curl https://archive.apache.org/dist/kafka/3.6.1/kafka_2.12-3.6.1.tgz --output kafka.tgz
tar -xvf kafka.tgz
cd kafka_2.12-3.6.1/bin
# 브로커 1대 파티션 3 topic은 test로 kafka topic 생성
./kafka-topics.sh --create --bootstrap-server 3.36.117.173:9092 --replication-factor 1 --partitions 3 --topic test
>hello
>jeongeun
>It is kafka test
>1
>2
>3
# test란 topic 데이터 확인
# from-beginning 옵션으로 이전에 commit 했던 이전의 데이터들도 같이 나옴.
./kafka-console-consumer.sh --bootstrap-server 3.36.117.173:9092 --topic test --from-beginning
hello
jeongeun
It is kafka test
1
2
3
^CProcessed a total of 6 messages
# Consumer Group 지정하여 데이터 확인
#넣었던 데이터 순서대로 결과가 나오지 않음. -> 3개의 파티션으로 나뉘어 데이터가 들어갔으며 데이터를 가져오는데는 순서가 없기 때문.
#파티션 1로만 설정했을 때는 순서대로 나옴.
./kafka-console-consumer.sh --bootstrap-server 3.36.117.173:9092 --topic test -group testgroup --from-beginning
# Consumer Group의 목록을 확인
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --list
# testgroup이란 Consumer Group에 대한 정보를 확인
#어떤 토픽을 소비하고 있는지, 각 파티션에서 어느 오프셋까지 메시지를 소비했는지 등의 정보를 표시
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --describe
Consumer group 'testgroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 9 9 0 - - -
testgroup test 1 6 6 0 - - -
testgroup test 2 0 0 0 - - -
# 컨슈머 그룹(testgroup)이 특정 토픽(test)의 오프셋을 0으로 리셋
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --topic test --reset-offsets --to-earliest --execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup test 0 0
testgroup test 1 0
testgroup test 2 0
# 컨슈머 그룹(testgroup)이 특정 토픽(test)의 특정 파티션(파티션 1)의 오프셋을 1로 리셋하는 작업
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --topic test:1 --reset-offsets --to-offset 1 --execute
GROUP TOPIC PARTITION NEW-OFFSET
testgroup test 1 1
# 결과
./kafka-consumer-groups.sh --bootstrap-server 3.36.117.173:9092 --group testgroup --describe
Consumer group 'testgroup' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
testgroup test 0 0 9 9 - - -
testgroup test 1 1 6 5 - - -
testgroup test 2 0 0 0 - - -
위에서는 기본적인 카프카 동작을 알아보기 위한 가장 간단한 실습이었으며 다음으로는 key-value를 지정하여 kafka Producer 실습을 진행하였다.
Key - Value 지정한 Kafka Producer 실습
key 말고도 partition을 지정할 수 있기도 함.
package com.tacademy;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class ProducerWithKeyValue {
private static String TOPIC_NAME = "test";
private static String BOOTSTRAP_SERVERS = "3.36.117.173:9092";
public static void main(String[] args) {
Properties configs = new Properties();
configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
KafkaProducer<String, String> producer = new KafkaProducer<>(configs);
for (int index = 0; index < 10; index++) {
String data = "This is record " + index;
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_NAME, Integer.toString(index), data);
try {
producer.send(record);
System.out.println("Send to " + TOPIC_NAME + " | data : " + data);
Thread.sleep(1000);
} catch (Exception e) {
System.out.println(e);
}
}
}
}
실행결과
→ key, value 와 같이 결과가 찍히는 것을 확인할 수 있음.
Record Key 란?
- 역할 : 메시지를 구분하는 구분자 역할
- 특징
- 레코드 값(value)을 정의하는 구분자
key에 해쉬값을 넣음으로써 중복처리 방지 가능 - 동일 키 ⇒ 동일 파티션 적재
- 순서를 보장하므로 상태머신(state machine)으로 사용 가능하다.
- 역할에 따라 컨슈머 할당 적용이 가능하다.
- 레코드 값(value)을 정의하는 구분자
Record Value 란?
- 역할 : 실질적으로 전달하고 싶은 데이터
- 보낼 수 있는 데이터 type : String, ByteArray, Int 등
- 데이터 포맷 상 특징
- CSV, TSV, JSON Object 등 서비스의 특징에 맞게 사용 권장
- CSV 사용시 (,) 콤마 기준으로 데이터 구분. 용량 이득.
- JSON 사용시 key/value 형태로 확장성이 뛰어남.
'Open source' 카테고리의 다른 글
[ Virtualization ] 가상화 개념과 종류 (VM, Hypervisor) (0) | 2024.03.06 |
---|---|
Kafka Consumer 실습 - kafka (6) (0) | 2024.03.04 |
Kafka Consumer & lag 모니터링 - kafka(4) (0) | 2024.02.28 |
Kafka Broker, Replication, ISR - kafka(3) (0) | 2024.02.22 |
Kafka Producer - kafka(2) (0) | 2024.02.22 |