티스토리 뷰
Spring Boot에서 Apache Kafka Binder 를 사용할 때, Producer, Consumer 설정을 어떻게 하고 코드는 어떻게 작성하면 되는지 간략하게 요약식으로 적어두려고 한다.
dependency 추가
build.gradle
dependencies {
implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}
Spring Cloud Stream 3.0.0 부터 kafka Stream Binder 를 사용해서 Java 8 부터 제공하는 Functional programming 스타일을 사용할 수 있다. 이와 관련된 type은 java.util.function.Function 이나 java.util.function.Consumer 이다.
application.yaml 설정
spring:
cloud:
function:
definition: kafkaConsumer;
stream:
kafka:
bindings:
producer-out-0:
producer:
configuration:
key:
serializer: org.apache.kafka.common.serialization.StringSerializer
kafkaConsumer-in-0:
consumer:
configuration:
key:
deserializer: org.apache.kafka.common.serialization.StringDeserializer
binder:
brokers: 127.0.0.1:9092
bindings:
producer-out-0:
destination: test # topic
contentType: application/json
kafkaConsumer-in-0:
destination: test # topic
위 application.yaml을 하나씩 살펴보면,
spring.cloud.stream.kafka.binder.brokers 에는 boroker server정보를 작성한다.
Consumer <channel-name> = kafkaConsumer-in-0
spring.cloud.function.definition 의 값에 consumer에 해당하는 Bean이름을 작성한다.
spring.cloud.stream.bindings.<channel-name>.destination 에는 consume할 topic 명을 작성한다.
spring.cloud.stream.kafka.bindings.<channel-name>.consumer.configuration 에는 consumer 관련 설정을 작성하는데, 여기서는 key에 대한 Deserializer설정을 해주었다.
Producer <channel-name> = producer-out-0
spring.cloud.stream.bindings.<channel-name>.destination 에는 producer할 topic 명을 작성한다. (위 예제에서는 consume할 topic과 producer할 topic이 같다.)
spring.cloud.stream.kafka.bindings.<channel-name>.producer.configuration 에는 producer 관련 설정을 작성하는데, 여기서는 key에 대한 Serializer설정을 해주었다.
Consumer Code
package com.wanbaep.runner;
import com.wanbaep.model.ClusterResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;
import java.util.function.Consumer;
@Slf4j
@Service
public class KafkaConsumer implements Consumer<Message<ClusterResponse>> {
@SneakyThrows
@Override
public void accept(Message<ClusterResponse> clusterMessage) {
log.info("key: {}", clusterMessage.getHeaders().get(KafkaHeaders.RECEIVED_KEY));
ClusterResponse clusterResponse = clusterMessage.getPayload();
log.info("payload: {}", clusterResponse);
}
}
KafkaConsumer 클래스는 Java의 Consumer Type을 상속 받도록 했으며, Message<ClusterResponse> 타입을 accept하도록 설정했다.
따라서 accept method만 Override해서 어떠한 동작을 할지 작성하면 된다.
Reference 문서에서는 @Bean 으로 설정하는 예제들만 있는데, 이렇게 별도 클래스로 구분해서 하고자 하는 경우 implements를 이용한 인터페이스 상속을 사용해서 구현하면 된다.
Producer Code
package com.wanbaep.runner;
import com.wanbaep.model.ClusterResponse;
import lombok.AllArgsConstructor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
@AllArgsConstructor
@Service
public class KafkaProducer {
private static final String bindingName = "producer-out-0";
private final StreamBridge streamBridge;
@Async
public void send(String key, ClusterResponse payload) {
streamBridge.send(bindingName, MessageBuilder
.withPayload(payload)
.setHeader(KafkaHeaders.KEY, key)
.build());
}
}
Producer 코드인데, StreamBridge를 사용해서 message를 produce하면 된다. application.yaml 에서 작성한 <channel-name> 이 bindingName이 되고, org.springframework.messaging.Message 를 사용해서 메시지를 전송하고 있다. 해당 클래스를 사용한 이유는 Payload만 전송하는 것이 아닌, KafkaHeader를 함께 전송해야 하는 경우가 있기 때문에 사용했다. 위에서는 Kafkaheaders.KEY=key 값이 들어가도록 설정했다.
topic 조회 with kafka-console-consumer
key도 함께 보내고 있기 때문에 같이 조회되도록 --property print.key=true 설정을 넣어줬다.
root@9716ab3aeb82:~# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning -property print.key=true
# Consumer가 consume한 message
clusterId 0 {"cluster":{"id":"clusterId 0","name":"name 0"},"node":[{"id":"node1"}]}
clusterId 1 {"cluster":{"id":"clusterId 1","name":"name 1"},"node":[{"id":"node1"}]}
clusterId 2 {"cluster":{"id":"clusterId 2","name":"name 2"},"node":[{"id":"node1"}]}
clusterId 3 {"cluster":{"id":"clusterId 3","name":"name 3"},"node":[{"id":"node1"}]}
clusterId 4 {"cluster":{"id":"clusterId 4","name":"name 4"},"node":[{"id":"node1"}]}
clusterId 5 {"cluster":{"id":"clusterId 5","name":"name 5"},"node":[{"id":"node1"}]}
clusterId 6 {"cluster":{"id":"clusterId 6","name":"name 6"},"node":[{"id":"node1"}]}
clusterId 7 {"cluster":{"id":"clusterId 7","name":"name 7"},"node":[{"id":"node1"}]}
clusterId 8 {"cluster":{"id":"clusterId 8","name":"name 8"},"node":[{"id":"node1"}]}
clusterId 9 {"cluster":{"id":"clusterId 9","name":"name 9"},"node":[{"id":"node1"}]}
# Producer가 produce한 message
clusterId 0 {"cluster":{"id":"clusterId 0","name":"name 0"},"node":[{"id":"node1"}]}
clusterId 1 {"cluster":{"id":"clusterId 1","name":"name 1"},"node":[{"id":"node1"}]}
clusterId 2 {"cluster":{"id":"clusterId 2","name":"name 2"},"node":[{"id":"node1"}]}
clusterId 3 {"cluster":{"id":"clusterId 3","name":"name 3"},"node":[{"id":"node1"}]}
clusterId 4 {"cluster":{"id":"clusterId 4","name":"name 4"},"node":[{"id":"node1"}]}
clusterId 5 {"cluster":{"id":"clusterId 5","name":"name 5"},"node":[{"id":"node1"}]}
clusterId 6 {"cluster":{"id":"clusterId 6","name":"name 6"},"node":[{"id":"node1"}]}
clusterId 7 {"cluster":{"id":"clusterId 7","name":"name 7"},"node":[{"id":"node1"}]}
clusterId 8 {"cluster":{"id":"clusterId 8","name":"name 8"},"node":[{"id":"node1"}]}
clusterId 9 {"cluster":{"id":"clusterId 9","name":"name 9"},"node":[{"id":"node1"}]}
간단하게 key와 payload가 kafka를 통해서 전달되는 것을 확인해 볼 수 있다.
'Kafka' 카테고리의 다른 글
| [Kafka] Docker Compose로 Kafka 설치 후 간단한 테스트 (0) | 2024.02.25 |
|---|
- Total
- Today
- Yesterday
- 데스크톱 애플리케이션
- consumer
- node add
- minikube
- OneToOne
- cpus
- Spring Cloud Stream
- docker-compose
- 애노테이션 프로세서
- 웹 애플리케이션
- 특정 ip
- producer
- WEB-INF
- Java 특징
- ServiceMonitor
- Kafka
- 애플리케이션 변화 과정
- DD파일
- Java 장단점
- Prometheus Operator
- minikube node add
- MySQL 외부 IP
- Java 란
- StreamBridge
- ExpectedException
- kubernetes
- Servlet Container
- 서버 클라이언트
- Servlet
- springboot3.x
| 일 | 월 | 화 | 수 | 목 | 금 | 토 |
|---|---|---|---|---|---|---|
| 1 | 2 | 3 | 4 | 5 | 6 | 7 |
| 8 | 9 | 10 | 11 | 12 | 13 | 14 |
| 15 | 16 | 17 | 18 | 19 | 20 | 21 |
| 22 | 23 | 24 | 25 | 26 | 27 | 28 |
