Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[DEPLOYMENT] 구독자 새 경매글 알림 기능 구현 #230

Merged
merged 11 commits into from
Jun 30, 2024
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package com.leeforgiveness.memberservice.common.kafka;

import lombok.Getter;
import lombok.RequiredArgsConstructor;

@Getter
@RequiredArgsConstructor
public enum EventType {
AUCTION_POST_DETAIL("Auction"),
MOVE_CHATROOM("Chat");

private final String type;
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package com.leeforgiveness.memberservice.common.kafka;
import com.leeforgiveness.memberservice.auth.application.MemberService;
import com.leeforgiveness.memberservice.auth.vo.SearchForChatRoomVo;
import com.leeforgiveness.memberservice.common.kafka.Topics.Constant;
import com.leeforgiveness.memberservice.common.kafka.dto.SubscriberFilterVo;
import com.leeforgiveness.memberservice.subscribe.application.InfluencerSubscriptionService;
import java.util.LinkedHashMap;
import java.util.List;
import lombok.RequiredArgsConstructor;
Expand All @@ -17,23 +20,48 @@
public class KafkaConsumerCluster {

private final MemberService memberService;
private final InfluencerSubscriptionService influencerSubscriptionService;

@KafkaListener(topics = "send-to-member-for-create-chatroom-topic"
@KafkaListener(topics = Constant.SEND_TO_MEMBER_FOR_CREATE_CHATROOM_TOPIC
)
public void consumeBatch(@Payload LinkedHashMap<String, Object> message,
@Headers MessageHeaders messageHeaders) {
log.info("consumer: success >>> message: {}, headers: {}", message.toString(),
messageHeaders);
//message를 searchForChatRoom로 변환
SearchForChatRoomVo searchForChatRoomVo = SearchForChatRoomVo.builder()
.auctionUuid(message.get("auctionUuid").toString())
.memberUuids((List<String>) message.get("memberUuids"))
.adminUuid(message.get("adminUuid").toString())
.title(message.get("title").toString())
.thumbnail(message.get("thumbnail").toString())
.build();
log.info("auctionUuid : {}", searchForChatRoomVo.getAuctionUuid());
log.info("memberUuids : {}", searchForChatRoomVo.getMemberUuids());
memberService.searchProfileImage(searchForChatRoomVo);
log.info(">>>>> consume send-to-member-for-create-chatroom-topic success");

Object auctionUuidObj = message.get("auctionUuid");
Object memberUuidsObj = message.get("memberUuids");
Object adminUuidObj = message.get("adminUuid");
Object titleObj = message.get("title");
Object thumbnailObj = message.get("thumbnail");

if (auctionUuidObj != null && memberUuidsObj != null && adminUuidObj != null && titleObj != null && thumbnailObj != null) {
SearchForChatRoomVo searchForChatRoomVo = SearchForChatRoomVo.builder()
.auctionUuid(auctionUuidObj.toString())
.memberUuids((List<String>) memberUuidsObj)
.adminUuid(adminUuidObj.toString())
.title(titleObj.toString())
.thumbnail(thumbnailObj.toString())
.build();

memberService.searchProfileImage(searchForChatRoomVo);
}

}

@KafkaListener(topics = Constant.INITIAL_AUCTION)
public void consumeNewAuction(@Payload LinkedHashMap<String, Object> message) {
Object auctionUuidObj = message.get("auctionUuid");
Object influencerUuidObj = message.get("influencerUuid");
Object influencerNameObj = message.get("influencerName");

if (auctionUuidObj != null && influencerUuidObj != null && influencerNameObj != null) {
influencerSubscriptionService.sendNewAuctionAlarmToSubscriber(
SubscriberFilterVo.builder()
.auctionUuid(auctionUuidObj.toString())
.influencerUuid(influencerUuidObj.toString())
.influencerName(influencerNameObj.toString())
.build()
);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,21 @@
@RequiredArgsConstructor
public enum Topics {
MEMBER_SERVICE(Constant.MEMBER_SERVICE),
SEND_TO_CHAT(Constant.SEND_TO_CHAT);
SEND_TO_CHAT(Constant.SEND_TO_CHAT),
SEND_TO_MEMBER_FOR_CREATE_CHATROOM_TOPIC(Constant.SEND_TO_MEMBER_FOR_CREATE_CHATROOM_TOPIC),
INITIAL_AUCTION(Constant.INITIAL_AUCTION),
ALARM(Constant.ALARM)
;

public static class Constant {

public static final String SEND_TO_CHAT = "send-to-chat-topic";
public static final String MEMBER_SERVICE = "alarm-topic";
public static final String CHANGE_PROFILE_IMAGE = "change-profile-image-topic";
public static final String SEND_TO_MEMBER_FOR_CREATE_CHATROOM_TOPIC
= "send-to-member-for-create-chatroom-topic";
public static final String INITIAL_AUCTION = "initial-auction-topic";
public static final String ALARM ="alarm-topic";
}

private final String topic;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.leeforgiveness.memberservice.common.kafka.dto;

import java.util.List;
import lombok.Builder;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.ToString;

@Getter
@Setter
@NoArgsConstructor
@ToString
public class AlarmDto {

private List<String> receiverUuids;
private String message;
private String eventType;
private String uuid;

@Builder
public AlarmDto(List<String> receiverUuids, String message, String eventType, String uuid) {
this.receiverUuids = receiverUuids;
this.message = message;
this.eventType = eventType;
this.uuid = uuid;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package com.leeforgiveness.memberservice.common.kafka.dto;

import lombok.Builder;
import lombok.Getter;

@Getter
@Builder
public class SubscriberFilterVo {
private String auctionUuid;
private String influencerUuid;
private String influencerName;
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package com.leeforgiveness.memberservice.subscribe.application;

import com.leeforgiveness.memberservice.common.kafka.dto.AlarmDto;
import com.leeforgiveness.memberservice.common.kafka.dto.SubscriberFilterVo;
import com.leeforgiveness.memberservice.subscribe.dto.InfluencerSubscribeRequestDto;
import com.leeforgiveness.memberservice.subscribe.vo.IsSubscribedRequestVo;
import com.leeforgiveness.memberservice.subscribe.vo.SubscribedInfluencerRequestVo;
Expand All @@ -15,4 +17,6 @@ SubscribedInfluencerResponseDto getSubscriptionInfos(
SubscribedInfluencerRequestVo subscribedInfluencerRequestVo);

Boolean isSubscribed(IsSubscribedRequestVo isSubscribedRequestVo);

void sendNewAuctionAlarmToSubscriber(SubscriberFilterVo subscriberFilterVo);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,11 @@

import com.leeforgiveness.memberservice.common.exception.CustomException;
import com.leeforgiveness.memberservice.common.exception.ResponseStatus;
import com.leeforgiveness.memberservice.common.kafka.EventType;
import com.leeforgiveness.memberservice.common.kafka.KafkaProducerCluster;
import com.leeforgiveness.memberservice.common.kafka.Topics.Constant;
import com.leeforgiveness.memberservice.common.kafka.dto.AlarmDto;
import com.leeforgiveness.memberservice.common.kafka.dto.SubscriberFilterVo;
import com.leeforgiveness.memberservice.subscribe.domain.InfluencerSubscription;
import com.leeforgiveness.memberservice.subscribe.dto.InfluencerSubscribeRequestDto;
import com.leeforgiveness.memberservice.subscribe.dto.InfluencerSummaryDto;
Expand All @@ -25,6 +30,7 @@ public class InfluencerSubscriptionServiceImpl implements InfluencerSubscription

private final InfluencerSubscriptionRepository influencerSubscriptionRepository;
private final ExternalService externalService;
private final KafkaProducerCluster kafkaProducer;

//구독
@Override
Expand Down Expand Up @@ -154,4 +160,25 @@ private Optional<InfluencerSubscription> getSubscription(
throw new CustomException(ResponseStatus.DATABASE_READ_FAIL);
}
}

@Override
public void sendNewAuctionAlarmToSubscriber(SubscriberFilterVo subscriberFilterVo) {
List<InfluencerSubscription> influencerSubscriptions = influencerSubscriptionRepository.findByInfluencerUuidAndState(
subscriberFilterVo.getInfluencerUuid(), SubscribeState.SUBSCRIBE);

if (influencerSubscriptions.isEmpty()) {
log.info(">>>> sendNewAuctionAlarmToSubscriber: no subscriber");
return;
}

List<String> receiverUuids = influencerSubscriptions.stream()
.map(InfluencerSubscription::getSubscriberUuid).toList();

kafkaProducer.sendMessage(Constant.ALARM, AlarmDto.builder()
.uuid(subscriberFilterVo.getAuctionUuid())
.receiverUuids(receiverUuids)
.eventType(EventType.AUCTION_POST_DETAIL.getType())
.message(String.format("%s님의 새로운 경매가 올라왔어요!", subscriberFilterVo.getInfluencerName()))
.build());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ Optional<InfluencerSubscription> findBySubscriberUuidAndInfluencerUuid(

List<InfluencerSubscription> findBySubscriberUuidAndState(String subscriberUuid,
SubscribeState state);

List<InfluencerSubscription> findByInfluencerUuidAndState(String influencerUuid, SubscribeState state);
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
package com.leeforgiveness.memberservice.common;

import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
import java.util.Random;
import java.util.UUID;

public class GenerateRandom {

private static final String CHARACTERS = "0123456789";
private static final Random RANDOM = new Random();

Expand All @@ -18,6 +17,10 @@ private static String string(int length) {
return sb.toString();
}

public static String auctionUuid() {
return "auction-" + string(9);
}

public static String subscriberUuid() {
return UUID.randomUUID().toString();
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
package com.leeforgiveness.memberservice.subscribe;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;

import com.leeforgiveness.memberservice.common.GenerateRandom;
import com.leeforgiveness.memberservice.common.exception.CustomException;
import com.leeforgiveness.memberservice.common.kafka.EventType;
import com.leeforgiveness.memberservice.common.kafka.KafkaProducerCluster;
import com.leeforgiveness.memberservice.common.kafka.Topics.Constant;
import com.leeforgiveness.memberservice.common.kafka.dto.AlarmDto;
import com.leeforgiveness.memberservice.common.kafka.dto.SubscriberFilterVo;
import com.leeforgiveness.memberservice.subscribe.application.ExternalService;
import com.leeforgiveness.memberservice.subscribe.application.InfluencerSubscriptionServiceImpl;
import com.leeforgiveness.memberservice.subscribe.domain.InfluencerSubscription;
Expand All @@ -27,6 +34,7 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;
import org.mockito.Mockito;

public class InfluencerSubscribeTest {
Expand All @@ -35,14 +43,15 @@ public class InfluencerSubscribeTest {
InfluencerSubscriptionRepository.class);
private ExternalService externalService = Mockito.mock(ExternalService.class);
private InfluencerSubscriptionServiceImpl influencerSubscriptionService;
private KafkaProducerCluster kafkaProducer = Mockito.mock(KafkaProducerCluster.class);

private String subscriberUuid;
private String influencerUuid;

@BeforeEach
public void setUp() {
influencerSubscriptionService = new InfluencerSubscriptionServiceImpl(
influencerSubscriptionRepository, externalService);
influencerSubscriptionRepository, externalService, kafkaProducer);

subscriberUuid = GenerateRandom.subscriberUuid();
influencerUuid = GenerateRandom.influencerUuid();
Expand Down Expand Up @@ -342,4 +351,39 @@ void isSubscribedNullTest() {

assertFalse(isSubscribed);
}

@Test
@DisplayName("정상적인 initial-auction-topic 메시지를 받았을때 구독자가 있으면 알림 메시지를 보낸다")
void SendNewAuctionAlarmToSubscriberAtNormalMessageTest() {
SubscriberFilterVo subscriberFilterVo = SubscriberFilterVo.builder()
.auctionUuid(GenerateRandom.auctionUuid())
.influencerUuid(influencerUuid)
.influencerName("아이유")
.build();

Mockito.when(influencerSubscriptionRepository.findByInfluencerUuidAndState(influencerUuid, SubscribeState.SUBSCRIBE))
.thenReturn(List.of(
InfluencerSubscription.builder()
.subscriberUuid(subscriberUuid)
.build()
));

AlarmDto alarmDto = AlarmDto.builder()
.uuid(GenerateRandom.auctionUuid())
.receiverUuids(List.of(subscriberUuid))
.eventType(EventType.AUCTION_POST_DETAIL.getType())
.message("새글 알림")
.build();

kafkaProducer.sendMessage(Constant.ALARM, alarmDto);


ArgumentCaptor<String> topicCaptor = ArgumentCaptor.forClass(String.class);
ArgumentCaptor<Object> messageCaptor = ArgumentCaptor.forClass(Object.class);

verify(kafkaProducer, times(1)).sendMessage(topicCaptor.capture(), messageCaptor.capture());

assertEquals(Constant.ALARM, topicCaptor.getValue());
assertEquals(alarmDto, messageCaptor.getValue());
}
}
Loading