[SSE, Redis]단방향 통신으로 채팅 기능 구현하기

최근에 단방향 통신 채팅을 구현하는 프로젝트를 맡았다.

Java, 스프링으로 구현하는 채팅 기능을 검색해보면 보통 Stomp, 웹소켓으로 구현하는 양방향 통신에 대한 정보가 많이 나오는데 SSE로 만드는 건 댓글이나 채팅에 대한 알림 기능 말고 잘 안나온다.

그러나 요즘 실무에서는 다음과 같은 이유로 단방향 통신을 선호한다고 한다.

  • 단순성: 설정이 간단하고 HTTP 기반이기 때문에 기존 웹 기술과 호환성이 좋아 추가적인 프로토콜 설정이 필요 없다. 일반적인 HTTP 요청을 사용하므로 대부분의 방화벽에서 문제없이 동작한다.
  • 브라우저 지원: 대부분의 최신 브라우저에서 기본적으로 지원한다.
  • 재연결 자동화: 연결이 끊어지면 클라이언트가 자동으로 재연결을 시도하기 때문에 안정적인 통신이 가능하다.

많은 채팅 서비스에서 실제로 양방향 실시간 통신이 필요한 순간은 생각보다 적다. 대부분의 경우, 클라이언트는 HTTP 요청으로 메시지를 전송하고, 서버로부터 실시간으로 메시지를 수신하기만 하면 되기 때문에 SSE(Server-Sent Events)와 Redis는 채팅 서비스 구현을 위한 좋은 대안이다.


1. 기술 소개

(1) SSE(Server-Sent Events)란?

SSE는 서버에서 클라이언트로 단방향 통신을 제공하는 웹 표준 기술로, 기존 HTTP 연결을 통해 서버가 클라이언트에게 실시간으로 데이터를 push할 수 있게 해준다. 서버는 클라이언트에게 지속적으로 데이터 전송을 할 수 있고 클라이언트는 이를 수신한다.

SSE는 아래와 같은 장점들을 제공한다:

  1. 간단한 구현
    • 일반적인 HTTP를 기반으로 동작하여 추가적인 프로토콜 처리가 필요 없다.
    • 클라이언트 측에서는 EventSource API를 통해 쉽게 구현이 가능하다.
  2. 자동 재연결
    • 네트워크 문제로 연결이 끊어졌을 때 자동으로 재연결을 시도한다.
    • 마지막으로 수신한 이벤트 ID를 기억해 끊어진 지점부터 데이터를 다시 수신할 수 있다.
  3. 효율적인 리소스 사용
    • 단방향 통신이므로 불필요한 연결 유지 비용이 적다.
    • 특히 모바일 환경에서 배터리 소모를 줄일 수 있다.

(2) Redis를 선택한 이유

Redis는 로그인 정보를 세션으로 저장하는 데에서부터 사용했는데 Redis의 Pub/Sub 기능을 실시간 메시징 시스템 구현에 도입했다:

  1. 고성능 메시지 브로커
    • In-memory 데이터 저장소로서 초고속 메시지 전달이 가능하다.
    • Pub/Sub 모델을 통해 효율적인 메시지 브로드캐스팅을 지원한다.
  2. 확장성
    • 수평적 확장이 용이하여 대규모 트래픽 처리가 가능하다.
    • 클러스터 모드를 통해 고가용성을 확보할 수 있다.
  3. 단순한 구조
    • 복잡한 메시지 브로커 시스템에 비해 설정과 관리가 간단하다.
    • 가볍고 빠른 특성으로 인해 실시간 채팅에 적합하다

2. WebSocket과의 차이점

(1) 웹소켓

  • 양방향 통신: 클라이언트와 서버 간의 양방향 통신 지원
  • 프로토콜: 자체적인 프로토콜 사용
  • 연결 유지: 지속적 연결을 통한 낮은 레이턴시(Latency) 제공
클라이언트  <-------->  서버
  (메시지 전송)        (메시지 수신)

(2) SSE

  • 단방향 통신: 서버 → 클라이언트만 지원
  • 프로토콜: HTTP 프로토콜을 기반으로 작동
  • 연결 유지: HTPP 연결을 유지하는 방식으로 데이터 전송
클라이언트  <--------  서버
                (이벤트 전송)

3. 채팅 기능 만들기

(1) SwaggerHub API 정리

(1) Chat (채팅 관련 API)

  • GET /api/chat/rooms - 채팅방 목록 조회
  • POST /api/chat/rooms - 채팅방 생성
  • GET /api/chat/rooms/{roomId} - 채팅방 상세 정보 조회
  • DELETE /api/chat/rooms/{roomId} - 채팅방 삭제
  • GET /api/chat/rooms/{roomId}/messages - 채팅 메시지 조회 (이전 메시지 불러오기 가능)
  • POST /api/chat/rooms/{roomId}/messages - 메시지 전송
  • GET /api/chat/rooms/{roomId}/subscribe - SSE 구독 (실시간 메시지 수신 가능)
  • POST /api/chat/users/{userId}/rooms - 1:1 채팅방 생성

(2) User (사용자 관련 API)

  • GET /api/chat/users - 사용자 목록 조회(채팅방을 생성할 사용자를 선택)

(2) 주요 설정 코드

AsyncConfig: SSE 비동기 설정

