Skip to content

Commit

Permalink
Merge pull request #594 from woowacourse-teams/BE/dev
Browse files Browse the repository at this point in the history
[BE] sse 타이머 동기화 보완 test 서버 배포
  • Loading branch information
JiHyeonL authored Sep 26, 2024
2 parents 1d47322 + 33a5258 commit ef5ef6e
Show file tree
Hide file tree
Showing 23 changed files with 280 additions and 68 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package site.coduo.sync.controller;

import org.springframework.http.ResponseEntity;
import org.springframework.web.bind.annotation.DeleteMapping;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;
Expand All @@ -10,6 +11,7 @@
import site.coduo.sync.controller.docs.SseDocs;
import site.coduo.sync.service.SseService;


@RequiredArgsConstructor
@RestController
public class SseController implements SseDocs {
Expand All @@ -22,4 +24,12 @@ public ResponseEntity<SseEmitter> createConnection(@PathVariable("key") final St

return ResponseEntity.ok(sseEmitter);
}

@DeleteMapping("/{key}/connect")
public ResponseEntity<Void> deleteConnection(@PathVariable("key") final String key) {
sseService.disconnectAll(key);

return ResponseEntity.noContent()
.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,8 @@ public interface SseDocs {
@ApiResponse(responseCode = "200", description = "SSE 연결 성공 - event:connect\ndata:OK 메시지를 응답", content = @Content(mediaType = MediaType.TEXT_EVENT_STREAM_VALUE))
@ApiResponse(responseCode = "4xx", description = "SSE 연결 실패", content = @Content(mediaType = MediaType.APPLICATION_JSON_VALUE, schema = @Schema(implementation = ApiErrorResponse.class)))
ResponseEntity<SseEmitter> createConnection(String key);

@Operation(summary = "특정 key에 속하는 SSE 연결을 모두 삭제한다.")
@ApiResponse(responseCode = "204", description = "SSE 삭제 성공 - event:close\ndata:OK 메시지를 응답", content = @Content(mediaType = MediaType.TEXT_EVENT_STREAM_VALUE))
ResponseEntity<Void> deleteConnection(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,4 +7,6 @@ public interface EventStream {
SseEmitter connect();

void flush(String name, String message);

void close();
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,10 @@ public void broadcast(final String name, final String message) {
streams.forEach(eventStream -> eventStream.flush(name, message));
}

public void closeAll() {
streams.forEach(EventStream::close);
}

public boolean isEmpty() {
return streams.isEmpty();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,25 +17,34 @@ public EventStreamsRegistry() {
this.registry = new ConcurrentHashMap<>();
}

public SseEmitter register(final String name) {
final EventStreams eventStreams = registry.getOrDefault(name, new EventStreams());
public SseEmitter register(final String key) {
final EventStreams eventStreams = registry.getOrDefault(key, new EventStreams());
final EventStream eventStream = new SseEventStream();
eventStreams.add(eventStream);
registry.put(name, eventStreams);
registry.put(key, eventStreams);
return eventStreams.publish(eventStream);
}

public EventStreams findEventStreams(final String key) {
public void release(final String key) {
if (!registry.containsKey(key)) {
throw new NotFoundSseConnectionException("존재하지 않는 SSE 커넥션입니다.");
return;
}
return registry.get(key);
final EventStreams eventStreams = registry.get(key);
eventStreams.closeAll();
registry.remove(key);
}

public boolean hasEmptyConnection(final String key) {
if (!registry.containsKey(key)) {
throw new NotFoundSseConnectionException("SSE 커넥션을 찾을 수 없습니다.");
public EventStreams findEventStreams(final String key) {
if (registry.containsKey(key)) {
return registry.get(key);
}
throw new NotFoundSseConnectionException("존재하지 않는 SSE 커넥션입니다.");
}

public boolean hasNoStreams(final String key) {
if (registry.containsKey(key)) {
return registry.get(key).isEmpty();
}
return registry.get(key).isEmpty();
throw new NotFoundSseConnectionException("SSE 커넥션을 찾을 수 없습니다.");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,20 @@ public void release(final String key) {
if (!registry.containsKey(key)) {
throw new NotFoundScheduledFutureException("키에 해당하는 스케줄러 결과가 존재하지 않습니다.");
}
registry.get(key).cancel(false);
registry.get(key)
.cancel(false);
registry.remove(key);
}

public boolean has(final String key) {
return registry.containsKey(key);
}

public boolean isActive(final String key) {
if (registry.containsKey(key)) {
final ScheduledFuture<?> scheduledFuture = registry.get(key);
return !scheduledFuture.isDone() && !scheduledFuture.isCancelled();
}
return false;
}
}
45 changes: 28 additions & 17 deletions backend/src/main/java/site/coduo/sync/service/SchedulerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import lombok.RequiredArgsConstructor;
import site.coduo.timer.domain.Timer;
import site.coduo.timer.repository.TimerRepository;
import site.coduo.timer.service.TimestampRegistry;

@RequiredArgsConstructor
@Component
Expand All @@ -26,40 +27,50 @@ public class SchedulerService {
private final SseService sseService;

public void start(final String key) {
sseService.broadcast(key, "timer", "start");
if (isInitial(key)) {
final Timer timer = timerRepository.fetchTimerByAccessCode(key).toDomain();
final Timer timer = timerRepository.fetchTimerByAccessCode(key)
.toDomain();
scheduling(key, timer);
timestampRegistry.register(key, timer);
return;
}
if (isResume(key)) {
final Timer timer = timestampRegistry.get(key);
scheduling(key, timer);
}
final Timer timer = timestampRegistry.get(key);
scheduling(key, timer);
}

private boolean isInitial(final String key) {
return !schedulerRegistry.has(key) && !timestampRegistry.has(key);
}

private boolean isResume(final String key) {
return !schedulerRegistry.has(key) && timestampRegistry.has(key);
}

private void scheduling(final String key, final Timer timer) {
final Trigger trigger = new PeriodicTrigger(DELAY_SECOND);
final ScheduledFuture<?> schedule = taskScheduler.schedule(() -> {
timer.decreaseRemainingTime(DELAY_SECOND.toMillis());
if (timer.getRemainingTime() == 0 || sseService.hasEmptyConnection(key)) {
schedulerRegistry.release(key);
timestampRegistry.release(key);
}
sseService.broadcast(key, "remaining-time", String.valueOf(timer.getRemainingTime()));
}, trigger);
final ScheduledFuture<?> schedule = taskScheduler.schedule(() -> runTimer(key, timer), trigger);
schedulerRegistry.register(key, schedule);
}

private void runTimer(final String key, final Timer timer) {
if (timer.isTimeUp()) {
stop(key);
final Timer initalTimer = new Timer(timer.getAccessCode(), timer.getDuration(), timer.getDuration());
timestampRegistry.register(key, initalTimer);
return;
}
if (sseService.hasNoConnections(key)) {
stop(key);
return;
}
timer.decreaseRemainingTime(DELAY_SECOND.toMillis());
sseService.broadcast(key, "remaining-time", String.valueOf(timer.getRemainingTime()));
}

public void pause(final String key) {
sseService.broadcast(key, "timer", "pause");
schedulerRegistry.release(key);
}

public void stop(final String key) {
sseService.broadcast(key, "timer", "stop");
schedulerRegistry.release(key);
}
}
19 changes: 14 additions & 5 deletions backend/src/main/java/site/coduo/sync/service/SseEventStream.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,16 @@
@Getter
public class SseEventStream implements EventStream {

private static final long INFINITE_TIME_OUT = -1;
private static final Duration TIME_OUT = Duration.ofMinutes(20);
private static final String CLOSE_NAME = "close";
private static final String CONNECT_NAME = "connect";
private static final String SUCCESS_MESSAGE = "OK";

private final AtomicLong id = new AtomicLong(0);
private final SseEmitter sseEmitter;

public SseEventStream() {
this.sseEmitter = new SseEmitter(INFINITE_TIME_OUT);
this.sseEmitter = new SseEmitter(TIME_OUT.toMillis());
}

public SseEventStream(final Duration timeout) {
Expand All @@ -38,8 +41,8 @@ public SseEmitter connect() {
try {
sseEmitter.send(SseEmitter.event()
.id(eventId)
.name("connect")
.data("OK"));
.name(CONNECT_NAME)
.data(SUCCESS_MESSAGE));
} catch (final IOException e) {
throw new SseConnectionFailureException("SSE 연결이 실패했습니다.");
}
Expand All @@ -56,10 +59,16 @@ public void flush(final String name, final String message) {
.data(message)
);
} catch (IOException e) {
throw new SseConnectionFailureException("SSE 통신에 실패했습니다.");
log.warn("SSE 통신 중 에러가 발생했습니다.");
}
}

@Override
public void close() {
flush(CLOSE_NAME, SUCCESS_MESSAGE);
sseEmitter.complete();
}

@Override
public boolean equals(final Object o) {
if (this == o) {
Expand Down
20 changes: 17 additions & 3 deletions backend/src/main/java/site/coduo/sync/service/SseService.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,37 @@
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import lombok.RequiredArgsConstructor;
import site.coduo.timer.service.TimerService;

@RequiredArgsConstructor
@Service
public class SseService {

private final EventStreamsRegistry eventStreamsRegistry;
private final TimerService timerService;
private final SchedulerRegistry schedulerRegistry;

public SseEmitter connect(final String key) {
return eventStreamsRegistry.register(key);
final SseEmitter emitter = eventStreamsRegistry.register(key);
final long remainingTime = timerService.readTimerRemainingTime(key);
// todo: SchedulerService 분리된 상수화 어떻게 할지 생각
broadcast(key, "remaining-time", String.valueOf(remainingTime));
if (schedulerRegistry.isActive(key)) {
broadcast(key, "timer", "running");
}
return emitter;
}

public void broadcast(final String key, final String event, final String data) {
final EventStreams emitters = eventStreamsRegistry.findEventStreams(key);
emitters.broadcast(event, data);
}

public boolean hasEmptyConnection(final String key) {
return eventStreamsRegistry.hasEmptyConnection(key);
public boolean hasNoConnections(final String key) {
return eventStreamsRegistry.hasNoStreams(key);
}

public void disconnectAll(final String key) {
eventStreamsRegistry.release(key);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ public ResponseEntity<Void> createTimerStart(@PathVariable("accessCode") final S

@PatchMapping("/{accessCode}/timer/stop")
public ResponseEntity<Void> createTimerStop(@PathVariable("accessCode") final String accessCode) {
schedulerService.stop(accessCode);
schedulerService.pause(accessCode);

return ResponseEntity.noContent()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@
@RequiredArgsConstructor
public enum TimerApiError {

INVALID_REQUEST(HttpStatus.BAD_REQUEST, "유효하지 않은 페어룸 히스토리 요청입니다."),
INVALID_REQUEST(HttpStatus.BAD_REQUEST, "유효하지 않은 요청입니다."),
INVALID_TIMER_REQUEST(HttpStatus.BAD_REQUEST, "유효하지 않은 타이머 시간이 존재합니다."),
TIMER_NOT_FOUND(HttpStatus.NOT_FOUND, "페어룸 히스토리가 존재하지 않습니다.");
TIMER_NOT_FOUND(HttpStatus.NOT_FOUND, "타이머를 찾을 수 없습니다.");

private final HttpStatus httpStatus;
private final String message;
Expand Down
4 changes: 4 additions & 0 deletions backend/src/main/java/site/coduo/timer/domain/Timer.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,10 @@ public long getRemainingTime() {
return remainingTime.get();
}

public boolean isTimeUp() {
return remainingTime.get() == 0;
}

public void decreaseRemainingTime(final long decrease) {
if (remainingTime.get() == 0L) {
return;
Expand Down
22 changes: 17 additions & 5 deletions backend/src/main/java/site/coduo/timer/service/TimerService.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
public class TimerService {

private final TimerRepository timerRepository;
private final TimestampRegistry timestampRegistry;
private final PairRoomRepository pairRoomRepository;

public TimerReadResponse readTimer(final String accessCode) {
Expand All @@ -27,15 +28,26 @@ public TimerReadResponse readTimer(final String accessCode) {
return TimerReadResponse.of(timerEntity.getId(), timerEntity.toDomain());
}

public long readTimerRemainingTime(final String accessCode) {
if (timestampRegistry.has(accessCode)) {
return timestampRegistry.get(accessCode)
.getRemainingTime();
}
final Timer timer = timerRepository.fetchTimerByAccessCode(accessCode)
.toDomain();
return timer.getDuration();
}

@Transactional
public void updateTimer(final String accessCode, final TimerUpdateRequest newTimer) {
public void updateTimer(final String accessCode, final TimerUpdateRequest updateRequest) {
final PairRoomEntity pairRoomEntity = pairRoomRepository.fetchByAccessCode(accessCode);
final TimerEntity timerEntity = timerRepository.fetchTimerByPairRoomId(pairRoomEntity.getId());
final Timer timer = new Timer(
final Timer newTimer = new Timer(
new AccessCode(pairRoomEntity.getAccessCode()),
newTimer.duration(),
newTimer.remainingTime()
updateRequest.duration(),
updateRequest.remainingTime()
);
timerEntity.updateTimer(timer);
timerEntity.updateTimer(newTimer);
timestampRegistry.register(accessCode, newTimer);
}
}
Original file line number Diff line number Diff line change
@@ -1,23 +1,21 @@
package site.coduo.sync.service;
package site.coduo.timer.service;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import org.springframework.stereotype.Component;

import site.coduo.sync.exception.DuplicateTimestampException;
import lombok.NoArgsConstructor;
import site.coduo.sync.exception.NotFoundTimeStampException;
import site.coduo.timer.domain.Timer;

@Component
@NoArgsConstructor
public class TimestampRegistry {

private final Map<String, Timer> registry = new ConcurrentHashMap<>();

public void register(final String key, final Timer timer) {
if (registry.containsKey(key)) {
throw new DuplicateTimestampException("이미 존재하는 타임 스탬프입니다.");
}
registry.put(key, timer);
}

Expand Down
Loading

0 comments on commit ef5ef6e

Please sign in to comment.