在前面的Work Queue中的消息是均匀分配消息给消费者;如果我想把消息分发给所有的消费者呢?那应当怎么操作呢?这就是要下面提到的Publish/Subscribe(分布/订阅)。让我们开始Publish/Subscribe之旅吧!
Publish/Subscribe的工作示意图如下:
在上图中的X表示Exchange(交换区);Exchange的类型有:direct , topic , headers 和 fanout
Publish/Subscribe的Exchang的类型为fanout;声明Publish/Subscribe的Exchang代码如下:
- channel.exchangeDeclare("logs", "fanout");
对于Work Queue中提到的发布消息的代码如下:
- channel.basicPublish("", queueName, null, message.getBytes());
但对于Publish/Subscribe中发布消息中的Queue的使用的是默认的;代码如下:
- channel.basicPublish( "logs", "", null, message.getBytes());
Exchange和各Queue之间是如何通信的呢?主要是通过把Exchange和各Queue绑定(binding);示意代码如下:
- channel.queueBind(queueName, exchangeName, "");
Publish/Subscribe加入绑定的工作示意图如下:
那我们就开始程序代码吧:P端的代码如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- public class EmitLog {
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");//声明Exchange
- for (int i = 0; i <= 2; i++) {
- String message = "hello word!" + i;
- channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());
- System.out.println(" [x] Sent '" + message + "'");
- }
- channel.close();
- connection.close();
- }
- }
运行结果如下:
- [x] Sent 'hello word!0'
- [x] Sent 'hello word!1'
- [x] Sent 'hello word!2'
C端的代码如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class ReceiveLogsOne {
- private static final String EXCHANGE_NAME = "logs";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
- String queueName = "log-fb1";
- channel.queueDeclare(queueName, false, false, false, null);
- channel.queueBind(queueName, EXCHANGE_NAME, "");//把Queue、Exchange绑定
- QueueingConsumer consumer = new QueueingConsumer(channel);
- channel.basicConsume(queueName, true, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- }
- }
- }
对于C端的代码我写了二个差不多的程序,只需要修改一下queueName。这样就形成了二个Queue;运行结果相同;
运行结果可能如下:
- [x] Received 'hello word!0'
- [x] Received 'hello word!1'
- [x] Received 'hello word!2'
相关推荐
在本教程的前面部分,我们发送和接收到队列中的消息,现在是时候在 RabbitMQ 中引入完整的消息传递模式了。 让我们快速回顾一下之前了解的内容: 生产者(producer):发送消息的程序 队列(queue):存储消息的缓冲...
文章目录rabbitmq7种实现方式搭建maven项目引入依赖创建连接简单队列消息生产者消息消费者work queues 工作队列生产者消费者能者多劳(公平分发):消费能力强则消费更多消息Publish/Subscribe 发布订阅模式生产者...
本例为.net下使用EasyNetQ操作RabbitMQ的Demo,例子采用MVC架构,包含完整的Publish/Subscribe。详情请看:https://www.cnblogs.com/imstrive/p/11078335.html
3. 发布/订阅模式(Publish/Subscribe Mode): - 发布/订阅模式用于将消息广播到多个消费者。每个消费者都有自己的队列,并且订阅相同的交换机。生产者发送消息到交换机,然后交换机将消息广播到所有与之绑定的...
rabbitMq简单介绍还有下载,安装及配置,包括一些队列案例(Simble简单队列,.work queues 工作队列 公平分发 轮询分发,订阅模式 publish/subscribe,routing路由模式,Topic 主题模式,rabbitMq的消息确认机制)
ps publish/subscribe 发布订阅模式 routing topic 消息确认机制 基于AMQP事务实现的消息确认 同步confirm确认 异步confirm消息确认 延迟消息队列 基于插件rabbitmq_delayed_message_exchange去实现 后续待补充...
一个简单的laravel的Rabbitmq库,基于Publish-Subscribe模式,其中订阅者是Consumer。 目录 2.1。 2.2。 2.3。 2.4。 2.5. 3.1。 3.2。 3.3。 可用的 CLI 命令 3.4. 自定义消息处理器 贡献 1.安装 跑步: ...
rabbit的各种工作模式的demo,包括work queues模式,publish/Subscribe模式,Routing模式等
MQTT 全称(Message Queue Telemetry Transport):一种基于发布/订阅(publish/subscribe)模式的轻量级通讯协议,通过订阅相应的主题来获取消息,是物联网(Internet of Thing)中的一个标准传输协议。 该协议将...
兔子MQ RabbitMQ实践包括6种做法,例如01:Hello World 02:Work Queues 03:Publish Subscribe 04:Routing 05:Topics 06:RPC
网络视频资源,如有侵权请留言/举报,资源过大上传乃是...7、Publish_Subscribe发布订阅模式3 d) y5 t6 A1 R 8、route 路由模式1 W. q/ p. b0 C 9、topic模式 10、整合spring方式-1 11、整合sring方式-2 12、消息持久化
rabbitMQ官方Demo,包含01HelloWorld,02WorkQueues,03PublishSubscribe,04Routing,05Topics,06RPC六个示例
请在此处创建功能请求或错误:)用法阅读./docs文件夹中的文档能力 连接(CONNECT / CONACK) 平(PINGREQ / PINGRESP) 发布(发布/回传) 订阅(SUBSCRIBE / SUBACK) 退订 QoS 0消息 QoS 1消息 QoS 2消息 保留在...
easy_bunny_rpc 通用RPC客户端/工人库,用于处理基于兔子的数据序列... subscribe do | payload | publish_success ( payload ) # Send a success message to the client # publish_failure(payload) # Send a failu
publish/subscribe communication styles. Chapter 12, Securing an API, will describe varius ways of securing your microservices. We will implement a system consisting of all previously introduced ...
The publish/subscribe model Running a sample system Scaling and grouping Running multiple instances Consumer groups Partitioning Configuration options Spring Cloud Stream properties Binding properties...
目前支持redis和amqp ,例如 RabbitMQ。目录渠道() 关闭() 渠道活动发布() 订阅活动关闭() 免责声明执照贡献例子 用法 发布 var queue = require ( 'message-queue' ) ( 'redis' ) ;var pub = queue . ...
使用RabbitMQ为Laravel发布事件 Nuwber的事件提供了一个简单的观察器实现,使您可以侦听当前应用程序和另一个应用程序中发生的各种事件。 例如,如果您需要对从另一个API发布的某个事件做出React。 不要将此软件包...