RabbitMQ学习整理————基于RabbitMQ实现RPC

基于RabbitMQ实现RPC

  • 前言
  • 什么是RPC
  • RabbitMQ如何实现RPC
  • RPC简单示例
  • 通过Spring AMQP实现RPC

前言

这边参考了RabbitMQ的官网,想整理一篇关于RabbitMQ实现RPC调用的博客,打算把两种实现RPC调用的都整理一下,一个是使用官方提供的一个Java client,还有一个是Spring AMQP的整合使用。
代码路径:https://github.com/yzh19961031/blogDemo/tree/master/rabbitmq

什么是RPC

RPC是远程过程调用(Remote Procedure Call)的缩写形式,简单说就是一个节点去请求另一个节点上面的服务并获得响应结果。
我们之前总结的工作模式都是发送消息到指定的队列,再由相关的消费者进行消费,如果存在这样的场景,比如消费者消费完消息需给生产者一个具体的响应,然后生产者再根据这个响应进行其他的业务逻辑,这样就需要使用到RabbitMQ提供的RPC能力。

RabbitMQ如何实现RPC

官方有很详细的介绍文档,这边贴一下地址:https://www.rabbitmq.com/tutorials/tutorial-six-java.html
RabbitMQ实现RPC很简单,正常的流程就是请求以及响应,我们只需要在请求的消息的属性里面添加一个响应队列的地址,这边需要使用到一个BasicProperties这个类。具体配置如下:

// 指定一个回调队列
callbackQueueName = channel.queueDeclare().getQueue();
// 设置replyTo的属性为指定的回调队列
BasicProperties props = new BasicProperties.Builder().replyTo(callbackQueueName).build();channel.basicPublish("", "rpc_queue", props, message.getBytes());

BasicProperties这个类中提供了很多的属性,有14个,很多基本上很少用到,常用的就是几个,我这边也贴一下,其实在我上一篇文章中基于RabbitMQ实现的一个RPC工具里面都有用到这些属性。

  1. contentType 这个属性用来表明消息的类型,默认是"application/octet-stream"这种流的类型,还有常用的比如"application/json","text/plain"等,这些在我的RPC工具里面都有用到。
  2. replyTo 这个就是上面指定的回调队列。
  3. correlationId 这个id可以用来进行消息的确认,将相应与请求相关联。主要是可以确认服务端收到的消息是不是指定客户端发过来的,用于确认。

首先先贴一张官方提供的图,这个是RabbitMQ实现RPC的主要工作流程:
在这里插入图片描述
实现RPC的具体工作流程:

  1. 首先客户端发送一个请求消息,这个请求消息里面有两个属性,一个是replyTo回调队列的地址,一个是correlationId用于标识当前消息唯一的id信息。
  2. 这个消息是发送到指定的rpc_queue这个队列上面。
  3. 对应我们的服务端Server就会等待rpc_queue上面的请求消息,当请求消息来得时候,服务端会进行处理,处理完成会将相应的消息再发送到请求消息属性中的replyTo回调的队列上面。
  4. 客户端发送消息之后,会等待replyTo队列中的消息。当有消息来得时候,会检查响应消息中correlationId属性和请求消息中correlationId是否一致,完成一次PRC调用。

RPC简单示例

我这边根据官网上面提供的例子简单修改整理了一下,这边提供一个大小写转换的功能,就是客户端发送一段小写的字符串,服务端将字符串转为大写再响应过来。详细逻辑可以看下代码中注释,具体代码如下:
首先服务端:

