현재 채팅서버는 카프카를 통해 채팅을 구현하였다.
카프카를 통해 채팅 서버가 scale out 하더라도 여러 채팅 서버간 분산되어 있는 메시지를 관리하기 위해서 사용하게 되었다.
하지만 고려 못한 부분이 존재했다. 카프카의 큰 이점 중 하나는 여러 파티션으로 나누어 병렬처리할 수 있다는 점이다. 즉, 많은 메시지가 쌓였을 때, 시스템 리소스(CPU, RAM 등)또는 성능에 따라 Consumer 수를 조정하여 파티션별로 데이터를 읽어오는 병렬처리가 가능하다.
하지만, Kafka는 파티션 내에서만 순서 보장이 될 뿐, 여러 파티션에 대해 병렬 처리하는 경우 순서가 보장될 수 없다.
따라서 이에 대한 해결방법을 고민하였고, 결과적으로 파티션별로 특정 채팅방을 할당하는 방식을 사용하게 되었다.
접근 방법
파티션은 임의로 3개로 설정하였고, 아래와 같이 매핑하여 채팅방에 대한 메시지를 특정 파티션에서 소비하도록 하여 순서를 유지한다.
- 파티션 1의 경우 3,6,9로 끝나는 채팅방
- 파티션 2의 경우 1,4,7로 끝나는 채팅방
- 파티션 3의 경우 2,5,8로 끝나는 채팅방
이 때, 파티션별 채팅방을 소비하는 방법으로, 토픽을 활용하였다.
- 파티션 1의 경우 3,6,9로 끝나는 채팅방 => topic-1
- 파티션 2의 경우 1,4,7로 끝나는 채팅방 => topic-2
- 파티션 3의 경우 2,5,8로 끝나는 채팅방 => topic-3
이를 통해 방의 모듈 연산에 따라 파티션을 분리함으로써, 채팅방의 메시지 순서를 보장할 수 있다.
해결 과정
1. 도커 컴포즈 설정
아래는 docker-compose 설정이다. Kafka 구동 시, 환경변수를 통해 파티션 관련 설정할 수 있다.
이는 Kafka 컨테이너 내부에서 명령어를 통해 설정이 수행된다.
version: '2'
services:
zookeeper:
image: wurstmeister/zookeeper
ports:
- "2181:2181"
kafka:
image: wurstmeister/kafka
ports:
- "9092:9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 127.0.0.1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_NUM_PARTITIONS: 3 #기본 토픽의 파티션 개수
KAFKA_CREATE_TOPICS: "studyhub:3:1" # 토픽의 파티션 수:replication 수
volumes:
- /var/run/docker.sock:/var/run/docker.sock
2. 파티션을 담당할 컨슈머 그룹 생성
파티션을 위한 Bean을 생성하였다.
특별한 부분은 setAckMode인데 Kafka의 Auto Commit을 사용하지 않고 수동 커밋을 사용하기 위함이다.
Auto Commit을 사용하는 경우 Offset 동기화가 가끔 제대로 되지 않을 수 있다는 내용이 있어 수동으로 커밋하기 위해 설정하였다.
(확실하진 않다. 하지만, Auto Commit 시간이나 이벤트 브로커 내부에 데이터를 길게 저장하는 경우 오프셋을 중복해서 consume하는 경우가 있는 것 같다.)
//파티션 1을 담당할 컨슈머 그룹 생성
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup1() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryGroup1());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
@Bean
public ConsumerFactory<String, String> consumerFactoryGroup1(){
JsonDeserializer<Chat> deserializer = new JsonDeserializer<>(Chat.class);
deserializer.setRemoveTypeHeaders(false);
deserializer.addTrustedPackages("*");
deserializer.setUseTypeMapperForKey(true);
return new DefaultKafkaConsumerFactory<>(
consumerConfigurationsGroup1(), new StringDeserializer(), new StringDeserializer()
);
}
@Bean
public Map<String, Object> consumerConfigurationsGroup1() {
Map<String, Object> configurations = new HashMap<>();
configurations.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
configurations.put(ConsumerConfig.GROUP_ID_CONFIG, groupId+"-1");
configurations.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDeserializer);
configurations.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,valueDeserializer);
// "earliser" - 큐의 가장 앞부터 소비하기 시작, "latest" - 가장 최근, 가장 최근에 추가된 것부터 소비하기 시작
configurations.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
configurations.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
return configurations;
}
//파티션 2을 담당할 컨슈머 그룹 생성
@Bean
ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactoryGroup2() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactoryGroup2());
factory.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
return factory;
}
3. KafkaProducer설정
이 글에서는 파티션을 3개 활용하기 때문에 roomId % 3을 통해 파티션을 나누었다. 파티션을 토픽별로 매핑한다.
따라서, kafkaTemplate.send(topic, partition, key, value)를 통해 토픽, 파티션, K-V를 설정하여 브로커에 produce한다.
일반적으로 토픽:파티션을 1:1로 매핑하는 것이 좋은 방법은 아니라고 생각한다. 파티션별 동작하는 것을 확인하기 위해 1:1로 매핑하여 수행한 것뿐이며, 토픽 하나에 여러 파티션을 두는 것이 더욱 좋은 방법이라 생각한다.
토픽에 파티션을 1:N으로 동작하게 하려면, docker-compose에 파티션의 수를 적합하게 늘리고, 아래 코드에서 partition의 규칙에 따라 partition 인자로 넣어주면 된다.
@Service
@RequiredArgsConstructor
@Slf4j
public class KafkaProducer {
private final KafkaTemplate<String, String> kafkaTemplate;
public void send(String topic, Chat chat) {
String type = chat.getMessageType();
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
try {
String content = objectMapper.writeValueAsString(chat);
// kafkaTemplate.send(topic, content);
kafkaTemplate.send(topic+"-"+(chat.getRoomId()%3+1), Integer.valueOf((int)(chat.getRoomId()%3)),chat.get_id(), content);
} catch (JsonProcessingException e) { //json 파싱 실패
e.printStackTrace();
}
}
}
4. KafkaConsumer 설정
KafkaListener에서 읽을 때 topic을 기준으로 읽는다.
Kafka의 Consumer가 메시지를 읽으며 ConsumerRecord<String,String>을 통해 데이터를 읽어온다. Record에서 어떤 파티션에서 읽었는 지, key, value, offset, timestamp등 정보를 얻을 수 있다.
그리고 2번 과정에서 설명했 듯 Auto Commit을 사용하지 않기 때문에 offset을 갱신? 동기화? 하기 위해 acknowledgment.acknowledge()를 수행한다.
@KafkaListener(topics = TOPICS+"-1", containerGroup = GROUPID+"-1")
public void consumeGroup1(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws IOException {
ObjectMapper objectMapper = new ObjectMapper().registerModule(new JavaTimeModule());
Chat chat = objectMapper.readValue(record.value(), Chat.class);
log.info("Consumed Message : " + record.value() + " from Partition : " + record.partition() + " with offset : " + record.offset()+ " in group 1");
template.convertAndSend("/topic/"+chat.getRoomId(), record.value());
acknowledgment.acknowledge();
}
실행 결과
테스트는 아래와 같이 수행하였다. 0~9의 id를 갖는 채팅을 생성하였고, 각각 모듈 연산에 따라 0,1,2의 roomId를 갖는다.
그리고 produce를 수행한 후 consume하는 것을 대기하기 위해 sleep을 수행한다.
public class KafkaOrderTest {
@Autowired
private KafkaProducer kafkaProducer;
@Autowired
private KafkaConsumer kafkaConsumer;
@Test
public void testMessageOrder() throws InterruptedException {
String topic = "studyhub";
for(int i=0; i<10; i++) {
kafkaProducer.send(topic,new Chat(
String.valueOf(i),
(long) i%3,
"test1",
"TEXT",
"test message",
LocalDateTime.now()));
}
Thread.sleep(5000); // consume하기 위한 대기시간
}
}
언뜻 보기에는 순서가 꼬인 것 처럼 보이지만, 파티션 별로 동작이 적절히 수행되었다. 수행 결과를 표로 나타내면 아래와 같다.
_id | roomId | partition |
1 | 1 | 1 |
0 | 0 | 0 |
2 | 2 | 2 |
4 | 1 | 1 |
3 | 0 | 0 |
5 | 2 | 2 |
8 | 2 | 2 |
6 | 0 | 0 |
7 | 1 | 1 |
9 | 0 | 0 |
roomId별로 메시지가 나눠지며, 파티션별로 순서가 보장되는 것을 보이기 위해 roomId 별로 정렬해서 나타내었다.
파티션을 나누기 위한 모듈로 연산(%3) 연산을 수행하여 roomId, partition이 나누어 졌으며 파티션 내부에서 순서가 보장되는 것을 확인할 수 있다
partition 0 : 0->3->6->9
partition 1 : 1->4->7
partition 2 : 2->5->8
_id | roomId | partition |
0 | 0 | 0 |
3 | 0 | 0 |
6 | 0 | 0 |
9 | 0 | 0 |
1 | 1 | 1 |
4 | 1 | 1 |
7 | 1 | 1 |
2 | 2 | 2 |
5 | 2 | 2 |
8 | 2 | 2 |
결론
따라서, 순서를 보장하기 위해 Kafka는 별개의 설정을 해야한다.
이전 나의 방법과 같이 Kafka를 단일 파티션을 사용했을 때는 파티션 내에서는 순서를 보장하기 때문에 메시지의 꼬임이 생기지 않는 것을 보장하나, 여러 파티션을 사용하는 경우 병렬 처리로 인한 순서 꼬임이 발생할 수 있다.
이 글에서는 채팅의 순서 꼬이는 것을 해결하기 위해 roomId를 partition별로 할당함으로써 같은 방은 같은 파티션 내에 produce하게 하였다. 즉 같은 방은 같은 파티션에 할당하여 파티션 내에서는 순서가 보장되는 것을 이용하였다.
다른 방법으로는
1) 파티션 키를 사용하는 것(파티션 키 : 파티션을 분배하는 데 사용),
2) 메시지 키 활용(메시지 키: 특정 토픽 내에서 메시지를 식별하는 데 사용, 메시키 키가 동일하면 같은 파티션으로 분류되는 점 이용),
3) Kafka Streams 활용(메시지 브로커에 들어온 순서를 보장)
가 있는 것으로 생각된다.
'프로젝트 > WebRTC - studyhub' 카테고리의 다른 글
[WebRTC] 마이크로서비스 간 gRPC 통신 - 개발편(2) (0) | 2023.11.09 |
---|---|
[WebRTC] 단위 테스트/통합 테스트 (0) | 2023.11.07 |
[WebRTC] 마이크로서비스 간 gRPC 통신 - 개발편(1) (0) | 2023.08.16 |
[WebRTC] 팀 프로젝트 개요 (0) | 2023.04.09 |