danuri 2023. 9. 8. 23:32

도메인 주도 개발 시작하기 책 정리

 
도메인 주도 개발 시작하기

실제 업무에 도메인 주도 설계(DDD)를 적용할 수 있도록 기본적인 DDD의 핵심 개념을 익히고 구현을 통해 학습할 수 있도록 구성한 DDD 입문서

www.hanbit.co.kr

 

시스템 간 강결합 문제

✅ 도메인 로직에서 외부 서비스 의존

ex) 쇼핑몰에서 구매를 취소하면 환불을 처리해야 한다.

-> 이 때, 환불 기능을 실행하는 주체가 주문 도메인 엔티티일 수 있다.

public class Order  {

    // 외부 서비스를 실행하기 위해 도메인 서비스를 파라미터로 전달받음
    public void cancel(RefundService refundService) {
        verifyNotYetShipped();
        this .state = OrderState.CANCELED;

        this .refundStatus = State.REFUND_STARTED;
        try {
            RefundService.refund(getPaymentId());
            this.refundStatus = State.REFUND_COMPLETED;
        } catch (Exception ex) {
            
        }
    }
}

-> 도메인 기능에서 환불 도메인 서비스를 파라미터로 전달받는다.

 

혹은 응용 서비스에서 환불 기능을 실행할 수도 있다.

public class CancelOrderService {

    private RefundService refundService;

    @Transactional
    public void cancel(OrderNo orderNo) {
        Order order = findOrder(orderNo);
        order.cancel();

        order.refundStarted();
        try {
            // 외부 서비스 성능에 직접 영향을 받는다.
            refundService.refund(order.getPaymentId());
            order.refundCompleted();
        } catch (Exception ex) {

        }
    }

}

 

보통 결제 시스템은 외부에 존재한다.

-> 즉, 앞선 두 예시에서 RefundService는 모두 외부 시스템을 호출해야 한다.

 

✅ 바운디드 컨텍스트의 강결합

이런 경우 두 가지 문제가 발생한다.

  1. 외부 서비스가 정상이 아닐 경우 트랜잭션 처리를 어떻게 해야 할까?
    • 취소 트랜잭션 전체를 롤백해야 할까? -> 취소를 먼저 하고, 환불만 나중에 다시 시도할 수도 있다.
  2. 외부 시스템의 응답시간이 길어지면 그만큼 취소 기능의 대기 시간도 길어진다.
    • 취소 기능이 외부 서비스 성능에 직접적인 영향을 받는다.

 

특히, 주문 도메인 로직 안에서 결제 도메인 로직을 포함하는 경우,

-> 두 도메인의 로직이 섞인다는 단점과,

-> 만약 주문 취소 기능에 취소 통지와 같은 기능이 추가된다면, NotifyService 같은 도메인 서비스를 파라미터로 또 추가해야 할 수 있다.

 

지금까지 언급한 문제가 발생하는 이유는,

주문 바운디드 컨텍스트와 결제 바운디드 컨텍스트 간의 강결합때문이다.

-> 이런 강결합을 없앨 수 있는 방법이 바로 이벤트다. (특히, 비동기 이벤트)

 


 

이벤트 개요

✅ 이벤트

이벤트는 '과거에 벌어진 어떤 것'을 의미한다. (상태가 변경됨)

ex) 주문이 취소된 것을 '주문이 취소됨 이벤트'가 벌어졌다고 할 수 있다.

 

이벤트 관련 구성요소

✅ 이벤트 구성요소

도메인 모델에 이벤트를 도입하려면 아래 네 개의 구성요소를 구현한다. (이벤트 생성 주체, 이벤트, 이벤트 디스패처, 이벤트 핸들러)

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

 

이벤트 생성 주체는 엔티티, 밸류, 도메인 서비스와 같은 도메인 객체이다.

-> 도메인 객체는 도메인 로직을 실행해서 상태가 바뀌면 관련 이벤트를 발생시킨다.

 

이벤트 핸들러는 이벤트 생성 주체가 발생한 이벤트에 반응한다.

