RabbitMQ通信模型之路由模型

频道:行业资讯 日期: 浏览:985
大家好,我是了不起。

今天了不起带领大家接着学习RabbitMQ,了解RabbitMQ的五大通信模型之一的路由模型;接下来还会有关于RabbitMQ的系列教程,对你有帮助的话记得关注哦~

往期传送门

??RabbitMQ(一)hello world??

??RabbitMQ(二)通信模型之work模型??

??RabbitMQ(三)通信模型之发布订阅模型??

路由模型

RabbitMQ 提供了五种不同的通信模型,上一篇文章中,简单的介绍了一下RabbitMQ的发布订阅模型模型。这篇文章来学习一下RabbitMQ中的路由模型(direct)。

路由模型(direct):路由模式相当于是分布订阅模式的升级版,多了一个 路由key来约束队列与交换机的绑定。

在路由模型中,生产者将消息发送到交换机,交换机根据消息的路由键将消息转发到对应的队列中。每个队列可以绑定多个路由键,每个路由键可以绑定到多个队列中。消费者从队列中接收消息并处理。当一个路由键被多个队列绑定时,交换机会将消息发送到所有绑定的队列中。当一个队列绑定多个路由键时,该队列将能够接收到所有路由键对应的消息。

适用场景

路由模型适用于需要点对点通信的场景,例如:

系统监控告警通知;

任务分发;

用户私信系统;

订单确认通知等。

演示

生产者

复制

// 生产者

public class Producer {

private static final String EXCHANGE_NAME = "exchange_direct_1";

// 定义路由的key,key值是可以随意定义的

private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";

private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

for (int i = 0; i < 100; i++) {

if (i % 2 == 0) {

channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1, null, ("路由模型发送的第 " + i + " 条信息").getBytes());

} else {

channel.basicPublish(EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2, null, ("路由模型发送的第 " + i + " 条信息").getBytes());

}

}

channel.close();

connection.close();

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

22.

消费者

复制

// 消费者1

public class Consumer {

private static final String QUEUE_NAME = "queue_direct_1";

private static final String EXCHANGE_NAME = "exchange_direct_1";

private static final String EXCHANGE_ROUTING_KEY1 = "direct_km1";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY1);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("消费者1接收到的消息是:" + new String(body));

}

};

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

复制

// 消费者2

public class Consumer2 {

private static final String QUEUE_NAME = "queue_direct_2";

private static final String EXCHANGE_NAME = "exchange_direct_1";

private static final String EXCHANGE_ROUTING_KEY2 = "direct_km2";

public static void main(String[] args) throws IOException, TimeoutException {

Connection connection = ConnectionUtils.getConnection();

Channel channel = connection.createChannel();

channel.queueDeclare(QUEUE_NAME, false, false, false, null);

channel.exchangeDeclare(EXCHANGE_NAME, "direct");

channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, EXCHANGE_ROUTING_KEY2);

DefaultConsumer defaultConsumer = new DefaultConsumer(channel) {

@Override

public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {

System.out.println("消费者2接收到的消息是:" + new String(body));

}

};

channel.basicConsume(QUEUE_NAME, true, defaultConsumer);

}

}

1.

2.

3.

4.

5.

6.

7.

8.

9.

10.

11.

12.

13.

14.

15.

16.

17.

18.

19.

20.

21.

测试

先启动2个消费者,再启动生产者

可以得到结果是消费者1得到了序号是偶数的消息

消费者2得到了序号是奇数的消息

小结

本文介绍了 RabbitMQ 通信模型中的路由模型的使用,通过交换机和路由键实现点对点通信,适合于需要点对点通信的场景。在实际使用过程中,需要注意以下几点:

路由键必须要与消费者绑定队列时的路由键相同,否则无法接收到消息;

可以通过多个交换机和路由键来实现更灵活的消息路由。

后续了不起还会继续更新RabbitMQ的系列文章,感兴趣的小伙伴持续关注哦~~~

0 留言

评论

◎欢迎参与讨论,请在这里发表您的看法、交流您的观点。