Skip to content

Commit

Permalink
Merge pull request #204 from SKY-HORSE-MAN-POWER/develop
Browse files Browse the repository at this point in the history
[DEPLOYMENT] 경매 페이지 SSE 에러 로그 추가 및 경매 마감 스케줄러 정상 마감 시간에 돌도록 설정
  • Loading branch information
chanchanwoong authored Jun 23, 2024
2 parents 639c9ce + 9bc1df8 commit 9a0c7f1
Show file tree
Hide file tree
Showing 13 changed files with 264 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@
import com.skyhorsemanpower.auction.domain.RoundInfo;
import com.skyhorsemanpower.auction.kafka.KafkaProducerCluster;
import com.skyhorsemanpower.auction.kafka.Topics;
import com.skyhorsemanpower.auction.kafka.dto.AuctionCloseDto;
import com.skyhorsemanpower.auction.kafka.data.MessageEnum;
import com.skyhorsemanpower.auction.kafka.data.dto.AlarmDto;
import com.skyhorsemanpower.auction.kafka.data.dto.AuctionCloseDto;
import com.skyhorsemanpower.auction.repository.*;
import com.skyhorsemanpower.auction.common.exception.ResponseStatus;
import com.skyhorsemanpower.auction.data.dto.*;
Expand All @@ -20,7 +22,6 @@

