본문으로 바로가기

채팅 서버 구축 (2) - STOMP

앞에서 스프링에 내장된 인메모리 메시지 브로커가 아닌 외부 메시지 브로커가 필요한 이유에 대해 알아보았다. 또한, Websocket과 STOMP만 이용해서 채팅 서버를 구현하면 여러 문제점이 존재한다. 따라서 외부 브로커의 종류에 대해 알아보고, 그 중에서 먼저 Redis를 이용하여 채팅 서버를 구현해보려고 한다.

 

목차

  • 채팅 서비스 고도화
    • Websocket + STOMP만을 이용한 채팅 서버 문제점
    • 외부 브로커 종류
  • Spring Redis Pub/Sub
    • Redis 기본 설정
    • Redis 발행/구독 모델 구현
    • Redis를 이용한 채팅 서버 구현
  • M1(ARM)에서 Embedded Redis 실행 이슈

 

채팅 서비스 고도화

Websocket + STOMP만을 이용한 채팅 서버 문제점

  • 서버를 재시작 할 때마다 채팅방 정보들이 리셋된다.
    • 채팅방의 저장소가 없기 때문에 서버의 메모리에 적재된 채팅방은 서버를 재시작할 때마다 초기화되는 문제가 있다.
    • 따라서 DB 혹은 다른 저장소를 이용하여 채팅방이 계속 유지되도록 처리가 필요하다.
    • 해결: Redis 저장소를 이용
  • 채팅 서버가 여러 대일 경우, 서버간 채팅방 공유가 불가능해진다.
    • Websocket과 STOMP의 pub/sub을 이용하여 구현하면 pub/sub이 발생한 서버 내에서만 메시지를 주고받는 것이 가능하다.
      • 채팅방(Topic)이 생성된 서버 안에서만 유효하기 때문에 다른 서버로 접속한 클라이언트는 채팅방이 보이지도 않고, 접근 과 구독도 불가능해진다.
    • 따라서 구독 대상(Topic)이 여러 서버에서 접근할 수 있도록 개선이 필요하다.
    • 해결: 공통으로 사용할 수 있는 pub/sub 시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 pub/sub 메시지를 주고받도록 해야 한다. (redis, kafka, rabbitMQ 등 존재)

 

외부 브로커 종류

 

RabbitMQ

  • 다양한 비즈니스에 의한 복잡한 라우팅 설계에 적합한 메시지 브로커
  • AMQP를 사용하여 지점 간 방법과 게시-구독 방법을 통해 메시지를 전달한다. 
    • AMQP(Advanced Message Queuing Protocols): 애플리케이션간에 데이터를 주고받을 때, 메시지 미들웨어 브로커를 통해 데이터를 주고받을 수 있게 해주는 메시징 프로토콜
  • 신뢰성 있는 메시지를 전송할 수 있기 때문에 정확한 요청-응답이 필요하거나, 트래픽은 작지만 장시간 실행되고 안정적인 작업이 필요한 경우에 사용할 수 있다.

 

Redis

  • 고성능 Key-Value 저장소 또는 메시지 브로커로 사용할 수 있는 인메모리 데이터 저장소이다.
  • Pub/Sub 기능
    • 다른 메시지 브로커와 다르게 메시지 지속성이 없다.
      • 즉, 메시지를 전송한 후 해당 메시지는 삭제되고 Redis 어디에도 저장되지 않는다.
      • 따라서 channel에 구독하고 있지 않으면 메시지가 유실 될 수 있다. 또한, 채팅방의 채팅이 많아질 경우 채팅 순서가 보장되지 않을 수 있다.
      • 메시지 전송 신뢰성을 보장하지 않기 때문에 단점을 보완할 별도의 추가 구현(Redis Streams)이 필요할 수 있다.
  • 실시간으로 빠르게 데이터 처리를 해야하는 서비스와 인메모리 데이터베이스로 인해 지속성이 중요하지 않고, 일정 수준의 손실을 처리할 수 있는 짧은 보존 메시지에 적합하다.

 

