티스토리 뷰

반응형

 

 

Spring Boot에서 Apache Kafka Binder 를 사용할 때, Producer, Consumer 설정을 어떻게 하고 코드는 어떻게 작성하면 되는지 간략하게 요약식으로 적어두려고 한다.

 

dependency 추가

build.gradle 

dependencies {
    implementation 'org.springframework.cloud:spring-cloud-stream-binder-kafka'
    implementation 'org.springframework.cloud:spring-cloud-starter-stream-kafka'
}

 

Spring Cloud Stream 3.0.0 부터 kafka Stream Binder 를 사용해서 Java 8 부터 제공하는 Functional programming 스타일을 사용할 수 있다. 이와 관련된 type은 java.util.function.Function 이나 java.util.function.Consumer 이다.

 

 

application.yaml 설정

spring:
  cloud:
    function:
      definition: kafkaConsumer;
    stream:
      kafka:
        bindings:
          producer-out-0:
            producer:
              configuration:
                key:
                  serializer: org.apache.kafka.common.serialization.StringSerializer
          kafkaConsumer-in-0:
            consumer:
              configuration:
                key:
                  deserializer: org.apache.kafka.common.serialization.StringDeserializer
        binder:
          brokers: 127.0.0.1:9092
      bindings:
        producer-out-0:
          destination: test # topic
          contentType: application/json
        kafkaConsumer-in-0:
          destination: test  # topic

application.yaml을 하나씩 살펴보면,

spring.cloud.stream.kafka.binder.brokers 에는 boroker server정보를 작성한다.

 

Consumer <channel-name> = kafkaConsumer-in-0

spring.cloud.function.definition 의 값에 consumer에 해당하는 Bean이름을 작성한다.

spring.cloud.stream.bindings.<channel-name>.destination 에는 consume할 topic 명을 작성한다.

spring.cloud.stream.kafka.bindings.<channel-name>.consumer.configuration 에는 consumer 관련 설정을 작성하는데, 여기서는 key에 대한 Deserializer설정을 해주었다.

 

Producer <channel-name> = producer-out-0

spring.cloud.stream.bindings.<channel-name>.destination 에는 producer할 topic 명을 작성한다. (위 예제에서는 consume할 topic과 producer할 topic이 같다.)

spring.cloud.stream.kafka.bindings.<channel-name>.producer.configuration 에는 producer 관련 설정을 작성하는데, 여기서는 key에 대한 Serializer설정을 해주었다.

 

Consumer Code

package com.wanbaep.runner;

import com.wanbaep.model.ClusterResponse;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.stereotype.Service;

import java.util.function.Consumer;

@Slf4j
@Service
public class KafkaConsumer implements Consumer<Message<ClusterResponse>> {

    @SneakyThrows
    @Override
    public void accept(Message<ClusterResponse> clusterMessage) {
        log.info("key: {}", clusterMessage.getHeaders().get(KafkaHeaders.RECEIVED_KEY));
        ClusterResponse clusterResponse = clusterMessage.getPayload();
        log.info("payload: {}", clusterResponse);
    }
}

 

KafkaConsumer 클래스는 Java의 Consumer Type을 상속 받도록 했으며, Message<ClusterResponse> 타입을 accept하도록 설정했다.

따라서 accept method만 Override해서 어떠한 동작을 할지 작성하면 된다.

Reference 문서에서는 @Bean 으로 설정하는 예제들만 있는데, 이렇게 별도 클래스로 구분해서 하고자 하는 경우 implements를 이용한 인터페이스 상속을 사용해서 구현하면 된다.

 

Producer Code

package com.wanbaep.runner;

import com.wanbaep.model.ClusterResponse;
import lombok.AllArgsConstructor;
import org.springframework.cloud.stream.function.StreamBridge;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;

@AllArgsConstructor
@Service
public class KafkaProducer {
    private static final String bindingName = "producer-out-0";

    private final StreamBridge streamBridge;

