-
Spring, Web Socket, STOMP, MongoDB 환경에서 채팅 구현 (1)Code 2024. 9. 26. 17:19
웹소켓 관련 의존성 추가(build.gradle)
implementation 'org.webjars:webjars-locator-core:0.59' implementation 'org.webjars:sockjs-client:1.5.1' implementation 'org.webjars:stomp-websocket:2.3.4'
Websocket + STOMP config 구성
@Configuration @EnableWebSocketMessageBroker @RequiredArgsConstructor public class WebSocketConfig implements WebSocketMessageBrokerConfigurer { // WebSocket Stomp로 연결하는 흐름에 대한 제어를 위한 interceptor. JWT 인증에 사용 private final StompHandler stompHandler; // WebSocket 연결 시 발생하는 exception 핸들링 목적의 클래스 private final StompExceptionHandler stompExceptionHandler; @Override public void configureMessageBroker(MessageBrokerRegistry registry) { registry.enableSimpleBroker("/sub"); registry.setApplicationDestinationPrefixes("/pub"); } @Override public void registerStompEndpoints(StompEndpointRegistry registry) { // 엔드포인트 추가 등록 registry .setErrorHandler(stompExceptionHandler) .addEndpoint("/ws") .addInterceptors() .setAllowedOriginPatterns("*") .withSockJS(); } @Override public void configureClientInboundChannel(ChannelRegistration registration) { // Client로부터 들어오는 메세지를 처리하는 MessageChannel 구성 메소드 registration.interceptors(stompHandler); } }
- 이게 나는 WebSocketConfig 라고 클래스명을 지정해 놨으면서 채팅 기능에 한정되는 코드라고 생각했다.. 그래서 Endpoint를 /ws/chat 으로 받았었는데, 프론트 axios 테스트 하는데 여기서 템플릿 어디서 웹소켓 쓰는 곳이 있었는지, 자꾸 오류를 뱉었다(!!)
지금은 웹소켓을 나만 쓰고 있는데 관련 오류가 터져서 뭔가 했는데 엔드포인트를 /ws로 연결을 시도하고 있었던 거였다.. - .enableSimpleBroker("/topic", "/queue") 1:n, 1:1로 공식문서에서는 받아주던데 일단 헷갈려서 그냥 /pub, /sub으로
config에 JWT 인증
StompHandler 설정
@Component @RequiredArgsConstructor @Order(Ordered.HIGHEST_PRECEDENCE) public class StompHandler implements ChannelInterceptor { // WebSocket 연결 시 헤더에서 JWT token 유효성 검증 private final JwtTokenProvider jwtTokenProvider; private static final Logger logger = LoggerFactory.getLogger(StompHandler.class); // presend: STOMP 메세지가 전송되기 전에 호출되어 웹소켓 연결 시 토큰 검증 @Override public Message<?> preSend(Message<?> message, MessageChannel channel) { /** * 1. StompHeaderAccessor: Stomp 메세지의 헤더에 접근하는 클래스 * 2. 전송된 Stomp 메세지의 Command가 CONNECT인지 검사 * 3. StompHeaderAccessor로부터 Authorization 헤더의 JWT 토큰 추출(BEARER <- 이거 제거) * 4. jwtAuthenticationFilter로부터 유효한 토큰인지 확인 */ final StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message); if (StompCommand.CONNECT.equals(stompHeaderAccessor.getCommand())) { final String authorization = extractJwt(stompHeaderAccessor); String token = authorization.substring(7); boolean isValid = jwtTokenProvider.validateToken(token); if (!isValid) { logger.error("Invalid JWT token: {}", token); // throw new IllegalStateException("Invalid JWT token"); return message; } } return message; } // STOMP 헤더에서 Authorization 값 추출 public String extractJwt(final StompHeaderAccessor accessor) { return accessor.getFirstNativeHeader("Authorization"); } }
- 사용자 인증을 어디서 받아야 할 지 고민했다. 웹소켓 handshake 과정에서 아예 토큰값을 보내서 확인할 지, stomp 명령이 시작될 때 보낼지 결론은 stomp 명령어가 connect인 경우에 토큰값을 헤더에 함께 전송하고, 이를 preSend 메소드에서 명령어 실행 전에 확인해줬다.
- 제대로 된 프론트가 없는 상태에서 간이 테스트로 프론트를 만들어서 로그인 관련된 테스트를 할 수 없었다. 그래서 우선 JWT token 인증 관련한 로직을 주석처리하고(일단 에러는 받아줬다.) 기능 작동만 확인했다.
- 토큰을 받을 때 Bearer {{token}} 형식으로 들어와서 .substring으로 키값만 받아오도록 했다
docker-compose.yaml, application.yaml 설정
- mongoDB, kafka, zookeeper, s3 설정
MessageController 생성
@Slf4j @Controller @RequiredArgsConstructor public class ChatMessageController { private final ChatMessageService chatMessageService; @MessageMapping("/chat/{chatRoomId}") @SendTo("/sub/chat/{chatRoomId}") public ChatMessageResponseDto broadcasting(final ChatMessageRequestDto chatMessageRequestDto, @DestinationVariable(value = "chatRoomId") final Long chatRoomId) { return chatMessageService.saveChatMessage(chatRoomId, chatMessageRequestDto); } }
- 채팅방과 참여자에 대한 정보는 mysql에서 관리하고, 메세지는 mongoDB에서 관리했다.
MongoDBConfig 구성
@Configuration @EnableMongoRepositories("ok.backend.chat.domain.repository") @EnableMongoAuditing public class MongoDbConfig { // _class 칼럼 자동 생성 방지 @Bean public MappingMongoConverter mappingMongoConverter( MongoDatabaseFactory mongoDatabaseFactory, MongoMappingContext mongoMappingContext ) { DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDatabaseFactory); MappingMongoConverter mappingConverter = new MappingMongoConverter(dbRefResolver, mongoMappingContext); mappingConverter.setTypeMapper(new DefaultMongoTypeMapper(null)); return mappingConverter; } }
- 이 부분 뭔지 모르겠고 복붙코드임; 없으면 _class 칼럼이 자동으로 생성된다나..
- 근데 레포지토리 위치를 인식을 못해서 자꾸 오류가 났다. MongoRepository가 존재하는 위치를 명시적으로 넣어줬다.
Websocket + STOMP test
- 여기서 너무 오래 해맸다.. 24년 9월 기준으로 STOMP 명령어를 테스트 할 수 있는 툴이 암만 찾아도 없었다;
- apic: 공식 docs는 접속이 가능한데 서비스는 닫혔다.
- https://jxy.me/websocket-debug-tool/ : 404 에러 뜬다.
- Postman (ref: https://velog.io/@handmk/SpringBoot-STOMP-chatting-서버-구축): 안 됨(경우에 따라 될 수도..)
- Postman은 웹소켓까지는 지원하는데 stomp 명령어를 인식하지를 못함
- 메세지를 전부 작성해서 send하면 된다고 해서 시도해봤다.
응답이 오지 않았다.. null 옥텟을 제대로 못 받아줘서 그렇다고 해서, notepad++를 통해 null값을 그대로 복사해서 환경변수로 지정해서 넣어줬다.
전역변수나 환경변수로 복사해서 넣어주던지, ^@ 코드를 반환하면 된다고 했다. -> 난 안 됨
- Base64로 메세지 변환해서 전송해서 성공한 사람들도 있길래 이렇게 해봤는데도 안됐다. 포기했다. - https://github.com/cdiptangshu/stomp-client :
- stomp 테스트를 할 수 있는 툴 안 해봄 - https://velog.io/@handmk/SpringBoot-STOMP-chatting-서버-구축 :
- 결국 이 블로그에서 올려주신 프론트 코드를 긁어와서(감사합니다...) 내 환경에 맞게 수정했다.
Kafka 설정
역할
producer: 메세지를 메세지 브로커의 토픽(Topic)으로 전송, DB에 메세지를 저장
consumer: 구독하고 있는 토픽의 메세지를 소비, 즉 현재 서버에 메세지를 받을 사용자에게 웹소켓으로 전송
설치
- kafka 설치
- application.yml 설정
- KafkaConstants 설정
public class KafkaConstants { public static String name = UUID.randomUUID().toString(); public static final String GROUP_ID = name; public static final String BROKER = "localhost:9092"; }
- 9092 포트를 사용하고, consumer name은 uuid로 설정
4. KafkaConsumerConfig, KafkaProducerConfig 설정해서 메세지 직렬화.. 동작 확인 필요
- KafkaConsumerConfig
@EnableKafka @Configuration public class KafkaConsumerConfig { // ConsumerFactory를 지정하여 Kafka 컨슈머의 설정을 제공 @Bean public ConcurrentKafkaListenerContainerFactory<String, ChatMessage> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, ChatMessage> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } @Bean public ConsumerFactory<String, ChatMessage> consumerFactory() { JsonDeserializer<ChatMessage> deserializer = new JsonDeserializer<>(ChatMessage.class); deserializer.setRemoveTypeHeaders(false); deserializer.addTrustedPackages("*"); // 패키지 신뢰 목록에 모든 패키지를 추가 deserializer.setUseTypeMapperForKey(true); // 메시지 키에 타입 정보 추가 ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder() .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER) .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class) .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer) .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // 가장 최신 메세지부터 읽음 .put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID) .build(); return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer); } }
- KafkaProducerConfig
@EnableKafka @Configuration public class KafkaProducerConfig { // Kafka Producer 생성에 사용(초기화) @Bean public ProducerFactory<String, ChatMessage> producerFactory() { return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration()); } // Producer 설정 정의 @Bean public Map<String, Object> kafkaProducerConfiguration() { return ImmutableMap.<String, Object>builder() .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER) .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class) .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class) .build(); } // ProducerFactory를 사용하여 KafkaTemplate 초기화 -> 메세지 전송하는 api @Bean public KafkaTemplate<String, ChatMessage> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
(여기서 문제)
MongoDB에 객체가 매핑으로 저장되어 JSON으로 저장됨 -> @DBref 설정 문제, MongoDB에 저장되는 데이터는 굳이 객체값까지 받아오지 않아도 된다고 판단해서 ResponseDto를 삭제하고 Id값만 저장함
서비스가 터지지 않는 걸로 봐서 Repository.save 이후 KafkaTemplate.send까지 잘 동작하는 것 같은데 이후 KafkaListener에서 제대로 받아오지 못하는 것 같음
- 카프카 토픽이 제대로 생성되었는지 확인
# 카프카 배쉬에 접근해서 확인.. docker exec -i -t kafka bash kafka-topics.sh --list --bootstrap-server localhost:9092 # 1, 3 제대로 생성되어 있다 __consumer_offsets # chat: 용도 - 토픽별 파티션 당 오프셋을 관리하기 위해서 만들어지는 토픽 # Kafka 브로커 상태 확인 kafka-broker-api-versions.sh --bootstrap-server localhost:9092 # Kafka 토픽의 메세지 직접 확인 kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1 --from-beginning # 토픽의 오프셋 상태 확인 kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 1 # zookeeper 실행 ./zkServer.sh start # 토픽 삭제 확인 kafka-topics.sh --delete --topic 1 --bootstrap-server localhost:9092
[Producer clientId=producer-1] Successfully sent message to topic chat partition 0 at offset 50
- 컨슈머가 토픽을 제대로 구독하고 있는지 확인
- sub가 제대로 동작하고 웹소켓에 메세지가 전달되는지 확인
(토픽의 Leader가 제대로 설정되어 있어야 메세지를 읽어오는듯)
CRUD api 작업(Service, Controller)
- 채팅 서비스(http 통신)
- 개인 채팅방 생성
- 단체 채팅방 생성
- 채팅방 이름 수정
- 채팅방 삭제
- 채팅방 참여
- 채팅방 나가기
- 채팅 참여자 조회
- 채팅 목록 조회
- 채팅 메세지(ws, mongoDB)
- 메세지 전송
- 채팅 파일 업로드
- 채팅방 메세지 내역 조회
- 채팅방별 마지막 메세지 시간 조회
- 웹소켓
- ping-pong 로직
- 마지막으로 웹소켓 연결한 타임스탬프 조회
'Code' 카테고리의 다른 글
채팅 구현 중 웹소켓 중복 구독, 연결 끊김 문제 해결 (2) (0) 2024.11.03 Kafka 메모 (0) 2024.10.02 Kafka, Redis (0) 2024.09.17 [Java] 공공데이터포털 Open API 파싱, JPA (0) 2024.08.08 [Java] 자주 사용되는 Lombok 어노테이션 (0) 2024.06.22 - 이게 나는 WebSocketConfig 라고 클래스명을 지정해 놨으면서 채팅 기능에 한정되는 코드라고 생각했다.. 그래서 Endpoint를 /ws/chat 으로 받았었는데, 프론트 axios 테스트 하는데 여기서 템플릿 어디서 웹소켓 쓰는 곳이 있었는지, 자꾸 오류를 뱉었다(!!)