Kafka

  • 대량의 데이터를 저장하면서 높은 처리량이 필요한 곳에 적합한 메시지 브로커
  • Pub/Sub 모델의 메시지 큐로 분산환경에 특화되어 있는 특징을 가지는 서버이다.
    • 다수의 서버가 하나의 서비스를 이루는 경우, kafka를 이용하여 분산처리를 하는데 각 서버가 통신하는 방식은 Pub/Sub을 이용한다.
    • 이 Pub/Sub 메시지들은 메시지 큐의 형태로 순차적으로 처리되며, 모든 메시지들은 로그로 처리된다.
    • 이 특성을 이용하면 서비스의 안정성을 확보할 수 있다.

 

Spring Redis Pub/Sub

먼저 local에서는 Redis 설치없이 간단하게 Embedded Resis를 사용하여 환경을 구축한다.

 

Redis 기본 설정

 

의존성 추가

implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation ('it.ozimov:embedded-redis:0.7.3') {
    exclude group: 'org.slf4j', module: 'slf4j-simple'
}
  • Embeded Redis를 사용하면 local에서도 Redis 설치 없이 간단하게 사용할 수 있다고 한다. 
    • [참고] 하지만 M1의 경우, Embeded Redis를 지원하지 않는다. 밑에서 해당 문제를 해결할 것이다.
  • Embeded Redis를 사용하면 Slf4j logger의 중복으로 인해 컴파일 에러가 생길 수 있다. 그래서 해당 모듈을 제거해준다.

 

Embeded Redis 서버 사용을 위한 설정

/**
 * 로컬 환경일 경우, 내장 레디스 실행
 */
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void redisServer() {
    	int port = isRedisRunning() ? findAvailablePort() : redisPort;
        redisServer = new RedisServer(port);
        redisServer.start();
    }

    @PreDestroy
    public void stopRedis() {
        if (redisServer != null) {
            redisServer.stop();
        }
    }
    
    /**
     * Embedded Redis가 현재 실행중인지 확인
     */
    private boolean isRedisRunning() throws IOException {
        return isRunning(executeGrepProcessCommand(redisPort));
    }
    
    /**
     * 해당 Process가 현재 실행중인지 확인
     */
    private boolean isRunning(Process process) {
        String line;
        StringBuilder pidInfo = new StringBuilder();
        try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
            while ((line = input.readLine()) != null) {
                pidInfo.append(line);
            }
        }
        catch (Exception e) {
        }
        return StringUtils.hasText(pidInfo.toString());
    }

    /**
     * 현재 PC/서버에서 사용가능한 포트 조회
     */
    public int findAvailablePort() throws IOException {
        for (int port = 10000; port <= 65535; port++) {
            Process process = executeGrepProcessCommand(port);
            if (!isRunning(process)) {
                return port;
            }
        }
        throw new IllegalArgumentException("Not Found Available port: 10000 ~ 65535");
    }

    /**
     * 해당 port를 사용중인 프로세스 확인하는 sh 실행
     */
    private Process executeGrepProcessCommand(int port) throws IOException {
        String command = String.format("netstat -nat | grep LISTEN|grep %d", port);
        String[] shell = {"/bin/sh", "-c", command};
        return Runtime.getRuntime().exec(shell);
    }
}
  • 채팅 서버가 실행될 때 Embeded Redis 서버도 동시에 실행될 수 있도록 설정한다.
  • Redis는 기본 6379 포트를 사용한다. 해당 포트를 사용하고 있는 프로세스가 있다면, 현재 사용가능한 포트를 찾아서 해당 포트로 서버를 실행한다.
    • isRedisRunning(): Embedded Redis가 현재 실행 중인지 확인
    • isRunning(): 해당 포트의 Process가 현재 실행 중인지 확인
    • findAvailablePort(): 현재 PC/서버에서 사용가능한 포트 조회
    • executeGrepProcessCommand(): 해당 port를 사용중인 프로세스 확인하는 sh 실행

 

Redis 설정

@Configuration
public class RedisConfig {

    @Value("${spring.redis.host}")
    private String host;
    @Value("${spring.redis.port}")
    private int port;
    
    /**
     * redis의 연결 정보 설정
     */
    @Bean
    public RedisConnectionFactory redisConnectionFactory() {
        RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
        redisConfiguration.setHostName(host);
        redisConfiguration.setPort(port);
        return new LettuceConnectionFactory(redisConfiguration);
    }

