`
yimeng528
  • 浏览: 184246 次
  • 性别: Icon_minigender_1
  • 来自: 北京
社区版块
存档分类
最新评论

rabbitmq学习6:RPC

阅读更多

在《rabbitmq学习2:Work Queues 》中我们已经知道了在多个worker如何分配耗时的任务。如果我现在要在远程的机器上运行然后得到结果,那应当怎么做呢?那就要用到RPC(Remote Procedure Call or RPC )了!

   关于RPC的介绍请参考百度百科里的关于RPC的介绍:http://baike.baidu.com/view/32726.htm#sub32726

   现在来看看来看看Rabbitmq中RPC吧!RPC的工作示意图如下:


   上图中的C代表客户端,S表示服务器端;Rabbitmq中的RPC流程如下:

1、首先客户端发送一个reply_to和corrention_id的请求,发布到RPC队列中;

2、服务器端处理这个请求,并把处理结果发布到一个回调Queue,此Queue的名称应当与reply_to的名称一致

3、客户端从回调Queue中得到先前corrention_id设定的值的处理结果。如果碰到和先前不一样的corrention_id的值,将会忽略而不是抛出异常。

 

  对于上面所提到的回调Queue中的消费处理使用的是BasicProperties类;而消息 属性在AMQP的协议中规定有14个;而很多大部分我们没有用到。常用的几个属性有:

English代码  收藏代码
  1. Message properties  
  2. The AMQP protocol predefine a set of 14 properties that go with a message. Most of the properties are rarely used, with the exception of the following:  
  3.   
  4. delivery_mode: Marks a message as persistent (with a value of 2) or transient (any other value). You may remember this property from the second tutorial.   
  5. content_type: Used to describe the mime-type of the encoding. For example for the often used JSON encoding it is a good practice to set this property to: application/json.   
  6. reply_to: Commonly used to name a callback queue.   
  7. correlation_id: Useful to correlate RPC responses with requests.   

 

 delivery_mode : 标记消息是持久性消息还是瞬态信息。在前面的“Work Queue”中我们已经提到过;   

  content_type : 用来描述MIME的类型。如把其类型设定为JSON;

  reply_to : 用于命名一个回调Queue;

  correlation_id : 用于与相关联的请求的RPC响应.

  现在我们就开始RPC的程序吧!

client的代码如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import java.util.UUID;  
  4.   
  5. import com.rabbitmq.client.AMQP.BasicProperties;  
  6. import com.rabbitmq.client.Channel;  
  7. import com.rabbitmq.client.Connection;  
  8. import com.rabbitmq.client.ConnectionFactory;  
  9. import com.rabbitmq.client.QueueingConsumer;  
  10.   
  11. public class RPCClient {  
  12.     private Connection connection;  
  13.     private Channel channel;  
  14.     private String requestQueueName = "rpc_queue";  
  15.     private String replyQueueName;  
  16.     private QueueingConsumer consumer;  
  17.   
  18.     public RPCClient() throws Exception {  
  19.         ConnectionFactory factory = new ConnectionFactory();  
  20.         factory.setHost("localhost");  
  21.         connection = factory.newConnection();  
  22.         channel = connection.createChannel();  
  23.   
  24.         replyQueueName = channel.queueDeclare().getQueue();  
  25.         consumer = new QueueingConsumer(channel);  
  26.         channel.basicConsume(replyQueueName, true, consumer);  
  27.     }  
  28.   
  29.     public String call(String message) throws Exception {  
  30.         String response = null;  
  31.         String corrId = UUID.randomUUID().toString();  
  32.   
  33.         BasicProperties props = new BasicProperties();  
  34.         props.setReplyTo(replyQueueName);  
  35.         props.setCorrelationId(corrId);  
  36.   
  37.         channel.basicPublish("", requestQueueName, props, message.getBytes());  
  38.   
  39.         while (true) {  
  40.             QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  41.             if (delivery.getProperties().getCorrelationId().equals(corrId)) {  
  42.                 response = new String(delivery.getBody(), "UTF-8");  
  43.                 break;  
  44.             }  
  45.         }  
  46.   
  47.         return response;  
  48.     }  
  49.   
  50.     public void close() throws Exception {  
  51.         connection.close();  
  52.     }  
  53.   
  54.     public static void main(String[] argv) {  
  55.         RPCClient fibonacciRpc = null;  
  56.         String response = null;  
  57.         try {  
  58.             fibonacciRpc = new RPCClient();  
  59.   
  60.             System.out.println(" [x] Requesting fib(30)");  
  61.             response = fibonacciRpc.call("30");  
  62.             System.out.println(" [.] Got '" + response + "'");  
  63.             System.out.println(" [x] Requesting fib(-1)");  
  64.             response = fibonacciRpc.call("-1");  
  65.             System.out.println(" [.] Got '" + response + "'");  
  66.             System.out.println(" [x] Requesting fib(a)");  
  67.             response = fibonacciRpc.call("a");  
  68.             System.out.println(" [.] Got '" + response + "'");  
  69.         } catch (Exception e) {  
  70.             e.printStackTrace();  
  71.         } finally {  
  72.             if (fibonacciRpc != null) {  
  73.                 try {  
  74.                     fibonacciRpc.close();  
  75.                 } catch (Exception ignore) {  
  76.                 }  
  77.             }  
  78.         }  
  79.     }  
  80. }  

 

 server的代码如下:

Java代码  收藏代码
  1. package com.abin.rabbitmq;  
  2.   
  3. import com.rabbitmq.client.AMQP.BasicProperties;  
  4. import com.rabbitmq.client.Channel;  
  5. import com.rabbitmq.client.Connection;  
  6. import com.rabbitmq.client.ConnectionFactory;  
  7. import com.rabbitmq.client.QueueingConsumer;  
  8.   
  9. public class RPCServer {  
  10.     private static final String RPC_QUEUE_NAME = "rpc_queue";  
  11.   
  12.     private static int fib(int n) {  
  13.         if (n > 1)  
  14.             return fib(n - 1) + fib(n - 2);  
  15.         else  
  16.             return n;  
  17.     }  
  18.   
  19.     public static void main(String[] argv) {  
  20.         Connection connection = null;  
  21.         Channel channel = null;  
  22.         try {  
  23.             ConnectionFactory factory = new ConnectionFactory();  
  24.             factory.setHost("localhost");  
  25.   
  26.             connection = factory.newConnection();  
  27.             channel = connection.createChannel();  
  28.   
  29.             channel.queueDeclare(RPC_QUEUE_NAME, falsefalsefalsenull);  
  30.   
  31.             channel.basicQos(1);  
  32.   
  33.             QueueingConsumer consumer = new QueueingConsumer(channel);  
  34.             channel.basicConsume(RPC_QUEUE_NAME, false, consumer);  
  35.   
  36.             System.out.println(" [x] Awaiting RPC requests");  
  37.   
  38.             while (true) {  
  39.                 String response = null;  
  40.   
  41.                 QueueingConsumer.Delivery delivery = consumer.nextDelivery();  
  42.   
  43.                 BasicProperties props = delivery.getProperties();  
  44.                 BasicProperties replyProps = new BasicProperties();  
  45.                 replyProps.setCorrelationId(props.getCorrelationId());  
  46.   
  47.                 try {  
  48.                     String message = new String(delivery.getBody(), "UTF-8");  
  49.                     int n = Integer.parseInt(message);  
  50.   
  51.                     System.out.println(" [.] fib(" + message + ")");  
  52.                     response = "" + fib(n);  
  53.                 } catch (Exception e) {  
  54.                     System.out.println(" [.] " + e.toString());  
  55.                     response = "";  
  56.                 } finally {  
  57.                     channel.basicPublish("", props.getReplyTo(), replyProps,  
  58.                             response.getBytes("UTF-8"));  
  59.   
  60.                     channel.basicAck(delivery.getEnvelope().getDeliveryTag(),  
  61.                             false);  
  62.                 }  
  63.             }  
  64.         } catch (Exception e) {  
  65.             e.printStackTrace();  
  66.         } finally {  
  67.             if (connection != null) {  
  68.                 try {  
  69.                     connection.close();  
  70.                 } catch (Exception ignore) {  
  71.                 }  
  72.             }  
  73.         }  
  74.     }  
  75. }  

 

先运行服务器端,运行结果如下:

Java代码  收藏代码
  1. [x] Awaiting RPC requests  

   再运行运行客户端,运行结果如下:

Java代码  收藏代码
  1. [x] Requesting fib(30)  
  2. [.] Got '832040'  
  3. [x] Requesting fib(-1)  
  4. [.] Got '-1'  
  5. [x] Requesting fib(a)  
  6. [.] Got ''  

   在服务器还可以出现:

Java代码  收藏代码
  1. [.] fib(30)  
  2. [.] fib(-1)  
  3. [.] java.lang.NumberFormatException: For input string: "a"  

 

 

 

分享到:
评论

相关推荐

    java队列源码-rabbitmq-repository:RabbitMQ消息队列学习的源码记录

    rabbitmq-java-rpc rabbitmq PRC通信示例 rabbitmq-spring-helloworld spring boot 使用rabbitmq的第一个demo rabbitmq-spring-work-queue spring boot使用rabbitmq的队列示例 rabbitmq-spring-fanout spring boot...

    使用RabbitMQ实现RPC

    作为中间件实现的RPC模式,希望对您的学习有所帮助。RabbitMQ RabbitMQ是基于AMQP协议实现的一个消息队列(MessageQueue),Message Queue是一个典型的生产者/消费者模式。生产者发布消息,消费者消费消息,生产者和...

    rabbitmq-demo.rar

    RabbitMQ原生最详细demo,深度理解RabbitMQ原理,...RabbitMQ提供了6种消息模型,但是第6种其实是RPC,并不是MQ,因此不予学习。那么也就剩下5种。 但是其实3、4、5这三种都属于订阅模型,只不过进行路由的方式不同

    JAVA核心知识点全集

    第一章:jvm、第二章:java集合、第三章:java多线程并发、第四章:java基础、第五章:Spring原理、第六章:微服务、第七章:Netty 与 RPC、第八章:网络、第九章:日志、第十章:Zookeeper、第十一章:Kafka、第十...

    基于maven的springboot项目自动化搭建依赖包+源代码+文档说明

    ### 6.kylin-redis 对redis进行自动化配置 ### 7.kylin-rpc 包含三个子模块 1. kylin-rpc-protocol rpc(hessian)调用协议 2. kylin-rpc-client rpc(hessian)调用客户端依赖包 3. kylin-rpc-client rpc(hessian)...

    IT|Java|面试|宝典

    多线程并发 4.java基础 5.spring原理 6.微服务 7.netty与rpc 及实现 8.网络 9.zookeeper 10.kafka 11.rabbitmq 12.hbase 13.mongodb 14.cassandra 15.设计模式 16.负载均衡 17.数据库 18.一致性算法 19.java算法 20....

    java后端学习笔记

    activeMq,rabbitMq,activity工作流,docker,dubbo,netty,rpc,springcloud,zookeeper学习笔记

    JAVA核心知识点整理[微信公众号:bugstack虫洞栈]=>推荐.pdf

    jvm、java集合、java多线程并发、java基础、spring原理、微服务、netty与rpc、网络、日志、zookeeper、kafka、rabbitmq、hbase、mongodb、cassandra、设计模式、负载均衡、数据库、一致性算法、java算法、数据结构、...

    JAVA面试很全的一个资料,不过仅针对面试哦,日常学习不合适。内容以问答形式。

    包含JVM、JAVA集合、多线程并发、JAVA基础、Spring原理、微服务、Netty与RPC、网络、日志、Zookeeper、Kafka、RabbitMQ、HBASE、MongoDB、Cassandra、设计模式、负载均衡、数据库、加密算法、分布式缓存、Hadoop、...

    Java 技术文档整理汇总大全

    6.Spring 原理 7.微服务 8.Netty 与RPC 9.网络10.日志 11.Zookeeper 12.Kafka 13.RabbitMQ 14.Hbase 15.MongoDB 16.Cassandra 17.设计模式 18.负载均衡 19.数据库 20.一致性算法 21.JAVA算法 22.数据结构 23.加密...

    JAVA核心面试知识整理

    6. SPRING 原理 7. 微服务 8. NETTY 与 RPC 9. 网络 10. 日志 11. ZOOKEEPER 12. KAFKA 13. RABBITMQ 14. HBASE 15. MONGODB 16. CASSANDRA 17. 设计模式 18. 负载均衡 19. 数据库 20. 一致性算法 21. JAVA 算法 22....

    Java核心知识点.PDF

    6.Spring原理 7.微服务 8.Netty与RPC 9.网络 10.日志 11.Zookeeper 12.Kafka 13.RabbitMQ 14.Hbase 15.MongoDB 16.Cassandra 17.设计模式 18.负裁均衡 19.数据库 20.一致性算法 21.JAVA算法 22.数据结构 23.加密算法...

    java面试笔试资料包括JAVA基础核心知识点深度学习Spring面试题等资料合集.zip

    RPC (Remote Procedure Call)即远程过程调用.doc Spring 面试问题 TOP 50(干货推荐收藏必备).doc springboot常见面试题.doc svn和git的区别及适用场景.doc ZooKeeper.doc 为什么分布式一定要有Redis.doc 分布式、...

    280多页超详细的JAVA核心知识点整理 包含了JVM、java集合、多线程并发、java基础、spring生态原理、微服务等等

    包含了JVM、java集合、多线程并发、java基础、spring生态原理、微服务、设计模式、NETTY与RPC、网络、日志、ZOOKEEPER、KAFKA、RABBITMQ、HBASE、MONGDB、CASSANDRA、负载均衡、数据库、一致性算法、java算法、数据...

    【白雪红叶】JAVA学习技术栈梳理思维导图.xmind

    RabbitMQ ActiveMQ 常用开源框架 Spring Spring MVC Spring WebFlow spring tx aop ioc Struts ibatis Mybatis CAS Dubbo 工作能力 软实力 应急能力 创新能力 管理能力 分享能力 学习能力 ...

    最新Python3.5零基础+高级+完整项目(28周全)培训视频学习资料

    RabbitMQ rpc实现 Redis hash操作 Redis 集合set 和有序集合操作 Redis 集合操作补充 Redis 发布订阅及本节作业 第12周 上节回顾 数据库介绍 mysql基本使用 mysql数据类型与创建表 mysql 增删改查 mysql 外键关联 ...

    JAVA面试题及知识点整理

    Spring原理、微服务架构、Netty与RPC、网络相关、日志相关、Zookeeper、Kafka、RabbitMQ、Hbase、MongoDB、Cassandra、设计模式、负载均衡相关、数据库理论、一致性算法讲解、JAVA基础算法、数据结构、加密算法、...

    一体式终端app.iml

    RabbitMQ的基本用法,有异步请求服务还有同步请求服务、RPC模式,各种请求服务的方法都有规范,用于新手学习也可以用于回顾MQ的知识

    Java思维导图xmind文件+导出图片

    分布式架构 漫谈分布式架构 初识分布式架构与意义 如何把应用从单机扩展到分布式 大型分布式架构演进过程 分布式架构设计 主流架构模型-SOA架构和微服务架构 领域驱动设计及业务驱动... 手写实现多协议RPC框架

    leetcode分类-learning-golang::memo:Go学习之路

    leetcode 分类 Learning Golang 知识清单 基础 变量 常量 类型 函数 包 数组 切片 指针 结构 方法 接口 ...RabbitMQ Apache Kafka 消息总线 Message-Bus 框架 RPC 区块链 目录文档说明 consensus --

Global site tag (gtag.js) - Google Analytics