-> 이벤트에 담긴 데이터를 이용해서 원하는 기능을 실행한다. (ex. '주문 취소됨 이벤트'를 받아 고객에게 SMS로 주문 취소를 통지한다.)

 

이벤트 디스패처는 이벤트 생성 주체와 이벤트 핸들러를 연결해준다.

-> 디스패처의 구현 방식에 따라 이벤트 생성과 처리를 동기나 비동기로 실행하게 된다.

 

이벤트의 구성

✅ 이벤트는 발생한 이벤트에 대한 정보를 담는다

이벤트는 다음을 포함한다.

  • 이벤트 종류: 클래스 이름으로 이벤트 종류를 표현
  • 이벤트 발생 시간
  • 추가 데이터: 주문번호, 신규 배송지 정보 등 이벤트와 관련된 정보

 

ex) 배송지를 변경할 때 발생하는 이벤트

public class ShippingInfoChangedEvent {

    private String orderNumber;
    private long timestamp;
    private ShippingInfo newShippingInfo;
}

 

+) 이벤트는 과거에 벌어진 것을 표현하기 때문에 이벤트 클래스 이름에는 과거 시제를 사용한다.

 

✅ 이벤트 생성 주체

이벤트 생성 주체는 Order 애그리거트다.

public class Order {

    public void cancel() {
        verifyNotYetShipped();
        this.state = OrderState.CANCELED;
        Events.raise(new OrderCanceledEvent(number.getNumber()));
    }
}

 

-> Events.raise()는 디스패처를 통해 이벤트를 전파하는 기능을 제공한다. (관련 구현은 뒤에서 살펴본다)

 

✅ 이벤트 핸들러

ex) 변경된 배송지 정보를 물류 서비스에 전송하는 핸들러

public class ShippingInfoChangedHandler {

    @EventListener(ShippingInfoChangedEvent.class)
    public void handle(ShippingInfoChangedEvent evt) {
        shippingInfoSynchronizer.sync(
                evt.getOrderNumber(),
                evt.getNewShippingInfo());
    }
}

 

✅ 이벤트 내 데이터

이벤트는 이벤트 핸들러가 작업을 수행하는 데 필요한 데이터를 담아야 한다.

-> 이 데이터가 부족하면 핸들러는 필요한 데이터를 읽기 위해 관련 API를 호출하거나 DB에서 직접 데이터를 읽어와햐 한다.

-> 물론, 이벤트 자체와 관련 없는 데이터를 포함할 필요도 없다.

 

이벤트 용도

✅ 트리거

도메인에 상태가 바뀔 때 후처리를 실행하기 위함.

ex) 주문 취소 이벤트를 트리거로 환불을 처리를 할 수 있다.

 

✅ 서로 다른 시스템 간의 데이터 동기화

ex) 배송지를 변경하면 외부 배송 서비스에 바뀐 배송지 정보를 전송한다.

 

이벤트 장점

✅ 도메인 로직이 섞이는 것 방지

public class Order  {

    // 외부 서비스를 실행하기 위해 도메인 서비스를 파라미터로 전달받음
    public void cancel(RefundService refundService) {
        // 주문 로직
        verifyNotYetShipped();
        this .state = OrderState.CANCELED;

        // 결제 로직
        this.refundStatus = State.REFUND_STARTED;
        try {
            RefundService.refund(getPaymentId());
            this .refundStatus = State.REFUND_COMPLETED;
        } catch (Exception ex) {

        }
    }
}

 

-> 주문 도메인에 주문 로직과, 결제 로직이 섞여있다.

 

public class Order  {

    public void cancel()  {
        verifyNotYetShipped();
        this .state = OrderState.CANCELED;
        Events.raise( new OrderCanceledEvent(number.getNumber()));
    }
}

 

-> 이벤트를 적용함으로써 결제 로직이 없어진 것을 알 수 있다.

-> 환불 서비스를 실행하기 위한 파라미터도 없어졌다.

-> 주문 도메인에서 결제 도메인으로의 의존을 제거했다.

 

✅ 기능 확장의 용이함

