Skip to content

Commit

Permalink
Merge pull request #41 from SKY-HORSE-MAN-POWER/develop
Browse files Browse the repository at this point in the history
[DEPLOYMENT] 알림 서비스 구현
  • Loading branch information
chanchanwoong authored Jun 26, 2024
2 parents 2aec432 + b01ba45 commit 520ccbe
Show file tree
Hide file tree
Showing 20 changed files with 309 additions and 131 deletions.
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ dependencies {
implementation 'com.github.danielwegener:logback-kafka-appender:0.1.0'
// Logstash
implementation 'net.logstash.logback:logstash-logback-encoder:6.2'

implementation 'org.springframework.boot:spring-boot-starter-data-mongodb'

}

tasks.named('bootBuildImage') {
Expand Down
15 changes: 3 additions & 12 deletions src/main/java/com/sparos4th2/alarm/application/AlarmService.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,13 @@
package com.sparos4th2.alarm.application;

import com.sparos4th2.alarm.domain.Alarm;
import com.sparos4th2.alarm.dto.AlarmDto;
import org.springframework.http.codec.ServerSentEvent;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import com.sparos4th2.alarm.data.vo.NotificationResponseVo;
import com.sparos4th2.alarm.data.dto.AlarmDto;

public interface AlarmService {

void saveAlarm();

Flux<Alarm> getAlarm(String receiverUuid);

Flux<ServerSentEvent<Object>> connect(String receiverUuid);

// Mono<Boolean> successMessageSend(String receiverUuid);

void finish(String receiverUuid);
NotificationResponseVo getAlarm(String receiverUuid, Integer page, Integer size);

void consume(AlarmDto alarmDto);
}
152 changes: 71 additions & 81 deletions src/main/java/com/sparos4th2/alarm/application/AlarmServiceImpl.java
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
package com.sparos4th2.alarm.application;

import com.sparos4th2.alarm.common.exception.CustomException;
import com.sparos4th2.alarm.common.exception.ResponseStatus;
import com.sparos4th2.alarm.data.dto.NotificationDto;
import com.sparos4th2.alarm.data.vo.NotificationResponseVo;
import com.sparos4th2.alarm.domain.Alarm;
import com.sparos4th2.alarm.dto.AlarmDto;
import com.sparos4th2.alarm.domain.AlarmCount;
import com.sparos4th2.alarm.data.dto.AlarmDto;
import com.sparos4th2.alarm.infrastructure.AlarmCountReactiveRepository;
import com.sparos4th2.alarm.infrastructure.AlarmCountRepository;
import com.sparos4th2.alarm.infrastructure.AlarmReactiveRepository;
import com.sparos4th2.alarm.infrastructure.AlarmRepository;
import com.sparos4th2.alarm.vo.AlarmVo;
import java.time.Duration;
import java.time.LocalDateTime;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.*;

import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.PageRequest;
import org.springframework.http.codec.ServerSentEvent;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.Sinks;
import reactor.core.scheduler.Schedulers;

@Service
@RequiredArgsConstructor
@Slf4j
public class AlarmServiceImpl implements AlarmService {

private final AlarmCountRepository alarmCountRepository;
private final AlarmRepository alarmRepository;
private final Map<String, Sinks.Many<ServerSentEvent<Object>>> sinks = new HashMap<>();

Expand All @@ -35,71 +35,56 @@ public void saveAlarm() {
.receiverUuid("test")
.message("test")
.eventType("test")
.alarmUrl("testUrl")
.alarmTime(LocalDateTime.now())
.build();
log.info("alarm: {}", alarm.toString());
alarmRepository.save(alarm).subscribe();
if (sinks.containsKey(alarm.getReceiverUuid())) {
sinks.get(alarm.getReceiverUuid())
.tryEmitNext(ServerSentEvent.builder().event("alarm").data(alarm)
.comment("new alarm")
.build());
}
}
log.info("alarm >>> {}", alarm.toString());
alarmRepository.save(alarm);

@Override
public Flux<Alarm> getAlarm(String receiverUuid) {
return alarmRepository.findAlarmByReceiverUuid(receiverUuid)
.switchIfEmpty(Mono.error(new CustomException(ResponseStatus.NO_EXIST_ALARM)))
.take(10)
.subscribeOn(Schedulers.boundedElastic());
AlarmCount alarmCount = AlarmCount.builder()
.alarmCount(1)
.receiverUuid("test")
.build();
alarmCountRepository.save(alarmCount);
}

@Override
public Flux<ServerSentEvent<Object>> connect(String receiverUuid) {
if (sinks.containsKey(receiverUuid)) { //이미 SSE 연결이 되어있는 경우
return sinks.get(receiverUuid).asFlux();
}
public NotificationResponseVo getAlarm(String receiverUuid, Integer page, Integer size) {
log.info("receiverUuid >>> {}, page >>> {}, size >>> {}", receiverUuid, page, size);

//SSE 연결이 되어있지 않은 경우
Sinks.Many<ServerSentEvent<Object>> sink = Sinks.many().multicast().onBackpressureBuffer();
sinks.put(receiverUuid, sink);
sinks.get(receiverUuid).tryEmitNext(ServerSentEvent.builder()
.event("config")
.data("Connected Successfully")
.comment("Connected Successfully")
.build());

//30분 후에 연결이 끊어지도록 설정
Mono.delay(Duration.ofMinutes(30)).doOnNext(i -> finish(receiverUuid))
.subscribe();
return sink.asFlux().doOnCancel(() -> {
log.info("### SSE Notification Cancelled by client: " + receiverUuid);
finish(receiverUuid);
});
}
// AlarmCount 수를 0으로 갱신
AlarmCount alarmCount = AlarmCount.builder()
.receiverUuid(receiverUuid)
.alarmCount(0)
.build();

// @Override
// public Mono<Boolean> successMessageSend(String receiverUuid) {
// return Mono.just(receiverUuid)
// .flatMap(id -> {
// if (sinks.containsKey(receiverUuid)) { //알림을 받을 사용자가 현재 SSE로 연결한 경우 알림 발송
// sinks.get(receiverUuid).tryEmitNext(ServerSentEvent.builder()
// .event("config")
// .data("Connected Successfully")
// .comment("Connected Successfully")
// .build());
// return Mono.just(true);
// }
// //오류처리CustomException으로 해야됨
// return Mono.error(new RuntimeException("Not connected"));
// });
// }
alarmCountRepository.save(alarmCount);

@Override
public void finish(String receiverUuid) {
sinks.get(receiverUuid).tryEmitComplete();
sinks.remove(receiverUuid);
// 알람 리스트 최신순으로 반환
Page<Alarm> alarmPage = alarmRepository.findAllAlarm(
receiverUuid, PageRequest.of(page, size)
);

List<Alarm> alarms = alarmPage.getContent();

List<NotificationDto> notificationDtos = new ArrayList<>();

for (Alarm alarm : alarms) {
notificationDtos.add(NotificationDto.builder()
.message(alarm.getMessage())
.eventType(alarm.getEventType())
.alarmUrl(alarm.getAlarmUrl())
.alarmTime(alarm.getAlarmTime())
.build());
}

boolean hasNext = alarmPage.hasNext();

return NotificationResponseVo.builder()
.notificationDtoList(notificationDtos)
.currentPage(page)
.hasNext(hasNext)
.build();
}

public void consume(AlarmDto alarmDto) {
Expand All @@ -116,21 +101,26 @@ public void consume(AlarmDto alarmDto) {
.alarmTime(LocalDateTime.now())
.build();

AlarmVo alarmVo = AlarmVo.builder()
.receiverUuid(receiverUuid)
.message(alarmDto.getMessage())
.eventType(alarmDto.getEventType())
.alarmTime(LocalDateTime.now())
.build();
// alarm 도큐먼트 저장
alarmRepository.save(alarm);

log.info("alarm: {}", alarm.toString());
alarmRepository.save(alarm).subscribe();
// 기존의 alarmCountRepository에 있는 지 확인한다.
// 있으면 count + 1, 없으면 count = 1 처리 후 alarm_count 도큐먼트 저장
Optional<AlarmCount> alarmCount = alarmCountRepository.findByReceiverUuid(receiverUuid);

if (sinks.containsKey(alarm.getReceiverUuid())) {
sinks.get(alarm.getReceiverUuid())
.tryEmitNext(ServerSentEvent.builder().event("alarm").data(alarmVo)
.comment("new alarm")
.build());
if (alarmCount.isPresent()) {
AlarmCount newAlarmCount = AlarmCount.builder()
.receiverUuid(receiverUuid)
.alarmCount(alarmCount.get().getAlarmCount() + 1)
.build();
alarmCountRepository.save(newAlarmCount);
}
else {
AlarmCount newAlarmCount = AlarmCount.builder()
.receiverUuid(receiverUuid)
.alarmCount(1)
.build();
alarmCountRepository.save(newAlarmCount);
}
});
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.sparos4th2.alarm.dto;
package com.sparos4th2.alarm.data.dto;

import java.util.List;
import lombok.AllArgsConstructor;
Expand Down
26 changes: 26 additions & 0 deletions src/main/java/com/sparos4th2/alarm/data/dto/NotificationDto.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.sparos4th2.alarm.data.dto;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.LocalDateTime;

@NoArgsConstructor
@Getter
@ToString
public class NotificationDto {
private String message;
private String eventType;
private String alarmUrl;
private LocalDateTime alarmTime;

@Builder
public NotificationDto(String message, String eventType, String alarmUrl, LocalDateTime alarmTime) {
this.message = message;
this.eventType = eventType;
this.alarmUrl = alarmUrl;
this.alarmTime = alarmTime;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package com.sparos4th2.alarm.data.vo;

import com.sparos4th2.alarm.data.dto.NotificationDto;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.time.LocalDateTime;
import java.util.List;

@NoArgsConstructor
@Getter
@ToString
public class NotificationResponseVo {
private List<NotificationDto> notificationDtoList;
private int currentPage;
private boolean hasNext;


@Builder
public NotificationResponseVo(List<NotificationDto> notificationDtoList, int currentPage, boolean hasNext) {
this.notificationDtoList = notificationDtoList;
this.currentPage = currentPage;
this.hasNext = hasNext;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.sparos4th2.alarm.data.vo;

import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

@Getter
@ToString
@NoArgsConstructor
public class StreamNotificationResponseVo {
private int alarmCount;
}
2 changes: 2 additions & 0 deletions src/main/java/com/sparos4th2/alarm/domain/Alarm.java
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import java.time.LocalDateTime;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Getter
@NoArgsConstructor
@Document(collection = "alarm")
public class Alarm {

Expand Down
24 changes: 24 additions & 0 deletions src/main/java/com/sparos4th2/alarm/domain/AlarmCount.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.sparos4th2.alarm.domain;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import org.springframework.data.annotation.Id;
import org.springframework.data.mongodb.core.mapping.Document;

@Getter
@Document(collection = "alarm_count")
@NoArgsConstructor
public class AlarmCount {

@Id
private String id;
private String receiverUuid;
private int alarmCount;

@Builder
public AlarmCount(String receiverUuid, int alarmCount) {
this.receiverUuid = receiverUuid;
this.alarmCount = alarmCount;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package com.sparos4th2.alarm.infrastructure;

import com.sparos4th2.alarm.data.vo.StreamNotificationResponseVo;
import com.sparos4th2.alarm.domain.Alarm;
import com.sparos4th2.alarm.domain.AlarmCount;
import java.util.Optional;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import org.springframework.data.mongodb.repository.Tailable;
import reactor.core.publisher.Flux;

public interface AlarmCountReactiveRepository extends ReactiveMongoRepository<AlarmCount, String> {

@Tailable
@Query("{ 'receiverUuid' : ?0 }")
Flux<StreamNotificationResponseVo> findByReceiverUuid(String receiverUuid);

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package com.sparos4th2.alarm.infrastructure;

import com.sparos4th2.alarm.domain.Alarm;
import com.sparos4th2.alarm.domain.AlarmCount;
import org.springframework.data.mongodb.repository.MongoRepository;

import java.util.Optional;

public interface AlarmCountRepository extends MongoRepository<AlarmCount, String> {
Optional<AlarmCount> findByReceiverUuid(String receiverUuid);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.sparos4th2.alarm.infrastructure;

import com.sparos4th2.alarm.domain.Alarm;
import org.springframework.data.mongodb.repository.Query;
import org.springframework.data.mongodb.repository.ReactiveMongoRepository;
import reactor.core.publisher.Flux;

public interface AlarmReactiveRepository extends ReactiveMongoRepository<Alarm, String> {

@Query("{ 'receiverUuid' : ?0 }")
Flux<Alarm> findAlarmByReceiverUuid(String receiverUuid);
}
Loading

0 comments on commit 520ccbe

Please sign in to comment.