import java.math.BigDecimal;
import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
Expand Down Expand Up @@ -140,6 +141,15 @@ public void auctionClose(String auctionUuid) {
// 경매글 마감 처리 메시지와 결제 서비스 메시지 동일 토픽으로 진행
producer.sendMessage(Topics.Constant.AUCTION_CLOSE, auctionCloseDto);

// 알람 서비스로 메시지 전달
AlarmDto alarmDto = AlarmDto.builder().receiverUuids(memberUuids.stream().toList())
.message(MessageEnum.Constant.AUCTION_CLOSE_MESSAGE)
.eventType("경매")
.build();
log.info("Kafka Message To Alarm Service >>> {}", alarmDto.toString());

producer.sendMessage(Topics.Constant.ALARM, alarmDto);

// 경매 마감 여부 저장
auctionCloseStateRepository.save(AuctionCloseState.builder()
.auctionUuid(auctionUuid)
Expand Down
66 changes: 29 additions & 37 deletions src/main/java/com/skyhorsemanpower/auction/config/QuartzConfig.java
Original file line number Diff line number Diff line change
@@ -1,51 +1,43 @@
package com.skyhorsemanpower.auction.config;

import com.skyhorsemanpower.auction.kafka.dto.InitialAuctionDto;
import com.skyhorsemanpower.auction.quartz.AuctionClose;
import lombok.RequiredArgsConstructor;
import org.quartz.*;
import org.quartz.spi.JobFactory;
import org.springframework.boot.autoconfigure.quartz.QuartzProperties;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.scheduling.quartz.SchedulerFactoryBean;
import org.springframework.scheduling.quartz.SpringBeanJobFactory;

import java.time.Instant;
import java.util.Date;
import javax.sql.DataSource;
import java.util.Properties;

@Configuration
@RequiredArgsConstructor
public class QuartzConfig {
private final Scheduler scheduler;

// 경매 시작과 경매 마감의 상태 변경 스케줄링
public void schedulerUpdateAuctionStateJob(InitialAuctionDto initialAuctionDto) throws SchedulerException {
// JobDataMap 생성 및 auctionUuid 설정
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("auctionUuid", initialAuctionDto.getAuctionUuid());
private final DataSource dataSource;
private final QuartzProperties quartzProperties;
private final ApplicationContext applicationContext;

// Job 생성
JobDetail auctionCloseJob = JobBuilder
.newJob(AuctionClose.class)
.withIdentity("AuctionCloseJob_" + initialAuctionDto.getAuctionUuid(),
"AuctionCloseGroup")
.usingJobData(jobDataMap)
.withDescription("경매 마감 Job")
.build();

Date auctionEndDate = Date.from(Instant.ofEpochMilli(initialAuctionDto.getAuctionStartTime()));

// Trigger 생성
Trigger auctionCloseTrigger = TriggerBuilder
.newTrigger()
.withIdentity("AuctionCloseTrigger_" + initialAuctionDto.getAuctionUuid(),
"AuctionCloseGroup")
.withDescription("경매 마감 Trigger")

// test용 60초 후 시작하는 스케줄러
.startAt(DateBuilder.futureDate(60, DateBuilder.IntervalUnit.SECOND))

//Todo 실제 배포에서는 auctionEndDate을 사용해야 한다.
// .startAt(auctionEndDate)
.build();
@Bean
public JobFactory springBeanJobFactory() {
SpringBeanJobFactory jobFactory = new SpringBeanJobFactory();
jobFactory.setApplicationContext(applicationContext);
return jobFactory;
}

// 스케줄러 생성 및 Job, Trigger 등록
scheduler.scheduleJob(auctionCloseJob, auctionCloseTrigger);
@Bean
public SchedulerFactoryBean schedulerFactoryBean(JobFactory jobFactory) {
Properties properties = new Properties();
properties.putAll(quartzProperties.getProperties());

SchedulerFactoryBean factory = new SchedulerFactoryBean();
factory.setJobFactory(jobFactory);
factory.setDataSource(dataSource);
factory.setQuartzProperties(properties);
factory.setOverwriteExistingJobs(true);
factory.setWaitForJobsToCompleteOnShutdown(true);
return factory;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package com.skyhorsemanpower.auction.config;

import com.skyhorsemanpower.auction.kafka.data.dto.InitialAuctionDto;
import com.skyhorsemanpower.auction.quartz.AuctionClose;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.quartz.*;
import org.quartz.impl.StdScheduler;
import org.quartz.impl.StdSchedulerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.time.Instant;
import java.util.Date;

@Configuration
@RequiredArgsConstructor
@Slf4j
public class QuartzJobConfig {
private final Scheduler scheduler;

@Bean
public Scheduler scheduler() throws SchedulerException {
Scheduler scheduler = new StdSchedulerFactory().getScheduler();
scheduler.start();
return scheduler;
}

// 경매 시작과 경매 마감의 상태 변경 스케줄링
public void schedulerUpdateAuctionStateJob(InitialAuctionDto initialAuctionDto) throws SchedulerException {
// JobDataMap 생성 및 auctionUuid 설정
JobDataMap jobDataMap = new JobDataMap();
jobDataMap.put("auctionUuid", initialAuctionDto.getAuctionUuid());

// Job 생성
JobDetail auctionCloseJob = JobBuilder
.newJob(AuctionClose.class)
.withIdentity("AuctionCloseJob_" + initialAuctionDto.getAuctionUuid(),
"AuctionCloseGroup")
.usingJobData(jobDataMap)
.withDescription("경매 마감 Job")
.build();


//todo
// 테스트를 위한경매 마감 시간을 경매 시작 시간으로부터 1분 뒤로 설정
// Date auctionEndDate = Date.from(Instant.ofEpochMilli(initialAuctionDto.getAuctionStartTime()).plusSeconds(60));

// 배포에선 아래 코드 사용해야 함
Date auctionEndDate = Date.from(Instant.ofEpochMilli(initialAuctionDto.getAuctionEndTime()));
log.info("Auction Close Job Will Start At >>> {}", auctionEndDate);

// Trigger 생성
Trigger auctionCloseTrigger = TriggerBuilder
.newTrigger()
.withIdentity("AuctionCloseTrigger_" + initialAuctionDto.getAuctionUuid(),
"AuctionCloseGroup")
.withDescription("경매 마감 Trigger")
.startAt(auctionEndDate)
.build();

// 스케줄러 생성 및 Job, Trigger 등록
scheduler.scheduleJob(auctionCloseJob, auctionCloseTrigger);
}
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
package com.skyhorsemanpower.auction.kafka;

import com.skyhorsemanpower.auction.common.DateTimeConverter;
import com.skyhorsemanpower.auction.config.QuartzConfig;
import com.skyhorsemanpower.auction.config.QuartzJobConfig;
import com.skyhorsemanpower.auction.domain.RoundInfo;
import com.skyhorsemanpower.auction.kafka.dto.InitialAuctionDto;
import com.skyhorsemanpower.auction.kafka.data.dto.InitialAuctionDto;
import com.skyhorsemanpower.auction.repository.RoundInfoRepository;
import com.skyhorsemanpower.auction.status.RoundTimeEnum;
import lombok.RequiredArgsConstructor;
Expand All @@ -23,7 +23,7 @@
@Component
public class KafkaConsumerCluster {
private final RoundInfoRepository roundInfoRepository;
private final QuartzConfig quartzConfig;
private final QuartzJobConfig quartzJobConfig;

@KafkaListener(topics = Topics.Constant.INITIAL_AUCTION, groupId = "${spring.kafka.consumer.group-id}")
public void initialAuction(@Payload LinkedHashMap<String, Object> message,
Expand All @@ -46,7 +46,7 @@ public void initialAuction(@Payload LinkedHashMap<String, Object> message,

// 경매 마감 스케줄러 등록
try {
quartzConfig.schedulerUpdateAuctionStateJob(initialAuctionDto);
quartzJobConfig.schedulerUpdateAuctionStateJob(initialAuctionDto);
} catch (Exception e1) {
log.warn(e1.getMessage());
}
Expand Down
4 changes: 3 additions & 1 deletion src/main/java/com/skyhorsemanpower/auction/kafka/Topics.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ public enum Topics {
PAYMENT_SERVICE(Constant.PAYMENT_SERVICE),
SUCCESSFUL_BID_ALARM(Constant.SUCCESSFUL_BID_ALARM),
INITIAL_AUCTION(Constant.INITIAL_AUCTION),
AUCTION_CLOSE(Constant.AUCTION_CLOSE)
AUCTION_CLOSE(Constant.AUCTION_CLOSE),
ALARM(Constant.ALARM)
;

public static class Constant {
Expand All @@ -23,6 +24,7 @@ public static class Constant {
public static final String SUCCESSFUL_BID_ALARM = "successful-bid-alarm-topic";
public static final String INITIAL_AUCTION = "initial-auction-topic";
public static final String AUCTION_CLOSE = "auction-close-topic";
public static final String ALARM ="alarm-topic";

}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package com.skyhorsemanpower.auction.kafka.data;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@RequiredArgsConstructor
@Getter
public enum MessageEnum {
AUCTION_CLOSE_MESSAGE(Constant.AUCTION_CLOSE_MESSAGE);

public static class Constant {
public static final String AUCTION_CLOSE_MESSAGE = "경매 낙찰되었습니다.";
}

private final String message;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package com.skyhorsemanpower.auction.kafka.data.dto;

import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.ToString;

import java.util.List;

@Getter
@NoArgsConstructor
@ToString
public class AlarmDto {
private List<String> receiverUuids;
private String message;
private String eventType;

@Builder
public AlarmDto(List<String> receiverUuids, String message, String eventType) {
this.receiverUuids = receiverUuids;
this.message = message;
this.eventType = eventType;
}
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.skyhorsemanpower.auction.kafka.dto;
package com.skyhorsemanpower.auction.kafka.data.dto;

import com.skyhorsemanpower.auction.status.AuctionStateEnum;
import lombok.Builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.skyhorsemanpower.auction.kafka.dto;
package com.skyhorsemanpower.auction.kafka.data.dto;

import com.skyhorsemanpower.auction.status.AuctionStateEnum;
import lombok.Builder;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package com.skyhorsemanpower.auction.kafka.dto;
package com.skyhorsemanpower.auction.kafka.data.dto;

import lombok.Builder;
import lombok.Getter;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,13 @@
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.web.bind.annotation.*;
import reactor.core.publisher.Flux;
import reactor.core.scheduler.Schedulers;

@Slf4j
@RestController
@RequiredArgsConstructor
@Tag(name = "경매 서비스", description = "경매 서비스 API")
Expand All @@ -46,7 +48,14 @@ public SuccessResponse<Object> offerBiddingPrice(
@Operation(summary = "경매 페이지 API", description = "경매 페이지에 보여줄 데이터 실시간 조회")
public Flux<RoundInfoResponseVo> auctionPage(
@PathVariable("auctionUuid") String auctionUuid) {
return roundInfoReactiveRepository.searchRoundInfo(auctionUuid).subscribeOn(Schedulers.boundedElastic());
return roundInfoReactiveRepository.searchRoundInfo(auctionUuid).subscribeOn(Schedulers.boundedElastic())
.doOnError(error -> {
log.info("SSE error occured!! >>> {}", error.toString());
})
.onErrorResume(error -> {
// 에러 발생 시, 빈 Flux 객체를 반환
return Flux.empty();
});
}

// 경매 페이지 최초 진입 시 현재 데이터 조회 API
Expand Down
Loading

0 comments on commit 9a0c7f1

Please sign in to comment.