ex) 구매 취소 시 환불과 함께 이메일로 취소 내용을 보내고 싶다면?

-> 이메일 발송을 처리하는 핸들러를 구현하면 된다.

-> 취소 로직은 수정할 필요가 없다.

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

 


 

이벤트, 핸들러, 디스패처 구현

이벤트 클래스

✅ 이벤트 클래스는 이벤트를 처리하는 데 필요한 최소한의 데이터를 포함

ex) 주문 취소됨 이벤트

public class OrderCanceledEvent extends Event {
    private String orderNumber;

    public OrderCanceledEvent(String number) {
        super();
        this.orderNumber = number;
    }

    public String getOrderNumber() {
        return orderNumber;
    }
}

 

모든 이벤트가 공통으로 갖는 프로퍼티가 존재한다면 관련 상위 클래스를 만들 수도 있다.

ex) 이벤트 발생 시간

public abstract class Event {
    private long timestamp;

    public Event() {
        this.timestamp = System.currentTimeMillis();
    }

    public long getTimestamp() {
        return timestamp;
    }

}

-> 해당 상위 클래스를 각 이벤트 클래스가 상속받도록 한다.

 

Events 클래스와 ApplicationEventPublisher

✅ 스프링이 제공하는 ApplicationEventPublisher

public class Events {
    private static ApplicationEventPublisher publisher;

    static void setPublisher(ApplicationEventPublisher publisher) {
        Events.publisher = publisher;
    }

    public static void raise(Object event) {
        if (publisher != null) {
            publisher.publishEvent(event);
        }
    }
}

-> Events 클래스의 raise() 메서드는 ApplicationEventPublisher를 이용해 이벤트를 발생시킨다.

 

setPublisher() 메서드에 이벤트 퍼블리셔를 전달하기 위해 스프링 설정 클래스를 작성한다.

@Configuration 
public class EventsConfiguration  {
     @Autowired 
    private ApplicationContext applicationContext;

    @Bean 
    public InitializingBean eventsInitializer ()  {
         return () -> Events.setPublisher(applicationContext);
    }
}

 

InitializaingBean은 스프링 빈 객체를 초기화할 때 사용하는 인터페이스다.

-> 이 기능을 사용해 Events 클래스를 초기화했다.

ApplicationContext(스프링 컨테이너)는 ApplicationEventPublisher를 상속하고 있다.

 

이벤트 발생과 이벤트 핸들러

✅ 이벤트 발생

이벤트를 발생시킬 코드는 Events.raise() 메서드를 사용한다.

public class Order {

    public void cancel() {
        verifyNotYetShipped();
        this.state = OrderState.CANCELED;
        Events.raise(new OrderCanceledEvent(number.getNumber()));
    }
}

 

✅ 핸들러는 스프링이 제공하는 @EventListener 사용

@Service
public class OrderCanceledEventHandler {
    private RefundService refundService;

    public OrderCanceledEventHandler(RefundService refundService) {
        this.refundService = refundService;
    }

    @EventListener(OrderCanceledEvent.class)
    public void handle(OrderCanceledEvent event) {
        refundService.refund(event.getOrderNumber());
    }
}

 

ApplicationEventPublisher가 publishEvent() 메서드를 실행할 때,

OrderCanceledEvent를 전달하면,

해당 타입을 value로 갖는 @EventListener 애노테이션을 붙인 메서드를 찾아 실행한다.

 

흐름 정리

✅ 이벤트 처리 흐름

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

코드 흐름을 보면 응용 서비스와 동일한 트랜잭션 범위에서 이벤트 핸들러를 실행하고 있다.

-> 즉, 도메인 상태 변경과 이벤트 핸들러는 같은 트랜잭션 범위에서 실행된다.

 


 

동기 이벤트 처리 문제

✅ 외부 서비스에 영향

이벤트를 사용해서 강결합 문제는 해소했지만,

아직 외부 서비스에 영향을 받는 문제는 남아 있다.

