Java & Spring

Spring RabbitMQ - publish/subscribe

ju_young 2024. 2. 28. 20:33
728x90

Docker

먼저 Docker를 설정하여 컨테이너를 생성한다.

Dockerfile

FROM rabbitmq:latest

docker-compose

version: '1'  
services:  
  rabbitmq:  
    container_name: rabbitmq-test  
    build:  
      dockerfile: Dockerfile  
    image: rabbitmq  
    ports:  
      - "15672:15672"  
      - "5672:5672"

Dependency

spring boot 2 기준으로 dependency를 다음과 같이 추가해준다.

implementation 'org.springframework.boot:spring-boot-starter-amqp'  
implementation 'org.springframework.boot:spring-boot-starter-web'  
...
testImplementation 'org.springframework.amqp:spring-rabbit-test'

RabbitMQ

RabbitMQ를 사용하기 전에 몇 가지 개념들을 살펴보자.

  • producer: message를 전송하는 user application
  • queue: message가 저장되는 buffer
  • consumer: message를 받는 user application
  • exchange
    • 한 쪽에서는 producer로부터 message를 받고 다른 쪽에서는 queue로 push
    • type: direct, topic, headers, fanout
    • direct exchange
    • routing key를 기반으로 메시지를 전달하는 방식
    • 메시지가 excahnge로 들어오면 메시지의 routing key를 확인하고 일치하는 queue로 라우팅
    • fanout exchange
    • 모든 queue로 메시지를 보내는 방식
    • 모든 queue에 메시지를 보내기때문에 routing key는 무시됨
    • topic exchange
    • routing key를 사용하지만 direct 타입과 달리 특정 값이 아닌 패턴으로 binding
    • headers exchange
    • 메시지의 헤더에 여러 속성 값을 사용하여 라우팅하는 방식
    • routing key는 무시되며 헤더의 값과 binding하여 일치하는 queue로 라우팅
  • binding: exchange와 queue 사이의 relationship을 binding이라 부른다.

위처럼 exchange는 총 4가지가 있어 하나씩 적용해볼 것이다.

Receiver

메시지를 받을 경우 수행될 메소드를 정의한다.

@Slf4j  
@Component  
public class Receiver {  
    private CountDownLatch latch = new CountDownLatch(1);  

    public void receiveMessage(String message) {  
        log.info("Received <" + message + ">");  
        latch.countDown();  
    }  

    public void receiveMessage(byte[] message) {  
        log.info("Received <" + new String(message, StandardCharsets.UTF_8) + ">");  
        latch.countDown();  
    }  

    public CountDownLatch getLatch() {  
        return latch;  
    }  
}

Configuration

@Configuration  
public class RabbitMQConfig {  
    static final String topicExchangeName = "spring-boot-topic-exchange";  
    static final String directExchangeName = "spring-boot-direct-exchange";  
    static final String fanoutExchangeName = "spring-boot-fanout-exchange";  
    static final String headersExchangeName = "spring-boot-headers-exchange";  

    static final String queueName = "spring-boot";  

    /**  
     * 큐 생성  
     */  
    @Bean  
    Queue queue() {  
        //durable이 true일 경우 디스크에 저장되며, false면 메모리에 저장  
        //exclusive가 true일 경우 특정 연결에 대한 접근을 제한하고 해당 연결이 종료되면 자동으로 삭제  
        //autoDelete가 true일 경우 모든 consumer와 disconnect된 queue를 자동 삭제  
        return new Queue(queueName, false);  
    }  

    /**  
     * TopicExchange 생성  
     * - 패턴으로 routing key를 binding  
     */    @Bean  
    TopicExchange topicExchange() {  
        return new TopicExchange(topicExchangeName);  
    }  

    /**  
     * DirectExchange 생성  
     * - routing key를 사용한 binding  
     */    @Bean  
    DirectExchange directExchange() {  
        return new DirectExchange(directExchangeName);  
    }  

    /**  
     * fanoutExchange 생성  
     * - 모든 queue로 메시지 전송  
     */  
    @Bean  
    FanoutExchange fanoutExchange() {  
        return new FanoutExchange(fanoutExchangeName);  
    }  

    /**  
     * headersExchange 생성  
     * - 헤더의 값을 사용한 binding  
     */    
    @Bean  
    HeadersExchange headersExchange() {  
        return new HeadersExchange(headersExchangeName);  
    }  

    /**  
     * 큐와 TopicExchange 바인딩  
     */  
    @Bean  
    Binding topicBinding(Queue queue, TopicExchange topicExchange) {  
        //foo.bar로 시작하는 routing key와 같이 message를 보내는 것을 의미  
        //foo.bar.* 패턴과 일치하는 queue로 라우팅  
        return BindingBuilder.bind(queue).to(topicExchange).with("foo.bar.#");  
    }  

