개발 중에 고민했던 내용 중 데이터 변환 과정에서 다형성을 적용한 경험을 정리하였다.
상황
현재 개발 중인 WebRTC 프로젝트는 MSA 형태로 구성되어있으며 실시간 데이터를 관리해야한다.
이 때 실시간 데이터를 관리하기 위한 방법으로 Redis(실시간 데이터 저장소)와 연동되는 것은 State(상태관리) 서버 뿐이며, TCP를 통해 Signaling 서버, Chat 서버로 부터 실시간 데이터를 받는다.
문제가 발생한 것은 Signaling 서버 및 Chat 서버로 부터 데이터를 받는 부분이었다.
우리 주제는 WebRTC 기반 스터디 이기 때문에 사용자가 방에 입장해서 버튼을 누르면 공부시간을 기록할 수 있다.
그렇기 때문에 Signaling 서버에서 유저ID , 버튼 누른 시각 을 보낸다.
Chat 서버에서는 웹소켓 세션을 통해 접속 상태를 관리할 수 있다.
따라서 유저ID, 방ID, 세션값 을 보낸다.
결과적으로 하나의 상태관리(TCP) 서버에서 각각 다른 서버의 다른 데이터 구조를 받아들일 수 있어야 한다.
HTTP 요청처럼 url을 통해 하는 방식이 아닌 TCP 소켓을 연결하여 실시간 효율성을 보장하는 방안으로 구현하고자 하였기 때문에 서버별로 다른 데이터를 유동적으로 받아들일 수 있는 기술이 필요했다.
해결 방안
해결하기 위한 아이디어로 상속 개념을 떠올릴 수 있었다. A(Parent), B(Child),C(Child)로 사용한다면 A에 server를 포함하고 B,C에 각 서버별 데이터를 반영할 수 있는 데이터들을 넣을 수 있다면 이를 적용할 수 있을 것이라고 생각했다.
이를 위한 기술로 Jackson 라이브러리를 활용하였다.
그 이유는 Jackson 라이브러리를 통해 각 서버로부터 데이터를 String 형태로 해서 주고받고 있는 상태였으며
이를 응용하여 직렬화 과정에서 JsonSubTypes를 통해서 다형성을 가진 객체의 자식클래스를 변환할 수 있다는 사실을 검색을 통해 알게 되었기 때문이다.
- 추상 클래스 : TCPMessageRequest
- 자식 클래스 : TCPChatRequest
- 자식 클래스 : TCPSignalingRequest
의 형태로 활용하였고 아래 코드와 같다.
- JsonTypeInfo은 타입 정보를 어떻게 처리할 지 지정한다.
- use는 NAME(이름 기반), CLASS(클래스 이름 기반) ... 등 어떤 것을 기반으로 처리할 지 결정한다.
- include는 어느 시점에서 타입 정보를 포함할 지 결정한다.
- property는 JSON 데이터에서 타입 정보를 읽을 속성의 이름을 지정한다.
- visible은 속성 타입 정보가 JSON 데이터에 포함되어야 하는지 여부를 설정한다.
- JsonSubTypes는 객체의 서브 타입을 지정할 수 있다. 즉, Json데이터를 역직렬화할 때 서브 타입으로의 매핑을 지정한다.
- JsonTypeName은 타입 정보를 나타낸다.
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "server",
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = TCPChatRequest.class, name = "chat"),
@JsonSubTypes.Type(value = TCPSignalingRequest.class, name = "signaling")
})
public abstract class TCPMessageRequest{
private String server;
}
TCPMessageRequest는 부모 클래스로 property인 "server"의 값에 따라 SubType이 결정된다.
server의 값이 chat일 때는 TCPChatRequest로, server의 값이 signaling일 때는 TCPSignalingRequest로 역직렬화한다.
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeName("signaling")
public class TCPSignalingRequest extends TCPMessageRequest{
@JsonProperty("user_id")
private String userId;
@JsonProperty("action")
private String action;
@JsonProperty("study_start_time")
private LocalDateTime studyStartTime;
@Override
public String toString(){
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
<TCPSignalingRequest>
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeName("chat")
public class TCPChatRequest extends TCPMessageRequest{
@JsonProperty("user_id")
private String userId;
@JsonProperty("room_id")
private Long roomId;
@JsonProperty("session")
private String session;
@JsonProperty("type")
private String type;
@Override
public String toString(){
ObjectMapper mapper = new ObjectMapper();
try {
return mapper.writeValueAsString(this);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
<TCPChatRequest>
이 후 해당 데이터를 받은 데이터는 (TCPChatRequest)형태로 캐스팅하여 사용하는 형태로 구현하였다.
에러 처리 및 코드가 정리되지 않았지만 예시로 참고할 수 있을 것 같다.
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageService {
private final RedisService redisService;
public String processMessage(String message) {
log.info("Received message: {}", message);
ObjectMapper mapper = new ObjectMapper();
try{
TCPMessageRequest response = mapper.readValue(message, TCPMessageRequest.class);
String responseMessage = String.format("Message \"%s\" is processed", response.toString());
switch (response.getServer()) {
case "chat":
TCPChatRequest chatRequest = (TCPChatRequest) response;
log.warn(chatRequest.toString());
if (chatRequest.getType().matches("SUBSCRIBE")) {
RealTimeData realTimeData = redisService.getData(chatRequest.getUserId());
if (realTimeData == null) {
realTimeData = new RealTimeData();
realTimeData.setUserId(chatRequest.getUserId());
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(chatRequest.getSession());
RealTimeData realTimeData1 = redisService.setData(realTimeData);
} else {
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(chatRequest.getSession());
RealTimeData realTimeData2 = redisService.setData(realTimeData);
}
} else if (chatRequest.getType().matches("DISCONNECT")) {
RealTimeData realTimeData = redisService.getData(chatRequest.getUserId());
if (realTimeData != null) {
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(null);
redisService.setData(realTimeData);
}
} else {
// 에러처리
}
break;
case "signaling":
TCPSignalingRequest signalingRequest = (TCPSignalingRequest) response;
log.warn(signalingRequest.toString());
if (signalingRequest.getAction().matches("TIMER_ON")) {
try {
RealTimeData realTimeData = redisService.getData(signalingRequest.getUserId());
LocalDateTime currentTime = LocalDateTime.now();
realTimeData.setStudyStartTime(currentTime);
RealTimeData savedData = redisService.setData(realTimeData);
}catch (Exception e) {
throw new RuntimeException(e);
}
} else if (signalingRequest.getAction().matches("TIMER_OFF")) {
try {
RealTimeData realTimeData = redisService.getData(signalingRequest.getUserId());
LocalDateTime currentTime = LocalDateTime.now();
LocalDateTime studyStartTime = realTimeData.getStudyStartTime();
Duration totalStudyTime = Duration.between(studyStartTime, currentTime);
realTimeData.setStudyStartTime(null);
realTimeData.setRecordTime(totalStudyTime);
RealTimeData savedData = redisService.setData(realTimeData);
}catch (Exception e) {
throw new RuntimeException(e);
}
} else {
// 에러처리
}
break;
}
return responseMessage;
} catch (JsonMappingException e) {
throw new RuntimeException(e);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}
<Message Service>
'백엔드' 카테고리의 다른 글
[Spring Boot] 단위 테스트, 통합 테스트 (0) | 2023.09.25 |
---|---|
gRPC통신에 관하여 (0) | 2023.08.07 |
Kafka-(de)Serialize 에러, ErrorHandlingDeserializer (0) | 2023.03.29 |
Jar VS War (0) | 2022.11.15 |
REST API (0) | 2021.11.28 |