Docker
먼저 Docker를 설정하여 컨테이너를 생성한다.
Dockerfile
FROM rabbitmq:latest
docker-compose
version: '1'
services:
rabbitmq:
container_name: rabbitmq-test
build:
dockerfile: Dockerfile
image: rabbitmq
ports:
- "15672:15672"
- "5672:5672"
Dependency
spring boot 2 기준으로 dependency를 다음과 같이 추가해준다.
implementation 'org.springframework.boot:spring-boot-starter-amqp'
implementation 'org.springframework.boot:spring-boot-starter-web'
...
testImplementation 'org.springframework.amqp:spring-rabbit-test'
RabbitMQ
RabbitMQ
를 사용하기 전에 몇 가지 개념들을 살펴보자.
- producer: message를 전송하는 user application
- queue: message가 저장되는 buffer
- consumer: message를 받는 user application
- exchange
- 한 쪽에서는 producer로부터 message를 받고 다른 쪽에서는 queue로 push
- type:
direct
,topic
,headers
,fanout
- direct exchange
- routing key를 기반으로 메시지를 전달하는 방식
- 메시지가 excahnge로 들어오면 메시지의 routing key를 확인하고 일치하는 queue로 라우팅
- fanout exchange
- 모든 queue로 메시지를 보내는 방식
- 모든 queue에 메시지를 보내기때문에 routing key는 무시됨
- topic exchange
- routing key를 사용하지만
direct
타입과 달리 특정 값이 아닌 패턴으로 binding
- routing key를 사용하지만
- headers exchange
- 메시지의 헤더에 여러 속성 값을 사용하여 라우팅하는 방식
- routing key는 무시되며 헤더의 값과 binding하여 일치하는 queue로 라우팅
- direct exchange
- binding: exchange와 queue 사이의 relationship을 binding이라 부른다.
위처럼 exchange는 총 4가지가 있어 하나씩 적용해볼 것이다.
Receiver
메시지를 받을 경우 수행될 메소드를 정의한다.
@Slf4j
@Component
public class Receiver {
private CountDownLatch latch = new CountDownLatch(1);
public void receiveMessage(String message) {
log.info("Received <" + message + ">");
latch.countDown();
}
public void receiveMessage(byte[] message) {
log.info("Received <" + new String(message, StandardCharsets.UTF_8) + ">");
latch.countDown();
}
public CountDownLatch getLatch() {
return latch;
}
}
Configuration
@Configuration
public class RabbitMQConfig {
static final String topicExchangeName = "spring-boot-topic-exchange";
static final String directExchangeName = "spring-boot-direct-exchange";
static final String fanoutExchangeName = "spring-boot-fanout-exchange";
static final String headersExchangeName = "spring-boot-headers-exchange";
static final String queueName = "spring-boot";
/**
* 큐 생성
*/
@Bean
Queue queue() {
//durable이 true일 경우 디스크에 저장되며, false면 메모리에 저장
//exclusive가 true일 경우 특정 연결에 대한 접근을 제한하고 해당 연결이 종료되면 자동으로 삭제
//autoDelete가 true일 경우 모든 consumer와 disconnect된 queue를 자동 삭제
return new Queue(queueName, false);
}
/**
* TopicExchange 생성
* - 패턴으로 routing key를 binding
*/ @Bean
TopicExchange topicExchange() {
return new TopicExchange(topicExchangeName);
}
/**
* DirectExchange 생성
* - routing key를 사용한 binding
*/ @Bean
DirectExchange directExchange() {
return new DirectExchange(directExchangeName);
}
/**
* fanoutExchange 생성
* - 모든 queue로 메시지 전송
*/
@Bean
FanoutExchange fanoutExchange() {
return new FanoutExchange(fanoutExchangeName);
}
/**
* headersExchange 생성
* - 헤더의 값을 사용한 binding
*/
@Bean
HeadersExchange headersExchange() {
return new HeadersExchange(headersExchangeName);
}
/**
* 큐와 TopicExchange 바인딩
*/
@Bean
Binding topicBinding(Queue queue, TopicExchange topicExchange) {
//foo.bar로 시작하는 routing key와 같이 message를 보내는 것을 의미
//foo.bar.* 패턴과 일치하는 queue로 라우팅
return BindingBuilder.bind(queue).to(topicExchange).with("foo.bar.#");
}
/**
* 큐와 DirectExchange 바인딩
*/
@Bean
Binding directBinding(Queue queue, DirectExchange directExchange) {
//routing key와 같이 message를 보내는 것을 의미
//foo.bar.direct와 일치하는 queue로 라우팅
return BindingBuilder.bind(queue).to(directExchange).with("direct");
}
/**
* 큐와 FanoutExchange 바인딩
*/
@Bean
Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {
return BindingBuilder.bind(queue).to(fanoutExchange);
}
/**
* 큐와 HeadersExchange 바인딩
*/
@Bean
Binding headersBinding(Queue queue, HeadersExchange headersExchange) {
return BindingBuilder.bind(queue).to(headersExchange).where("headers").exists();
}
@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter); //message listener 등록
return container;
}
/**
* Receiver는 POJO이기 때문에 MessageListenerAdapter로 wrapping해준다.
*/
@Bean
MessageListenerAdapter listenerAdapter(Receiver receiver) {
return new MessageListenerAdapter(receiver, "receiveMessage");
}
}
Runner
이제 실제로 메시지를 전송하는 클래스를 작성해보자.
@Slf4j
@Component
public class Runner implements CommandLineRunner {
private final RabbitTemplate rabbitTemplate;
private final Receiver receiver;
public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {
this.receiver = receiver;
this.rabbitTemplate = rabbitTemplate;
}
@Override
public void run(String... args) throws Exception {
log.info("Sending message...");
String message = "Hello from RabbitMQ!";
//TopicExchange
rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", message);
//DirectExchange
rabbitTemplate.convertAndSend(RabbitMQConfig.directExchangeName, "direct", message);
//FanoutExchange
rabbitTemplate.convertAndSend(RabbitMQConfig.fanoutExchangeName, "", message);
//HeadersExchange
Message messageWithHeader = MessageBuilder.withBody(message.getBytes())
.setHeader("headers", "h")
.build();
rabbitTemplate.convertAndSend(RabbitMQConfig.headersExchangeName, "", messageWithHeader);
receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);
}
}
전체적인 흐름을 정리해보면 다음과 같다.
- 메시지 전송(
rabbitTemplate.convertAndSend()
) - exchange 타입에 따라 exchange name 또는 routing key를 가지고 binding
- binding 후 일치하는 queue를 탐색
- 일치하는 queue에 메시지 전송
MessageListenerContainer
에 등록된MessageListner
(위 예시에서 Receiver)가 전송된 메시지를 받아 처리한다.
전체 코드는 https://github.com/JadeKim042386/SpringWebSocket/tree/main/MessageWithRabbitMQ 에서 확인할 수 있다.
Appendix
About queue size
- 이용 가능한 모든 resource를 사용하기 때문에 Queue 개수는 내부적으로 제한되어있지 않다고 한다.
- 모든 worker들이 바쁘게 돌아간다면 Queue가 가득 차버릴 수 있다. 따라서 계속 지켜보며 더 많은 worker를 추가하거나 다른 전략을 가져야할 것이다.
Fair dispatch vs Round-robin dispatching
- 기본적으로 RabbitMQ는 consumer에게 message를 순차적으로 전송한다. 따라서 평균적으로 모든 consumer들은 같은 개수의 message를 얻게된다. 이런 분산 message 방식을
round-robin
이라 부른다.- 예를 들어 두 개의 worker들이 있고 모든 홀수 message들은 무거우며 짝수 message들은 가볍다고하자. 그러면 한 worker는 항상 바쁘게 돌아가고 다른 하나는 거의 동작하지 않을 것이다.
- Spring AMQP에서 default로
Fair dispatch
가 설정되어있다. 그리고DEFAULT_PREFETCH_COUNT
는 250이다. (DEFAULT_PREFETCH_COUNT는 worker에게 한 번에 건네는 message 수) - 만약
DEFAULT_PREFETCH_COUNT
를 1로 설정한다면 round robin 방식으로 작동한다.
[reference]