    /**
     * redis의 pub/sub 메시지를 처리하는 listener 설정
     */
    @Bean
    public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
        RedisMessageListenerContainer container = new RedisMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        return container;
    }

    /**
     * 어플리케이션에서 사용할 redisTemplate 설정
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate() {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(redisConnectionFactory());
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
        return redisTemplate;
    }
}
  • redisConnectionFactory(): Redis 서버와 통신을 위한 추상화 RedisConnectionFactory를 생성하고 반환한다.
  • redisMessageListener():  redis의 pub/sub 메시지를 처리하기 위해 listener를 설정한다.
  • redisTemplate(): 어플리케이션에서 사용할 redisTemplate 생성하고 반환한다.
    • RedisTemplate은 Redis 데이터를 저장하고 조회하는 기능을 하는 클래스이다.
    • setKeySerializer() , setValueSerializer(): Redis 데이터를 직렬화하는 방식을 설정할 수 있다.
      • Redis-CLI를 사용해 Redis 데이터를 직접 조회하는 경우, Redis 데이터를 문자열로 반환해야하기 때문에 설정한다.

 

Redis 발행/구독 모델 구현

 

Redis 발행 서비스 구현

@RequiredArgsConstructor
@Service
public class RedisPublisher {

    private final RedisTemplate<String, Object> redisTemplate;

    public void publish(ChannelTopic topic, ChatMessage message) {
        redisTemplate.convertAndSend(topic.getTopic(), message);
    }
}
  • publish(): 채팅방에 입장하여 메시지를 작성하면 메시지를 Redis Topic에 발행한다.
  • 해당 서비스를 통해 메시지를 발행하면 대기하고 있던 redis 구독 서비스가 메시지를 처리한다.

 

Redis 구독 서비스 구현

@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {

    private final ObjectMapper objectMapper;
    private final RedisTemplate redisTemplate;
    private final SimpMessageSendingOperations messagingTemplate;

    @Override
    public void onMessage(Message message, byte[] pattern) {
        try {
            // redis에서 발행된 데이터를 받아 deserialize
            String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
            ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
            // Websocket 구독자에게 채팅 메시지 Send
            messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
        } catch (Exception e) {
            log.error(e.getMessage());
        }
    }
}
  • onMessage(): Redis에서 메시지가 발행(publish) 될 때까지 대기하다가 메시지가 발생되면 메시지를 처리하는 리스너다.
  • MessageListener를 상속 받아서 onMessage()를 재작성한다.
    • Redis에 메시지가 발행되면 해당 메시지를 ChatMessage로 변환하고, messaging Template을 이용하여 채팅방의 모든 웹 소켓 클라이언트에게 메시지를 전달한다.

 

Redis를 이용한 채팅 서버 구현

 

ChatController

@Controller
@RequiredArgsConstructor
public class ChatController {

    private final RedisPublisher redisPublisher;
    private final ChatService chatService;

    @MessageMapping("/chat/message")
    public void message(ChatMessage message) {
        if(message.getType().equals(MessageType.ENTER)) {
            message.setEnterMessage();
            chatService.enterChatRoom(message.getRoomId());
        }
        // 메시지를 redis의 Topic으로 발행함
        redisPublisher.publish(chatService.getTopic(message.getRoomId()), message);
    }

}
  • 웹소켓의 /pub/sub/message로 들어오는 메시징을 처리한다.
  • 클라이언트가 채팅방 입장 시, 채팅방에서 대화가 가능하도록 리스너를 연동한다(enterChatRoom()).
  • 이후 메시지를 서로 다른 서버에 공유하기 위해 Redis의 Topic으로 발행한다.

 

ChatService

@Service
@RequiredArgsConstructor
public class ChatService {
    private final RedisRepository redisRepository;
    private final RedisSubscriber redisSubscriber;
    private final RedisMessageListenerContainer redisMessageListener;
    private Map<String, ChannelTopic> topics;

    @PostConstruct
    private void init() {
        topics = new HashMap<>();
    }

    /**
     * 채팅방 생성
     */
    public ChatRoom createChatRoom(String roomName) {
        String roomId = UUID.randomUUID().toString();
        ChatRoom chatRoom = ChatRoom.builder()
                .roomId(roomId)
                .name(roomName)
                .build();
        redisRepository.saveRoom(chatRoom);
        return chatRoom;
    }

    /**
     * 채팅방 입장
     */
    public void enterChatRoom(String roomId) {
        ChannelTopic topic = topics.get(roomId);
        if (topic == null) {
            topic = new ChannelTopic(roomId);
        }
        redisMessageListener.addMessageListener(redisSubscriber, topic);
        topics.put(roomId, topic);
    }

    public List<ChatRoom> findAllRoom() {
        return redisRepository.findAllRoom();
    }

    public ChatRoom findRoomById(String roomId) {
        return redisRepository.findRoomById(roomId);
    }

    public ChannelTopic getTopic(String roomId) {
        return topics.get(roomId);
    }

}
  • RedisMessageListenerContainer: 채팅방(topic)에 발행될 메시지를 처리할 리스너
  • topics: 채팅방의 대화 메시지를 발행하기 위한 redis의 topic 정보
    • 서버별로 채팅방에 매치되는 topic 정보를 Map에 넣어 roomId로 찾을 수 있게 한다.
  • createChatRoom(): 채팅방 개설
    • 서버 간의 채팅방 공유를 위해 redis hash에 저장한다.
  • enterChatRoom(): 채팅방 입장
    • 채팅방 id로 redis topic을 조회하여 pub/sub 통신을 하기 위한 리스너를 설정한다.
  • findAllRoom(), findRoomById(): 채팅방 정보를 조회할 때 Redis Hash에 저장된 데이터를 불러온다.

 

