앞에서 스프링에 내장된 인메모리 메시지 브로커가 아닌 외부 메시지 브로커가 필요한 이유에 대해 알아보았다. 또한, Websocket과 STOMP만 이용해서 채팅 서버를 구현하면 여러 문제점이 존재한다. 따라서 외부 브로커의 종류에 대해 알아보고, 그 중에서 먼저 Redis를 이용하여 채팅 서버를 구현해보려고 한다.
목차
- 채팅 서비스 고도화
- Websocket + STOMP만을 이용한 채팅 서버 문제점
- 외부 브로커 종류
- Spring Redis Pub/Sub
- Redis 기본 설정
- Redis 발행/구독 모델 구현
- Redis를 이용한 채팅 서버 구현
- M1(ARM)에서 Embedded Redis 실행 이슈
채팅 서비스 고도화
Websocket + STOMP만을 이용한 채팅 서버 문제점
- 서버를 재시작 할 때마다 채팅방 정보들이 리셋된다.
- 채팅방의 저장소가 없기 때문에 서버의 메모리에 적재된 채팅방은 서버를 재시작할 때마다 초기화되는 문제가 있다.
- 따라서 DB 혹은 다른 저장소를 이용하여 채팅방이 계속 유지되도록 처리가 필요하다.
- 해결:
Redis저장소를 이용
- 채팅 서버가 여러 대일 경우, 서버간 채팅방 공유가 불가능해진다.
- Websocket과 STOMP의 pub/sub을 이용하여 구현하면 pub/sub이 발생한 서버 내에서만 메시지를 주고받는 것이 가능하다.
- 채팅방(Topic)이 생성된 서버 안에서만 유효하기 때문에 다른 서버로 접속한 클라이언트는 채팅방이 보이지도 않고, 접근 과 구독도 불가능해진다.
- 따라서 구독 대상(Topic)이 여러 서버에서 접근할 수 있도록 개선이 필요하다.
- 해결: 공통으로 사용할 수 있는 pub/sub 시스템을 구축하고 모든 서버들이 해당 시스템을 통하여 pub/sub 메시지를 주고받도록 해야 한다. (
redis,kafka,rabbitMQ등 존재)
- Websocket과 STOMP의 pub/sub을 이용하여 구현하면 pub/sub이 발생한 서버 내에서만 메시지를 주고받는 것이 가능하다.
외부 브로커 종류
RabbitMQ
- 다양한 비즈니스에 의한 복잡한 라우팅 설계에 적합한 메시지 브로커
- AMQP를 사용하여 지점 간 방법과 게시-구독 방법을 통해 메시지를 전달한다.
- AMQP(Advanced Message Queuing Protocols): 애플리케이션간에 데이터를 주고받을 때, 메시지 미들웨어 브로커를 통해 데이터를 주고받을 수 있게 해주는 메시징 프로토콜
- 신뢰성 있는 메시지를 전송할 수 있기 때문에 정확한 요청-응답이 필요하거나, 트래픽은 작지만 장시간 실행되고 안정적인 작업이 필요한 경우에 사용할 수 있다.
Redis
- 고성능 Key-Value 저장소 또는 메시지 브로커로 사용할 수 있는 인메모리 데이터 저장소이다.
- Pub/Sub 기능
- 다른 메시지 브로커와 다르게 메시지 지속성이 없다.
- 즉, 메시지를 전송한 후 해당 메시지는 삭제되고 Redis 어디에도 저장되지 않는다.
- 따라서 channel에 구독하고 있지 않으면 메시지가 유실 될 수 있다. 또한, 채팅방의 채팅이 많아질 경우 채팅 순서가 보장되지 않을 수 있다.
- 메시지 전송 신뢰성을 보장하지 않기 때문에 단점을 보완할 별도의 추가 구현(Redis Streams)이 필요할 수 있다.
- 다른 메시지 브로커와 다르게 메시지 지속성이 없다.
- 실시간으로 빠르게 데이터 처리를 해야하는 서비스와 인메모리 데이터베이스로 인해 지속성이 중요하지 않고, 일정 수준의 손실을 처리할 수 있는 짧은 보존 메시지에 적합하다.
Kafka
- 대량의 데이터를 저장하면서 높은 처리량이 필요한 곳에 적합한 메시지 브로커
- Pub/Sub 모델의 메시지 큐로 분산환경에 특화되어 있는 특징을 가지는 서버이다.
- 다수의 서버가 하나의 서비스를 이루는 경우, kafka를 이용하여 분산처리를 하는데 각 서버가 통신하는 방식은 Pub/Sub을 이용한다.
- 이 Pub/Sub 메시지들은 메시지 큐의 형태로 순차적으로 처리되며, 모든 메시지들은 로그로 처리된다.
- 이 특성을 이용하면 서비스의 안정성을 확보할 수 있다.
Spring Redis Pub/Sub
먼저 local에서는 Redis 설치없이 간단하게 Embedded Resis를 사용하여 환경을 구축한다.
Redis 기본 설정
의존성 추가
implementation 'org.springframework.boot:spring-boot-starter-data-redis'
implementation ('it.ozimov:embedded-redis:0.7.3') {
exclude group: 'org.slf4j', module: 'slf4j-simple'
}
- Embeded Redis를 사용하면 local에서도 Redis 설치 없이 간단하게 사용할 수 있다고 한다.
- [참고] 하지만 M1의 경우, Embeded Redis를 지원하지 않는다. 밑에서 해당 문제를 해결할 것이다.
- Embeded Redis를 사용하면 Slf4j logger의 중복으로 인해 컴파일 에러가 생길 수 있다. 그래서 해당 모듈을 제거해준다.
Embeded Redis 서버 사용을 위한 설정
/**
* 로컬 환경일 경우, 내장 레디스 실행
*/
@Profile("local")
@Configuration
public class EmbeddedRedisConfig {
@Value("${spring.redis.port}")
private int redisPort;
private RedisServer redisServer;
@PostConstruct
public void redisServer() {
int port = isRedisRunning() ? findAvailablePort() : redisPort;
redisServer = new RedisServer(port);
redisServer.start();
}
@PreDestroy
public void stopRedis() {
if (redisServer != null) {
redisServer.stop();
}
}
/**
* Embedded Redis가 현재 실행중인지 확인
*/
private boolean isRedisRunning() throws IOException {
return isRunning(executeGrepProcessCommand(redisPort));
}
/**
* 해당 Process가 현재 실행중인지 확인
*/
private boolean isRunning(Process process) {
String line;
StringBuilder pidInfo = new StringBuilder();
try (BufferedReader input = new BufferedReader(new InputStreamReader(process.getInputStream()))) {
while ((line = input.readLine()) != null) {
pidInfo.append(line);
}
}
catch (Exception e) {
}
return StringUtils.hasText(pidInfo.toString());
}
/**
* 현재 PC/서버에서 사용가능한 포트 조회
*/
public int findAvailablePort() throws IOException {
for (int port = 10000; port <= 65535; port++) {
Process process = executeGrepProcessCommand(port);
if (!isRunning(process)) {
return port;
}
}
throw new IllegalArgumentException("Not Found Available port: 10000 ~ 65535");
}
/**
* 해당 port를 사용중인 프로세스 확인하는 sh 실행
*/
private Process executeGrepProcessCommand(int port) throws IOException {
String command = String.format("netstat -nat | grep LISTEN|grep %d", port);
String[] shell = {"/bin/sh", "-c", command};
return Runtime.getRuntime().exec(shell);
}
}
- 채팅 서버가 실행될 때 Embeded Redis 서버도 동시에 실행될 수 있도록 설정한다.
- Redis는 기본 6379 포트를 사용한다. 해당 포트를 사용하고 있는 프로세스가 있다면, 현재 사용가능한 포트를 찾아서 해당 포트로 서버를 실행한다.
isRedisRunning(): Embedded Redis가 현재 실행 중인지 확인isRunning(): 해당 포트의 Process가 현재 실행 중인지 확인findAvailablePort(): 현재 PC/서버에서 사용가능한 포트 조회executeGrepProcessCommand(): 해당 port를 사용중인 프로세스 확인하는 sh 실행
Redis 설정
@Configuration
public class RedisConfig {
@Value("${spring.redis.host}")
private String host;
@Value("${spring.redis.port}")
private int port;
/**
* redis의 연결 정보 설정
*/
@Bean
public RedisConnectionFactory redisConnectionFactory() {
RedisStandaloneConfiguration redisConfiguration = new RedisStandaloneConfiguration();
redisConfiguration.setHostName(host);
redisConfiguration.setPort(port);
return new LettuceConnectionFactory(redisConfiguration);
}
/**
* redis의 pub/sub 메시지를 처리하는 listener 설정
*/
@Bean
public RedisMessageListenerContainer redisMessageListener(RedisConnectionFactory connectionFactory) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
return container;
}
/**
* 어플리케이션에서 사용할 redisTemplate 설정
*/
@Bean
public RedisTemplate<String, Object> redisTemplate() {
RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
redisTemplate.setConnectionFactory(redisConnectionFactory());
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setValueSerializer(new Jackson2JsonRedisSerializer<>(String.class));
return redisTemplate;
}
}
redisConnectionFactory(): Redis 서버와 통신을 위한 추상화 RedisConnectionFactory를 생성하고 반환한다.redisMessageListener(): redis의 pub/sub 메시지를 처리하기 위해 listener를 설정한다.redisTemplate(): 어플리케이션에서 사용할 redisTemplate 생성하고 반환한다.RedisTemplate은 Redis 데이터를 저장하고 조회하는 기능을 하는 클래스이다.setKeySerializer(),setValueSerializer(): Redis 데이터를 직렬화하는 방식을 설정할 수 있다.- Redis-CLI를 사용해 Redis 데이터를 직접 조회하는 경우, Redis 데이터를 문자열로 반환해야하기 때문에 설정한다.
Redis 발행/구독 모델 구현
Redis 발행 서비스 구현
@RequiredArgsConstructor
@Service
public class RedisPublisher {
private final RedisTemplate<String, Object> redisTemplate;
public void publish(ChannelTopic topic, ChatMessage message) {
redisTemplate.convertAndSend(topic.getTopic(), message);
}
}
publish(): 채팅방에 입장하여 메시지를 작성하면 메시지를 Redis Topic에 발행한다.- 해당 서비스를 통해 메시지를 발행하면 대기하고 있던 redis 구독 서비스가 메시지를 처리한다.
Redis 구독 서비스 구현
@Slf4j
@RequiredArgsConstructor
@Service
public class RedisSubscriber implements MessageListener {
private final ObjectMapper objectMapper;
private final RedisTemplate redisTemplate;
private final SimpMessageSendingOperations messagingTemplate;
@Override
public void onMessage(Message message, byte[] pattern) {
try {
// redis에서 발행된 데이터를 받아 deserialize
String publishMessage = (String) redisTemplate.getStringSerializer().deserialize(message.getBody());
ChatMessage roomMessage = objectMapper.readValue(publishMessage, ChatMessage.class);
// Websocket 구독자에게 채팅 메시지 Send
messagingTemplate.convertAndSend("/sub/chat/room/" + roomMessage.getRoomId(), roomMessage);
} catch (Exception e) {
log.error(e.getMessage());
}
}
}
onMessage(): Redis에서 메시지가 발행(publish) 될 때까지 대기하다가 메시지가 발생되면 메시지를 처리하는 리스너다.MessageListener를 상속 받아서onMessage()를 재작성한다.- Redis에 메시지가 발행되면 해당 메시지를 ChatMessage로 변환하고, messaging Template을 이용하여 채팅방의 모든 웹 소켓 클라이언트에게 메시지를 전달한다.
Redis를 이용한 채팅 서버 구현
ChatController
@Controller
@RequiredArgsConstructor
public class ChatController {
private final RedisPublisher redisPublisher;
private final ChatService chatService;
@MessageMapping("/chat/message")
public void message(ChatMessage message) {
if(message.getType().equals(MessageType.ENTER)) {
message.setEnterMessage();
chatService.enterChatRoom(message.getRoomId());
}
// 메시지를 redis의 Topic으로 발행함
redisPublisher.publish(chatService.getTopic(message.getRoomId()), message);
}
}
- 웹소켓의
/pub/sub/message로 들어오는 메시징을 처리한다. - 클라이언트가 채팅방 입장 시, 채팅방에서 대화가 가능하도록 리스너를 연동한다(
enterChatRoom()). - 이후 메시지를 서로 다른 서버에 공유하기 위해 Redis의 Topic으로 발행한다.
ChatService
@Service
@RequiredArgsConstructor
public class ChatService {
private final RedisRepository redisRepository;
private final RedisSubscriber redisSubscriber;
private final RedisMessageListenerContainer redisMessageListener;
private Map<String, ChannelTopic> topics;
@PostConstruct
private void init() {
topics = new HashMap<>();
}
/**
* 채팅방 생성
*/
public ChatRoom createChatRoom(String roomName) {
String roomId = UUID.randomUUID().toString();
ChatRoom chatRoom = ChatRoom.builder()
.roomId(roomId)
.name(roomName)
.build();
redisRepository.saveRoom(chatRoom);
return chatRoom;
}
/**
* 채팅방 입장
*/
public void enterChatRoom(String roomId) {
ChannelTopic topic = topics.get(roomId);
if (topic == null) {
topic = new ChannelTopic(roomId);
}
redisMessageListener.addMessageListener(redisSubscriber, topic);
topics.put(roomId, topic);
}
public List<ChatRoom> findAllRoom() {
return redisRepository.findAllRoom();
}
public ChatRoom findRoomById(String roomId) {
return redisRepository.findRoomById(roomId);
}
public ChannelTopic getTopic(String roomId) {
return topics.get(roomId);
}
}
RedisMessageListenerContainer: 채팅방(topic)에 발행될 메시지를 처리할 리스너topics: 채팅방의 대화 메시지를 발행하기 위한 redis의 topic 정보- 서버별로 채팅방에 매치되는 topic 정보를 Map에 넣어 roomId로 찾을 수 있게 한다.
createChatRoom(): 채팅방 개설- 서버 간의 채팅방 공유를 위해 redis hash에 저장한다.
enterChatRoom(): 채팅방 입장- 채팅방 id로 redis topic을 조회하여 pub/sub 통신을 하기 위한 리스너를 설정한다.
findAllRoom(),findRoomById(): 채팅방 정보를 조회할 때 Redis Hash에 저장된 데이터를 불러온다.
RedisRepository
@Repository
@RequiredArgsConstructor
public class RedisRepository {
private static final String CHAT_ROOMS = "CHAT_ROOM";
private final RedisTemplate<String, Object> redisTemplate;
private HashOperations<String, String, ChatRoom> opsHashChatRoom;
@PostConstruct
private void init() {
opsHashChatRoom = redisTemplate.opsForHash();
}
public void saveRoom(ChatRoom chatRoom) {
opsHashChatRoom.put(CHAT_ROOMS, chatRoom.getRoomId(), chatRoom);
}
public List<ChatRoom> findAllRoom() {
return opsHashChatRoom.values(CHAT_ROOMS);
}
public ChatRoom findRoomById(String roomId) {
return opsHashChatRoom.get(CHAT_ROOMS, roomId);
}
}
- RedisTemplate은 Redis 서버와 상호작용하도록 높은 수준으로 추상화를 제공하는 클래스이다. 또한, 여러 자료구조를 쉽게 Serialize, Deserialize 해주는 opsForX 메소드를 제공하여 redis 서버에 데이터 CRUD를 할 수 있다.
opsForhash(): Redis의 Hash 타입의 자료구조에 접근할 수 있다.HashOperations이 반환된다.- 외에도 opstForValue(), opstForList(), opstForSet() 등이 있다.
- 채팅방을 Redis Hash에 저장하고, Hash에서 채팅방을 조회한다.
ChatRoom
@Getter
public class ChatRoom implements Serializable {
private static final long serialVersionUID = 6494678977089006639L;
private String roomId;
private String name;
@Builder
public ChatRoom(String roomId, String name) {
this.roomId = roomId;
this.name = name;
}
}
- 자바 직렬화란?
- 자바 객체를 바이트스트림으로 만들어서 파일, DB, 메모리, 네트워크 송신이 가능하도록 하는 것이다.
- 역직렬화는 파일, DB, 메모리, 네트워크로부터 수신된 바이트스트림을 자바 객체로 변환하는 과정이다.
- Redis에 저장되는 객체들은 Serialize 가능해야 하기 때문에 Serializable를 참조하도록 선언하고 serialVersionUID를 셋팅해준다.
- serialVersionUID: 시리얼 통신을 하는 클래스의 버전을 표시하는 것(직렬화 버전의 고유값)
- 직렬화 과정에서 만들어지는 바이트스트림에는 버전정보인 serialVersionUID가 포함되는데, 나중에 역직렬화를 할 때 해당 클래스의 특정 버전에 맞는지, 아닌지를 체크한다.
M1(ARM)에서 Embedded Redis 실행 이슈
먼저 Local 환경에서는 Embedded Redis를 동작하도록 했는데, 계속 실행 오류가 있었다.
발생한 이유는 Embedded Redis 라이브러리에서는 mac_arm64용 바이너리가 준비되어 있지 않기 때문이었다. Redis에서 M1의 ARM 프로세스 아키텍처에서 실행되는 것을 지원하지 않기 때문이라고 한다.
지원하는 버전이 있을까 엄청 찾아봤는데 결국 없었다고 한다.....
- 해결 방법
- 직접 바이너리를 지정해서 해결할 수 있다.
- RediseServer(File executable, int port) 생성자를 이용한다.
1. Redis 설치
1. Redis 소스 코드 컴파일하기
$ wget https://download.redis.io/releases/redis-6.2.5.tar.gz # Redis 다운로드
$ tar xzf redis-6.2.5.tar.gz # 파일 압축 해제
$ cd redis-6.2.5 # redis 디렉토리도 이동
$ make # redis 컴파일
- make는 소스 코드에서 실행 파일을 만드는 명령어이다.
- 위의 과정을 거치면 src 경로에 바이너리 파일들이 빌드되고, redis-server 바이너리 파일을 실행시켜 redis가 잘 실행되는 것을 볼 수 있다.
- [문제 발생] 하지만!! 또 문제가 발생했다 ㅠㅋㅠ
- make 과정에서 오류가 발생해 컴파일에 실패했다. 이것도 열심히 찾아봤지만 결국 해결을 하지 못했다.
2. Homebrew로 설치하기
$ brew install redis # redis 설치
$ brew uninstall redis # redis 설치 제거
$ brew services info redis # redis 실행 상태 확인
$ brew services start redis # Background로 redis 실행
$ brew services restart redis # Background로 redis 재실행
$ brew services stop redis # Background로 redis 중지
$ redis-server --version # redis 버전 확인
$ redis-server # Foreground로 redis 실행
- mac에는 정말 좋은 Homebrew가 있다는 것을 잊고 있었다.
3. Redis 공식 사이트에서 다운로드
- https://redis.io/download/
- 여기서 Redis.6.2.14 버전을 다운로드 하였다.
2. EmbeddedRedisConfig 수정
Redis 바이너리 파일 옮기기
- Redis 서버가 잘 실행된다면 실행 파일(
redis-server)의 이름을 변경하여 프로젝트의 resouce 디렉토리 하위에 복사한다.src/main/resources/binary/redis/{redis-mac-arm-바이너리파일}경로에 추가한다.
EmbeddedRedisConfig
public class EmbeddedRedisConfig {
@Value("${spring.redis.port}")
private int redisPort;
private RedisServer redisServer;
@PostConstruct
public void startRedis() throws IOException {
int port = isRedisRunning() ? findAvailablePort() : redisPort;
if (isArmArchitecture()) {
log.info("ARM Architecture");
redisServer = new RedisServer(Objects.requireNonNull(getRedisServerExecutable()), port);
}
else {
redisServer = new RedisServer(redisPort);
}
redisServer.start();
}
...
private File getRedisServerExecutable() {
try {
return new File("src/main/resources/binary/redis/redis-server-6.2.14-mac-arm64");
} catch (Exception e) {
e.printStackTrace();
}
return null;
}
private boolean isArmArchitecture() {
return System.getProperty("os.arch").contains("aarch64");
}
}
RedisServer(excutable, port)로 RedisServer를 생성할 수 있다.new RedisServer(Objects.requireNonNull(getRedisServerExecutable()), port)- excutable은 Redis를 실행하는데 필요한 실행 파일을 지정하는 매개변수이다.
getRedisServerExecutable(): ARM 아키텍처에서 Redis Server를 실행할 때 사용할 Redis Server 실행 파일을 가져온다. 가져올 파일이 없는 경우 예외를 던진다.isArmArchitecture(): 현재 시스템이 ARM 아키텍처인지 확인한다.
참고
'Web > Spring, JPA' 카테고리의 다른 글
[Spring Websocket] 채팅 서버 구축 (2) - STOMP (0) | 2023.11.23 |
---|---|
[Spring Websocket] 채팅 서버 구축 (1) - Websocket (0) | 2023.11.22 |
[SpringSecurity] 스프링 시큐리티 이해(구조, 필터) (0) | 2023.09.17 |
[JPA] Entity 생성 방법 (0) | 2023.08.29 |
[아키텍쳐] 스프링 패키지 구조(계층형과 도메인형) (0) | 2023.08.22 |