@KafkaListener Annotation

많은 consumer의 방법 중 하나이다.

    @KafkaListener(id = "foo", topics = MSG_TOPIC, clientIdPrefix = "myClientId")
    public void listen(String data) {
        System.out.println(data);
    }

사용 방법은 다음과 같다.

id는 GROUP_ID_CONFIG 와 같다. 그래서 kafkaListener에서 지정하면 consumerConfig에서 group id 설정을 빼야한다.

[Consumer clientId=myClientId-0, groupId=foo]
[Consumer clientId=myClientId-1, groupId=foo]
[Consumer clientId=myClientId-2, groupId=foo]

clientIdPrefix는 kafka 연결할 때 위와 같은 로그가 보이는데

factory.setConcurrency(3); 으로 했을 때 멀티 쓰레드로 3개의 consumer 인스턴스에 대해 동작할 때

로그처럼 clientId를 붙여준다.

@EnableKafka 설정

@KafkaListener 사용에 필요한 것은 @Configuration 중 하나에 @EnableKafka 어노테이션이 필요하고 거기에는 기본 ConcurrentMessageListenerContainer를 구성하는데 사용되는 컨테이너 팩토리가 필요하다고 한다.

default로 기대되는 bean name은 kafkaListenerContainerFactory 이다.

@Configuration
@EnableKafka
public class MsgListenerContainerFactory {
    public final String GROUP1 = "ConsumerGroup1";

    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
    kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        factory.setConcurrency(3);
        factory.getContainerProperties().setPollTimeout(3000);
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        //props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1); //Listener에서 id지정하면 여기는 주석
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        return props;
    }
}

문서에서 말한 default name에 맞춰 kafkaListenerContainerFactory() 빈을 등록했다.

 

consumerConfigs에 AUTO_OFFSET_RESET_CONFIG는 처음 접근이거나 커밋한 offset이 없는 경우의 동작이다.

기본값은 latest로 가장 마지막 offset을 사용하고 earlist는 맨 처음 offset을 사용한다.

 

참고 자료

https://docs.spring.io/spring-kafka/reference/html/#kafka-listener-annotation

4.1.4 Reciving Messages @KafkaListener Annotation

 

Spring for Apache Kafka

When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka

docs.spring.io

https://www.youtube.com/watch?v=xqrIDHbGjOY