RedisRepository

@Repository
@RequiredArgsConstructor
public class RedisRepository {

    private static final String CHAT_ROOMS = "CHAT_ROOM";
    private final RedisTemplate<String, Object> redisTemplate;
    private HashOperations<String, String, ChatRoom> opsHashChatRoom;

    @PostConstruct
    private void init() {
        opsHashChatRoom = redisTemplate.opsForHash();
    }

    public void saveRoom(ChatRoom chatRoom) {
        opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
    }

    public List<ChatRoom> findAllRoom() {
        return opsHashChatRoom.values(CHAT_ROOMS);
    }

    public ChatRoom findRoomById(String roomId) {
        return opsHashChatRoom.get(CHAT_ROOMS, roomId);
    }

}
  • RedisTemplate은 Redis 서버와 상호작용하도록 높은 수준으로 추상화를 제공하는 클래스이다. 또한, 여러 자료구조를 쉽게 Serialize, Deserialize 해주는 opsForX 메소드를 제공하여 redis 서버에 데이터 CRUD를 할 수 있다.
    • opsForhash(): Redis의 Hash 타입의 자료구조에 접근할 수 있다. HashOperations이 반환된다.
    • 외에도 opstForValue(), opstForList(), opstForSet() 등이 있다.
  • 채팅방을 Redis Hash에 저장하고, Hash에서 채팅방을 조회한다.

 

ChatRoom

@Getter
public class ChatRoom implements Serializable {

    private static final long serialVersionUID = 6494678977089006639L;

    private String roomId;
    private String name;

    @Builder
    public ChatRoom(String roomId, String name) {
        this.roomId = roomId;
        this.name = name;
    }
}
  • 자바 직렬화란?
    • 자바 객체를 바이트스트림으로 만들어서 파일, DB, 메모리, 네트워크 송신이 가능하도록 하는 것이다.
    • 역직렬화는 파일, DB, 메모리, 네트워크로부터 수신된 바이트스트림을 자바 객체로 변환하는 과정이다.
  • Redis에 저장되는 객체들은 Serialize 가능해야 하기 때문에 Serializable를 참조하도록 선언하고 serialVersionUID를 셋팅해준다.
    • serialVersionUID: 시리얼 통신을 하는 클래스의 버전을 표시하는 것(직렬화 버전의 고유값)
    • 직렬화 과정에서 만들어지는 바이트스트림에는 버전정보인 serialVersionUID가 포함되는데, 나중에 역직렬화를 할 때 해당 클래스의 특정 버전에 맞는지, 아닌지를 체크한다.

 