/*** RPC服务端** @author yuanzhihao* @since 2020/11/21*/
public class RPCServer {public static void main(String[] args) throws IOException, TimeoutException {// 首先还是正常获得connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel();// 定义一个rpc的队列String queueName = "test_rpc";channel.queueDeclare(queueName, false, false, false, null);Object monitor = new Object();// 具体的消费代码里面实现DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 消费者将请求消息中的correlationId信息再作为响应传回replyTo队列AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder().correlationId(delivery.getProperties().getCorrelationId()).build();String response = "";try {// 提供一个大小写转换的方法String message = new String(delivery.getBody(), StandardCharsets.UTF_8);System.out.println("toUpperCase(" + message + ")");response = toUpperCase(message);} catch (RuntimeException e) {System.out.println(e.toString());} finally {// 将响应传回replyTo队列channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes(StandardCharsets.UTF_8));// 设置了手动应答 需要手动确认消息channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);// 执行完成会释放主线程的锁// RabbitMq consumer worker thread notifies the RPC server owner threadsynchronized (monitor) {monitor.notify();}}};// 监听"test_rpc"队列channel.basicConsume(queueName, false, deliverCallback, (consumerTag -> { }));// 这个锁对象是确保我们server的调用逻辑执行完成 首先挂起主线程// Wait and be prepared to consume the message from RPC client.while (true) {synchronized (monitor) {try {monitor.wait();} catch (InterruptedException e) {e.printStackTrace();}}}}// 提供一个大小写转换的方法private static String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端:

/*** RPC客户端** @author yuanzhihao* @since 2020/11/21*/public class RPCClient {public static void main(String[] args) throws IOException, TimeoutException, InterruptedException {// 创建connection以及channel对象ConnectionFactory connectionFactory = new ConnectionFactory();connectionFactory.setHost("192.168.1.108");connectionFactory.setPort(5672);connectionFactory.setVirtualHost("/");try ( Connection connection = connectionFactory.newConnection();Channel channel = connection.createChannel()) {// 声明一个队列String queueName = "test_rpc";// 请求消息中需要带一个唯一标识ID String corrId = UUID.randomUUID().toString();// 声明一个回调队列String replayQueueName = channel.queueDeclare().getQueue();// 将correlationId以及回调队列设置在消息的属性中AMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().correlationId(corrId).replyTo(replayQueueName).build();// 具体消息内容String msg = "hello rpc";// 发送请求消息channel.basicPublish("",queueName,properties,msg.getBytes());// 设置一个阻塞队列  等待服务端的响应final BlockingQueue<String> response = new ArrayBlockingQueue<>(1);String ctag = channel.basicConsume(replayQueueName, true, (consumerTag, message) -> {// 注意 这边根据correlationId进行下判断if (message.getProperties().getCorrelationId().equals(corrId)) {response.offer(new String(message.getBody(), StandardCharsets.UTF_8));}}, consumerTag -> {});// 获取响应结果String take = response.take();System.out.println("rpc result is "+ take);channel.basicCancel(ctag);}}
}

执行代码,具体的客户端与服务端运行结果在这里插入图片描述 在这里插入图片描述

通过Spring AMQP实现RPC

通过Spring来实现RPC也很简单,主要通过spring提供的一个RabbitTemplate对象中sendAndReceive方法来实现,这个方法是发送消息然后一直等待响应。监听器里面实现的和之前的逻辑大致相同,都需要将response响应消息发送到对应的replyTo回调队列上。下面直接贴一下代码。
首先是服务端,我这边直接是使用配置类的形式,具体一些的配置项可以参考下我之前的那篇博客或者上网搜一下~

/*** 主配置类** @author yuanzhihao* @since 2021/1/9*/
@Configuration
public class RabbitMQConfig {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);// 注入connectionFactory对象@Beanpublic ConnectionFactory connectionFactory() {CachingConnectionFactory connectionFactory = new CachingConnectionFactory();connectionFactory.setAddresses("192.168.1.108:5672");connectionFactory.setUsername("guest");connectionFactory.setPassword("guest");connectionFactory.setVirtualHost("/");return connectionFactory;}// 声明队列@Beanpublic Queue rpcQueue() {return new Queue("test_rpc",false);}@Beanpublic RabbitTemplate rabbitTemplate() {return new RabbitTemplate(connectionFactory());}// 创建初始化RabbitAdmin对象@Beanpublic RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);// 只有设置为 true,spring 才会加载 RabbitAdmin 这个类rabbitAdmin.setAutoStartup(true);return rabbitAdmin;}// 消息监听器@Beanpublic SimpleMessageListenerContainer simpleMessageListenerContainer(RabbitTemplate rabbitTemplate) {SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory());// 监听的队列container.setQueues(rpcQueue());MessageListener messageListener = message -> {String receiveMsg = new String(message.getBody(), StandardCharsets.UTF_8);log.info("Receive a message message is {}", receiveMsg);// 执行对应逻辑String responseMsg = toUpperCase(receiveMsg);MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(message.getMessageProperties().getCorrelationId()).build();// 响应消息 这边就是如果没有绑定交换机和队列的话 消息应该直接传到对应的队列上面rabbitTemplate.send("", message.getMessageProperties().getReplyTo(), new Message(responseMsg.getBytes(StandardCharsets.UTF_8), messageProperties));};// 设置监听器container.setMessageListener(messageListener);return container;}// 提供一个大小写转换的方法private String toUpperCase(String msg) {return msg.toUpperCase();}
}

客户端我采用test单元测试的形式

/*** spring amqp rpc 测试类** @author yuanzhihao* @since 2021/1/9*/
@ContextConfiguration(classes = {RabbitMQConfig.class})
@RunWith(SpringRunner.class)
public class RabbitMQRpcTest {private static final Logger log = LoggerFactory.getLogger(RabbitMQConfig.class);@Autowiredprivate RabbitTemplate rabbitTemplate;// 测试RPC客户端@Testpublic void testRpcClient() {// 设置correlationIdString corrId = UUID.randomUUID().toString();String msg = "hello rpc";MessageProperties messageProperties = MessagePropertiesBuilder.newInstance().setCorrelationId(corrId).build();// 注意 这边如果使用sendAndReceive不指定replyTo回调队列 spring会默认帮我们添加一个回调队列// 格式默认 "amq.rabbitmq.reply-to" 前缀Message message = rabbitTemplate.sendAndReceive("", "test_rpc", new Message(msg.getBytes(StandardCharsets.UTF_8), messageProperties));log.info("The response is {}", new String(message.getBody(), StandardCharsets.UTF_8));}
}

具体实现可以看下代码的注释
代码执行结果:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/2803461.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

开源博客项目Blog .NET Core源码学习(9:Autofac使用浅析)

开源博客项目Blog使用Autofac注册并管理组件和服务&#xff0c;Autofac是面向.net 的开源IOC容器&#xff0c;支持通过接口、实例、程序集等方式注册组件和服务&#xff0c;同时支持属性注入、方法注入等注入方式。本文学习并记录Blog项目中Autofac的使用方式。   整个Blog解…

NXP实战笔记(六):S32K3xx基于RTD-SDK在S32DS上配置PWM发波

目录 1、概述 2、SDK配置 2.1、Port配置 2.2、Emios_Mcl_Ip 2.3、Emios_Pwm 2.4、代码示例 1、概述 针对S32K3xx芯片&#xff0c;产生PWM的硬件支持单元仅有两个&#xff0c;分别是eMiosx与Flexio. 生成PWM的顺序&#xff0c;按照单片机所用资源进行初始化执行如下 初始化…

UTONMOS开启数智龙年,打造元宇宙游戏圈新名片

新年已过&#xff0c;全国各个城市早已客流涌动、热闹非凡。这种繁华景象不仅存在于现实世界&#xff0c;也被复刻到元宇宙的虚拟空间中。 据介绍&#xff0c;UTONMOS“源起山海-神念无界”元宇宙游戏是以原创IP玄幻神话故事“元宇宙史纪”为蓝本打造的元宇宙游戏空间&#xf…

【DataTable.js】02.DataTable参考

一、Option 1.1 Features 属性类型默认值描述autoWidthbooleantrue是否自动调节单元格宽度&#xff0c;若传入了columns.width&#xff0c;可禁用该选项orderingbooleantrue是否支持排序pagingbooleantrue是否支持分页scrollXbooleanfalse是否支持横向滚动条scrollYstring启用…

2024年必备原型设计工具盘点,助你成为设计大神

原型设计是 UI/UX 设计中至关重要的一步&#xff0c;就像用户体验中的其他环节一样&#xff0c;有无数的原型工具可以帮助你完成原型设计。 如果市场上有太多的原型设计工具让你不知所措&#xff0c;不知道选择哪一个&#xff0c;恭喜你&#xff0c;这个原型设计工具的集合是为…

云打印app下载,云打印app在哪下载?

随着互联网技术的发展&#xff0c;云打印技术已经逐渐成熟。而对于用户来说&#xff0c;很多有打印需求的用户都需要用到云打印&#xff0c;那么云打印App怎么下载&#xff0c;云打印app在哪下载呢&#xff1f;今天带大家来了解一下。 云打印app在哪下载&#xff1f; 很多有打…

银河麒麟桌面系统的文件保护箱的使用

在开始菜单搜索文件保护箱 输入密码进行认证&#xff0c;授权后方可使用 首次使用&#xff0c;选择新建 填写保护目录的名字以及设置目录保护密码 密钥文件保存到桌面 我这里保存到桌面&#xff0c;你可千万别&#xff0c;这个很重要&#xff0c;丢失的话&#xff0c;若忘记密…

动态规划-

关键词&#xff1a; 重叠子问题&#xff1b;每一个状态一定是由上一个状态推导出来(类似数列a^n f(a^n-1,a^n-2)) 步骤&#xff1a; 确定dp数组&#xff08;dp table&#xff09;以及下标的含义确定递推公式dp数组如何初始化确定遍历顺序举例推导dp数组 题目&#…

【数据结构】顺序表实现的层层分析!!

关注小庄 顿顿解馋◍˃ ᗜ ˂◍ 引言&#xff1a;本篇博客我们来认识数据结构其中之一的顺序表&#xff0c;我们将认识到什么是顺序表以及顺序表的实现&#xff0c;请放心食用~ 文章目录 一.什么是顺序表&#x1f3e0; 线性表&#x1f3e0; 顺序表 二.顺序表的实现&#x1f3e0…

【教3妹学编程-算法题】匹配模式数组的子数组数目 I

3妹&#xff1a;2哥2哥&#xff0c;你有没有看到上海女老师出轨男学生的瓜啊。 2哥 : 看到 了&#xff0c;真的是太毁三观了&#xff01; 3妹&#xff1a;是啊&#xff0c; 老师本是教书育人的职业&#xff0c;明确规定不能和学生谈恋爱啊&#xff0c;更何况是出轨。 2哥 : 是啊…

导出本地环境venv包whl文件。

把python环境 venv 对应包的文件导出成whl文件 将 Python 虚拟环境中包导出到文件&#xff0c;可以方便地在其他电脑上安装相同的环境&#xff0c;无需重复下载。 使用 pip freeze 和 pip download 使用 pip freeze 命令列出所有已安装的包和版本号保存到 requirements.txt …

Sora----打破虚实之间的最后一根枷锁----这扇门的背后是人类文明的晟阳还是最后的余晖

目录 一.Sora出道即巅峰 二.为何说Sora是该领域的巨头 三.Sora无敌的背后究竟有怎样先进的处理技术 1.Spacetime Latent Patches 潜变量时空碎片&#xff0c;建构视觉语言系统 2.扩散模型与Diffusion Transformer&#xff0c;组合成强大的信息提取器 3.DiT应用于潜变量时…

小区视频汇聚与智能监管方案:老破小升级改造与小区智慧化建设

一、需求背景 在当今数字化时代&#xff0c;智慧小区已成为城市建设的必然趋势。加快小区智能化改造&#xff0c;不断完善小区管理和服务&#xff0c;彻底改变粗放型管理方式已经成为当前小区智慧化趋势的重要任务。其中&#xff0c;智能视频监控系统在提高小区安全性和管理效…

T-Dongle-S3开发笔记——分区表

参考&#xff1a; ESP32之 ESP-IDF 教学&#xff08;十三&#xff09;—— 分区表_esp32分区表-CSDN博客 分区表 - ESP32 - — ESP-IDF 编程指南 latest 文档 (espressif.com) 分区表是 ESP32 划分内部 flash 闪存的清单&#xff0c;它将 flash 划分为多个不同功能的区域用于…

交通强国,数字引领|易知微数字孪生智慧港口,探索未来港口的无限可能

点击下载了解易知微数字孪生智慧港口解决方案&#x1f449;https://easyv.cloud/solution/port/?tcsdn 2023年12月&#xff0c;为推动智慧港口和智慧航道建设发展&#xff0c;加快建设交通强国水运&#xff0c;交通运输部发布《关于加快智慧港口和智慧航道建设的意见》。到20…

哪种游泳耳机品牌更好?2024四款甄选高评分榜单好物!

在繁忙的都市生活中&#xff0c;游泳已经成为了许多人释放压力、保持健康的重要方式。而随着科技的进步&#xff0c;游泳耳机也逐渐走进了人们的视野&#xff0c;让音乐与游泳完美结合&#xff0c;为游泳爱好者带来了全新的运动体验。然而&#xff0c;在琳琅满目的游泳耳机市场…

vite为什么编译比webpack快

启动速度&#xff1a;Vite在启动时不需要打包&#xff0c;因为它支持ES模块加载&#xff0c;不需要编译和打包所有模块的依赖。这意味着Vite在启动时不需要像Webpack那样构建整个项目的文件&#xff0c;因此启动速度更快。 1 vite 采用 es 新规范 vite 中的 main.ts 中可以直…

camunda源代码编译运行(三):验证camunda API接口功能

接上一篇文章&#xff1a;camunda源代码编译运行&#xff08;二&#xff09;&#xff1a;构建并运行camunda源代码工程 4.1、发布流程模型 先通过camunda的流程设计器设计一个流程&#xff0c;命名为&#xff1a;UserTask Flow1&#xff0c;然后发布流程&#xff0c;发布流程…

C语言第二十九弹---浮点数在内存中的存储

✨个人主页&#xff1a; 熬夜学编程的小林 &#x1f497;系列专栏&#xff1a; 【C语言详解】 【数据结构详解】 目录 1、浮点数在内存中的存储 1.1、练习 1.2、浮点数怎么转化为二进制 1.3、浮点数的存储 1.3.1、浮点数存的过程 1.3.2、浮点数取的过程 1.3、题目解析…

jquery 简介与解析

jQuery是一个快速、小巧且功能丰富的JavaScript库。它简化了诸如HTML文档遍历和操作、事件处理、动画以及Ajax操作等任务。jQuery的设计理念是“写得更少&#xff0c;做得更多”&#xff0c;这意味着通过jQuery&#xff0c;可以用更少的代码完成更多的工作。 主要特点&#xff…