    /**  
     * 큐와 DirectExchange 바인딩  
     */  
    @Bean  
    Binding directBinding(Queue queue, DirectExchange directExchange) {  
        //routing key와 같이 message를 보내는 것을 의미  
        //foo.bar.direct와 일치하는 queue로 라우팅  
        return BindingBuilder.bind(queue).to(directExchange).with("direct");  
    }  
    /**  
     * 큐와 FanoutExchange 바인딩  
     */  
    @Bean  
    Binding fanoutBinding(Queue queue, FanoutExchange fanoutExchange) {  
        return BindingBuilder.bind(queue).to(fanoutExchange);  
    }  

    /**  
     * 큐와 HeadersExchange 바인딩  
     */  
    @Bean  
    Binding headersBinding(Queue queue, HeadersExchange headersExchange) {  
        return BindingBuilder.bind(queue).to(headersExchange).where("headers").exists();  
    }  

    @Bean  
    SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,  
                                             MessageListenerAdapter listenerAdapter) {  
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();  
        container.setConnectionFactory(connectionFactory);  
        container.setQueueNames(queueName);  
        container.setMessageListener(listenerAdapter); //message listener 등록  
        return container;  
    }  

    /**  
     * Receiver는 POJO이기 때문에 MessageListenerAdapter로 wrapping해준다.  
     */    
    @Bean  
    MessageListenerAdapter listenerAdapter(Receiver receiver) {  
        return new MessageListenerAdapter(receiver, "receiveMessage");  
    }  
}

Runner

이제 실제로 메시지를 전송하는 클래스를 작성해보자.

@Slf4j  
@Component  
public class Runner implements CommandLineRunner {  

    private final RabbitTemplate rabbitTemplate;  
    private final Receiver receiver;  

    public Runner(Receiver receiver, RabbitTemplate rabbitTemplate) {  
        this.receiver = receiver;  
        this.rabbitTemplate = rabbitTemplate;  
    }  

    @Override  
    public void run(String... args) throws Exception {  
        log.info("Sending message...");  
        String message = "Hello from RabbitMQ!";  
        //TopicExchange
        rabbitTemplate.convertAndSend(RabbitMQConfig.topicExchangeName, "foo.bar.baz", message);  
        //DirectExchange
        rabbitTemplate.convertAndSend(RabbitMQConfig.directExchangeName, "direct", message);  
        //FanoutExchange
        rabbitTemplate.convertAndSend(RabbitMQConfig.fanoutExchangeName, "", message);  

        //HeadersExchange
        Message messageWithHeader = MessageBuilder.withBody(message.getBytes())  
                .setHeader("headers", "h")  
                .build();  
        rabbitTemplate.convertAndSend(RabbitMQConfig.headersExchangeName, "", messageWithHeader);  
        receiver.getLatch().await(10000, TimeUnit.MILLISECONDS);  
    }  
}

전체적인 흐름을 정리해보면 다음과 같다.

  1. 메시지 전송(rabbitTemplate.convertAndSend())
  2. exchange 타입에 따라 exchange name 또는 routing key를 가지고 binding
  3. binding 후 일치하는 queue를 탐색
  4. 일치하는 queue에 메시지 전송
  5. MessageListenerContainer에 등록된 MessageListner(위 예시에서 Receiver)가 전송된 메시지를 받아 처리한다.

전체 코드는 https://github.com/JadeKim042386/SpringWebSocket/tree/main/MessageWithRabbitMQ 에서 확인할 수 있다.


Appendix

About queue size

  • 이용 가능한 모든 resource를 사용하기 때문에 Queue 개수는 내부적으로 제한되어있지 않다고 한다.
  • 모든 worker들이 바쁘게 돌아간다면 Queue가 가득 차버릴 수 있다. 따라서 계속 지켜보며 더 많은 worker를 추가하거나 다른 전략을 가져야할 것이다.

Fair dispatch vs Round-robin dispatching

  • 기본적으로 RabbitMQ는 consumer에게 message를 순차적으로 전송한다. 따라서 평균적으로 모든 consumer들은 같은 개수의 message를 얻게된다. 이런 분산 message 방식을 round-robin이라 부른다.
    • 예를 들어 두 개의 worker들이 있고 모든 홀수 message들은 무거우며 짝수 message들은 가볍다고하자. 그러면 한 worker는 항상 바쁘게 돌아가고 다른 하나는 거의 동작하지 않을 것이다.
  • Spring AMQP에서 default로 Fair dispatch가 설정되어있다. 그리고 DEFAULT_PREFETCH_COUNT는 250이다. (DEFAULT_PREFETCH_COUNT는 worker에게 한 번에 건네는 message 수)
  • 만약 DEFAULT_PREFETCH_COUNT를 1로 설정한다면 round robin 방식으로 작동한다.

[reference]

728x90