在前面的已经提到了一对一的情况;现在一个生产者与多个消费者的情况(Work Queues)。
Work Queues的示意图如下:
对于上图的模型中对于c端的worker来说。RabbitMQ服务器可能一直发送多个消息给一个worker,而另一个可能几乎不做任何事情。这样就会导致一个worker很忙,而另一个却很空闲。这种情况可能都不想出现。如何解决这个问题呢。当然最理想的情况是均匀分配消息给每个worker。我们可能通过channel . basicQos(1)方法(prefetchCount = 1 )来设置同一时间每次发给一个消息给一个worker。示意图如下:
P端的程序如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.MessageProperties;
- public class NewTask {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明此队列并且持久化
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- String message = getMessage(argv);
- channel.basicPublish("", TASK_QUEUE_NAME,
- MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());//持久化消息
- System.out.println(" [x] Sent '" + message + "'");
- channel.close();
- connection.close();
- }
- private static String getMessage(String[] strings) {
- if (strings.length < 1)
- return "Hello World!";
- return joinStrings(strings, " ");
- }
- private static String joinStrings(String[] strings, String delimiter) {
- int length = strings.length;
- if (length == 0)
- return "";
- StringBuilder words = new StringBuilder(strings[0]);
- for (int i = 1; i < length; i++) {
- words.append(delimiter).append(strings[i]);
- }
- return words.toString();
- }
- }
多次运行此程序并传入的参数分别为“First message ”,“Secondmessage ”,“Third message ”,“Fourth message ”,“Fifth message ”
C端的程序如下:
- package com.abin.rabbitmq;
- import com.rabbitmq.client.Channel;
- import com.rabbitmq.client.Connection;
- import com.rabbitmq.client.ConnectionFactory;
- import com.rabbitmq.client.QueueingConsumer;
- public class Worker {
- private static final String TASK_QUEUE_NAME = "task_queue";
- public static void main(String[] argv) throws Exception {
- ConnectionFactory factory = new ConnectionFactory();
- factory.setHost("localhost");
- Connection connection = factory.newConnection();
- Channel channel = connection.createChannel();
- //声明此队列并且持久化
- channel.queueDeclare(TASK_QUEUE_NAME, true, false, false, null);
- System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
- channel.basicQos(1);//告诉RabbitMQ同一时间给一个消息给消费者
- /* We're about to tell the server to deliver us the messages from the queue.
- * Since it will push us messages asynchronously,
- * we provide a callback in the form of an object that will buffer the messages
- * until we're ready to use them. That is what QueueingConsumer does.*/
- QueueingConsumer consumer = new QueueingConsumer(channel);
- /*
- 把名字为TASK_QUEUE_NAME的Channel的值回调给QueueingConsumer,即使一个worker在处理消息的过程中停止了,这个消息也不会失效
- */
- channel.basicConsume(TASK_QUEUE_NAME, false, consumer);
- while (true) {
- QueueingConsumer.Delivery delivery = consumer.nextDelivery();//得到消息传输信息
- String message = new String(delivery.getBody());
- System.out.println(" [x] Received '" + message + "'");
- doWork(message);
- System.out.println(" [x] Done");
- channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);//下一个消息
- }
- }
- private static void doWork(String task) throws InterruptedException {
- for (char ch : task.toCharArray()) {
- if (ch == '.')
- Thread.sleep(1000);//这里是假装我们很忙
- }
- }
- }
开启两个worker分别运行。运行结果如:
c1的结果:
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'First message'
- [x] Received 'Third message'
- [x] Received 'Fifth message'
c2的结果
- [*] Waiting for messages. To exit press CTRL+C
- [x] Received 'Second message'
- [x] Received 'Fourth message'
相关推荐
RabbitMQ学习笔记:Connections、Channels、Queues之state状态连接、信道、队列状态如下:GitHub地址:https://gi
rabbitmq-3.10.6:management
RabbitMQ分发方式:主题模式思维脑图
rabbitmq延迟插件:rabbitmq_delayed_message_exchange-20171215-3.6.x.ez
NULL 博文链接:https://wubin850219.iteye.com/blog/1076093
适用于RabbitMQ的Docker 描述 该存储库使使用docker构建RabbitMQ变得简单。 先决条件 快速开始 docker-compose up -d 设置 步骤1:将节点添加到您docker-compose.yml ... 步骤2:在端口中添加默认端口
NULL 博文链接:https://wubin850219.iteye.com/blog/1050328
RabbitMQ实战: 高效部署分布式消息队列,高质量文档分享,请珍惜!
RabbitMQ
The default behaviour for RabbitMQ when a maximum queue length or size is set an
rabbitmq安装目录 work_home: /data/test erlang_rpm安装文件名(放在files文件夹内) erlang_rpm: erlang-20.3.7-1.el7.centos.x86_64.rpm rabbitmq安装文件名(放在files文件夹内) rabbitmq_rpm: rabbitmq-server...
RabbitMQ示例2:工作原理 RabbitMQ示例3:发布与订阅【fanout转换器】 RabbitMQ示例4:路由【直接交换机】 RabbitMQ示例5:主题【topic切换】 RabbitMQ示例6:远程过程调用RPC Pom.xml <? xml version = " ...
kubernetes-rabbitmq-cluster:适用于kubernetes的可部署的Rabbitmq集群
【课程目录】:---第一章:RabbitMQ介绍----1-什么是消息中间件.mp4----2-RabbitMQ消息队列安装:window环境.mp4----3-RabbitMQ消息队列安装 :Linux环境.mp4----4-Rabbitmq入口示例:server.mp4----5-rabbitmq入口...
Work Queues与入门程序的简单模式相比,多了一个或一些消费端,多个消费端共同消费同一个队列 中的消息。 应用场景:对于任务过重或任务较多情况使用工作队列可以提高任务处理的速度。
rabbitmq_demo:学习rabbitmq
rabbitmq-test:rabbitmq测试
rabbitmq实例 包括: 基于java的websocket消息推送,以及spring boot集成方式的消息推送 基于html5的websocke协议实现,html直接与rabbitmq建立链接,进行消息推送和接收 ...技术学习交流:635278789