// 1. 응용 서비스 코드
@Transactional // 외부 연동 과정에서 익셉션이 발생하면 트랜잭션 처리는?
public void cancel(OrderNo orderNo) {
    Order order = findOrder(orderNo);
    order.cancel(); // order.cancle()에서 OrderCanceledEvent 발생
}

// 2. 이벤트를 처리하는 코드
@Service
public class OrderCanceledEventHandler {

    @EventListener(OrderCanceledEvent.class)
    public void handle(OrderCanceledEvent event) {
        // refundService.refund()가 느려지거나 익셉션이 발생하면?
        refundService.refund(event.getOrderNumber());
    }
}

 

refundService.refund()가 외부 서비스와 연동한다고 했을 때,

외부 서비스가 느려지면 cancel() 메서드도 함께 느려진다.

-> 외부 서비스의 성능 저하가 내 시스템의 성능 저하로 연결된다.

 

✅ 트랜잭션도 같이 영향

성능 저하뿐만 아니라 트랜잭션도 문제가 된다.

외부 서비스에서 익셉션이 발생하면, cancel() 메서드의 트랜잭션을 롤백해야 할까?

-> 이는 구매 취소 기능을 롤백하는 것이므로 구매 취소가 실패하는 것과 같다.

 

생각해볼 점: 외부 환불 서비스 실행에 실패했다고 해서 반드시 구매 취소를 실패처리해야 할까?

-> 일단 구매 취소 자체는 처리하고, 환불만 재처리하거나 수동으로 처리할 수도 있다.

 

외부 시스템과의 연동을 동기로 처리할 때 발생하는 성능과 트랜잭션 범위 문제를 해소하는 방법

-> 이벤트를 비동기로 처리하거나 이벤트와 트랜잭션을 연계하는 것이다. (먼저 비동기 이벤트 처리부터 알아보자)

 


 

비동기 이벤트 처리

✅ A 하면 최대 언제까지 B 하라

'A 하면 B 하라'는 내용을 담고 있는 요구사항은 실제로 'A 하면 최대 언제까지 B 하라'인 경우가 많다.

-> 즉, 일정 시간 안에만 후속 조치를 처리하면 되는 경우가 적지 않다.

-> B를 하는 데 실패해도 일정 간격으로 재시도를 하거나 수동 처리를 해도 상관없는 경우가 있다.

-> 이런 경우 이벤트를 비동기로 처리하는 방식으로 구현할 수 있다.

 

✅ 이벤트를 비동기로

이벤트를 비동기로 구현할 수 있는 방법은 다양한데, 여기서는 다음 네 가지 방식을 살펴보겠다.

  • 로컬 핸들러를 비동기로 실행하기
  • 메시지 큐를 사용하기
  • 이벤트 저장소와 이벤트 포워더 사용하기
  • 이벤트 저장소와 이벤트 제공 API 사용하기

 

로컬 핸들러 비동기 실행

✅ 이벤트 핸들러를 별도 스레드로

비동기로 이벤트 핸들러를 실행하기 위해 다음 두 가지를 해야 한다.

1. @EnableAsync 애너테이션을 사용해서 비동기 기능을 활성화

@SpringBootApplication
@EnableAsync
public class ShopApplication {

    public static void main(String[] args) {
        SpringApplication.run(ShopApplication.class, args);
    }

}

-> 스프링 설정 클래스에 @EnableAsync 애너테이션을 붙인다.

 

2. 이벤트 핸들러 메서드에 @Async 애너테이션을 붙인다.

@Service
public class OrderCanceledEventHandler {

    @Async
    @EventListener(OrderCanceledEvent.class)
    public void handle(OrderCanceledEvent event) {
        refundService.refund(event.getOrderNumber());
    }
}

 

-> 이러면 OrderCanceledEvent가 발생했을 때, handle() 메서드를 별도 스레드를 이용해서 비동기로 실행한다.

 

메시징 시스템을 이용한 비동기 구현

✅ 메시징 시스템

