1. Kafka 개요
- 링크드인에서 개발
- 아파치 재단에서 운영하고 있는 플랫폼
Message Oriented Middleware
미들웨어란?
- 시스템이나 컴포넌트 사이에서 중간자 역할을 해주는 것
- 두 시스템이 너무 강하게 연결되어 있으면, 서로의 의존성이 높기 때문에 미들웨어가 존재한다.
- ex) 서버 A - 서버 B가 강한 결합을 가지고 있다고 할 때, 서버 A 장애 시 서버 B에 전파됨
- 따라서, 느슨한 결합이 필요하다.
- 이런 완충재 역할을 하는 것이 '미들웨어'이고, '메시지에 기반했다'라고 해서 Message Oriented Middleware(MOM)라고 한다.
- 메시지 큐를 사용하는 플랫폼: Kafka, RabbitMQ
- 모놀리식 아키텍처에서 MSA로 변화되면서 이런 Kafka, RabbitMQ 같은 플랫폼의 수요가 늘어나고 있다.
- 모놀리식 아키텍처에도 적용할 수 있지만, MSA 환경에서 장점이 부각된다.
Kafka
- 대용량 처리에 강점
- 데이터가 날아가지 않는 비휘발성
RabbitMQ
- 메시지별로 다르게 처리를 해야 할 때 유용
- 메시지에 라우팅 키를 작성할 수 있고, 라우팅 키 별로 어떤 메시지 큐와 서버에 보내야 할지 라우팅 해줌
- ex) '정상 상황일 때와 에러 상황일 때 다른 메시지큐로 보낸다'와 같은 처리 가능
카프카가 생겨난 이유
- 여러 가지 모듈이 직접 통신...
- 규모가 작을 때는 모듈 별로 소스 코드 관리나 버전 관리가 가능했으나
- 서비스가 커지고 이용자가 많아지면서 트래픽이 늘어남 -> 소스 코드가 관리와 버전 관리가 힘들어지고, 장애 발생 시 장애가 전파됨
- 즉, 모듈끼리 너무 강하게 결합되어 있다.
- 중앙 집결되어 있으면서 느슨한 결합을 할 수 있는 미들웨어가 필요하다!!
- 그렇게 변경된 구조
- publisher
- 메시지를 생성
- 누가 가져가든 관심이 없음
- 이 토픽으로 메시지 생성해야지~~해서 메시지를 생성하면 역할 끝
- Kafka
- 토픽별로 메시지를 가지고 있음
- subscriber
- 메시지를 사용
- subscriber 별로 offset을 관리 (어디어디까지 읽었다)
- 토픽에 새로운 메시지가 갱신되면 offset을 보고 메시지를 가져가게 됨
- 누가 해당 메시지를 발행했는지는 관심 없음
- 모듈이 많아지더라도, 카프카에 토픽으로만 관리하면 됨
- 느슨한 결합 + 중앙 집결화된 관리
서비스 기업에서도 많이 사용하는 메시지 플랫폼!!!
2. Message Queue
메시지 큐를 사용하는 이유?
- Asynchronous
- 비동기 처리
- 응답을 기다릴 필요가 없으므로, 성능이 좋아짐
- Decoupling
- 직접 통신하지 않으므로, 두 컴포넌트의 결합이 느슨해지고 장애 전파가 되지 않음
- Scalability
- 스케일 인, 스케일 아웃 자유로움
- 메시지 큐를 늘렸다 줄였다 할 수 있음
메시지 큐의 두 가지 모델
Point to Point
- Sender가 메시지를 보냄
- 여러 명의 Receiver가 있을 수는 있지만, 한 Receiver 메시지를 consume한다면 다른 Receiver들은 해당 메시지를 볼 수 없다.
- 과거에 많이 사용되던 메시지 큐 방식
- 두 가지 모델
- fire and forget
- Sender가 메시지를 발생하고 Receiver가 받든 말든 자신의 프로세스를 진행
- 처리 속도가 빠르지만, 메시지 유실 가능성이 있음
- request reply
- Request 큐와 Response 큐가 존재
- Sender가 Request 큐에 Receiver를 향해서 메시지를 보내고, Recevier가 메시지를 잘 받았다면 Response 큐에 메시지를 잘 받았음을 응답을 남김으로써 Sender에게 알림
- Sender가 기다렸다가 다음 프로세스를 진행하기 때문에 메시지 유실 가능성은 없지만, 응답 처리 속도가 느림
- fire and forget
Pub/Sub
Kafka 방식, 토픽으로 관리 (참고로 RabbitMQ는 옵션으로 Point to Point와 Pub/Sub 중에서 정할 수 있다!)
- Publisher
- Subscriber를 별도로 지정하지 않는다.
- 토픽에 메시지를 발행
- Subscriber
- 토픽의 offset을 관리하며 메시지를 consume함
- ACK 값을 설정하여 프로듀서가 카프카에 메시지를 어떤 방식으로 보낼 것인지 설정할 수 있음
RabbitMQ vs. Kafka
RabbitMQ
- Exchage가 존재
- Publisher가 메시지를 발행할 때 메시지에 라우팅 키를 넣어줌
- 라우팅 키를 기반으로 큐를 지정하거나, 메시지의 상태값을 보고 큐를 지정하는 라우팅 처리 진행
- 라우팅 키랑 정확히 일치하는 큐로 보내준다.
- 패턴이 비슷한 애들은 이쪽에 몰아서 처리한다.
- 헤더 값을 보고 라우팅을 나눠준다.
- 전체적으로 뿌린다 등 Exchange 설정에 따라서 다양한 방식으로 큐에 메시지를 뿌려줄 수 있다.
- 메시지 큐에서 consume되면 메시지가 사라짐 -> 휘발성 데이터
Kafka
- 토픽 기반으로 관리
- 하나의 토픽에 여러 개의 파티션이 존재
- 파티션에 Producer가 메시지 이벤트를 발행시키고 파티션에서 Subscriber가 메시지를 읽어가는 형태
- 토픽과 파티션은 1:N 관계 (토픽당 하나의 파티션이 있을 수도 있고, 여러 개의 파티션이 있을 수도 있다)이다.
- 파티션 내에서는 순서가 보장되나, 파티션 간에는 순서보장이 되지 않는다.
- 토픽 기반으로 발행 및 구독하는 구조이므로 라우팅 기능은 존재하지 않는다.
3. Kafka에 대한 상세한 내용
- 브로커
- 카프카가 구성된 서버
- 여러 대의 브로커가 존재할 수 있음
- 카프카 서버도 장애를 일으킬 수 있기 때문에, 가용성을 위함
- 최대 3대의 브로커를 사용해 카프카 클러스터를 구성하라고 권장
- 즉, 브로커들이 모여 카프카 클러스터를 구성
- 브로커 내부에는 여러 개의 토픽들이 존재하며, 토픽 내에는 여러 개의 파티션이 존재
- 토픽 내 파티션 수는 정하기 나름이나 보통 컨슈머 수와 맞추게 된다.
- 파티션이 여러 개 이기 때문에 한 토픽에 대해서 병렬적으로 메시지 처리가 가능하다.
- 병렬 처리를 통해 시스템의 효율을 높인 것이다.
- 한 파티션 내에는 시간 순서가 보장되지만, 파티션 간에는 보장되지 않으므로 시간 보장이 필요하다면 하나의 파티션에 몰아줘야 한다.
- 다시 복습하면, 카프카 클러스터는 여러 개의 브로커로 구성되어 있고 브로커란 카프카 서버
- 토픽은 각 브로커마다 있을 수도 있고, 하나의 브로커에만 있을 수도 있다.
- 카프카 내 replication factor를 통해 토픽을 몇 개의 브로커에 나눠서 저장할 것인지를 정할 수 있다.
- 브로커가 죽었을 때 다른 브로커가 업무를 받아서 이행할 수 있게 구성해야 하기 때문
- replication factor를 1로 설정하여 한 개의 브로커에만 생성했다면
- 이 브로커에 장애가 발생했을 때 여기에 있는 토픽은 더 이상 사용할 수 없으므로 문제 발생!
- 브로커가 죽었을 때 다른 브로커가 업무를 받아서 이행할 수 있게 구성해야 하기 때문
- 토픽 내 파티션 여러 개
- Leader 파티션
- Pub/Sub과 직접 통신
- Follow 파티션
- Leader 파티션의 메시지들을 replication해서 동기화를 맞춰줌
- 파티션 1의 Leader는 브로커 1에 있고, 파티션 2의 Leader는 브로커 2에 있는 식
- In-Sync-Replica(ISR): 잘 동기화되었음을 의미
- Leader 파티션
- 브로커가 여러 개 있는 것은 고가용성을 위함
- Controller
- 브로커의 장애 감지
- 브로커 하나가 Controller 역할을 한다.
- 브로커 중 장애가 발생한 것이 있다면, 장애가 발생한 브로커의 파티션 중 Leader 역할을 하고 있는 것이 있다면 다른 브로커의 파티션으로 Leader 지위를 넘겨줌
- 코디네이터
- 컨슈머 그룹 내에서 장애 감지
- 해당 컨슈머가 구독하고 있던 것들을 다른 컨슈머에게 보내주는 것
- Controller
- replication factor
- 브로커보다는 작거나 같아야 한다.
- 컨슈머에 파티션 개수를 맞추는 이유?
- 하나의 컨슈머가 여러 개의 파티션을 구독할 수 있지만, 하나의 파티션은 여러 개의 컨슈머에게 구독당할 수 없다.
- 파티션 N : 컨슈머 1
- 컨슈머보다 파티션이 적으면 노는 컨슈머가 생긴다.
- == 파티션보다 컨슈머가 많으면 노는 컨슈머가 생긴다.
- 토픽 이름
- 카프카는 회사 내에서 공용으로 사용하는 경우가 많다.
- 따라서, 어떤 팀에서 어떤 용도로 사용하는 토픽인지 명확하게 밝혀주는 네이밍이 중요하다.
- 컨슈머별로 파티션의 어디까지를 읽었는지 offset을 관리
- 각각의 컨슈머가 어디까지 읽었는지 알 수 있음
ACK 방식
프로듀서가 카프카에 메시지를 어떠한 방식으로 보낼 것이냐?!
ACK 옵션 | 설명 | 비고 |
0 | 프로듀서가 Leader 파티션에 전송 후, 응답값 받지 않음 | 성능은 좋지만, 유실 가능성 있음 |
1 | 프로듀서가 Leader 파티션에 전송 후, Leader 파티션의 응답을 받음 (Leader 파티션까지) | 0보다 성능은 떨어지지만, 유실 가능성은 적어짐 |
all | 프로듀서가 Leader 파티션에 전송 후, Replication 정상 응답까지 기다림 (Leader 파티션에 Follow 파티션까지) | 가장 느리지만, 유실 가능성 없음 |
카프카 특징
- Disk I/O
- 데이터를 디스크에 저장
- I/O 리소스 소모가 많지 않을까...?
- Page Cache를 통하여 성능 개선
- 단, 메모리를 많이 잡아먹기 때문에 카프카의 메모리 사용이 서비스에 영향을 줄 수 있다.
- 따라서 카프카와 서비스를 분리하는 것이 좋다.
- 데이터 전송을 배치 처리
- 메시지 발생, 소비를 하나씩 처리하는 것이 아닌 일정 단위로 묶어서 처리
- 처리할 때마다 통신 리소스 소모가 발생하기 때문에, 묶어 처리함으로써 통신 리소스 소모를 최소화한다.
Kafka Streams
- 카프카에서 지원하는 외부 API 라이브러리
- 카프카 프레임워크에 종속되어있지 않다.
- ex)
- 증권 시세를 받을 때, 시세가 쭉쭉 이벤트 성으로 들어오는데
- 카프카 메시지를 Kafka Streams를 통해 시세의 변화를 감지하고 있다가 다른 데이터와 엮어서 또 다른 이벤트를 발생시킨다.
- 토스의 증권 알림을 떠올려보자!
- 삼성전자가 52주 최고가를 찍었다는 것을 알게 되면 '52주 최고가'라는 이벤트를 발생시켜서 이것과 관련된 비즈니스 로직을 수행한다.
- 이렇듯 다양한 것들을 시도해 볼 수 있다.
4. Kafka, ZooKeeper Docker 띄워보기
ZooKeeper
- 카프카의 오케스트레이션 툴
- 카프카의 메타 데이터를 관리
docker-compose.yml
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
container_name: zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka:2.12-2.5.0
container_name: kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
volumes:
- /var/run/docker.sock:/var/run/docker.sock
docker-compose up -d
docker exec -it kafka bash
kafka-topics.sh --list --bootstrap-server localhost:9092
토픽을 만들어보자!
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
- api 서버에서 심사 요청을 카프카에 보내기
- consumer에서 해당 토픽을 받아와서 메시지를 consume해서 css 모듈 쪽으로 심사 보내기
loan-request라는 토픽을 만들자.
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic loan-request
로컬 환경에서 도커 띄우기 -> 카프카 컨테이너에 접속해서 토픽 생성
흐름
- 스프링 부트 애플리케이션에서 카프카로 접속할 수 있도록 카프카 configuration을 생성
- 카프카로 명령을 보내는 producer를 개발
- api 모듈에서 해당 producer 쪽으로 메시지 produce
- 메시지를 consuming하는 consumer 모듈 만들기
- 해당 토픽이 생성될 때마다 consumer가 해당 메시지를 읽어서 css 쪽으로 순차적으로 심사를 보내기
- 심사를 받은 후에 심사 결과 데이터를 넣기
5. Kafka Configuration, Producer 구현
개념 복습
Apache Kafka 간략하게 살펴보기
본 문서는 “카프카 핵심 가이드 (제이펍)” 를 참고하였습니다.
medium.com
모듈 생성
build.gradle.kts
plugins {}
version = "0.0.1"
dependencies {
// Kafka
implementation("org.springframework.kafka:spring-kafka:2.8.0")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.14.+")
}
버전이 맞지 않아 jackson을 사용했지만, 이게 더 성능이 좋으니 참고!
https://kotlinlang.org/docs/serialization.html
패키지 구조
프론트엔트랑 api가 사용하는 dto와 카프카와 다른 모듈이 사용하는 dto는 보통 다르기 때문에 별도의 dto 패키지 생성
KafkaConfig
package happyprogfrog.kafka.config
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.DefaultKafkaProducerFactory
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.kafka.core.ProducerFactory
@Configuration
@EnableKafka
class KafkaConfig {
companion object {
const val BOOTSTRAP_SERVER = "localhost:9092"
}
@Bean
fun producerFactory(): ProducerFactory<String, String> {
val configurationProperties = HashMap<String, Any>()
configurationProperties[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVER
configurationProperties[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configurationProperties[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configurationProperties)
}
@Bean
fun consumerFactory(): ConsumerFactory<String, String> {
val configurationProperties = HashMap<String, Any>()
configurationProperties[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = BOOTSTRAP_SERVER
configurationProperties[ConsumerConfig.GROUP_ID_CONFIG] = "fintech"
configurationProperties[ConsumerConfig.AUTO_OFFSET_RESET_CONFIG] = "earliest"
configurationProperties[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
configurationProperties[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
return DefaultKafkaConsumerFactory(configurationProperties);
}
@Bean
fun kafkaTemplate(): KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, String> {
val factory = ConcurrentKafkaListenerContainerFactory<String, String>()
factory.consumerFactory = consumerFactory()
return factory
}
}
- @EnableKafka
- 스프링 카프카를 활성화
- 카프카 관련 설정을 자동으로 구성
- companion object
- 코틀린에서 자바의 static 멤버와 유사하게 동작
- @Bean
- 스프링 컨텍스트에 빈으로 등록
- ProducerFactory<String, String>
- 카프카 프로듀서를 생성하는 팩토리
- 카프카 서버 주소를 설정하고, 메시지 키와 값을 직렬화할 클래스를 설정
- ConsumerFactory<String, String>
- 카프카 컨슈머를 생성하는 팩토리
- 카프카 서버 주소를 설정하고, 메시지 키와 값을 역직렬화할 클래스를 설정
- 컨슈머 그룹 ID와 오프셋 초기 설정(earliest는 가장 처음부터 읽기 시작)
- KafkaTemplate<String, String>
- 메시지를 카프카 토픽에 전송하기 위한 템플릿
- 앞서 정의한 프로듀서 팩토리를 사용
- ConcurrentKafkaListenerContainerFactory<String, String>
- 카프카 메시지를 비동기적으로 수신하기 위한 컨테이너 팩토리
- 앞서 정의한 컨슈머 팩토리를 사용
KafkaTopic
토픽들은 enum으로 관리
package happyprogfrog.kafka.enum
enum class KafkaTopic(val topicName: String) {
LOAN_REQUEST("loan_request");
}
LoanRequestDto
- 카프카를 통해서 어떠한 데이터를 주고받을지 정의
- CB사에 어떤 것들을 보낼 것인가?
- 기존에 api 모듈 쪽에서 사용하는 dto랑 필드가 비슷하지만, 분리해 주는 것이 좋다.
package happyprogfrog.kafka.dto
data class LoanRequestDto(
val userKey: String,
val userName: String,
val userIncomeAmount: Long,
var userRegistrationNumber: String
)
LoanRequestSender
- 카프카 쪽으로 메시지를 produce
- 다른 곳에서 주입해서 사용할 수 있도록 @Componet 애노테이션 사용
package happyprogfrog.kafka.producer
import com.fasterxml.jackson.databind.ObjectMapper
import happyprogfrog.kafka.dto.LoanRequestDto
import happyprogfrog.kafka.enum.KafkaTopic
import org.springframework.kafka.core.KafkaTemplate
import org.springframework.stereotype.Component
@Component
class LoanRequestSender(
private val kafkaTemplate: KafkaTemplate<String, String>,
private val objectMapper: ObjectMapper
) {
fun sendMessage(topic: KafkaTopic, loanRequestDto: LoanRequestDto) {
kafkaTemplate.send(topic.topicName, objectMapper.writeValueAsString(loanRequestDto))
}
}
api 모듈에서 사용
build.gradle.kts
implementation(project(":domain"))
implementation(project(":kafka")) // 추가!
@ComponetScan 작성
다른 모듈에 있는 @Compoent 애노테이션을 인식하지 못하므로, 아래와 같이 명시
package happyprogfrog.api
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.domain.EntityScan
import org.springframework.boot.runApplication
import org.springframework.context.annotation.ComponentScan
@SpringBootApplication
@EntityScan(basePackages = ["happyprogfrog.domain"])
@ComponentScan(basePackages = ["happyprogfrog"])
class ApiApplication
fun main(args: Array<String>) {
runApplication<ApiApplication>(*args)
}
LoanRequestService 수정
package happyprogfrog.api.loan.request
import happyprogfrog.domain.domain.UserInfo
interface LoanRequestService {
fun loanRequestMain(
loanRequestInputDto: LoanRequestDto.LoanRequestInputDto
): LoanRequestDto.LoanRequestResponseDto
fun saveUserInfo(userInfoDto: UserInfoDto): UserInfo
fun loanRequestReview(userInfoDto: UserInfoDto)
}
UserInfoDto에 메서드 추가
package happyprogfrog.api.loan.request
import happyprogfrog.domain.domain.UserInfo
import happyprogfrog.kafka.dto.LoanRequestDto
data class UserInfoDto (
val userKey: String,
val userName: String,
val userRegistrationNumber: String,
val userIncomeAmount: Long
){
fun toEntity(): UserInfo =
UserInfo(
userKey, userRegistrationNumber, userName, userIncomeAmount
)
fun toLoanRequestKafkaDto() = LoanRequestDto(userKey, userName, userIncomeAmount, userRegistrationNumber)
}
LoanRequestServiceImpl 코드 추가
package happyprogfrog.api.loan.request
import happyprogfrog.api.loan.GenerateKey
import happyprogfrog.api.loan.encrypt.EncryptComponent
import happyprogfrog.domain.repository.UserInfoRepository
import happyprogfrog.kafka.enum.KafkaTopic
import happyprogfrog.kafka.producer.LoanRequestSender
import org.springframework.stereotype.Service
@Service
class LoanRequestServiceImpl(
private val generateKey: GenerateKey,
private val userInfoRepository: UserInfoRepository,
private val encryptComponent: EncryptComponent,
private val loanRequestSender: LoanRequestSender
): LoanRequestService {
/**
* 대출 심사 요청
*/
override fun loanRequestMain(
loanRequestInputDto: LoanRequestDto.LoanRequestInputDto
): LoanRequestDto.LoanRequestResponseDto {
// 유저 키 생성
val userKey = generateKey.generateUserKey()
// 주민 번호 암호화
loanRequestInputDto.userRegistrationNumber =
encryptComponent.encryptString(loanRequestInputDto.userRegistrationNumber)
val userInfoDto = loanRequestInputDto.toUserInfoDto(userKey);
// 유저 정보 저장
saveUserInfo(userInfoDto)
// 카프카를 통해서 유저 심사 요청
loanRequestReview(userInfoDto)
return LoanRequestDto.LoanRequestResponseDto(userKey)
}
/**
* 유저 정보 저장
*/
override fun saveUserInfo(userInfoDto: UserInfoDto) = userInfoRepository.save(userInfoDto.toEntity())
/**
* 카프카를 통해서 유저 심사 요청
*/
override fun loanRequestReview(userInfoDto: UserInfoDto) {
loanRequestSender.sendMessage(
KafkaTopic.LOAN_REQUEST,
userInfoDto.toLoanRequestKafkaDto()
)
}
}
동작 확인
api 호출
카프카에서 메시지 발행 확인
docker exec -it kafka bash
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic loan_request --from-beginning
6. Consumer 모듈 구현
독립적으로 따로 배포가 돼서, 메시지 큐에 메시지가 쌓일 때마다 읽어서 CB사로 보내줘야 하기 때문에 별도의 구동 가능한 서버가 있어야 한다.
ConsumerApplication
package happyprogfrog.consumer
import org.springframework.boot.autoconfigure.SpringBootApplication
import org.springframework.boot.autoconfigure.domain.EntityScan
import org.springframework.boot.runApplication
@SpringBootApplication
@EntityScan(basePackages = ["happyprogfrog.domain"]) // 중요!
class ConsumerApplication
fun main(args: Array<String>) {
runApplication<ConsumerApplication>(*args)
}
build.gradle.kts
Consumer 모듈의 역할은 카프카 메시지를 읽어서(consume) CB사에 다녀와서 CB사의 응답으로부터 금리와 한도액을 받아 DB에 넣어주는 것! 따라서, 필요한 의존성은 다음과 같다.
plugins {}
version = "0.0.1"
dependencies {
// Kafka
implementation(project(":kafka"))
implementation("org.springframework.kafka:spring-kafka:2.8.0")
implementation("com.fasterxml.jackson.module:jackson-module-kotlin:2.14.+")
// Domain
implementation(project(":domain"))
implementation("org.springframework.boot:spring-boot-starter-data-jpa:2.7.6")
// Web Server
implementation("org.springframework.boot:spring-boot-starter-web")
}
JpaAuditingConfiguration
package happyprogfrog.consumer.config
import org.springframework.context.annotation.Configuration
import org.springframework.data.jpa.repository.config.EnableJpaAuditing
import org.springframework.data.jpa.repository.config.EnableJpaRepositories
@Configuration
@EnableJpaAuditing
@EnableJpaRepositories(basePackages = ["happyprogfrog.domain"])
class JpaAuditingConfiguration
포트 설정
api와 consumer를 각각 띄운다고 하면, 포트가 충돌 나기 때문에 포트를 각각 지정해 주기.
api - application.yml 수정
spring:
profiles:
include:
- domain
mvc:
pathmatch:
matching-strategy: ant_path_matcher
server:
port: 80
consumer - application.yml 추가
server:
port: 8080
추가로, DB와 관련된 부분은 domain을 참고하도록 하기 위해서 최종적으로는 다음과 같이 작성
server:
port: 8080
spring:
profiles:
include:
- domain
LoanRequestConsumer
package happyprogfrog.consumer.kafka
import com.fasterxml.jackson.databind.ObjectMapper
import happyprogfrog.kafka.dto.LoanRequestDto
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.stereotype.Service
@Service
class LoanRequestConsumer(
private val objectMapper: ObjectMapper
) {
@KafkaListener(topics = ["loan_request"], groupId = "fintech")
fun loanRequestTopicConsumer(message: String) {
val loanRequestKafkaDto = objectMapper.readValue(message, LoanRequestDto::class.java)
println(loanRequestKafkaDto)
}
}
- @Service
- 카프카를 Listen해서 발행될 때, 바로바로 처리
- 받을 때 String으로 받으니까 ObjectMapper 사용
- @KafkaListener
- 토픽과 그룹 ID를 지정해 주면, 해당 토픽에 그룹 ID로 묶여있는 것 끼리 offset을 따라서 카프카 토픽을 읽어온다.
- 메시지를 받아 LoanRequestDto로 바꾸기
동작 확인
ApiApplication과 ConsumerApplication을 차례대로 띄운 후에 확인
카프카를 통해서 데이터를 주고받은 것이 확인된 것이다!
LoanRequestService
CB 쪽의 API 스펙을 알지 못하기 때문에, 대략적인 틀만 만들어두기.
package happyprogfrog.consumer.service
import happyprogfrog.domain.domain.LoanReview
import happyprogfrog.domain.repository.LoanReviewRepository
import org.springframework.stereotype.Service
@Service
class LoanRequestService(
private val loanReviewRepository: LoanReviewRepository
) {
fun loanRequest() {
// TODO: CB Component 로 요청 보내기 -> 응답값을 DB에 저장하기
}
fun loanRequestToCb() {
// TODO
}
fun saveLoanReviewData(loanReview: LoanReview) = loanReviewRepository.save(loanReview)
}