-
Kafka 채팅 파티션 분산 (3)Code 2024. 11. 11. 17:04
↓ 여기서 이어집니다
어느정도 채팅 기능이 동작하고 틀이 잡힐 무렵...
이제 진짜 흐린눈하고 있던 문제를 해결할 때가 됐다고 생각했다...
문제
- 현재 방식에서는 특정 채팅방에 메세지를 전송할 경우, 채팅방 id로 kafka 토픽이 자동생성 된다.
- 즉, client가 첫 메세지를 보내기 전까지는 토픽이 존재하지 않아서 이때 토픽이 생기기 전까지 메세지가 정상적으로 전송되지 않는다.
- 그리고 서비스가 제공되면 채팅방은 무수히 많이 생겨날텐데, 그 때마다 토픽을 하나씩 생성하는 건 너무 비효율적이다.
해결 방법
- Kafka의 특징 중 분산 처리(파티셔닝)를 활용하여 해결한다.
- 채팅방 id에 의해 여러개로 쪼개진 토픽을 chat 하나로 정리하고,
- 파티셔닝을 활용하여 3개의 partition에 채팅 메세지를 뿌려준다.
- 중요한 건, 순서 보장을 위해 partition key를 사용하여 메세지가 정해진 파티션으로만 전송되도록 해야한다.
- mod 연산에 따른 파티션 분리 방법은 다음 블로그의 글들을 참고했다.
구현
1. docker-compose.yaml
원래 이런 식으로 도커 컴포즈파일 올리면서 한번에 토픽 생성&파티션 생성까지 완료할 생각이었는데, 잘 안 돌아갔다..
그래서 그냥 로컬도 마찬가지고 서버 띄웠을 때 kafka 올리고 바로 명령어 실행해서 토픽 생성했다.하하
더보기kafka: image: 'bitnami/kafka:latest' container_name: 'kafka' ports: - '9092:9092' environment: KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181' KAFKA_LISTENERS: 'PLAINTEXT://0.0.0.0:9092' KAFKA_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092' command: - 'sh' - '-c' - " /opt/bitnami/kafka/bin/kafka-server-start.sh & \ sleep 20; \ until /opt/bitnami/kafka/bin/kafka-topics.sh --create --topic chat --partitions 3 --replication-factor 1 --if-not-exists --bootstrap-server localhost:9092; do \ echo 'Waiting for Kafka to be ready...'; \ sleep 5; \ done; \ wait" depends_on: - 'zookeeper'
2. KafkaConstants 설정
public class KafkaConstants { // public static String name = UUID.randomUUID().toString(); public static final String GROUP_ID = "chat-group"; }
하나의 토픽에서 채팅 메세지를 관리할 예정이므로 uuid로 생성 중인 컨슈머 그룹을 하나로 묶어 관리될 수 있도록 해준다.
3. ChatMessageService 수정
producer
- 위에서 참고한 글들과 마찬가지로, 3개의 파티션으로 나눈 후 모듈 연산에 따라 채팅방을 배정한다.
- 채팅방 3, 6, 9 ... → partition 0
- 채팅방 1, 4, 7 ... → partition 1
- 채팅방 2, 5, 8 ... → partition 2
public void sendMessage(Long chatRoomId, ChatMessageRequestDto chatMessageRequestDto) { ChatMessage chatMessage = chatMessageRepository.save(ChatMessage.createMessage( chatRoomId, chatMessageRequestDto.getMemberId(), chatMessageRequestDto.getNickname(), chatMessageRequestDto.getMessage(), null, null)); try { kafkaTemplate.send("chat", chatRoomId.intValue() % 3, null, chatMessage); } catch (Exception e) { log.error("Failed to send message", e); } } public void sendFileMessage(Long chatRoomId, Long memberId, String nickname, List<StorageResponseDto> storageFiles) { for (StorageResponseDto storageResponse : storageFiles) { StorageFile storageFile = storageFileService.findByStorageIdAndId( storageResponse.getStorageId(), storageResponse.getStorageFileId()); Long fileId = storageFile.getId(); String fileName = storageFile.getOriginalName(); ChatMessage chatMessage = chatMessageRepository.save(ChatMessage.createMessage( chatRoomId, memberId, nickname, null, fileId, fileName)); try { kafkaTemplate.send("chat", chatRoomId.intValue() % 3, null, chatMessage); // log.info("Sent message to topic {} on partition {}", chatRoomId, chatRoomId.intValue() % 3); } catch (Exception e) { log.error("Failed to send message", e); } } }
- "chat" 토픽의 모듈러 연산에 따른 나머지 값에 따른 파티션으로 메세지를 전송한다.
- 까먹지 않고 파일 메세지 전송 부분도 챙겨준다..
consumer
@KafkaListener(groupId = "chat-group", topicPartitions = @TopicPartition(topic = "chat", partitions = {"0"})) public void consumeMessagePartition0(ChatMessage chatMessage) { consumeMessage(chatMessage, 0); } @KafkaListener(groupId = "chat-group", topicPartitions = @TopicPartition(topic = "chat", partitions = {"1"})) public void consumeMessagePartition1(ChatMessage chatMessage) { consumeMessage(chatMessage, 1); } @KafkaListener(groupId = "chat-group", topicPartitions = @TopicPartition(topic = "chat", partitions = {"2"})) public void consumeMessagePartition2(ChatMessage chatMessage) { consumeMessage(chatMessage, 2); } private void consumeMessage(ChatMessage chatMessage, int partition) { try { simpMessagingTemplate.convertAndSend("/sub/chat/"+chatMessage.getChatRoomId().toString(), chatMessage); log.info("Received message from partition {}: {}", partition, chatMessage); } catch (Exception e) { log.error("Failed to process message from partition {}: {}", partition, chatMessage, e); } }
테스트를 위해 처리한 파티션과 메세지를 남기는 로그를 찍었습니다...
- consumeMessage 메서드를 수정해서 @KafkaListener 어노테이션을 분리한다. 이 메서드는 단순히 chatMessage와 partition number을 수신한 후, 지정한 웹소켓 경로로 수신한 메세지를 전송하는 동작을 관리하는 로직으로 변경했다.
- @KafkaListener 어노테이션을 통해 "chat-group" 그룹의 컨슈머가 메세지를 받을 수 있도록 지정해 준다.
- 이 때, @TopicPartition 어노테이션으로 특정 토픽("chat")의 특정 파티션(partitions = {""}) 에서만 수신할 수 있도록 해서 정해진 파티션으로 전달된 메세지만 처리한다.
결과
토픽을 제대로 생성하고, 파티셔닝 된 것도 확인했다. 내용도 어렵고 뭔가 많이 변경해야 할 것 같아서 솔직히 뒤로 미뤄뒀던 건데 생각보다 금방 해결됐다! 이로써 채팅방 생성할 때마다 테스트하려고 백엔드 청기백기 안 해도 된다.. 이제서야 제대로 된 채팅방 기능을 구현한 것 같다. 사실 정리된 글이 많아서 도움을 많이 받아가지고 구현 시간은 엄청 길지 않았는데, 블로그 정리하는데 한 세월 걸렸다.. 메세지 큐의 기능 말고도 서비스 분리에 있어서 kafka 기능도 경험해보고 싶다.
'Code' 카테고리의 다른 글
채팅 구현 중 웹소켓 중복 구독, 연결 끊김 문제 해결 (2) (0) 2024.11.03 Kafka 메모 (0) 2024.10.02 Spring, Web Socket, STOMP, MongoDB 환경에서 채팅 구현 (1) (2) 2024.09.26 Kafka, Redis (0) 2024.09.17 [Java] 공공데이터포털 Open API 파싱, JPA (0) 2024.08.08