M1(ARM)에서 Embedded Redis 실행 이슈

먼저 Local 환경에서는 Embedded Redis를 동작하도록 했는데, 계속 실행 오류가 있었다. 
발생한 이유는 Embedded Redis 라이브러리에서는 mac_arm64용 바이너리가 준비되어 있지 않기 때문이었다. Redis에서 M1의 ARM 프로세스 아키텍처에서 실행되는 것을 지원하지 않기 때문이라고 한다.
지원하는 버전이 있을까 엄청 찾아봤는데 결국 없었다고 한다.....
  • 해결 방법
    • 직접 바이너리를 지정해서 해결할 수 있다.
    • RediseServer(File executable, int port) 생성자를 이용한다.

 

1. Redis 설치

 

1. Redis 소스 코드 컴파일하기

$ wget https://download.redis.io/releases/redis-6.2.5.tar.gz   # Redis 다운로드
$ tar xzf redis-6.2.5.tar.gz   # 파일 압축 해제
$ cd redis-6.2.5   # redis 디렉토리도 이동
$ make # redis 컴파일
  • make는 소스 코드에서 실행 파일을 만드는 명령어이다.
  • 위의 과정을 거치면 src 경로에 바이너리 파일들이 빌드되고, redis-server 바이너리 파일을 실행시켜 redis가 잘 실행되는 것을 볼 수 있다.
  • [문제 발생] 하지만!! 또 문제가 발생했다 ㅠㅋㅠ

  • make 과정에서 오류가 발생해 컴파일에 실패했다. 이것도 열심히 찾아봤지만 결국 해결을 하지 못했다.

 

2. Homebrew로 설치하기

$ brew install redis  # redis 설치
$ brew uninstall redis  # redis 설치 제거
$ brew services info redis  # redis 실행 상태 확인

$ brew services start redis  # Background로 redis 실행
$ brew services restart redis # Background로 redis 재실행
$ brew services stop redis # Background로 redis 중지

$ redis-server --version  # redis 버전 확인
$ redis-server # Foreground로 redis 실행
  • mac에는 정말 좋은 Homebrew가 있다는 것을 잊고 있었다.

 

3. Redis 공식 사이트에서 다운로드

 

2. EmbeddedRedisConfig 수정

 

Redis 바이너리 파일 옮기기

  • Redis 서버가 잘 실행된다면 실행 파일(redis-server)의 이름을 변경하여 프로젝트의 resouce 디렉토리 하위에 복사한다.
    • src/main/resources/binary/redis/{redis-mac-arm-바이너리파일} 경로에 추가한다.

 

EmbeddedRedisConfig

public class EmbeddedRedisConfig {

    @Value("${spring.redis.port}")
    private int redisPort;

    private RedisServer redisServer;

    @PostConstruct
    public void startRedis() throws IOException {
        int port = isRedisRunning() ? findAvailablePort() : redisPort;
        if (isArmArchitecture()) {
            log.info("ARM Architecture");
            redisServer = new RedisServer(Objects.requireNonNull(getRedisServerExecutable()), port);
        }
        else {
            redisServer = new RedisServer(redisPort);
        }
        redisServer.start();
    }
 
    ...

    private File getRedisServerExecutable() {
        try {
            return new File("src/main/resources/binary/redis/redis-server-6.2.14-mac-arm64");
        } catch (Exception e) {
            e.printStackTrace();
        }
        return null;
    }

    private boolean isArmArchitecture() {
        return System.getProperty("os.arch").contains("aarch64");
    }
}
  • RedisServer(excutable, port)로 RedisServer를 생성할 수 있다.
    • new RedisServer(Objects.requireNonNull(getRedisServerExecutable()), port)
    • excutable은 Redis를 실행하는데 필요한 실행 파일을 지정하는 매개변수이다.
  • getRedisServerExecutable(): ARM 아키텍처에서 Redis Server를 실행할 때 사용할 Redis Server 실행 파일을 가져온다. 가져올 파일이 없는 경우 예외를 던진다.
  • isArmArchitecture(): 현재 시스템이 ARM 아키텍처인지 확인한다.

 

 

참고