비동기 이벤트 처리를 위해 카프카(Kafka)나 래빗MQ(RabbitMQ)와 같은 메시징 시스템을 사용할 수 있다.

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

  1. 이벤트가 발생하면 이벤트 디스패처는 이벤트를 메시지 큐에 보낸다.
  2. 메시지 큐는 이벤트를 메시지 리스너에 전달하고,
  3. 메시지 리스너는 알맞은 이벤트 핸들러를 이용해서 이벤트를 처리한다.

 

✅ 글로벌 트랜잭션

필요하다면 이벤트를 발생시키는 도메인 기능과, 메시지 큐에 이벤트를 저장하는 절차를 한 트랜잭션으로 묶어야 한다.

도메인 기능을 실행한 결과를 DB에 반영하고,

이 과정에서 발생한 이벤트를 메시지 큐에 저장하는 것을,

같은 트랜잭션 범위에서 실행하려면 글로벌 트랜잭션이 필요하다.

-> 장점: 안전하게 이벤트를 메시지 큐에 전달할 수 있다.

-> 단점: 전체 성능이 떨어진다.

 

✅ 메시지 큐는 다른 환경에서

메시지 큐를 사용하면 보통 이벤트를 발생시키는 주체와 이벤트 핸들러가 별도 프로세스에서 동작한다.

-> 이벤트 발생 JVM과 이벤트 처리 JVM이 다르다.

물론 한 JVM에서 이벤트 발생 주체와 이벤트 핸들러를 위치시킬 수 있지만,

-> 이는 보통 시스템을 복잡하게 만들 뿐이다.

 

✅ 메시지 큐 비교

1. 래빗MQ

글로벌 트랜잭션 지원 + 클러스터 고가용성 지원 -> 안정적으로 메시지를 전달할 수 있다.

다양한 개발 언어와 통신 프로토콜 지원

 

2. 카프카

글로벌 트랜잭션 지원X, 다른 메시징 시스템에 비해 높은 성능

 

이벤트 저장소를 이용한 비동기 처리

✅ 포워더를 이용한 비동기 처리

이벤트를 일단 DB에 저장한다.

포워더는 주기적으로 이벤트 저장소에 이벤트를 가져와 이벤트 핸들러를 실행한다. (별도 스레드)

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

장점 1:

도메인의 상태와 이벤트 저장소로 동일한 DB를 사용하기 때문에,

도메인의 상태변화와 이벤트 저장이 로컬 트랜잭션으로 처리된다.

 

장점 2:

이벤트를 물리적 저장소에 보관하기 때문에,

핸들러가 이벤트 처리에 실패할 경우 포워더는 다시 이벤트 저장소에서 이벤트를 읽어와 핸들러를 실행한다.

 

✅ 외부 API를 이용한 비동기 처리

이벤트를 DB에 저장하는 것까지는 같고,

이번에는 이벤트 핸들러가 API 서버를 통해 이벤트 목록을 가져간다.

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

API 방식과 포워더 방식의 차이점은 이벤트를 전달하는 방식에 있다.

-> 포워더는 이벤트를 어디까지 처리했는지 추적하는 역할이 포워더에 있다면,

-> API 방식은 이벤트 목록을 요구하는 외부 핸들러가 자신이 어디까지 이벤트를 처리했는지 기억해야 한다.

 

✅ 이벤트 저장소 구현

 

https://velog.io/@csh0034/도메인-주도-개발-시작하기-10.-이벤트

 

  • EventEntry: 이벤트 저장소에 보관할 데이터.
  • EventStore: 이벤트를 저장하고 조회하는 인터페이스를 제공한다.
  • JdbcEventStore: JDBC를 이용한 EventStore 구현 클래스다.
  • EventApi: REST API를 이용해서 이벤트 목록을 제공하는 컨트롤러다.

 

<EventEntry>

public class EventEntry {
    private Long id;
    private String type;
    private String contentType;
    private String payload;
    private long timestamp;

    public EventEntry(String type, String contentType, String payload) {
        this.type = type;
        this.contentType = contentType;
        this.payload = payload;
        this.timestamp = System.currentTimeMillis();
    }

    public EventEntry(Long id, String type, String contentType, String payload,
                      long timestamp) {
        this.id = id;
        this.type = type;
        this.contentType = contentType;
        this.payload = payload;
        this.timestamp = timestamp;
    }

