@KafkaListener Annotation
많은 consumer의 방법 중 하나이다.
@KafkaListener(id = "foo", topics = MSG_TOPIC, clientIdPrefix = "myClientId")
public void listen(String data) {
System.out.println(data);
}
사용 방법은 다음과 같다.
id는 GROUP_ID_CONFIG 와 같다. 그래서 kafkaListener에서 지정하면 consumerConfig에서 group id 설정을 빼야한다.
[Consumer clientId=myClientId-0, groupId=foo]
[Consumer clientId=myClientId-1, groupId=foo]
[Consumer clientId=myClientId-2, groupId=foo]
clientIdPrefix는 kafka 연결할 때 위와 같은 로그가 보이는데
factory.setConcurrency(3); 으로 했을 때 멀티 쓰레드로 3개의 consumer 인스턴스에 대해 동작할 때
로그처럼 clientId를 붙여준다.
@EnableKafka 설정
@KafkaListener 사용에 필요한 것은 @Configuration 중 하나에 @EnableKafka 어노테이션이 필요하고 거기에는 기본 ConcurrentMessageListenerContainer를 구성하는데 사용되는 컨테이너 팩토리가 필요하다고 한다.
default로 기대되는 bean name은 kafkaListenerContainerFactory 이다.
@Configuration
@EnableKafka
public class MsgListenerContainerFactory {
public final String GROUP1 = "ConsumerGroup1";
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
factory.setConcurrency(3);
factory.getContainerProperties().setPollTimeout(3000);
return factory;
}
@Bean
public ConsumerFactory<Integer, String> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(consumerConfigs());
}
@Bean
public Map<String, Object> consumerConfigs() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
//props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP1); //Listener에서 id지정하면 여기는 주석
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return props;
}
}
문서에서 말한 default name에 맞춰 kafkaListenerContainerFactory() 빈을 등록했다.
consumerConfigs에 AUTO_OFFSET_RESET_CONFIG는 처음 접근이거나 커밋한 offset이 없는 경우의 동작이다.
기본값은 latest로 가장 마지막 offset을 사용하고 earlist는 맨 처음 offset을 사용한다.
참고 자료
https://docs.spring.io/spring-kafka/reference/html/#kafka-listener-annotation
4.1.4 Reciving Messages @KafkaListener Annotation
Spring for Apache Kafka
When using Spring for Apache Kafka in a Spring Boot application, the Apache Kafka dependency versions are determined by Spring Boot’s dependency management. If you wish to use a different version of kafka-clients or kafka-streams, and use the embedded ka
docs.spring.io
https://www.youtube.com/watch?v=xqrIDHbGjOY
'back-end > kafka' 카테고리의 다른 글
| kafka-spring Producer key에 따른 partition 배정 (0) | 2023.09.03 |
|---|---|
| docker bitnami/kafka 주키퍼없이(KRaft) (0) | 2023.09.02 |
| Kafka의 아키텍처를 알아보자 (0) | 2023.08.11 |