Spring에 Kafka연동 테스트를 하기 위해 두개의 서버, server-a와 server-b를 생성하겠다.
Dependency
build.gradle.kts
implementation ("org.springframework.kafka:spring-kafka:2.8.0")
application.yml
server:
servlet:
context-path: /server-a
port: 8080
spring:
kafka:
bootstrap-servers: ${KAFKA_BROKER:localhost}:${KAFKA_BROKER_PORT:9092}
listener.ack-mode: MANUAL_IMMEDIATE
producer:
acks: 1
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: ${CONSUMER_GROUP:test-group}
auto-offset-reset: latest
enable-auto-commit: false
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
두개의 프로젝트 모두 위와 같이 설정해준다
server-a
controller
@RestController
class TestController(
private val service: Service
) {
@GetMapping
fun heartBeat(): String {
service.kafkaMessage("테스트 메세지")
return "OK"
}
}
service
@Service
class Service(
private val kafkaTemplate: KafkaTemplate<String, String>)
{
fun kafkaMessage(message: String) {
println("kafka send")
try {
kafkaTemplate.send("a-to-b", message)
} catch (e: Exception) {
println("ERROR")
throw RuntimeException(e)
}
}
}
server-a는 카프카 메세지를 던지는 서버이다. Service에서 kafkaTemplate를 이용해서 'a-to-b'라는 토픽으로 메세지를 던졌다.
server-b
@Service
class Service {
@KafkaListener(topics = ["a-to-b"])
@Transactional
fun receiveCreateMessage(message: String?, ack: Acknowledgment) {
try {
println(message)
ack.acknowledge()
} catch (e: Exception) {
println("ERROR")
throw RuntimeException(e)
}
}
}
server-b는 'a-to-b'토픽에서 메세지를 읽어오는 컨슈머 서버이다. 메세지를 읽기 위해서 @KafkaListner 애노테이션을 붙여주고 메세지를 읽는다.
테스트
코드 작성을 다 하고 실행시켜 테스트해보자
server-a
server-b
server-a에서 등록한 메세지를 server-b에서 확인한 것을 볼 수 있다
'spring' 카테고리의 다른 글
Spring에서 Http Request 보내기 - Feign (0) | 2021.09.19 |
---|---|
Spring Data Event Handler (0) | 2021.06.19 |
Custom HandlerMethodArgumentResolver (0) | 2021.06.08 |
JWT를 이용한 인증 처리 (0) | 2021.05.29 |
테스트 코드 커버리지 확인하기 (jacoco) tutorial (0) | 2021.05.01 |
댓글