[Spring WebFlux] 프로젝트에서 SSE 구현 예시
개인적으로 스터디를 진행하는게 있는데 거기서 스터디 홈페이지를 개발하는 토이 프로젝트를 진행하고 있다. 그 과정에서 내가 고민한 부분과 그 것을 어떻게 해결했는지에 대한 경험을 이야기해보려 한다.
문제
내가 경험한 문제는 캘린더를 개발하는 과정에서 생긴 데이터 동기화 문제였다.
클라이언트 A가 캘린더 화면에 들어오면서 최초 로딩된 데이터를 가지고 온다.
그 상태에서 클라이언트 B가 데이터를 수정할 경우 클라이언트 A가 새로고침을 하거나 Polling 하는 등 새로 데이터를 받아오지 않는한 데이터를 동기화되지 않는다.
물론 사용자가 많지 않고 실시간 데이터 동기화가 중요하지 않은 곳이긴 했지만, "이게 실시간 서비스였다면 어땠을까?", "사용자가 많은 서비스라면 어땠을까?" 라는 생각으로 이 부분을 개선해보자고 생각했다.
접근방법
내가 접근한 방식은 Reactive Programming 을 이용한 방식이었다.
백엔드에서 데이터가 누군가에 의해서 등록/수정/삭제되면 해당 데이터에 대한 이벤트를 발행하고, 프론트엔드에서는 전달되는 이벤트에 반응해 가지고 있는 데이터를 수정하도록 구현한다.
- 페이지 로드시 프론트엔드(Vue.js) 에서 백엔드(Spring Boot) 로 일년치 데이터를 API 호출을 통해서 가져온다.
- 프론트엔드는 받은 데이터를 메모리(Vuex)에 저장하고, 추가로 현재 보관중인 데이터가 몇년도인지 current_year 도 함께 저장한다.
- 프론트엔드는 백엔드로 SSE 연결을 위한 API 를 호출한다.
- 백엔드는 이후 회의일정에 변화(등록/추가/삭제)가 발생하면 연결된 SSE 를 통해 이벤트를 발행한다.
- 프론트엔드는 이벤트에 반응해 해당 이벤트가 자신의 current_year 에 해당한다면 데이터 동기화 작업을 수행한다.
백엔드 코드
ScheduleChannel.java
@Component
public class ScheduleChannel {
private final Many<ScheduleEventDto> scheduleEvents;
public ScheduleChannel() {
this.scheduleEvents = Sinks.many().multicast().directAllOrNothing();
}
public Many<ScheduleEventDto> getSink() {
return this.scheduleEvents;
}
public Flux<ScheduleEventDto> asFlux() {
return this.scheduleEvents.asFlux();
}
}
CalenarController.java
@RestController
@RequestMapping("/calendar")
@RequiredArgsConstructor
public class CalendarController {
private final CalendarService calendarService;
private final ScheduleChannel scheduleChannel;
@GetMapping("/{year}/schedules")
public Flux<ScheduleDto> getSchedulesByYear(@PathVariable int year) {
return calendarService.retrieveSchedules(year);
}
@PostMapping("/schedule")
public Mono<ScheduleDto> setSchedule(
@RequestBody ScheduleDto scheduleDto
) {
return calendarService.saveSchedule(scheduleDto);
}
@PutMapping("/schedules/{idx}")
public Mono<ScheduleDto> updateSchedule(
@PathVariable long idx,
@RequestBody ScheduleDto scheduleDto
) {
return calendarService.updateSchedule(idx, scheduleDto);
}
@DeleteMapping("/schedules/{idx}")
public Mono<Void> deleteSchedule(
@PathVariable long idx
) {
return calendarService.deleteSchedule(idx);
}
@CrossOrigin("*")
@GetMapping(value = "/schedules/sse", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public Flux<ScheduleEventDto> scheduleSse() {
return scheduleChannel.asFlux();
}
}
CalendarService.java
@Service
@RequiredArgsConstructor
public class CalendarService {
private final ScheduleRepository scheduleRepository;
private final ScheduleChannel scheduleChannel;
public Flux<ScheduleDto> retrieveSchedules(int year) {
return scheduleRepository.findByYearOrderByStartTime(year)
.map(ScheduleDto::of);
}
@Transactional
public Mono<ScheduleDto> saveSchedule(ScheduleDto scheduleDto) {
return scheduleRepository.save(Schedule.of(scheduleDto))
.map(schedule -> {
scheduleChannel.getSink()
.tryEmitNext(ScheduleEventDto.builder()
.type(ScheduleEventType.CREATE)
.schedule(ScheduleDto.of(schedule))
.build());
return ScheduleDto.of(schedule);
});
}
@Transactional
public Mono<ScheduleDto> updateSchedule(long idx, ScheduleDto scheduleDto) {
return scheduleRepository.findById(idx)
.map(schedule -> {
schedule.setTitle(scheduleDto.getTitle());
schedule.setStartTime(scheduleDto.getStartTime());
schedule.setEndTime(scheduleDto.getEndTime());
schedule.setComment(scheduleDto.getComment());
schedule.setAlarm(scheduleDto.getAlarm());
scheduleChannel.getSink()
.tryEmitNext(ScheduleEventDto.builder()
.type(ScheduleEventType.MODIFY)
.schedule(ScheduleDto.of(schedule))
.build());
return schedule;
})
.flatMap(schedule -> scheduleRepository.save(schedule).map(ScheduleDto::of));
}
@Transactional
public Mono<Void> deleteSchedule(long idx) {
scheduleRepository.findById(idx).subscribe(schedule -> {
scheduleChannel.getSink()
.tryEmitNext(ScheduleEventDto.builder()
.type(ScheduleEventType.DELETE)
.schedule(ScheduleDto.of(schedule))
.build());
});
return scheduleRepository.deleteById(idx);
}
}
scheduleChannel 의 Sinks.Many 객체를 가져와 EmitNext 하면 해당 이벤트가 SSE 에 발행되는 것이다. 이 때 기억해야 할 것이 Sinks.Many 객체가 생성될 때 어떤 옵션을 부여받았는지 여부이다.
unicast() : 하나의 Subscriber 만 허용한다. 즉, 하나의 Client 만 연결할 수 있다.
multicast() : 여러 Subscriber 를 허용한다.
replay() : 여러 Subscriber 를 허용하되, 이전에 발행된 이벤트들을 기억해 추가로 연결되는 Subscriber 에게 전달한다.
multicast().onBackpressureBuffer()
: Subscriber 가 없을 때 발행된 이벤트들에 대해서 그 다음 구독하는 Subscriber 에게 전달한다.
multicast().directAllOrNothing()
: Subscriber 는 자신이 구독한 시점에서부터의 이벤트만 받는다.
나는 multicast().directAllorNothing() 을 사용했다. 이유는 곰곰히 생각해보면 알 수 있다.
프론트엔드 코드
mounted() {
this.$store.dispatch("load_calendar", this.year);
var source = new EventSource("http://localhost:10831/calendar/schedules/sse");
source.onmessage = (event) => {
var event_data = JSON.parse(event.data);
if (event_data.schedule.year === this.year) {
this.$store.dispatch("call_calendar_event", event_data);
}
}
}
마지막으로 패드에 정리했던 내용 첨부하면서 마치도록 한다.