    public Long getId() {
        return id;
    }

    public String getType() {
        return type;
    }

    public String getContentType() {
        return contentType;
    }

    public String getPayload() {
        return payload;
    }

    public long getTimestamp() {
        return timestamp;
    }
}

-> 이벤트 객체를 직렬화해서 payload에 저장한다. 이 때, JSON으로 직렬화했다면 contentType은 'application/json'을 갖는다.

 

 <EventStore>

public interface EventStore {
    void save(Object event);
    List<EventEntry> get(long offset, long limit);
}

-> 이벤트는 과거에 벌어진 사건이므로 데이터가 변경되지 않는다. (이벤트 수정 기능은 제공하지 않는다)

 

<JdbcEventStore>

@Component
public class JdbcEventStore implements EventStore {
    private ObjectMapper objectMapper;
    private JdbcTemplate jdbcTemplate;

    public JdbcEventStore(ObjectMapper objectMapper, JdbcTemplate jdbcTemplate) {
        this.objectMapper = objectMapper;
        this.jdbcTemplate = jdbcTemplate;
    }

    @Override
    public void save(Object event) {
        EventEntry entry = new EventEntry(event.getClass().getName(),
                "application/json", toJson(event));
        jdbcTemplate.update(
                "insert into evententry " +
                        "(type, content_type, payload, timestamp) " +
                        "values (?, ?, ?, ?)",
                ps -> {
                    ps.setString(1, entry.getType());
                    ps.setString(2, entry.getContentType());
                    ps.setString(3, entry.getPayload());
                    ps.setTimestamp(4, new Timestamp(entry.getTimestamp()));
                });
    }

    private String toJson(Object event) {
        try {
            return objectMapper.writeValueAsString(event);
        } catch (JsonProcessingException e) {
            throw new PayloadConvertException(e);
        }
    }

    @Override
    public List<EventEntry> get(long offset, long limit) {
        return jdbcTemplate.query(
                "select * from evententry order by id asc limit ?, ?",
                ps -> {
                    ps.setLong(1, offset);
                    ps.setLong(2, limit);
                },
                (rs, rowNum) -> {
                    return new EventEntry(
                            rs.getLong("id"),
                            rs.getString("type"),
                            rs.getString("content_type"),
                            rs.getString("payload"),
                            rs.getTimestamp("timestamp").getTime());
                });
    }
}

save(): MySQL 예제이기 때문에 evententry 테이블의 주요키에 자동 증가 컬럼을 사용했고, insert 쿼리에 주요키를 설정하지 않았다.

get(): offset과 limit을 이용해서 원하는 개수만큼 데이터를 조회한다.

 

✅ 이벤트 저장을 위한 이벤트 핸들러 구현

이벤트를 이벤트 저장소에 추가하는 이벤트 핸들러를 구현한다.

@Component
public class EventStoreHandler {
    private EventStore eventStore;

    public EventStoreHandler(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @EventListener(Event.class)
    public void handle(Event event) {
        eventStore.save(event);
    }
}

 

-> Event 타입을 상속받은 이벤트 타입만 이벤트 저장소에 보관한다.

ex) 도메인 로직에서 Events.raise(event)로 이벤트 디스패처가 이벤트를 publish하고, EventStoreHandler가 이를 DB에 저장한다.

 

✅ REST API 구현

@RestController
public class EventApi {
    private EventStore eventStore;

    public EventApi(EventStore eventStore) {
        this.eventStore = eventStore;
    }

    @RequestMapping(value = "/api/events", method = RequestMethod.GET)
    public List<EventEntry> list(
            @RequestParam("offset") Long offset,
            @RequestParam("limit") Long limit) {
        return eventStore.get(offset, limit);
    }
}

 

스프링 MVC를 통해 EventStore#get을 실행하고 그 결과를 리턴한다.

 

