Skip to content

Commit

Permalink
Feat/integ sse (#124)
Browse files Browse the repository at this point in the history
* Bug: Produce status bug

* Fix: SSE 통합
  • Loading branch information
klkim1913 authored Aug 17, 2023
1 parent 747b992 commit ff65806
Show file tree
Hide file tree
Showing 11 changed files with 114 additions and 42 deletions.
13 changes: 5 additions & 8 deletions src/main/java/com/anywayclear/controller/AlarmController.java
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,6 @@
import org.springframework.web.bind.annotation.*;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import javax.servlet.http.HttpServletResponse;
import java.time.LocalDateTime;

@RestController
@RequiredArgsConstructor
@RequestMapping("/api/alarms")
Expand All @@ -27,11 +24,11 @@ public ResponseEntity<SseEmitter> createEmitter(@AuthenticationPrincipal OAuth2U
return new ResponseEntity<>(alarmService.createEmitter(oAuth2User, lastEventId), HttpStatus.OK);
}

@PostMapping("type/{type}/topic/{topicName}")
@ResponseStatus(HttpStatus.CREATED)
public void pushAlarm(@PathVariable("type") String type, @PathVariable("topicName") String topicName) {
alarmService.pushAlarm(type, topicName, LocalDateTime.now());
}
// @PostMapping("type/{type}/topic/{topicName}")
// @ResponseStatus(HttpStatus.CREATED)
// public void pushAlarm(@PathVariable("type") String type, @PathVariable("topicName") String topicName) {
// alarmService.pushAlarm(type, topicName, LocalDateTime.now());
// }

@GetMapping("/{memberId}")
public ResponseEntity<AlarmResponseList> getAlarmList(@PathVariable String memberId) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package com.anywayclear.controller;

import com.anywayclear.dto.response.AlarmResponseList;
import com.anywayclear.service.NotificationService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.http.MediaType;
import org.springframework.http.ResponseEntity;
import org.springframework.security.core.annotation.AuthenticationPrincipal;
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.web.bind.annotation.*;
Expand All @@ -26,6 +28,12 @@ public SseEmitter subscribe(@AuthenticationPrincipal OAuth2User oAuth2User,
return notificationService.subscribe(userId, lastEventId);
}

@GetMapping("/list")
public ResponseEntity<AlarmResponseList> getAlarmList(@AuthenticationPrincipal OAuth2User oAuth2User) {
String userId = (String) oAuth2User.getAttributes().get("userId");
return ResponseEntity.ok(notificationService.getAlarmList(userId));
}

// @PostMapping("/send-data")
// public void sendData(@AuthenticationPrincipal OAuth2User oAuth2User) {
// String userId = (String) oAuth2User.getAttributes().get("userId");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,20 @@

@Getter
public class AuctionCompleteNotification {
private final int type;
private final String nickname;
private final String produceName;

@Builder
public AuctionCompleteNotification(String nickname, String produceName) {
public AuctionCompleteNotification(int type, String nickname, String produceName) {
this.type = type;
this.nickname = nickname;
this.produceName = produceName;
}

public static AuctionCompleteNotification toResponse(Auction auction) {
return AuctionCompleteNotification.builder()
.type(2)
.nickname(auction.getNickname())
.produceName(auction.getProduce().getName())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import com.anywayclear.entity.Auction;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.time.LocalDateTime;

@Getter
@ToString
public class AuctionResponse {
private final Long id;
private final int price;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,14 @@
import com.anywayclear.entity.Produce;
import lombok.Builder;
import lombok.Getter;
import lombok.ToString;

import java.time.LocalDateTime;
import java.util.List;

@Getter
//@Setter
@ToString
public class ProduceResponse {
private final long id;
private final String name;
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/anywayclear/service/AuctionService.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,18 @@

import com.anywayclear.dto.request.BiddingRequest;
import com.anywayclear.dto.request.DealCreateRequest;
import com.anywayclear.dto.response.AuctionResponse;
import com.anywayclear.dto.response.AuctionResponseList;
import com.anywayclear.dto.response.BiddingResponse;
import com.anywayclear.dto.response.ProduceResponse;
import com.anywayclear.entity.Auction;
import com.anywayclear.entity.Member;
import com.anywayclear.entity.Produce;
import com.anywayclear.exception.CustomException;
import com.anywayclear.repository.AuctionRepository;
import com.anywayclear.repository.MemberRepository;
import com.anywayclear.repository.ProduceRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;

Expand All @@ -20,6 +23,7 @@
import static com.anywayclear.exception.ExceptionCode.*;

@Service
@Slf4j
public class AuctionService {
private final AuctionRepository auctionRepository;
private final MemberRepository memberRepository;
Expand All @@ -43,7 +47,7 @@ public BiddingResponse Bidding(long auctionId, String consumerId, BiddingRequest
}
/* 테스트동안 제한 안함 */
// 테스트용으로 1분
if (LocalDateTime.now().isAfter(auction.getUpdatedAt().plusMinutes(1))) {
if (LocalDateTime.now().isAfter(auction.getLastBidding().plusMinutes(1))) {
throw new CustomException(EXPIRED_AUCTION_TIME);
}
Member consumer = memberRepository.findByUserId(consumerId).orElseThrow(() -> new CustomException(INVALID_MEMBER));
Expand All @@ -61,6 +65,8 @@ public void checkAuctionFinished(long auctionId) {
Auction auction = auctionRepository.findById(auctionId).orElseThrow(() -> new CustomException(INVALID_AUCTION_ID));
Produce produce = auction.getProduce();
// 테스트용으로 1분
log.debug(AuctionResponse.toResponse(auction).toString());
log.debug(ProduceResponse.toResponse(produce).toString());
if (produce.getStatus() == 1 && !auction.isClosed() && LocalDateTime.now().isAfter(auction.getLastBidding().plusMinutes(1))) {
auction.setClosed(true);
produce.setEndDate(auction.getLastBidding());
Expand Down
7 changes: 1 addition & 6 deletions src/main/java/com/anywayclear/service/DibService.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.time.LocalDateTime;
import java.util.List;

import static com.anywayclear.exception.ExceptionCode.*;
Expand All @@ -29,13 +27,10 @@ public class DibService {
private final MemberRepository memberRepository;
private final ProduceRepository produceRepository;

private final AlarmService alarmService;

public DibService(DibRepository dibRepository, MemberRepository memberRepository, ProduceRepository produceRepository, AlarmService alarmService) {
public DibService(DibRepository dibRepository, MemberRepository memberRepository, ProduceRepository produceRepository) {
this.dibRepository = dibRepository;
this.memberRepository = memberRepository;
this.produceRepository = produceRepository;
this.alarmService = alarmService;
}

@Transactional
Expand Down
83 changes: 75 additions & 8 deletions src/main/java/com/anywayclear/service/NotificationService.java
Original file line number Diff line number Diff line change
@@ -1,35 +1,59 @@
package com.anywayclear.service;

import com.anywayclear.dto.response.AlarmResponse;
import com.anywayclear.dto.response.AlarmResponseList;
import com.anywayclear.dto.response.AuctionCompleteNotification;
import com.anywayclear.entity.Alarm;
import com.anywayclear.entity.Auction;
import com.anywayclear.repository.EmitterRepository;
import com.anywayclear.entity.Member;
import com.anywayclear.entity.Produce;
import com.anywayclear.exception.CustomException;
import com.anywayclear.exception.ExceptionCode;
import com.anywayclear.repository.*;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.io.IOException;
import java.time.LocalDateTime;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

@Service
@Slf4j
public class NotificationService {
// 기본 타임아웃 설정
private static final Long DEFAULT_TIMEOUT = 60L * 1000 * 60;
private final EmitterRepository emitterRepository;
private final ProduceRepository produceRepository;
private final DibRepository dibRepository;
private final MemberRepository memberRepository;
private final SubscribeRepository subscribeRepository;
private final RedisTemplate<String,Alarm> redisAlarmTemplate;

public NotificationService(EmitterRepository emitterRepository) {
public NotificationService(EmitterRepository emitterRepository, ProduceRepository produceRepository, DibRepository dibRepository, MemberRepository memberRepository, SubscribeRepository subscribeRepository, RedisTemplate<String,Alarm> redisAlarmTemplate) {
this.emitterRepository = emitterRepository;
this.produceRepository = produceRepository;
this.dibRepository = dibRepository;
this.memberRepository = memberRepository;
this.subscribeRepository = subscribeRepository;
this.redisAlarmTemplate = redisAlarmTemplate;
}

public SseEmitter subscribe(String userId, String lastEventId) {
String id = userId + "_" + System.currentTimeMillis();
SseEmitter emitter = createEmitter(id);
sendToClient(emitter, id, "EventStream Created. [userId=" + userId + "]");
sendToClient(emitter, id,"connection", "EventStream Created. [userId=" + userId + "]");
if (!lastEventId.isEmpty()) {
Map<String, Object> events = emitterRepository.findAllEventCacheStartWithId(userId);
events.entrySet().stream()
.filter(entry -> lastEventId.compareTo(entry.getKey()) < 0)
.forEach(entry -> sendToClient(emitter, entry.getKey(), entry.getValue()));
.forEach(entry -> sendToClient(emitter, entry.getKey(),"sse", entry.getValue()));
}
return emitter;
}
Expand All @@ -43,17 +67,59 @@ public void notify(String userId, Auction auction) {
sseEmitters.forEach(
(key, emitter) -> {
emitterRepository.saveEventCache(key, auction);
sendToClient(emitter, key, AuctionCompleteNotification.toResponse(auction));
sendToClient(emitter, key,"sse", AuctionCompleteNotification.toResponse(auction));
}
);
}

@Transactional(readOnly = true)
public void pushAlarm(String type, String topicName, LocalDateTime now) { // 알람 전송

private void sendToClient(SseEmitter emitter, String id, Object data) {
// 알림 수신 목록 불러오기
Alarm alarm;
List<String> receiverKeyList;
if (type.equals("dib")) {
Produce produce = produceRepository.findById(Long.parseLong(topicName)).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_PRODUCE_ID));
alarm = Alarm.builder().type(0).senderId(produce.getId().toString()).senderName(produce.getName()).context("경매가 시작되었습니다!").build(); // 알람 객체 생성
receiverKeyList = dibRepository.findAllByProduce(produce).stream().map(p -> p.getConsumer().getUserId()).collect(Collectors.toList());
} else {
Member seller = memberRepository.findByUserId(topicName).orElseThrow(() -> new CustomException(ExceptionCode.INVALID_MEMBER));
alarm = Alarm.builder().type(1).senderId(seller.getUserId()).senderName(seller.getNickname()).context("새로운 경매 글이 올라왔습니다!").build(); // 알람 객체 생성
receiverKeyList = subscribeRepository.findAllBySeller(seller).stream().map(m -> m.getConsumer().getUserId()).collect(Collectors.toList());
}

// 받은 알람 SSE 전송
// 해당 토픽의 SseEmitter 모두 가져옴
receiverKeyList.forEach(
receiverKey -> {
SseEmitter emitter = emitterRepository.get(receiverKey);
String keyWithTime = receiverKey + now;
emitterRepository.saveEventCache(keyWithTime, AlarmResponse.toResponse(alarm));
sendToClient(emitter, receiverKey, "Alarm", AlarmResponse.toResponse(alarm));
String redisKey = "member:" + receiverKey + ":alarm:" + alarm.getId();
redisAlarmTemplate.opsForValue().set(redisKey, alarm); // 레디스에 저장
redisAlarmTemplate.expire(redisKey, 1, TimeUnit.MINUTES); // TTL 설정 ***** 테스트용
}

);

}

@Transactional(readOnly = true)
public AlarmResponseList getAlarmList(String memberId) { // 해당 유저의 알림 리스트 불러오기
// 패턴 매칭 사용 -> member:memberId:alarm:*

// [1] 반복문으로 해당 소비자 userId가 키로 포함된 알림 내역 불러오기
Set<String> keys = redisAlarmTemplate.keys("member:" + memberId + ":alarm:*");
// [2] 반복문으로 Value(알람 객체) 불러와 저장
List<Alarm> alarmList = keys.stream().map(k -> redisAlarmTemplate.opsForValue().get(k)).collect(Collectors.toList()); // 해당 키의 알람 리스트 저장
return new AlarmResponseList(alarmList);
}
private void sendToClient(SseEmitter emitter, String id, String name, Object data) {
try {
emitter.send(SseEmitter.event()
.id(id)
.name("sse")
.name(name)
.data(data));
} catch (IOException e) {
emitterRepository.deleteById(id);
Expand All @@ -66,7 +132,8 @@ private SseEmitter createEmitter(String id) {
SseEmitter emitter = new SseEmitter(DEFAULT_TIMEOUT);
emitterRepository.save(id, emitter);
// Emitter가 완료될 때(모든 데이터가 성공적으로 전송된 상태) Emitter를 삭제한다.
emitter.onCompletion(() -> emitterRepository.deleteById(id));
emitter.onCompletion(() -> emitterRepository.deleteById(id)
);
// Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때) Emitter를 삭제한다.
emitter.onTimeout(() -> emitterRepository.deleteById(id));

Expand Down
10 changes: 4 additions & 6 deletions src/main/java/com/anywayclear/service/ProduceService.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,10 @@
import com.anywayclear.entity.Auction;
import com.anywayclear.entity.Member;
import com.anywayclear.entity.Produce;
import com.anywayclear.entity.Subscribe;
import com.anywayclear.exception.CustomException;
import com.anywayclear.repository.AuctionRepository;
import com.anywayclear.repository.MemberRepository;
import com.anywayclear.repository.ProduceRepository;
import com.anywayclear.repository.SubscribeRepository;
import lombok.extern.slf4j.Slf4j;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand All @@ -32,15 +30,15 @@ public class ProduceService {
private final MemberRepository memberRepository;
private final AuctionService auctionService;

private final AlarmService alarmService;
private final NotificationService notificationService;


public ProduceService(ProduceRepository produceRepository, AuctionRepository auctionRepository, MemberRepository memberRepository, AuctionService auctionService, AlarmService alarmService) {
public ProduceService(ProduceRepository produceRepository, AuctionRepository auctionRepository, MemberRepository memberRepository, AuctionService auctionService, NotificationService notificationService) {
this.produceRepository = produceRepository;
this.auctionRepository = auctionRepository;
this.memberRepository = memberRepository;
this.auctionService = auctionService;
this.alarmService = alarmService;
this.notificationService = notificationService;
}

@Transactional
Expand All @@ -52,7 +50,7 @@ public Long createProduce(ProduceCreateRequest request, String sellerId) {
}

// 구독자들에게 알림 발송
alarmService.pushAlarm("sub", sellerId, LocalDateTime.now());
notificationService.pushAlarm("sub", sellerId, LocalDateTime.now());

return produce.getId();
}
Expand Down
6 changes: 1 addition & 5 deletions src/main/java/com/anywayclear/service/SubscribeService.java
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,7 @@
import org.springframework.security.oauth2.core.user.OAuth2User;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.web.servlet.mvc.method.annotation.SseEmitter;

import java.time.LocalDateTime;
import java.util.ArrayList;
import java.util.List;

Expand All @@ -25,12 +23,10 @@ public class SubscribeService {
private final SubscribeRepository subscribeRepository;
private final MemberRepository memberRepository;

private AlarmService alarmService;

public SubscribeService(SubscribeRepository subscribeRepository, MemberRepository memberRepository, AlarmService alarmService) {
public SubscribeService(SubscribeRepository subscribeRepository, MemberRepository memberRepository) {
this.subscribeRepository = subscribeRepository;
this.memberRepository = memberRepository;
this.alarmService = alarmService;
}

@Transactional
Expand Down
Loading

0 comments on commit ff65806

Please sign in to comment.