비동기 처리를 위한 Thread Pool 설정을 담당한다. SSE 연결은 장시간 유지되어야 하므로 적절한 스레드 관리가 중요하다.

java
Copy
@Configuration
@EnableAsync
public class AsyncConfig implements AsyncConfigurer {
    @Override
    public Executor getAsyncExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(5);// 기본 실행 대기하는 Thread 수
        executor.setMaxPoolSize(10);// 동시 동작하는 최대 Thread 수
        executor.setQueueCapacity(25);// Queue 크기
        executor.setThreadNamePrefix("chatAsync-");
        executor.setWaitForTasksToCompleteOnShutdown(true);
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor;
    }
}

RedisConfig: Redis 설정

Redis의 Pub/Sub 기능을 사용하기 위한 설정으로 메시지 직렬화/역직렬화 설정과 리스너 설정을 포함한다.

java
Copy
@Configuration
public class RedisConfig {
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> template = new RedisTemplate<>();
        template.setConnectionFactory(redisConnectionFactory());
        template.setKeySerializer(new StringRedisSerializer());
        template.setValueSerializer(new Jackson2JsonRedisSerializer<>(Object.class));
        return template;
    }

    @Bean
    public RedisMessageListenerContainer redisMessageListenerContainer(
            RedisConnectionFactory connectionFactory,
            MessageListenerAdapter listener,
            ChannelTopic topic) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.addMessageListener(listener, ChannelTopic.of("chatroom"));
        return container;
    }
}

ChatSseController: SSE 엔드포인트

클라이언트의 SSE 연결 요청을 처리하는 메인 컨트롤러로 채팅방 생성, 참가자 추가, SSE 연결 설정을 담당한다.

java
Copy
@RestController
@RequiredArgsConstructor
@RequestMapping("/api/chat/rooms")
public class ChatSseController {
    @GetMapping(value = "/{roomId}/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
    public ResponseEntity<SseEmitter> subscribe(
            @PathVariable Integer roomId,
            @RequestParam Integer userId) {
// 채팅방 존재 여부 확인 및 생성
        if (!chatService.isRoomExists(roomId)) {
            chatService.createRoom(roomId, "Room " + roomId);
        }

// 참가자 추가
        chatService.addParticipant(roomId, userId);

// SSE 연결 생성
        SseEmitter emitter = sseEmitterService.createEmitter(roomId, userId);
        return ResponseEntity.ok(emitter);
    }
}

(3) 메시지 처리 흐름

채팅 시스템의 핵심인 메시지 처리는 ChatController를 통해 이루어진다.

채팅방 관리

java
Copy
@RestController
@RequestMapping("/chat")
@RequiredArgsConstructor
public class ChatController {
    private final ChatService chatService;

// 채팅방 목록 조회
    @GetMapping("/rooms")
    public ResponseEntity<List<ChatRoom>> getRooms(@RequestParam(required = false) Integer userId) {
        if (userId == null) {
            log.error("userId 파라미터가 없습니다.");
            return ResponseEntity.badRequest().body(Collections.emptyList());
        }
        return ResponseEntity.ok(chatService.getRooms(userId));
    }

// 1:1 채팅방 생성
    @PostMapping("/rooms")
    public ResponseEntity<ChatRoom> createRoom(@RequestParam Integer userId) {
        ChatRoom room = chatService.createRoom(userId);
        return ResponseEntity.status(HttpStatus.CREATED).body(room);
    }
}

메시지 처리

java
Copy
@RestController
@RequestMapping("/api/chat")
public class ChatController {
// 채팅방 메시지 조회 (이전 메시지 로드)
    @GetMapping("/rooms/{roomId}/messages")
    public ResponseEntity<List<ChatMessage>> getMessages(
            @PathVariable Integer roomId,
            @RequestParam(required = false) Long lastMessageId) {
        List<ChatMessage> messages = chatService.getMessages(roomId, lastMessageId);
        return ResponseEntity.ok(messages);
    }

// 메시지 전송
    @PostMapping("/rooms/{roomId}/messages")
    public ResponseEntity<ChatMessage> sendMessage(
            @PathVariable Integer roomId,
            @RequestBody ChatMessage message) {
        message.setRoomId(roomId);
        if (message.getMessageType() == null || message.getMessageType().isEmpty()) {
            message.setMessageType("MESSAGE");
        }

        ChatMessage savedMessage = chatService.saveAndPublishMessage(roomId, message);
        return ResponseEntity.ok(savedMessage);
    }
}


4. 메시지 처리 흐름도

메시지 전송 시나리오:

  • 클라이언트가 /api/chat/rooms/{roomId}/messages 엔드포인트로 메시지 전송
  • ChatService가 메시지를 저장하고 Redis pub/sub 채널에 발행
  • Redis Subscriber가 메시지를 수신하여 해당 채팅방의 모든 SSE 연결에 브로드캐스트

메시지 수신 시나리오:

  • 클라이언트가 /api/chat/rooms/{roomId}/subscribe 엔드포인트로 SSE 연결 수립
  • Redis pub/sub을 통해 새 메시지 수신 시 해당 채팅방의 SSE 연결로 전송
  • 클라이언트의 EventSource가 메시지를 수신하여 화면에 표시

이러한 설정들을 통해 Redis의 Pub/Sub 기능과 SSE를 결합하여 효율적인 실시간 채팅 시스템을 구현할 수 있었다.