API를 사용하는 클라이언트는 일정 간격으로 다음 과정을 실행한다.

  • 가장 마지막에 처리한 데이터의 offset인 lastOffset을 구한다. 저장한 lastOffset이 없으면 0을 사용한다.
  • 마지막에 처리한 lastOffset을 offset으로 사용해서 API를 실행한다.
  • API 결과로 받은 데이터를 처리한다.
  • offset + 데이터 개수를 lastOffset으로 저장한다.

 

클라이언트가 이벤트 처리에 실패하면 다시 실패한 이벤트부터 읽어와 이벤트를 재처리할 수 있다.

API 서버에 장애가 발생한 경우에도 주기적으로 재시도를 해서 API 서버가 살아나면 이벤트를 처리할 수 있다.

 

✅ 포워더 구현

앞서 봤던 API 방식과 유사하게,

포워더는 일정 주기로 EventStore에서 이벤트를 읽어와 이벤트 핸들러에 전달한다.

마지막으로 전달한 이벤트의 offset을 기억해 두었다가 다음 조회 시점에 참고한다.

@Component
public class EventForwarder {
    private static final int DEFAULT_LIMIT_SIZE = 100;

    private EventStore eventStore;
    private OffsetStore offsetStore;
    private EventSender eventSender;
    private int limitSize = DEFAULT_LIMIT_SIZE;

    public EventForwarder(EventStore eventStore,
                          OffsetStore offsetStore,
                          EventSender eventSender) {
        this.eventStore = eventStore;
        this.offsetStore = offsetStore;
        this.eventSender = eventSender;
    }

    @Scheduled(initialDelay = 1000L, fixedDelay = 1000L)
    public void getAndSend() {
        long nextOffset = getNextOffset();
        List<EventEntry> events = eventStore.get(nextOffset, limitSize);
        if (!events.isEmpty()) {
            int processedCount = sendEvent(events);
            if (processedCount > 0) {
                saveNextOffset(nextOffset + processedCount);
            }
        }
    }

    private long getNextOffset() {
        return offsetStore.get();
    }

    private int sendEvent(List<EventEntry> events) {
        int processedCount = 0;
        try {
            for (EventEntry entry : events) {
                eventSender.send(entry);
                processedCount++;
            }
        } catch(Exception ex) {
            // 로깅 처리
        }
        return processedCount;
    }

    private void saveNextOffset(long nextOffset) {
        offsetStore.update(nextOffset);
    }

}

 

getAndSend(): 일정 주기로 이벤트를 DB에서 조회해서 전송한다.

  • getNextOffset(): 읽어올 이벤트의 다음 offset을 구한다.
  • eventStore.get(): 이벤트 저장소에서 offset부터 limitSize 만큼 이벤트를 구한다.
  • sendEvent(): 이벤트를 전송한다. 그리고 처리한 이벤트 개수를 리턴한다. 익셉션이 발생하면 이벤트 전송을 멈추고 성공한 이벤트 개수를 리턴하기 때문에, 다음 스케줄 때, 마지막 성공한 이벤트의 다음 이벤트부터 전송을 시도할 수 있다.
  • saveNextOffset(): 처리한 이벤트 개수가 0보다 크면 다음에 읽어올 offset을 저장한다.

 

OffsetStore는 offset 값을 DB 혹은 로컬 파일에 보관해서 마지막 offset 값을 물리적 저장소에 보관한다.

public interface OffsetStore {
    long get();
    void update(long nextOffset);
}

 

EventSender는 다음과 같이 단순한 인터페이스다.

-> 외부 메시징 시스템에 이벤트를 전송하거나 핸들러에 이벤트를 전달하면 된다.

public interface EventSender {
    void send(EventEntry event);
}

 


 

이벤트 적용 시 추가 고려 사항

✅ 이벤트 소스를 EventEntry에 추가할지

앞서 구현한 EventEntry는 이벤트 발생 주체에 대한 정보를 갖지 않는다.

-> 만약 'Order가 발생시킨 이벤트만 조회하기'와 같은 기능을 갖고 싶다면, EventEntry에 이벤트 발생 주체를 추가해야 한다.

 

✅ 포워더에서 전송 실패를 얼마나 허용할지

