ABOUT ME

-

Today
-
Yesterday
-
Total
-
  • Spring, Web Socket, STOMP, MongoDB 환경에서 채팅 구현 (1)
    Code 2024. 9. 26. 17:19

    웹소켓 관련 의존성 추가(build.gradle)

        implementation 'org.webjars:webjars-locator-core:0.59'
        implementation 'org.webjars:sockjs-client:1.5.1'
        implementation 'org.webjars:stomp-websocket:2.3.4'

     

    Websocket + STOMP config 구성

    @Configuration
    @EnableWebSocketMessageBroker
    @RequiredArgsConstructor
    public class WebSocketConfig implements WebSocketMessageBrokerConfigurer {
    
        // WebSocket Stomp로 연결하는 흐름에 대한 제어를 위한 interceptor. JWT 인증에 사용
        private final StompHandler stompHandler;
        // WebSocket 연결 시 발생하는 exception 핸들링 목적의 클래스
        private final StompExceptionHandler stompExceptionHandler;
    
        @Override
        public void configureMessageBroker(MessageBrokerRegistry registry) {
            registry.enableSimpleBroker("/sub");
            registry.setApplicationDestinationPrefixes("/pub");
        }
    
        @Override
        public void registerStompEndpoints(StompEndpointRegistry registry) {
            // 엔드포인트 추가 등록
            registry
                    .setErrorHandler(stompExceptionHandler)
                    .addEndpoint("/ws")
                    .addInterceptors()
                    .setAllowedOriginPatterns("*")
                    .withSockJS();
        }
    
        @Override
        public void configureClientInboundChannel(ChannelRegistration registration) {
            // Client로부터 들어오는 메세지를 처리하는 MessageChannel 구성 메소드
            registration.interceptors(stompHandler);
        }
    }

     

    • 이게 나는 WebSocketConfig 라고 클래스명을 지정해 놨으면서 채팅 기능에 한정되는 코드라고 생각했다.. 그래서 Endpoint를 /ws/chat 으로 받았었는데, 프론트 axios 테스트 하는데 여기서 템플릿 어디서 웹소켓 쓰는 곳이 있었는지, 자꾸 오류를 뱉었다(!!)
      지금은 웹소켓을 나만 쓰고 있는데 관련 오류가 터져서 뭔가 했는데 엔드포인트를 /ws로 연결을 시도하고 있었던 거였다..
    • .enableSimpleBroker("/topic", "/queue") 1:n, 1:1로 공식문서에서는 받아주던데 일단 헷갈려서 그냥 /pub, /sub으로

     

    config에 JWT 인증

    StompHandler 설정

    @Component
    @RequiredArgsConstructor
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public class StompHandler implements ChannelInterceptor {
    
        // WebSocket 연결 시 헤더에서 JWT token 유효성 검증
        private final JwtTokenProvider jwtTokenProvider;
        private static final Logger logger = LoggerFactory.getLogger(StompHandler.class);
    
        // presend: STOMP 메세지가 전송되기 전에 호출되어 웹소켓 연결 시 토큰 검증
        @Override
        public Message<?> preSend(Message<?> message, MessageChannel channel) {
            /**
             * 1. StompHeaderAccessor: Stomp 메세지의 헤더에 접근하는 클래스
             * 2. 전송된 Stomp 메세지의 Command가 CONNECT인지 검사
             * 3. StompHeaderAccessor로부터 Authorization 헤더의 JWT 토큰 추출(BEARER <- 이거 제거)
             * 4. jwtAuthenticationFilter로부터 유효한 토큰인지 확인
             */
            final StompHeaderAccessor stompHeaderAccessor = StompHeaderAccessor.wrap(message);
    
            if (StompCommand.CONNECT.equals(stompHeaderAccessor.getCommand())) {
                final String authorization = extractJwt(stompHeaderAccessor);
    
                String token = authorization.substring(7);
                boolean isValid = jwtTokenProvider.validateToken(token);
                if (!isValid) {
                    logger.error("Invalid JWT token: {}", token);
    //                throw new IllegalStateException("Invalid JWT token");
                    return message;
                }
            }
            return message;
        }
    
        // STOMP 헤더에서 Authorization 값 추출
        public String extractJwt(final StompHeaderAccessor accessor) {
            return accessor.getFirstNativeHeader("Authorization");
        }
    }

     

    • 사용자 인증을 어디서 받아야 할 지 고민했다. 웹소켓 handshake 과정에서 아예 토큰값을 보내서 확인할 지, stomp 명령이 시작될 때 보낼지 결론은 stomp 명령어가 connect인 경우에 토큰값을 헤더에 함께 전송하고, 이를  preSend 메소드에서 명령어 실행 전에 확인해줬다.
    • 제대로 된 프론트가 없는 상태에서 간이 테스트로 프론트를 만들어서 로그인 관련된 테스트를 할 수 없었다. 그래서 우선 JWT token 인증 관련한 로직을 주석처리하고(일단 에러는 받아줬다.) 기능 작동만 확인했다.
    • 토큰을 받을 때 Bearer {{token}} 형식으로 들어와서 .substring으로 키값만 받아오도록 했다

    docker-compose.yaml, application.yaml 설정

    • mongoDB, kafka, zookeeper, s3 설정

    MessageController 생성

    @Slf4j
    @Controller
    @RequiredArgsConstructor
    public class ChatMessageController {
    
        private final ChatMessageService chatMessageService;
    
        @MessageMapping("/chat/{chatRoomId}")
        @SendTo("/sub/chat/{chatRoomId}")
        public ChatMessageResponseDto broadcasting(final ChatMessageRequestDto chatMessageRequestDto,
                                                   @DestinationVariable(value = "chatRoomId") final Long chatRoomId) {
            return chatMessageService.saveChatMessage(chatRoomId, chatMessageRequestDto);
        }
    }
    • 채팅방과 참여자에 대한 정보는 mysql에서 관리하고, 메세지는 mongoDB에서 관리했다.

    MongoDBConfig 구성

    @Configuration
    @EnableMongoRepositories("ok.backend.chat.domain.repository")
    @EnableMongoAuditing
    public class MongoDbConfig {
        // _class 칼럼 자동 생성 방지
        @Bean
        public MappingMongoConverter mappingMongoConverter(
                MongoDatabaseFactory mongoDatabaseFactory,
                MongoMappingContext mongoMappingContext
        ) {
            DbRefResolver dbRefResolver = new DefaultDbRefResolver(mongoDatabaseFactory);
            MappingMongoConverter mappingConverter = new MappingMongoConverter(dbRefResolver, mongoMappingContext);
            mappingConverter.setTypeMapper(new DefaultMongoTypeMapper(null));
            return mappingConverter;
        }
    }
    • 이 부분 뭔지 모르겠고 복붙코드임; 없으면 _class 칼럼이 자동으로 생성된다나..
    • 근데 레포지토리 위치를 인식을 못해서 자꾸 오류가 났다. MongoRepository가 존재하는 위치를 명시적으로 넣어줬다.

    Websocket + STOMP test

    • 여기서 너무 오래 해맸다.. 24년 9월 기준으로 STOMP 명령어를 테스트 할 수 있는 툴이 암만 찾아도 없었다;
    1. apic: 공식 docs는 접속이 가능한데 서비스는 닫혔다.
    2. https://jxy.me/websocket-debug-tool/ : 404 에러 뜬다.
    3. Postman (ref: https://velog.io/@handmk/SpringBoot-STOMP-chatting-서버-구축): 안 됨(경우에 따라 될 수도..)
      - Postman은 웹소켓까지는 지원하는데 stomp 명령어를 인식하지를 못함
      - 메세지를 전부 작성해서 send하면 된다고 해서 시도해봤다.
      응답이 오지 않았다.. null 옥텟을 제대로 못 받아줘서 그렇다고 해서, notepad++를 통해 null값을 그대로 복사해서 환경변수로 지정해서 넣어줬다.
      전역변수나 환경변수로 복사해서 넣어주던지, ^@ 코드를 반환하면 된다고 했다. -> 난 안 됨
      - Base64로 메세지 변환해서 전송해서 성공한 사람들도 있길래 이렇게 해봤는데도 안됐다. 포기했다.
    4. https://github.com/cdiptangshu/stomp-client :
      - stomp 테스트를 할 수 있는 툴 안 해봄
    5. https://velog.io/@handmk/SpringBoot-STOMP-chatting-서버-구축 :
      - 결국 이 블로그에서 올려주신 프론트 코드를 긁어와서(감사합니다...) 내 환경에 맞게 수정했다.

    Kafka 설정

    역할

    producer: 메세지를 메세지 브로커의 토픽(Topic)으로 전송, DB에 메세지를 저장

    consumer: 구독하고 있는 토픽의 메세지를 소비, 즉 현재 서버에 메세지를 받을 사용자에게 웹소켓으로 전송

    설치

    1. kafka 설치
    2. application.yml 설정
    3. KafkaConstants 설정
    public class KafkaConstants {
        public static String name = UUID.randomUUID().toString();
        public static final String GROUP_ID = name;
        public static final String BROKER = "localhost:9092";
    }
    • 9092 포트를 사용하고, consumer name은 uuid로 설정

      4. KafkaConsumerConfig, KafkaProducerConfig 설정해서 메세지 직렬화.. 동작 확인 필요

    • KafkaConsumerConfig
    @EnableKafka
    @Configuration
    public class KafkaConsumerConfig {
        // ConsumerFactory를 지정하여 Kafka 컨슈머의 설정을 제공
        @Bean
        public ConcurrentKafkaListenerContainerFactory<String, ChatMessage> kafkaListenerContainerFactory() {
            ConcurrentKafkaListenerContainerFactory<String, ChatMessage> factory = new ConcurrentKafkaListenerContainerFactory<>();
            factory.setConsumerFactory(consumerFactory());
            return factory;
        }
    
        @Bean
        public ConsumerFactory<String, ChatMessage> consumerFactory() {
            JsonDeserializer<ChatMessage> deserializer = new JsonDeserializer<>(ChatMessage.class);
            deserializer.setRemoveTypeHeaders(false);
            deserializer.addTrustedPackages("*"); // 패키지 신뢰 목록에 모든 패키지를 추가
            deserializer.setUseTypeMapperForKey(true); // 메시지 키에 타입 정보 추가
    
            ImmutableMap<String, Object> config = ImmutableMap.<String, Object>builder()
                    .put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER)
                    .put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class)
                    .put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, deserializer)
                    .put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest") // 가장 최신 메세지부터 읽음
                    .put(ConsumerConfig.GROUP_ID_CONFIG, KafkaConstants.GROUP_ID)
                    .build();
    
            return new DefaultKafkaConsumerFactory<>(config, new StringDeserializer(), deserializer);
    
        }
    
    }
    • KafkaProducerConfig
    @EnableKafka
    @Configuration
    public class KafkaProducerConfig {
        // Kafka Producer 생성에 사용(초기화)
        @Bean
        public ProducerFactory<String, ChatMessage> producerFactory() {
            return new DefaultKafkaProducerFactory<>(kafkaProducerConfiguration());
        }
    
        // Producer 설정 정의
        @Bean
        public Map<String, Object> kafkaProducerConfiguration() {
            return ImmutableMap.<String, Object>builder()
                    .put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KafkaConstants.BROKER)
                    .put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class)
                    .put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class)
                    .build();
        }
    
        // ProducerFactory를 사용하여 KafkaTemplate 초기화 -> 메세지 전송하는 api
        @Bean
        public KafkaTemplate<String, ChatMessage> kafkaTemplate() {
            return new KafkaTemplate<>(producerFactory());
        }
    
    }

     

    (여기서 문제)

     

    MongoDB에 객체가 매핑으로 저장되어 JSON으로 저장됨 -> @DBref 설정 문제, MongoDB에 저장되는 데이터는 굳이 객체값까지 받아오지 않아도 된다고 판단해서 ResponseDto를 삭제하고 Id값만 저장함

     

    서비스가 터지지 않는 걸로 봐서 Repository.save 이후 KafkaTemplate.send까지 잘 동작하는 것 같은데 이후 KafkaListener에서 제대로 받아오지 못하는 것 같음

     

    • 카프카 토픽이 제대로 생성되었는지 확인
    # 카프카 배쉬에 접근해서 확인..
    docker exec -i -t kafka bash
    kafka-topics.sh --list --bootstrap-server localhost:9092 # 1, 3 제대로 생성되어 있다
    __consumer_offsets # chat: 용도 - 토픽별 파티션 당 오프셋을 관리하기 위해서 만들어지는 토픽
    
    # Kafka 브로커 상태 확인
    kafka-broker-api-versions.sh --bootstrap-server localhost:9092
    
    # Kafka 토픽의 메세지 직접 확인
    kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1 --from-beginning
    
    # 토픽의 오프셋 상태 확인
    kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 1
    
    # zookeeper 실행
    ./zkServer.sh start
    
    # 토픽 삭제 확인
    kafka-topics.sh --delete --topic 1 --bootstrap-server localhost:9092

     

    [Producer clientId=producer-1] Successfully sent message to topic chat partition 0 at offset 50

    • 컨슈머가 토픽을 제대로 구독하고 있는지 확인
    • sub가 제대로 동작하고 웹소켓에 메세지가 전달되는지 확인
      (토픽의 Leader가 제대로 설정되어 있어야 메세지를 읽어오는듯)

    CRUD api 작업(Service, Controller)

    1. 채팅 서비스(http 통신)
      • 개인 채팅방 생성
      • 단체 채팅방 생성
      • 채팅방 이름 수정
      • 채팅방 삭제
      • 채팅방 참여
      • 채팅방 나가기
      • 채팅 참여자 조회
      • 채팅 목록 조회
    2. 채팅 메세지(ws, mongoDB)
      • 메세지 전송
      • 채팅 파일 업로드
      • 채팅방 메세지 내역 조회
      • 채팅방별 마지막 메세지 시간 조회
    3. 웹소켓
      • ping-pong 로직
      • 마지막으로 웹소켓 연결한 타임스탬프 조회