개요
현재 마이크로서비스 간 데이터 통신 시 TCP를 활용하고 있다.
그러나, gRPC로 방향을 바꿔보고자 한다.
이유로는 크게 3가지가 있다.
- 성능 개선 기대
- 예외 처리의 직관성 개선 기대
- 네트워크 사용량 감소 기대
세가지 경우 모두 확실하지 않지만, 채팅 서버 - 상태관리 서버 사이에 적용하여 다양한 테스트를 해볼 예정이다.
테스트는 아래 3가지를 할 생각이다.
- TCP VS gRPC 송수신 성능 평가
- 예외처리 코드 작성 및 직관성 개선 확인(지극히 개인적, 정성적.. 으로 평가할 예정)
- Prometheus & Grafana를 통한 usage 체크
특히, 현재 가장 문제점은 예외처리 코드에 대한 직관성이 많이 떨어진다.
그 이유로는 TCP통신에 대한 이해도가 부족하여, 예외처리 구현의 미흡함이 있다.
또한, A 서버 -> B 서버로 통신함에 있어 B서버 내부적으로 발생하는 예외들을 관리하는 것이 어렵다고 생각한다.
따라서, 이를 gRPC를 통해 함수형태로 호출하면, 호출한 내용에 대한 응답의 예외를 확인하기 쉬우며, gRPC에서도 다양한 기능을 제공하기 때문에 테스트 후 전체 시스템에 적용해보고자 한다.
개발
설정
grpc에 필요한 라이브러리를 추가한 코드는 아래와 같다.
일반적인 gradle과 차별점은 sourceSets와 protobuf 부분이 있다.
sourceSets : 소스 코드 디렉토리를 정의하여, 유형별 코드를 분리하여 관리하기위해 사용한다. 아래 코드에서는 proto 파일과 proto파일을 통해 생성되는 java 소스 파일을 구분하였으며, java 소스파일은 proto 파일을 통해 생성된다.
protobuf : 해당 블록은 gRPC에서 사용되는 Protocol Buffer 관련 설정을 정의한다. protoc에는 Protocol Buffer의 컴파일러 모듈(+버전?)을 정의하였고, plugins는 Protocol Buffer를 사용하기 위한 gRPC Java 플러그인을 정의하였다. generateProtoTasks 부분은 Protocol Buffers 파일을 컴파일하여 Java 코드로 변환하는 작업을 정의한다.
plugins {
id 'java'
id 'org.springframework.boot' version '2.7.10'
id 'io.spring.dependency-management' version '1.0.15.RELEASE'
id 'com.google.protobuf' version '0.8.18'
}
group = 'pnu.cse.studyhub'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '11'
repositories {
mavenCentral()
}
ext {
set('springCloudVersion', "2021.0.6")
}
def grpcVersion = "1.53.0"
def protocVersion = "3.18.1"
dependencies {
implementation 'org.springframework.boot:spring-boot-starter-integration'
developmentOnly 'org.springframework.boot:spring-boot-devtools'
implementation 'org.springframework.boot:spring-boot-starter-web'
implementation 'org.springframework.integration:spring-integration-ip'
implementation 'org.projectlombok:lombok'
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
//grpc
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
implementation "io.grpc:grpc-stub:${grpcVersion}"
implementation "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
implementation "io.grpc:grpc-netty-shaded:${grpcVersion}"
annotationProcessor 'org.projectlombok:lombok'
testImplementation 'org.springframework.boot:spring-boot-starter-test'
testImplementation 'org.springframework.integration:spring-integration-test'
}
sourceSets {
main {
proto {
srcDirs 'src/main/resources/proto'
}
java {
srcDirs 'build/generated/sources/proto/main/grpc'
srcDirs 'build/generated/sources/proto/main/java'
}
}
}
protobuf {
protoc {
artifact = "com.google.protobuf:protoc:${protocVersion}"
}
plugins {
grpc {
artifact = "io.grpc:protoc-gen-grpc-java:${grpcVersion}"
}
}
generateProtoTasks {
all()*.plugins {
grpc {}
}
}
}
dependencyManagement {
imports {
mavenBom "org.springframework.cloud:spring-cloud-dependencies:${springCloudVersion}"
}
}
tasks.named('test') {
useJUnitPlatform()
}
이에 따라 gradle의 generateProto를 통해 생성된 build 파일들은 아래와 같다.
gRPC를 위한 proto 파일 구현
아래 코드는 gRPC 통신을 위해 구현된 proto파일이다. WebRTC프로젝트에서 활용하던 채팅 서버 <-> 상태관리 서버 간 TCP통신을 대체하고자 구현한 내용이다.
코드에 대한 설명은 주석을 통해 확인할 수 있다.
syntax = "proto3"; //proto3를 사용
option java_multiple_files = true; //java 파일을 여러 개 생성 가능
option java_package = "pnu.cse.studyhub.state.service"; //java 파일의 패키지
import "google/protobuf/empty.proto";
service GrpcMessageService { // 서비스 이름 및 해당 서비스에서 제공하는 메서드 정의
rpc SubscribeChat (ChatSubscribeRequest) returns (ChatActionResponse); // rpc : 메서드 정의
rpc UnsubscribeChat (ChatUnsubscribeRequest) returns (ChatActionResponse);
// rpc UnsubscribeChat (ChatUnsubscribeRequest) returns (google.protobuf.Empty);
}
message ChatSubscribeRequest {
string userId = 1;
int64 roomId = 2;
string session = 3;
}
message ChatActionResponse {
string userId = 1;
}
message ChatUnsubscribeRequest {
string session = 1;
}
gRPC 서비스 로직 구현
기존 코드
아래는 기존에 TCP통신을 위해 구현된 기존 MessageService 코드이다.
이 후 새로 gRPC통신을 사용한 코드를 통해 확인할 수 있겠지만, 가독성 측면에서 많이 개선되었다.
우선, 간략하게 설명하자면 TCP통신은 실시간 상태관리를 빠르게 반영하기위해 HTTP통신을 사용하기보다 TCP 통신을 사용하는 것이 합리적이라고 판단하여 사용했다. 그러나, 채팅서버, 유저서버, 시그널링 서버 등 각기 다른 서버(TCP Client)에서 상태관리서버(TCP Server)로 전달되는 데이터들을 상속 클래스를 통해 처리하다보니 MessageService 자체의 크기가 커지고 복잡해져, 관리하기 어려워졌다.
TCPServerConfig에서 MessageChannel Bean을 분리하여 구현함으로써 TCP에서도 분리할 수 있을 것으로 생각된다. 하지만, TCP 코드를 수정하려던 찰나, gRPC통신으로 구현하고 성능평가를 하여 직접적인 비교를 해보고, gRPC통신으로 변경하는 것을 고려하고 있다.
@AllArgsConstructor
@NoArgsConstructor
@Data
@JsonTypeInfo(
use = JsonTypeInfo.Id.NAME,
include = JsonTypeInfo.As.PROPERTY,
property = "server",
visible = true)
@JsonSubTypes({
@JsonSubTypes.Type(value = TCPChatReceiveRequest.class, name = "chat"),
@JsonSubTypes.Type(value = TCPSignalingReceiveRequest.class, name = "signaling"),
@JsonSubTypes.Type(value = TCPSignalingReceiveSchedulingRequest.class, name = "signaling_scheduling"),
@JsonSubTypes.Type(value = TCPAuthReceiveRequest.class, name = "auth"),
@JsonSubTypes.Type(value = TCPRoomReceiveRequest.class, name = "room")
})
@SuperBuilder
public abstract class TCPMessageReceiveRequest {
private String server;
private String type;
}
- 상속 클래스 구현 상태 -
추상클래스로 메시지를 수신하고, server property 값을 통해 클래스를 결정하였다.
추상 클래스, TCPMessageReceiveRequest로 수신하여, server 프로퍼티를 확인하여 클래스를 결정하였다.
@Service
@Slf4j
@RequiredArgsConstructor
public class MessageService {
private final RedisService redisService;
private final TCPRoomClientGateway tcpRoomClientGateway;
private final TCPSignalingClientGateway tcpSignalingClientGateway;
private final TCPAuthClientGateway tcpAuthClientGateway;
private final JsonConverter jsonConverter;
public String processMessage(String message) {
log.info("Received message: {}", message);
ObjectMapper mapper = new ObjectMapper();
try{
TCPMessageReceiveRequest response = jsonConverter.convertFromJson(message, TCPMessageReceiveRequest.class);
String responseMessage = "";
switch (response.getServer()) {
case "chat":
TCPChatReceiveRequest chatRequest = (TCPChatReceiveRequest) response;
if (chatRequest.getType().matches("SUBSCRIBE")) {
RealTimeData realTimeData = redisService.findRealTimeData(chatRequest.getUserId());
if (realTimeData != null) { // 오늘 접속 이력이 있는 경우
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(chatRequest.getSession());
RealTimeData chatSubscribeRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
responseMessage = jsonConverter.convertToJson(chatSubscribeRealTimeData);
} else { // 오늘 접속 이력이 없는 경우
realTimeData = makeRealTimeData(chatRequest);
RealTimeData chatSubscribeRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
responseMessage = jsonConverter.convertToJson(chatSubscribeRealTimeData);
}
} else if (chatRequest.getType().matches("DISCONNECT|UNSUBSCRIBE")) {
String userId = redisService.findUserIdBySessionId(chatRequest.getSession());
if (userId != null) {
redisService.deleteRealTimeDataAndSession(userId, chatRequest.getSession());
responseMessage = mapper.writeValueAsString(redisService.findRealTimeData(userId));
} else {
// 존재하지 않는 접속 이력에 대한 삭제 동작 , 예외처리
}
} else {
// 구독, 구독해제, 연결해제를 제외한 나머지 소켓 동작
};
break;
-MessageService 구현 상태 -
server property를 통해 요청 타입을 구분하며, case, if 등으로 인해 관리하기 복잡해졌다.
신규 코드
아래 코드에서 확인할 수 있듯이, 채팅 서버에 대한 gRPC 통신 코드이다. GrpcMessageServiceGrpc는 generateProto에 의해 생성된 파일이며, 해당 파일의 인터페이스인 GrpcMessageServiceImplBase를 extends하여 구현하였다.
@Service
@RequiredArgsConstructor
@Slf4j
public class GrpcMessageServiceImpl extends GrpcMessageServiceGrpc.GrpcMessageServiceImplBase {
private final RedisService redisService;
@Override
public void subscribeChat(ChatSubscribeRequest request, StreamObserver<ChatActionResponse> responseObserver) {
log.debug("SubscribeChat request: {}", request);
RealTimeData realTimeData = redisService.findRealTimeData(request.getUserId());
RealTimeData savedRealTimeData = null;
if (realTimeData != null) { // 오늘 접속 이력이 있는 경우
realTimeData.setRoomId(request.getRoomId());
realTimeData.setSessionId(request.getSession());
savedRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
} else { // null, 오늘 접속 이력이 없는 경우
realTimeData = makeRealTimeData(request);
savedRealTimeData = redisService.saveRealTimeDataAndSession(realTimeData);
}
ChatActionResponse response = ChatActionResponse.newBuilder()
.setUserId(savedRealTimeData.getUserId())
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
@Override
public void unsubscribeChat(ChatUnsubscribeRequest request, StreamObserver<ChatActionResponse> responseObserver) {
log.debug("UnsubscribeChat request: {}", request);
String userId = redisService.findUserIdBySessionId(request.getSession());
if (userId != null) {
// 최신 버전으로 수정 요망
redisService.deleteRealTimeDataAndSession(userId, request.getSession());
} else {
// 미접속자의 접속 해제 -> 예외 처리
}
ChatActionResponse response = ChatActionResponse.newBuilder()
.setUserId(userId)
.build();
responseObserver.onNext(response);
responseObserver.onCompleted();
}
// 오늘 접속 이력이 없는 경우 RealTimeData 생성
private RealTimeData makeRealTimeData(ChatSubscribeRequest chatRequest) {
RealTimeData realTimeData = new RealTimeData();
realTimeData.setUserId(chatRequest.getUserId());
realTimeData.setRoomId(chatRequest.getRoomId());
realTimeData.setSessionId(chatRequest.getSession());
return realTimeData;
}
}
proto에서 정의한 message 형태인 ChatSubscribeRequest, ChatUnsubscribeRequest, ChatActionResponse 객체를 사용하여 요청/응답을 처리한다. 이 때, 클라이언트로 응답을 보내는 responseObserver 인터페이스를 사용하며 onNext 메소드를 통해 응답 데이터를 처리하고, onCompleted메소드를 통해 클라이언트로부터의 요청을 완료했음을 알린다.
gRPC 서버 설정
gRPC 서버를 실행하기 위한 클래스이다.
@Configuration
@RequiredArgsConstructor
public class GrpcServerConfig implements ApplicationRunner {
private final RedisService redisService;
private static final int PORT = 8095;
@Override
public void run(ApplicationArguments args) throws Exception {
Server server = ServerBuilder.forPort(PORT)
.addService(new GrpcMessageServiceImpl(redisService))
.build();
server.start();
}
}
PORT 설정 및 Service에 필요한 의존성 주입이 포함되어 있다. 그리고, ApplicationRunner를 통해 Spring Boot 애플리케이션이 실행될 때 메서드가 수행된다.
채팅 서버와 통신하기 위한 GrpcMessageServiceImpl은 addService를 통해 설정되었으며, 다른 서비스 또한 동일한 방법으로 추가 가능하다.
기타 내용
gRPC 서버를 구현하며, gRPC를 위한 Spring Boot Starter 라이브러리가 있는데 그것을 통해 어노테이션을 활용한 구현이 더욱 수월할 것으로 생각된다. 하지만 이 글의 방식으로 설정함으로써도 Spring Boot와 gRPC에 대해 이해하는데 많은 도움이 된 것 같다.
다음 글에선 gRPC 클라이언트를 구현하고, gRPC 클라이언트 및 서버 간 통신을 테스트 할 예정이다.
'프로젝트 > WebRTC - studyhub' 카테고리의 다른 글
[WebRTC] 마이크로서비스 간 gRPC 통신 - 개발편(2) (0) | 2023.11.09 |
---|---|
[WebRTC] 단위 테스트/통합 테스트 (0) | 2023.11.07 |
[WebRTC] 채팅 서버 - 채팅 메시지 순서 보장(카프카 순서 보장) (2) | 2023.11.03 |
[WebRTC] 팀 프로젝트 개요 (0) | 2023.04.09 |