Back-End/Spring Boot

[Spring WebFlux] 프로젝트에서 SSE 구현 예시

ch4njun 2021. 10. 25. 01:13
반응형

개인적으로 스터디를 진행하는게 있는데 거기서 스터디 홈페이지를 개발하는 토이 프로젝트를 진행하고 있다. 그 과정에서 내가 고민한 부분과 그 것을 어떻게 해결했는지에 대한 경험을 이야기해보려 한다.

문제


내가 경험한 문제는 캘린더를 개발하는 과정에서 생긴 데이터 동기화 문제였다.

 

클라이언트 A가 캘린더 화면에 들어오면서 최초 로딩된 데이터를 가지고 온다.

그 상태에서 클라이언트 B가 데이터를 수정할 경우 클라이언트 A가 새로고침을 하거나 Polling 하는 등 새로 데이터를 받아오지 않는한 데이터를 동기화되지 않는다.

물론 사용자가 많지 않고 실시간 데이터 동기화가 중요하지 않은 곳이긴 했지만, "이게 실시간 서비스였다면 어땠을까?", "사용자가 많은 서비스라면 어땠을까?" 라는 생각으로 이 부분을 개선해보자고 생각했다.

접근방법


내가 접근한 방식은 Reactive Programming 을 이용한 방식이었다.

백엔드에서 데이터가 누군가에 의해서 등록/수정/삭제되면 해당 데이터에 대한 이벤트를 발행하고, 프론트엔드에서는 전달되는 이벤트에 반응해 가지고 있는 데이터를 수정하도록 구현한다.

 

  1. 페이지 로드시 프론트엔드(Vue.js) 에서 백엔드(Spring Boot) 로 일년치 데이터를 API 호출을 통해서 가져온다.
  2. 프론트엔드는 받은 데이터를 메모리(Vuex)에 저장하고, 추가로 현재 보관중인 데이터가 몇년도인지 current_year 도 함께 저장한다.
  3. 프론트엔드는 백엔드로 SSE 연결을 위한 API 를 호출한다.
  4. 백엔드는 이후 회의일정에 변화(등록/추가/삭제)가 발생하면 연결된 SSE 를 통해 이벤트를 발행한다.
  5. 프론트엔드는 이벤트에 반응해 해당 이벤트가 자신의 current_year 에 해당한다면 데이터 동기화 작업을 수행한다.

백엔드 코드


ScheduleChannel.java

@Component
public class ScheduleChannel {
    private final Many<ScheduleEventDto> scheduleEvents;

    public ScheduleChannel() {
        this.scheduleEvents = Sinks.many().multicast().directAllOrNothing();
    }
    public Many<ScheduleEventDto> getSink() {
        return this.scheduleEvents;
    }
    public Flux<ScheduleEventDto> asFlux() {
        return this.scheduleEvents.asFlux();
    }
}

 

CalenarController.java

@RestController
@RequestMapping("/calendar")
@RequiredArgsConstructor
public class CalendarController {
    private final CalendarService calendarService;
    private final ScheduleChannel scheduleChannel;

    @GetMapping("/{year}/schedules")
    public Flux<ScheduleDto> getSchedulesByYear(@PathVariable int year) {
        return calendarService.retrieveSchedules(year);
    }

    @PostMapping("/schedule")
    public Mono<ScheduleDto> setSchedule(
            @RequestBody ScheduleDto scheduleDto
    ) {
        return calendarService.saveSchedule(scheduleDto);
    }

    @PutMapping("/schedules/{idx}")
    public Mono<ScheduleDto> updateSchedule(
            @PathVariable long idx,
            @RequestBody ScheduleDto scheduleDto
    ) {
        return calendarService.updateSchedule(idx, scheduleDto);
    }

    @DeleteMapping("/schedules/{idx}")
    public Mono<Void> deleteSchedule(
            @PathVariable long idx
    ) {
        return calendarService.deleteSchedule(idx);
    }