포워더에서 이벤트 전송이 실패하면 실패한 이벤트부터 다시 읽어와 전송을 시도한다.

-> 만약 특정 이벤트에서 계속 전송에 실패한다면 그 이벤트 때문에 나머지 이벤트를 전송할 수 없게 된다.

-> 실패한 이벤트의 재전송 횟수 제한을 두어 해당 이벤트는 생략하고 다음 이벤트로 넘어간다 등의 정책이 필요하다.

+) 실패한 이벤트를 따로 어딘가 저장하기도 한다. 이후 실패 이유 분석이나 후처리에 도움이 된다.

 

✅ 이벤트 손실

이벤트 저장소를 사용하면 이벤트 발생과 이벤트 저장이 한 트랜잭션이기 때문에 이벤트가 저장소에 보관된다는 것을 보장할 수 있다.

-> 반면 로컬 핸들러를 이용하는 방법은 핸들러가 이벤트 처리에 실패하게 되면 이벤트를 유실하게 된다.

 

✅ 이벤트 순서

이벤트 발생 순서대로 외부 시스템에 전달해야 하는 경우 이벤트 저장소를 사용하는 것이 좋다.

-> 반면 메시징 시스템은 기술에 따라 이벤트 발생 순서와 메시지 전달 순서가 다를 수 있다.

 

✅ 이벤트 재처리

이미 처리된 동일한 이벤트를 다시 처리해야 할 때,

-> 처리한 이벤트의 순번을 기억해 두었다가 이미 처리한 이벤트가 도착하면 해당 이벤트를 처리하지 않고 무시하는 방식이 있다.

+) 멱등성: 연산을 여러 번 적용해도 결과가 달라지지 않는 성질

이벤트 핸들러가 멱등성을 가지면, 시스템 장애로 인해 이벤트가 중복해서 발생해도 결과적으로 동일 상태가 된다.

 

이벤트 처리와 DB 트랜잭션 고려

✅ 동기 방식

이벤트 발생과 처리를 모두 동기로 처리하면,

refundService와 같은 외부 시스템과의 통신은 성공해서 결제는 취소됐는데,

만약 트랜잭션이 실패하게 되면 롤백되어 주문은 취소되지 않은 상태로 남게 된다.

 

✅ 비동기 방식

이벤트 발생과 처리를 비동기로 처리하면,

트랜잭션을 커밋해서 주문은 취소됏는데,

만약 비동기로 핸들러가 실패하면 결제는 취소되지 않은 상태로 남게 된다.

 

✅ 이벤트 처리 실패 + 트랜잭션 실패

이벤트 처리를 동기로 하든 비동기로 하든 이벤트 처리 실패와 트랜잭션 실패를 함께 고려해야 한다.

-> 다만 이를 모두 고 려하면 복잡해지므로 경우의 수를 줄이면 도움이 된다.

-> 트랜잭션이 성공할 때만 이벤트 핸들러를 실행하는 것이다.

 

스프링은 @TransactionalEventListener를 통해 트랜잭션 상태에 따라 이벤트 핸들러를 실행할 수 있게 한다.

@Async
@TransactionalEventListener(
        classes = OrderCanceledEvent.class,
        phase = TransactionPhase.AFTER_COMMIT
)
public void handle(OrderCanceledEvent event) {
    refundService.refund(event.getOrderNumber());
}

-> TransactionPhase.AFTER_COMMIT을 통해 트랜잭션 커밋에 성공한 뒤에 핸들러 메서드를 실행한다.

-> 이러면 이벤트 핸들러를 실행했는데 트랜잭션이 롤백되는 상황은 발생하지 않는다.

 

이벤트 저장소로 DB를 사용해도 이벤트 발생과 이벤트 처리를 한 트랜잭션으로 처리할 수 있기 때문에 동일한 효과를 볼 수 있다.

-> 트랜잭션이 성공할 때문 이벤트 핸들러를 실행하면 트랜잭션 실패에 대한 경우의 수가 줄어 이벤트 처리 실패만 고민하면 된다.