-
Kafka 설정
역할
producer: 메세지를 메세지 브로커의 토픽(Topic)으로 전송, DB에 메세지를 저장
consumer: 구독하고 있는 토픽의 메세지를 소비, 즉 현재 서버에 메세지를 받을 사용자에게 웹소켓으로 전송
설치
1. kafka 설치
2. application.yml 설정
spring:
kafka:
bootstrap-servers: localhost:9092
zookeeper: port 2181
kafka: port 9092
3. ProducerConfiguration: ReactiveKafkaProducerTemplate 빈으로 등록
ReactiveKafkaProducerTemplate: 메세지 발행하는 과정 추상화 -> Reactor Kafka(KafkaSender)이 메세지 발행
ConsumerConfiguration: ReactiveKafkaConsumerTemplate
directAllOrNothing(): 구독자가 없는 경우 tryEmitNext() 실패 -> 구독자가 없을 때 컨슈머가 받은 메세지를 고려하지 않음
문제
MongoDB에 객체가 매핑으로 저장되어 JSON으로 저장됨 -> @DBref 설정 문제, MongoDB에 저장되는 데이터는 굳이 객체값까지 받아오지 않아도 된다고 판단해서 ResponseDto를 삭제하고 Id값만 저장함
서비스가 터지지 않는 걸로 봐서 Repository.save 이후 KafkaTemplate.send까지 잘 동작하는 것 같은데 이후 KafkaListener에서 제대로 받아오지 못하는 것 같음
1. 카프카 토픽이 제대로 생성되었는지 확인
카프카 배쉬에 접근해서 확인..
docker exec -i -t kafka bash
kafka-topics.sh --list --bootstrap-server localhost:9092 -> 1, 3 제대로 생성되어 있다
__consumer_offsets -> chat: 용도 - 토픽별 파티션 당 오프셋을 관리하기 위해서 만들어지는 토픽
Kafka 브로커 상태 확인
kafka-broker-api-versions.sh --bootstrap-server localhost:9092
Kafka 토픽의 메세지 직접 확인
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic 1 --from-beginning
토픽의 오프셋 상태 확인
kafka-topics.sh --describe --bootstrap-server localhost:9092 --topic 1
zookeeper 실행
./zkServer.sh start
kafka-topics.sh --delete --topic 1 --bootstrap-server localhost:9092
[Producer clientId=producer-1] Successfully sent message to topic chat partition 0 at offset 50
2. 컨슈머가 토픽을 제대로 구독하고 있는지 확인
3. sub가 제대로 동작하고 웹소켓에 메세지가 전달되는지 확인
토픽의 Leader가 제대로 설정되어 있어야 메세지를 읽어오는듯
docker exec -it kafka \
kafka-topics.sh --describe --topic chat --bootstrap-server localhost:9092
docker exec -it kafka \
kafka-topics.sh --create --topic chat --partitions 3 --replication-factor 1 --bootstrap-server localhost:9092
'Code' 카테고리의 다른 글
Kafka 채팅 파티션 분산 (3) (2) 2024.11.11 채팅 구현 중 웹소켓 중복 구독, 연결 끊김 문제 해결 (2) (0) 2024.11.03 Spring, Web Socket, STOMP, MongoDB 환경에서 채팅 구현 (1) (2) 2024.09.26 Kafka, Redis (0) 2024.09.17 [Java] 공공데이터포털 Open API 파싱, JPA (0) 2024.08.08