    @Async
    public void send(String key, ClusterResponse payload) {
        streamBridge.send(bindingName, MessageBuilder
                .withPayload(payload)
                .setHeader(KafkaHeaders.KEY, key)
                .build());
    }

}

 

Producer 코드인데, StreamBridge를 사용해서 message를 produce하면 된다. application.yaml 에서 작성한 <channel-name> 이 bindingName이 되고, org.springframework.messaging.Message 를 사용해서 메시지를 전송하고 있다. 해당 클래스를 사용한 이유는 Payload만 전송하는 것이 아닌, KafkaHeader를 함께 전송해야 하는 경우가 있기 때문에 사용했다. 위에서는 Kafkaheaders.KEY=key 값이 들어가도록 설정했다.

 

topic 조회 with kafka-console-consumer

key도 함께 보내고 있기 때문에 같이 조회되도록 --property print.key=true 설정을 넣어줬다.

root@9716ab3aeb82:~# kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning -property print.key=true

# Consumer가 consume한 message
clusterId 0	{"cluster":{"id":"clusterId 0","name":"name 0"},"node":[{"id":"node1"}]}
clusterId 1	{"cluster":{"id":"clusterId 1","name":"name 1"},"node":[{"id":"node1"}]}
clusterId 2	{"cluster":{"id":"clusterId 2","name":"name 2"},"node":[{"id":"node1"}]}
clusterId 3	{"cluster":{"id":"clusterId 3","name":"name 3"},"node":[{"id":"node1"}]}
clusterId 4	{"cluster":{"id":"clusterId 4","name":"name 4"},"node":[{"id":"node1"}]}
clusterId 5	{"cluster":{"id":"clusterId 5","name":"name 5"},"node":[{"id":"node1"}]}
clusterId 6	{"cluster":{"id":"clusterId 6","name":"name 6"},"node":[{"id":"node1"}]}
clusterId 7	{"cluster":{"id":"clusterId 7","name":"name 7"},"node":[{"id":"node1"}]}
clusterId 8	{"cluster":{"id":"clusterId 8","name":"name 8"},"node":[{"id":"node1"}]}
clusterId 9	{"cluster":{"id":"clusterId 9","name":"name 9"},"node":[{"id":"node1"}]}

# Producer가 produce한 message
clusterId 0	{"cluster":{"id":"clusterId 0","name":"name 0"},"node":[{"id":"node1"}]}
clusterId 1	{"cluster":{"id":"clusterId 1","name":"name 1"},"node":[{"id":"node1"}]}
clusterId 2	{"cluster":{"id":"clusterId 2","name":"name 2"},"node":[{"id":"node1"}]}
clusterId 3	{"cluster":{"id":"clusterId 3","name":"name 3"},"node":[{"id":"node1"}]}
clusterId 4	{"cluster":{"id":"clusterId 4","name":"name 4"},"node":[{"id":"node1"}]}
clusterId 5	{"cluster":{"id":"clusterId 5","name":"name 5"},"node":[{"id":"node1"}]}
clusterId 6	{"cluster":{"id":"clusterId 6","name":"name 6"},"node":[{"id":"node1"}]}
clusterId 7	{"cluster":{"id":"clusterId 7","name":"name 7"},"node":[{"id":"node1"}]}
clusterId 8	{"cluster":{"id":"clusterId 8","name":"name 8"},"node":[{"id":"node1"}]}
clusterId 9	{"cluster":{"id":"clusterId 9","name":"name 9"},"node":[{"id":"node1"}]}

 

간단하게 key와 payload가 kafka를 통해서 전달되는 것을 확인해 볼 수 있다.

반응형

'Kafka' 카테고리의 다른 글

[Kafka] Docker Compose로 Kafka 설치 후 간단한 테스트  (0) 2024.02.25
댓글
반응형
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
«   2026/02   »
1 2 3 4 5 6 7
8 9 10 11 12 13 14
15 16 17 18 19 20 21
22 23 24 25 26 27 28
글 보관함