    @CrossOrigin("*")
    @GetMapping(value = "/schedules/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public Flux<ScheduleEventDto> scheduleSse() {
        return scheduleChannel.asFlux();
    }
}

 

CalendarService.java

@Service
@RequiredArgsConstructor
public class CalendarService {
    private final ScheduleRepository scheduleRepository;
    private final ScheduleChannel scheduleChannel;

    public Flux<ScheduleDto> retrieveSchedules(int year) {
        return scheduleRepository.findByYearOrderByStartTime(year)
                .map(ScheduleDto::of);
    }

    @Transactional
    public Mono<ScheduleDto> saveSchedule(ScheduleDto scheduleDto) {
        return scheduleRepository.save(Schedule.of(scheduleDto))
                .map(schedule -> {
                    scheduleChannel.getSink()
                            .tryEmitNext(ScheduleEventDto.builder()
                                    .type(ScheduleEventType.CREATE)
                                    .schedule(ScheduleDto.of(schedule))
                                    .build());
                    return ScheduleDto.of(schedule);
                });
    }

    @Transactional
    public Mono<ScheduleDto> updateSchedule(long idx, ScheduleDto scheduleDto) {
        return scheduleRepository.findById(idx)
                .map(schedule -> {
                    schedule.setTitle(scheduleDto.getTitle());
                    schedule.setStartTime(scheduleDto.getStartTime());
                    schedule.setEndTime(scheduleDto.getEndTime());
                    schedule.setComment(scheduleDto.getComment());
                    schedule.setAlarm(scheduleDto.getAlarm());

                    scheduleChannel.getSink()
                            .tryEmitNext(ScheduleEventDto.builder()
                                    .type(ScheduleEventType.MODIFY)
                                    .schedule(ScheduleDto.of(schedule))
                                    .build());
                    return schedule;
                })
                .flatMap(schedule -> scheduleRepository.save(schedule).map(ScheduleDto::of));
    }

    @Transactional
    public Mono<Void> deleteSchedule(long idx) {
        scheduleRepository.findById(idx).subscribe(schedule -> {
            scheduleChannel.getSink()
                    .tryEmitNext(ScheduleEventDto.builder()
                            .type(ScheduleEventType.DELETE)
                            .schedule(ScheduleDto.of(schedule))
                            .build());

        });
        return scheduleRepository.deleteById(idx);
    }
}

 

scheduleChannel 의 Sinks.Many 객체를 가져와 EmitNext 하면 해당 이벤트가 SSE 에 발행되는 것이다. 이 때 기억해야 할 것이 Sinks.Many 객체가 생성될 때 어떤 옵션을 부여받았는지 여부이다.

 

unicast() : 하나의 Subscriber 만 허용한다. 즉, 하나의 Client 만 연결할 수 있다.

multicast() : 여러 Subscriber 를 허용한다.

replay() : 여러 Subscriber 를 허용하되, 이전에 발행된 이벤트들을 기억해 추가로 연결되는 Subscriber 에게 전달한다.

 

multicast().onBackpressureBuffer()

   : Subscriber 가 없을 때 발행된 이벤트들에 대해서 그 다음 구독하는 Subscriber 에게 전달한다.

multicast().directAllOrNothing()

   : Subscriber 는 자신이 구독한 시점에서부터의 이벤트만 받는다.

 

나는 multicast().directAllorNothing() 을 사용했다. 이유는 곰곰히 생각해보면 알 수 있다.

프론트엔드 코드


mounted() {
    this.$store.dispatch("load_calendar", this.year);
    var source = new EventSource("http://localhost:10831/calendar/schedules/sse");
    source.onmessage = (event) => { 
        var event_data = JSON.parse(event.data);
        if (event_data.schedule.year === this.year) {
            this.$store.dispatch("call_calendar_event", event_data);
        }
    }
}

 


마지막으로 패드에 정리했던 내용 첨부하면서 마치도록 한다.

반응형