이 글은 인프런 Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) 강의를 듣고 쓴 글입니다.
Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) - 인프런 | 강의
Spring framework의 Spring Cloud 제품군을 이용하여 마이크로서비스 애플리케이션을 개발해 보는 과정입니다. Cloud Native Application으로써의 Spring Cloud를 어떻게 사용하는지, 구성을 어떻게 하는지에 대해
www.inflearn.com
Apache Kafka의 활용
이번 섹션에서는 저번에 했던 내용들을 프로젝트 안에서 실제로 적용해볼 것이다. Order Service와 Catalog Service를 Kafka에 적용시킬 것이며, 주문 시 Order Service의 데이터베이스에서 남은 수량의 감소가 일어날 건데, 수량의 감소를 Catalog Service의 데이터베이스에서도 적용시킬 것이다.
이 때, 수량의 감소를 알려주는 Order Service가 Producer, 이 정보를 받는 Catalog Service가 Consumer이다. 각각이 어떻게 코딩해야 하는지를 살펴보자. Consumer 부터.
Catalog Microservice 수정
우선, 다음과 같은 의존성을 추가한다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
이제, Kafka에 활용할 패키지 messagequeue 를 만들어 다음과 같은 클래스를 만든다.
@EnableKafka
@Configuration
public class KafkaConsumerConfig {
//topic 에 접속하기 위한 정보를 담고 있는 bean
@Bean
public ConsumerFactory<String, String> consumerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumerGroupId");
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
return new DefaultKafkaConsumerFactory<>(properties);
}
//topic 에 변경이 발생했는지를 탐지하는 리스너
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory
= new ConcurrentKafkaListenerContainerFactory<>();
kafkaListenerContainerFactory.setConsumerFactory(consumerFactory());
return kafkaListenerContainerFactory;
}
}
- Kafka의 Consumer를 위한 설정 클래스이다. 따라서 @Configuration 와 @EnableKafka 를 붙여준다.
- 다음 2개의 빈을 생성한다.
- Topic에 접속하기 위한 정보를 담고 있는 Bean이 필요하다. 내가 구독할 Topic을 정하는 것이라고 생각하면 된다. 총 4개의 정보를 설정했다. 먼저 카프카 서버에 접속할 수 있는 주소 정보가 필요하다. 그리고 만약, Consumer의 그룹이 있다면 써주면 되는데, 우린 없으니까 해당 코드는 있으나 마나이다... 또한 Kafka에 보낸 데이터들은 직렬화 되어 있을 것이므로 그걸 다시 자바 어플리케이션 내부에서 쓰려면 역직렬화 해야 한다. 우린 거기에 StringDeserializer를 사용한다.
- Topic에 변경이 발생했는지를 계속 리스닝하고 있는 Bean 도 필요하다. 방금 생성한 Topic 설정 정보를 이용해서 리스너를 Bean으로 등록한다.
이제 실제 Consumer가 동작하는 로직을 수행하는 서비스 클래스인 KafkaConsumer 클래스를 다음과 같이 만들자.
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaConsumer {
final CatalogRepository repository;
@KafkaListener(topics = "example-catalog-topic")
public void updateQty(String kafkaMessage) {
log.info("Kafka Message: ->" + kafkaMessage);
Map<Object, Object> map = new HashMap<>();
ObjectMapper mapper = new ObjectMapper();
try {
map = mapper.readValue(kafkaMessage, new TypeReference<Map<Object, Object>>() {});
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
CatalogEntity entity = repository.findByProductId((String) map.get("productId"));
if (entity != null) {
entity.setStock(entity.getStock() - (Integer) map.get("qty"));
repository.save(entity);
}
}
}
- 어떤 Topic을 구독하는 Listner를 만드는 것이다. JSON 형식의 문자열인 kafkaMessage를 받아온다.
이것을 objectMapper로 Map에 우리가 쓰기 편하도록 넣어준다. 이 과정이 역직렬화이다. - 이제 여기서 얻은 productId의 Entity를 찾아 요청에 들어있는 수량만큼 감소시키고 다시 flush 해서 데이터베이스에 저장한다.
Order Microservice 수정
아까와 같이 spring kafka 의존성을 추가한다. (코드 생략)
또한, messagequeue 패키지를 만들고 그 안에 다음과 같은 클래스를 만든다.
@EnableKafka
@Configuration
public class KafkaProducerConfig {
@Bean
public ProducerFactory<String, String> producerFactory() {
Map<String, Object> properties = new HashMap<>();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
return new DefaultKafkaProducerFactory<>(properties);
}
//데이터 전달을 위한 bean
@Bean
public KafkaTemplate<String, String> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory());
}
}
- 이전과 거의 비슷하지만 CosumerFactory가 아니라 ProducerFactory이다. 그리고 객체를 직렬화해서 통신해야 하므로 Deserializer 가 아니라 Serializer 이다. 그룹 관련 정보는 필요없다.
- 이전처럼 리스너는 필요가 당연히 없고, 방금 설정한 정보를 토대로 데이터를 전달할 수 있게 해주는 Bean을 추가로 생성한다.
이제 실제 Producer가 동작하는 로직을 수행하는 서비스 클래스인 KafkaProducer 클래스를 다음과 같이 만들자.
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public OrderDto send(String topic, OrderDto orderDto) {
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(orderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Kafka Producer sent data from the Order microservice: " + orderDto);
return orderDto;
}
}
Kafka에 orderDto에 관한 내용을 보내는 메소드이다. orderDto를 Json 형식으로 직렬화해주고, 이걸 KafkaTemplate 의 send 메소드를 이용해서 설정된 Topic에 보내게 된다.
그러면, 실제 주문이 이루어졌을 때, 이 send 메소드를 호출을 해야 하는 것이다. 따라서 Controller 클래스에서 주문 생성 메소드를 다음과 같이 바꾸자.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable(name = "userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
/* jpa 관련 작업 */
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
OrderDto createOrder = orderService.createOrder(orderDto);
ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
한 줄만 추가된 것이다.
이제 모든 설정이 끝났다. 주문 시 해당 수량만큼 catalog service의 데이터베이스에도 감소가 있어야 한다. 테스트해보자.
테스트!
우선, 당연히 선행되어야 할 조건은 다음과 같다.
Kafka 기동, Zookeeper 기동, Eureka 기동, Config Service 기동, API Gateway 기동, Order Service 기동, Catalog Service 기동
주문 전, 첫 상태에서 Catalog Service의 데이터베이스를 보자.
data.sql 을 통해 넣어놨던 초기 데이터들이 보인다.
이제, 주문을 해보자! (user-id 는 실제 기동하지 않고 더미데이터를 써도 테스트에는 무방하다.)
CATALOG-003 이라는 물건에 대해 2000원짜리 13개 물건을 구매했다.
Order Service 로 돌아가보자.
의도한대로 Kafka로 데이터를 잘 보냈다는 로그를 띄웠다.
이제 Catalog Service를 보자.
Kafka에서 온 메시지가 JSON 형식으로 나와있고, 이를 이용해서 수정한 Update 쿼리가 잘 적용된 것을 볼 수 있다.
최종적으로 Catalog Service의 데이터베이스를 보자.
세번째 항목이 120개에서 107개로 낮아진 것을 볼 수 있다.
분명 Order Service에만 주문을 했는데, Kafka를 통해 Catalog Service에 실시간으로 메시지가 전달되는 과정을 테스트 성공했다.
Multi Orders Microservice 사용에 대한 동기화 문제
만약, Order Service 자체가 하나가 아니라면 어떻게 될까? 섹션 11에서 했던 것처럼 주문마다 저장되는 데이터베이스가 다를 것이다. 이걸 해결하기 위해 데이터베이스를 하나로 두고, Order Service의 인스턴스들은 Kafka에 메시지를 전달한다.
Kafka는 들어온 메시지 순서대로 처리하고 트래킹하면서 데이터베이스에 최종 결과만 전달한다.
이제 Order Service에서 사용하던 MySQL 데이터베이스를 터미널에서 사용하면서 임베디드가 아닌, 직접 테이블을 만들어서 실행해보자.
우선, 기존에 쓰던 mydb 데이터베이스에 다음과 같은 쿼리로 테이블을 만든다.
create table orders (
id int auto_increment primary key,
user_id varchar(50) not null,
product_id varchar(20) not null,
order_id varchar(50) not null,
qty int default 0,
unit_price int default 0,
total_price int default 0,
created_at datetime default now()
)
이제 Order Service의 application.yml 파일의 일부분을 다음과 같이 수정한다.
# driver-class-name: org.h2.Driver
# url: jdbc:h2:mem:testdb
driver-class-name: com.mysql.cj.jdbc.Driver
url: jdbc:mysql://localhost:3306/mydb
이제 실행시키고 주문을 하면 주문 내역이 터미널에서 MySQL에 잘 확인되는 것을 볼 수 있다.
Order Microservice 수정
보낼 메시지 등록
이 형식대로 커넥터에게 메시지를 보내면, 데이터베이스에 잘 넣어줄 것이다. 이대로 Order Service 안에서 메시지를 만들어 Kafka에게 보내주자.
@Data
@AllArgsConstructor
public class Field {
private String type;
private boolean optional;
private String field;
}
@Data
@Builder
public class Schema {
private String type;
private List<Field> fields;
private boolean optional;
private String name;
}
@Data
@Builder
public class Payload {
private String order_id;
private String user_id;
private String product_id;
private int qty;
private int unit_price;
private int total_price;
}
@Data
@AllArgsConstructor
public class KafkaOrderDto implements Serializable {
private Schema schema;
private Payload payload;
}
이렇게 4개의 클래스를 만들면 위의 사진 형식대로 데이터를 전달할 수 있다.
이대로 메시지를 Topic에 보낼 OrderProducer 클래스도 만들자.
@Service
@RequiredArgsConstructor
@Slf4j
public class OrderProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
List<Field> fields = Arrays.asList(
new Field("string", true, "order_id"),
new Field("string", true, "user_id"),
new Field("string", true, "product_id"),
new Field("int32", true, "qty"),
new Field("int32", true, "unit_price"),
new Field("int32", true, "total_price")
);
Schema schema = Schema.builder()
.type("struct")
.fields(fields)
.optional(false)
.name("orders")
.build();
public OrderDto send(String topic, OrderDto orderDto) {
Payload payload = Payload.builder()
.order_id(orderDto.getOrderId())
.user_id(orderDto.getUserId())
.product_id(orderDto.getProductId())
.qty(orderDto.getQty())
.unit_price(orderDto.getUnitPrice())
.total_price(orderDto.getTotalPrice())
.build();
KafkaOrderDto kafkaOrderDto = new KafkaOrderDto(schema, payload);
ObjectMapper mapper = new ObjectMapper();
String jsonInString = "";
try {
jsonInString = mapper.writeValueAsString(kafkaOrderDto);
} catch (JsonProcessingException ex) {
ex.printStackTrace();
}
kafkaTemplate.send(topic, jsonInString);
log.info("Order Producer sent data from the Order microservice: " + kafkaOrderDto);
return orderDto;
}
}
위의 사진 형식으로 메시지를 만들고 (KafkaOrderDto) 이걸 JSON 형식으로 바꿔서 kafkaTemplate을 이용해서 보낸다.
마지막으로 Controller를 조금 변경해보자.
@PostMapping("/{userId}/orders")
public ResponseEntity<ResponseOrder> createOrder(@PathVariable(name = "userId") String userId,
@RequestBody RequestOrder orderDetails) {
ModelMapper mapper = new ModelMapper();
mapper.getConfiguration().setMatchingStrategy(MatchingStrategies.STRICT);
OrderDto orderDto = mapper.map(orderDetails, OrderDto.class);
orderDto.setUserId(userId);
/* jpa */
// OrderDto createOrder = orderService.createOrder(orderDto);
// ResponseOrder responseOrder = mapper.map(createOrder, ResponseOrder.class);
/* kafka */
orderDto.setOrderId(UUID.randomUUID().toString());
orderDto.setTotalPrice(orderDetails.getQty() * orderDetails.getUnitPrice());
/* send this order to the kafka */
kafkaProducer.send("example-catalog-topic", orderDto);
orderProducer.send("orders", orderDto);
ResponseOrder responseOrder = mapper.map(orderDto, ResponseOrder.class);
return ResponseEntity.status(HttpStatus.CREATED).body(responseOrder);
}
이전에 JPA 를 활용해서 직접 Entity를 만들고 데이터베이스에 저장하는 코드를 주석처리해주자. 그리고 해당 서비스에서 해주던 OrderId와 TotalPrice의 설정을 여기서 해준다.
orderProducer.send() 를 통해 Topic에 메시지를 보내서 데이터베이스에 기록한다.
실제로 이 메시지를 받아서 데이터베이스에 기록해줄 것을 추가하자.
여기서 무엇을 기동해야 할까?!?! 이걸 모르면 이해를 하나도 하지 못하고 있는 것. 실제로 이 동작을 하기 위해 어떤 것을 기동해야 하고, 어떤 것을 추가해야 하는지 생각해보자.
실제로 이 메시지를 받아서 데이터베이스에 기록해줄 Sink Connect를 추가하자!
데이터베이스에 일어나는 변경을 감지해 베시지를 받아오는 것이 Source Connect인데, 우린 이건 필요없다.
직접 Kafka의 Sink Connect에 메시지를 전달해 데이터베이스에 삽입해주면 되니까 Sink Connect만 필요하다.
이전에 등록했던 Connect에서 name과 topics 만 바꿔서 다음과 같이 등록하자.
실제로 잘 등록되었는지 상태 조회도 해보면 다음과 같다.
테스트
테스트를 해보자. 현재 우리는 단일 데이터베이스를 두고, 서버에서 DB와 직접 통신하지 않고 Kafka라는 미들웨어를 통해 DB에 정보를 전달할 것이다.
여러 개 (최소 2개)의 Order Service를 기동하고 주문 생성을 아래 사진과 같이 여러 번 해보자. 그 때마다 실행되는 서비스의 콘솔창을 보면 요청마다 다른 서비스에 주문 생성이 전달될 것이다.
하지만 주문 생성이 다른 인스턴스에 되더라도 같은 데이터베이스에 저장되어야 한다. 데이터베이스를 살펴보자.
전달한 주문 내역이 모두 잘 들어와있다. Kafka를 통해서 단일 데이터베이스에 잘 전달되었다.
'[MSA] Spring Cloud로 개발하는 마이크로서비스 애플리케이션' 카테고리의 다른 글
섹션 13 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) (0) | 2023.09.05 |
---|---|
섹션 11 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) (0) | 2023.08.24 |
섹션 10 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) (0) | 2023.08.18 |
섹션 9 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) (0) | 2023.08.11 |
섹션 8 : Spring Cloud로 개발하는 마이크로서비스 애플리케이션(MSA) (0) | 2023.08.11 |