이전 글에 이어 gRPC client부분을 구현하고, gRPC 통신을 테스트하였다.
https://jhl8109.tistory.com/62
gRPC 동작
gRPC 동작은 MessageChannelInterceptor에 의해 수행된다.
코드 중 주석 부분은, 존재하던 TCP 코드이다.
@Slf4j
@RequiredArgsConstructor
@Component
public class MessageChannelInterceptor implements ChannelInterceptor {
private final TCPMessageService tcpMessageService;
private final JwtTokenProvider jwtTokenProvider;
private final GrpcClientService grpcClientService;
// 메세지가 전송되기 전에 Intercept
@Override
public Message<?> preSend(Message<?> message, MessageChannel channel) {
return ChannelInterceptor.super.preSend(message, channel);
}
// 메세지가 전송된 후
// CONNECT : 클라이언트가 서버에 연결되었을 때
// DISCONNECT : 클라이언트가 서버와 연결을 끊었을 때
// SUBSCRIBE : 채팅방에 들어갈 때
// UNSUBSCRIBE : 채팅방에 나갈 때 혹은 소켓 연결 끊어질 때
// command 상태를 통해서 접속 상태를 관리함.
@Override
public void postSend(Message<?> message, MessageChannel channel, boolean sent) {
StompHeaderAccessor accessor = StompHeaderAccessor.wrap(message);
String authorizationHeader = String.valueOf(accessor.getFirstNativeHeader("Authorization"));
String sessionId = accessor.getSessionId();
String userId = "";
// jwt 토큰 검증, gateway에서 검증 되었기 때문에 userId를 쓰기만 하면됨.
// 접속 상태 관리
switch (accessor.getCommand()) {
case SUBSCRIBE: // room ID에 들어갈 때(소켓 연결이 아니라 채팅방에 들어갈 때 )
userId = getUserId(authorizationHeader);
Long roomId = Long.valueOf(accessor.getDestination().substring(7));
// tcp 소켓 통신 구현부
// log.info(roomId.toString());
// TCPSocketSessionRequest subscribeRequest = TCPSocketSessionRequest.builder()
// .type("SUBSCRIBE")
// // "Timer ON TIMER OFF 프론트에서 보내는ㅅ거, USER_OUT, USER_IN 프론트에서 받는거
// .userId(userId)
// .server("chat")
// .roomId(roomId) // 슬래쉬 ( '/topic/' ) 삭제
// .session(sessionId)
// .build();
// log.debug(accessor.getCommand() + " : " + subscribeRequest.toString());
// tcpMessageService.sendMessage(subscribeRequest.toString());
//grpc 통신 구현부
grpcClientService.subscribeRoom(userId,roomId,sessionId);
break;
case DISCONNECT: // 채팅방 나갈 때
// TCPSocketSessionRequest disconnectRequest = TCPSocketSessionRequest.builder()
// .type("DISCONNECT")
// .userId(userId)
// .server("chat")
// .roomId(null) // 슬래쉬 ( '/topic/' ) 삭제
// .session(sessionId)
// .build();
// log.debug(accessor.getCommand() + " : " + disconnectRequest.toString());
// tcpMessageService.sendMessage(disconnectRequest.toString());
grpcClientService.unsubscribeRoom(sessionId);
break;
case UNSUBSCRIBE:
// userId = getUserId(authorizationHeader);
// TCPSocketSessionRequest unsubscribeRequest = TCPSocketSessionRequest.builder()
// .type("UNSUBSCRIBE")
// .userId(userId)
// .server("chat")
// .roomId(null) // 슬래쉬 ( '/topic/' ) 삭제
// .session(sessionId)
// .build();
// log.debug(accessor.getCommand() + " : " + unsubscribeRequest.toString());
// tcpMessageService.sendMessage(unsubscribeRequest.toString());
grpcClientService.unsubscribeRoom(sessionId);
break;
}
ChannelInterceptor.super.postSend(message, channel, sent);
}
TCP 소켓 통신 방식에 비해 직관적으로 확인할 수 있는 가장 큰 차이점은 type 필드와 server 필드가 사라졌다는 점이다.
이는 서버에 따라 proto 파일을 만들고, proto파일의 rpc 함수로 동작하고자 하는 type을 구분하였기 때문이다. 따라서 약간 직관성이 개선되었다는 느낌을 받았다.
또한, 추가적으로 gRPC의 직렬화를 통해 성능적 이점도 기대할 수 있다.
gRPC 서비스 로직 구현
이전 편에서 작성한 프로토타입과 동일하다.
syntax = "proto3"; //proto3를 사용
option java_multiple_files = true; //java 파일을 여러 개 생성 가능
option java_package = "pnu.cse.studyhub.state.service"; //java 파일의 패키지
import "google/protobuf/empty.proto";
service GrpcMessageService { // 서비스 이름 및 해당 서비스에서 제공하는 메서드 정의
rpc SubscribeChat (ChatSubscribeRequest) returns (ChatActionResponse); // rpc : 메서드 정의
rpc UnsubscribeChat (ChatUnsubscribeRequest) returns (ChatActionResponse);
// rpc UnsubscribeChat (ChatUnsubscribeRequest) returns (google.protobuf.Empty);
}
message ChatSubscribeRequest {
string userId = 1;
int64 roomId = 2;
string session = 3;
}
message ChatActionResponse {
string userId = 1;
}
message ChatUnsubscribeRequest {
string session = 1;
}
GrpcClientService는 proto 파일을 통해 생성된 Request, Response 객체를 기반으로 수행된다.
이전 인터셉터에서 수행했던 gRPC 동작의 구현 내용이 포함되어 있다.
@Service
@RequiredArgsConstructor
@Slf4j
public class GrpcClientService {
private final GrpcMessageServiceGrpc.GrpcMessageServiceBlockingStub grpcStub;
public void subscribeRoom(String userId, long roomId, String session) {
ChatSubscribeRequest request = ChatSubscribeRequest.newBuilder()
.setUserId(userId)
.setRoomId(roomId)
.setSession(session)
.build();
ChatActionResponse response = grpcStub.subscribeChat(
request
);
log.debug("grpc-subscribe request : {}", request);
log.debug("grpc-subscribe response : {}", response);
};
public void unsubscribeRoom(String session) {
ChatUnsubscribeRequest request = ChatUnsubscribeRequest.newBuilder()
.setSession(session)
.build();
ChatActionResponse response = grpcStub.unsubscribeChat(
ChatUnsubscribeRequest.newBuilder()
.setSession(session)
.build()
);
log.debug("grpc-unsubscribe request: {}", request);
log.debug("grpc-unsubscribe response : {}", response);
};
gRPCClientService에서는 proto 파일에 정의된, 그리고 생성된 ChatSubscribeRequest/Response, ChatActionRequest/Response를 사용한다. 그리고 상태관리서버에 동작중인 subscribeChat, unsubscribeChat gRPC 함수를 호출하여 동작한다.
통신은 grpcStub을 통해 grpc 호출 및 응답한다.
테스트
다음과 같이 grpc request/reponse가 정상적으로 수행되었다.
기존 코드
사실 gRPC로 변경해야겠다고 생각한 이유는 TCP Server 부분 때문이다.
TCP Server는 chat/signaling/auth/room 서버 등 여러 서버로 부터 요청을 받기 때문에 각 서버별 동작을 처리하기 위한 방법이 필요했다.
실제 TCP 소켓 통신을 사용할 당시 아래 TCP Server의 문제 코드는 아래와 같다.
아래 코드는 server의 타입으로 요청을 처리하도록 구현한 부분이다.
@NoArgsConstructor
@Data
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "server",
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = TCPChatReceiveRequest.class, name = "chat"),
@JsonSubTypes.Type(value = TCPSignalingReceiveRequest.class, name = "signaling"),
@JsonSubTypes.Type(value = TCPSignalingReceiveSchedulingRequest.class, name = "signaling_scheduling"),
@JsonSubTypes.Type(value = TCPAuthReceiveRequest.class, name = "auth"),
@JsonSubTypes.Type(value = TCPRoomReceiveRequest.class, name = "room")
})
@SuperBuilder
public abstract class TCPMessageReceiveRequest {
private String server;
private String type;
}
아래 코드는 server, type 필드에 의해 동작을 처리하는 부분이다.
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageService {
private final RedisService redisService;
private final TCPRoomClientGateway tcpRoomClientGateway;
private final TCPSignalingClientGateway tcpSignalingClientGateway;
private final TCPAuthClientGateway tcpAuthClientGateway;
private final JsonConverter jsonConverter;
public String processMessage(String message) {
log.info("Received message: {}", message);
ObjectMapper mapper = new ObjectMapper();
try{
TCPMessageReceiveRequest response = jsonConverter.convertFromJson(message, TCPMessageReceiveRequest.class);
String responseMessage = "";
switch (response.getServer()) {
case "chat":
TCPChatReceiveRequest chatRequest = (TCPChatReceiveRequest) response;
if (chatRequest.getType().matches("SUBSCRIBE")) {
RealTimeData realTimeData = redisService.findRealTimeData(chatRequest.getUserId());
if (realTimeData != null) { // 오늘 접속 이력이 있는 경우
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(chatRequest.getSession());
RealTimeData chatSubscribeRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
responseMessage = jsonConverter.convertToJson(chatSubscribeRealTimeData);
} else { // 오늘 접속 이력이 없는 경우
realTimeData = makeRealTimeData(chatRequest);
RealTimeData chatSubscribeRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
responseMessage = jsonConverter.convertToJson(chatSubscribeRealTimeData);
}
} else if (chatRequest.getType().matches("DISCONNECT|UNSUBSCRIBE")) {
String userId = redisService.findUserIdBySessionId(chatRequest.getSession());
if (userId != null) {
redisService.deleteRealTimeDataAndSession(userId, chatRequest.getSession());
responseMessage = mapper.writeValueAsString(redisService.findRealTimeData(userId));
} else {
// 존재하지 않는 접속 이력에 대한 삭제 동작 , 예외처리
}
} else {
// 구독, 구독해제, 연결해제를 제외한 나머지 소켓 동작
};
break;
과거에 사용했던 위의 두 코드는 단점이 존재한다고 생각하여 gRPC로 수정하고자 했다.
- 직관성이 떨어짐
- 필드 값에 의존적
- gRPC에 대한 성능 기대
따라서, gRPC 통신을 사용하였고 장점이 있음을 구현을 통해 확인할 수 있었다.
- 직관성 증대
- server 별로 proto 파일을 구성하여 server 필드에 대한 의존성 제거
- 동작에 따라 rpc 함수를 구성하여 type 필드에 대한 의존성 제거
다만 추가적으로 확인해야할 사항이 존재한다.
- 예외 처리에 대한 직관성?
- 성능이 실제로 높은가?
에 대한 확인이 필요하다.
'프로젝트 > WebRTC - studyhub' 카테고리의 다른 글
[WebRTC] 단위 테스트/통합 테스트 (0) | 2023.11.07 |
---|---|
[WebRTC] 채팅 서버 - 채팅 메시지 순서 보장(카프카 순서 보장) (2) | 2023.11.03 |
[WebRTC] 마이크로서비스 간 gRPC 통신 - 개발편(1) (0) | 2023.08.16 |
[WebRTC] 팀 프로젝트 개요 (0) | 2023.04.09 |