Open source

Kafka Consumer 실습 - kafka (6)

jogaknabi_1023 2024. 3. 4. 19:45

이전 블로그에서는 Producer 생성하는 실습을 진행하였는데 이제는 데이터를 받는 입장인 Consumer 관련 내용이다. 환경은 이전과 동일하다. Consumer 생성의 기본적인 코드(Auto Commit = false)는 이미지로 대처하고 (Auto commit = ture)로 설정하는 코드부터 진행하였다. 개인적으로 Consumer에서 공부할 부분이 조금 더 많았던 것 같다.

 

 

1. 가장 기본적인 Consumer

기본 Consumer 코드 내용

 

 

2. Auto Commit test
configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWithAutoCommit {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{aws ec2 public ip}:9092";

    public static void main(String[] args) {
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(configs);
        consumer.subscribe(Arrays.asList(TOPIC_NAME));

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
            for (ConsumerRecord<String, String> record : records) {
                System.out.println(record.value());
            }
        }
    }
}
  • configs.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 60000);
    60초 마다 컨슈머가 어디까지 읽었는지 Broker에게 알려주는 것.
    한 번 출력을 했어도 60초 이내 다시 컨슈머를 재실행시키면 한번 더 데이터가 출력된다. → 데이터 중복 처리 발생

 

Consumer commit

enable.auto.commit=true

  • 일정 간격, poll() 메서드 호출 시 자동 commit
  • 속도가 가장 빠르다.
  • 중복 또는 유실이 발생할 수 있다.
    컨슈머가 강제로 종료되거나 셧다운될 때 데이터가 중복처리될 수 있다.

 

그렇다면 데이터 중복을 막을 수 있는 방법은?

  • auto commit을 사용하되 컨슈머가 죽지 않도록 한다 → 불가능. 서버/애플리케이션은 언제든지 죽을 수 있다.
  • auto.commit을 사용하지 않는다.
    Kafka consumer의 commitSync(), commitAsync() 사용

 

enable.auto.commit=false

1) commitSync() : 동기커밋

  • consumerRecord 처리 순서를 보장함.
  • 속도가 가장 느림.
  • poll() 메서드로 반환된 ConsumerRecord이 마지막 offset을 커밋
  • Map<TopicPartition, OffsetAndMetadata>을 통해 오프셋 지정 커밋 가능

 

 

2) commitAsync() : 비동기 커밋

  • 동기 커밋보다 빠름.
  • 중복이 발생할 수 있음.
    일시적인 통신 문제로 이전 offset보다 이후 offset이 먼저 커밋될 때
    처리 순서가 중요한 서비스 (주문, 재고관리 등)에서는 사용 제한

 

Consumer rebalance

컨슈머 그룹의 파티션 소유권이 변경될 때 일어나는 현상

When: consumer.close() 호출 시, consumer의 세션 끊겼을 때, 컨슈머-파티션 할당이 변경될 때

  • 리밸런스를 하는 동안 일시적으로 메시지를 가져올 수 있다.
  • 리밸런스 발생 시 데이터 유실/중복 발생 가능성 있다.
  • commitSync() 또는 추가적인 방법(unique key)으로 데이터 유실/중복 방지

 

Consumer wakeup

 

SIGKILL 로 인한 중복 처리 발생에서 wakeup 동작 예시

(1) poll() 호출 : records 100개 반환. offset 101 ~200

(2) records loop 구문 수행

(3) record.value() system 150 오프셋 print 중 SIGKILL 호출

      101~150번 오프셋 처리 완료. 151~200 오프셋 미처리

(4) offset 200 커밋 불가

      브로커에는 100번 오프셋이 마지막 커밋

      컨슈머 재시작하면 다시 오프셋 101부터 처리 시작. → 101~150번 오프셋 중복 처리

 

 

코드 예시

  • wakeup() 을 통한 graceful shutdown이 필수
    SIGKILL(9)는 프로세스 강제 종료로 커밋 불가 → 중복/유실 발생
    SIGTERM을 통한 sutdown signal로 kill하여 처리한 데이터 커밋 필요

 

3. Consumer Multiple Thread

한개의 프로세스에 여러개의 스레드 생성

안전하게 종료하기 위해 shutdown, WakeupException 중요함. 실제 코드 개발할 때 유의하기!

 

ConsumerWithMultiThread 코드

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ConsumerWithMultiThread {
    private static String TOPIC_NAME = "test";
    private static String GROUP_ID = "testgroup";
    private static String BOOTSTRAP_SERVERS = "{브로커 ip}:9092";
    private static int CONSUMER_COUNT = 3;
    private static List<ConsumerWorker> workerThreads = new ArrayList<>();

    public static void main(String[] args) {
        Runtime.getRuntime().addShutdownHook(new ShutdownThread());
        Properties configs = new Properties();
        configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);
        configs.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
        configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);

        ExecutorService executorService = Executors.newCachedThreadPool();
        for (int i = 0; i < CONSUMER_COUNT; i++) {
            ConsumerWorker worker = new ConsumerWorker(configs, TOPIC_NAME, i);
            workerThreads.add(worker);
            executorService.execute(worker);
        }
    }

    static class ShutdownThread extends Thread {
        public void run() {
            workerThreads.forEach(ConsumerWorker::shutdown);
            System.out.println("Bye");
        }
    }
}

newCachedTreadPool : 스레드 완료되면 스스로 죽음

 

ConsumerWorker 코드

package com.tacademy;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.errors.WakeupException;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerWorker implements Runnable {
    private Properties prop;
    private String topic;
    private String threadName;
    private KafkaConsumer<String, String> consumer;

    ConsumerWorker(Properties prop, String topic, int number) {
        this.prop = prop;
        this.topic = topic;
        this.threadName = "consumer-thread-" + number;
    }

    @Override
    public void run() {
        consumer = new KafkaConsumer<>(prop);
        consumer.subscribe(Arrays.asList(topic));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(threadName + " >> " + record.value());
                }
                consumer.commitSync();
            }
        } catch (WakeupException e) {
            System.out.println(threadName + " trigger WakeupException");
        } finally {
            consumer.commitSync();
            consumer.close();
        }
    }

    public void shutdown() {
        consumer.wakeup();
    }
}

 

실행결과

3개의 스레드가 번갈아가면서 데이터 처리함. 터미널에서 jps 명령어로 ConsumerWithMultiThread 실행중인 것을 확인할 수 있음.

 

 

Trouble Shooting

1. 카프카가 제대로 동작하고 있지 않았을 시에 마주할 수 있는 에러

이럴 때는 인스턴스로 접속해서 jps 확인 후에 브로커, 카프카 서버 순서대로 다시 실행시키면 된다.

 

2. comsumer, producer 에서 코드 실행 시 broker와의 connection error가 났으면 먼저 broker가 잘 작동하고 있는지 확인. 인스턴스를 오랫동안 켜두면 왜인지 모르게 broker가 실행 중지 상태로 변경되었었음.