RabbitMQ - 以 MQ 为例,手写一个 RPC 框架 demo

 

目录

前言 

一、再谈自定义应用层协议

二、再谈 BrokerServer

三、再谈 Connection、Channel

四、Demo

a)启动服务器

b)客户端连接


前言 


本篇文章来自于笔者之前写过的一个系列 —— “根据源码,模拟实现 RabbitMQ” 系列,不妨可以去看看~

一、再谈自定义应用层协议

a)这个自定义应用层协议实际上就是在描述将来 客户端 和 服务器 之间通讯的消息格式长啥样

b)首先是一个 Int 类型的 type,描述了这个消息到底是用来干什么的(要调用服务器这边的哪一个服务).

c)然后就是 payload 的数据载荷,承载着将来调用 VirtualHost 中的具体的服务所需要的参数(例如创建交换机所需要的参数就有:交换机名字、交换机类型、是否自动删除、是否持久化、扩展参数).

因为 TCP 是面向字节流的(IO 流中主要提供的就是二进制数据的读写),因此这里不太适合使用 JSON 格式数据进行网络传输(可读性不好,效率不高),因此这里 payload 是一个 字节数组,将具体的数据序列化成 byte 数组放进来.

d)这里要注意的一点是,TCP 是面向字节流的,因此会出现粘包问题,那么为了解决这个问题,由两种办法,第一种就是约定分割符(读到指定分隔符就截止),第二种就是描述好 payload 的长度.

这里我采用的就是第二种办法,只需要在协议里面在添加一个 length 字段,用来描述 payload 的长度.

import java.io.Serializable//Socket 自定义应用层协议(请求)
data class Request(val type: Int,val length: Int,val payload: ByteArray,
): Serializable//Socket 自定义应用层协议(响应)
data class Response(val type: Int,val length: Int,val payload: ByteArray,
): Serializable//基本参数(每个请求都会携带的参数,这里进行了一个封住)
open class ReqBaseArguments(open val rid: String = "",open val channelId: String = "",
): Serializable//基本响应参数(每个响应都会携带的参数),主要是为了应对 mq 回调响应处理
open class RespBaseArguments(open val rid: String,open val channelId: String,open val ok: Boolean,
): Serializable//主要的请求: 创建交换机、删除交换机、创建队列
data class ExchangeDeclareReq(val name: String,val type: ExchangeType,val durable: Boolean,val autoDelete: Boolean,val arguments: MutableMap<String, Any>,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializabledata class ExchangeDeleteReq(val name: String,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializabledata class QueueDeclareReq(val name: String,val durable: Boolean,val exclusive: Boolean,val autoDelete: Boolean,val arguments: MutableMap<String, Any>,override val rid: String,override val channelId: String,
): ReqBaseArguments(), Serializable

 

二、再谈 BrokerServer

a)BrokerServer 就是一个中间服务,也可以简单理解为 VirtualHost 的代理(BrokerServer 接收客户端请求,调用 VirtualHost 中具体的服务).

b)BrokerServer 启动的时候,就会通过 accept 阻塞等待客户端这边的 TCP 连接,连接成功之后只需要为该客户端其分配一个线程,处理之后的任务.

c)此时这个线程就会处于一个死循环循环,通过 IO 流读取到 客户端 请求中的 type、length、payload ,并按照约定的格式进行解析 payload,得到具体数据(这里不仅包含了 VirtualHost 服务中所需要的具体的参数,还携带了 channelId 和 rid)

d)此时,只需要根据 IO 流中读取出的 type,调用对应 VirtualHost 中的服务即可.

e)最后再将 VirtualHost 处理后得到的响应封装成 我们约定的应用层协议格式,通过 IO 写入到流中,让客户端去读取.

class BrokerServer(port: Int
) {private val socket = ServerSocket(port)private val clientPool = Executors.newFixedThreadPool(5)//key: channelId ,value: Socket//注意:这里的 Channel 只表示一个 "逻辑" 上的连接(创建,销毁 channel),这个 Map 是为了后台信息统计private val channelSession = ConcurrentHashMap<String, Socket>()private val virtualHost = VirtualHost()fun start() {println("[BrokerServer] 启动!")while (true) {val client = socket.accept()clientPool.submit {clientProcess(client)}}}private fun clientProcess(client: Socket) {println("[BrokerServer] 客户端上线!ip: ${client.inetAddress}, port: ${client.port}")try {client.getInputStream().use { inputStream ->client.getOutputStream().use { outputStream ->DataInputStream(inputStream).use { dataInputStream ->DataOutputStream(outputStream).use { dataOutputStream ->while (true) {val request = readRequest(dataInputStream)val response = process(request, client)writeResponse(response, dataOutputStream)}}}}}} catch (e: EOFException) {println("[BrokerServer] 客户端正常下线!ip: ${client.inetAddress}, port: ${client.port}")} catch (e: Exception) {println("[BrokerServer] 客户端连接异常!ip: ${client.inetAddress}, port: ${client.port}")} finally {client.close()removeChannelSession(client)}}private fun process(request: Request, client: Socket) = with(request) {//1.解析请求val req = BinaryTool.bytesToAny(payload)//2.获取请求中的 channelId,记录和 Socket 的关系(让每个 channel 都对应自己的 Socket,类似于 Session)val reqBase = req as ReqBaseArguments//3.根据 type 类型执行不同的服务(创建 Channel、销毁 Channel、创建交换机、删除交换机...)val ok = when(type) {1 -> {channelSession[reqBase.channelId] = clientprintln("[BrokerServer] channel 创建成功!channelId: ${reqBase.channelId}")true}2 -> {channelSession.remove(reqBase.channelId)println("[BrokerServer] channel 销毁成功!channelId: ${reqBase.channelId}")true}3 -> virtualHost.exchangeDeclare(req as ExchangeDeclareReq)4 -> virtualHost.exchangeDelete(req as ExchangeDeleteReq)5 -> virtualHost.queueDeclare(req as QueueDeclareReq)//...else -> throw RuntimeException("[BrokerServer] 客户端请求 type 非法!type: $type")}//4.返回响应val respBase = RespBaseArguments(reqBase.rid, reqBase.channelId, ok)val payload = BinaryTool.anyToBytes(respBase)Response(type, payload.size, payload)}/*** 读取客户端请求* 使用 DataInputStream 的主要原因就是有多种读取方式,例如 readInt()、readLong(),这些都是原生 InputStream 没有的*/private fun readRequest(dataInputStream: DataInputStream) = with(dataInputStream) {val type = readInt()val length = readInt()val payload = ByteArray(length)val n = read(payload)if (n != length) throw RuntimeException("[BrokerServer] 读取客户端请求异常!")Request(type, length, payload)}/*** 将响应写回给客户端*/private fun writeResponse(response: Response, outputStream: DataOutputStream) = with(outputStream) {writeInt(response.type)writeInt(response.length)write(response.payload)flush()}//删除所有和这个 clientSocket 有关的 Channelprivate fun removeChannelSession(client: Socket) {val channelIdList = mutableListOf<String>()//这里不能直接删除,会破坏迭代器结构for (entry in channelSession) {if (entry.value == client) channelIdList.add(entry.key)}for (channelId in channelIdList) {channelSession.remove(channelId)}}}

class VirtualHost {fun exchangeDeclare(req: ExchangeDeclareReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 创建交换机成功!")return true}fun exchangeDelete(req: ExchangeDeleteReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 删除交换机成功!")return true}fun queueDeclare(req: QueueDeclareReq): Boolean {//执行业务逻辑//...println("[VirtualHost] 创建队列成功!")return true}}

 

三、再谈 Connection、Channel

a)一个 Connection 就是一个 TCP 连接,因此频繁 建立/断开连接(三次握手、四次挥手...)的开销也是相当大的,因此就引入了 Channel. 

b)一个 Connection 下可以有多个 Channel(此处使用 map 来维护).  Channel 只是简单的表示一个逻辑上的连接,可以理解为一个大的项目下被拆分成的多个小的微服务. 实现了 TCP 连接的复用.

c)起初,我们需要先创建出 Connection 与服务端建立连接,初始化构造中只需要写一个死循环,不断的从服务端这边读取响应.

d)接着,通过 Connection 创建出 Channel 来完成具体的业务(Channel 中就提供了一系列方法,就像调用本地的方法一样,调用到远程服务器的接口).

e)例如 Channel 中提供的创建叫交换机方法(channel.exchangeDeclare(...)),这个方法中具体要做的就是将传入的参数,封装到一个对象中,序列化成 二进制 数据,这就是将来协议中要传输的 payload.   进一步的,协议 Request 就构造出来了,通过 IO 写到流中,供服务端读取.

d)为了能够让每次请求和响应都能对的上,Channel 这里我维护了一个 map(key 是 rid、value 是具体的响应),客户端和服务端之间的每个请求和响应都会携带上这个 rid 这个参数,这样将来 Connection 客户端接受到响应的时候,就可以直接把 响应中的 rid 提取出来,交给 Channel 的 map 中(响应来之前,Channel 一直阻塞等待,直到响应来了 -> 能通过 rid  从 map 中得到).

class ConnectionFactory(private val host: String,private val port: Int,
) {fun newConnection() = Connection(host, port)}
class Connection(ip: String,port: Int,
) {private val socket = Socket(ip, port)private val channelMap = ConcurrentHashMap<String, Channel>()//下述这样提前创建好,是为了将来 Channel 在读写请求的时候的方便(Channel 就不用获取输入输出流了)private val inputStream = socket.getInputStream()private val outputStream = socket.getOutputStream()private val dataInputStream = DataInputStream(inputStream)private val dataOutputStream = DataOutputStream(outputStream)init {//此线程负责不停的从服务器这边获取响应Thread {try {while (!socket.isClosed) {//读取服务器响应val resp = readResp()//将响应交给对应的 ChannelputRespToChannel(resp)}} catch (e: SocketException) {println("[Connection] 客户端正常断开连接")} catch (e: Exception) {println("[Connection] 客户端异常断开连接")e.printStackTrace()}}.start()}/*** 将客户端 Connection 接收到的请求,交给对应的 Channel 处理(此时 Channel 还在阻塞等待服务端响应)*/private fun putRespToChannel(resp: Response) {//这里由于不涉及回调,所以每个 type 类型的响应都长一样,就按照一样的方式解析了val baseResp = BinaryTool.bytesToAny(resp.payload) as RespBaseArgumentsval channel = channelMap[baseResp.channelId]?: throw RuntimeException("[Connection] 该响应对应的 Channel 不存在!channelId: ${baseResp.channelId}")//将响应交给 Channelchannel.notifyResp(baseResp)}/*** 创建 Channel*/fun createChannel(): Channel { //1.创建 Channel,保存到 map 种val channelId = "C-${UUID.randomUUID()}"val channel = Channel(channelId, this)channelMap[channelId] = channel//2.告知服务端 Channel 创建val ok = channel.createChannel()//3.如果 Channel 创建不成功,客户端这边也应该要删除对应的 Channel 信息if (!ok) channelMap.remove(channelId)return channel}private fun readResp() = with(dataInputStream) {val type = readInt()val length = readInt()val payload = ByteArray(length)val n = read(payload)if (n != length) throw RuntimeException("[Connection] 客户端读取响应异常!")Response(type, length, payload)}fun writeReq(request: Request) = with(dataOutputStream) {writeInt(request.type)writeInt(request.length)write(request.payload)flush()}}
class Channel(private val channelId: String,private val connection: Connection,  //自己当前属于哪个 Channel
) {//key: rid(为了能让每个 Channel 对应上自己的响应)//value: RespBaseArguments(具体的响应)//当 Connection 的扫描线程接收到响应之后,就会将响应传给这个 mapprivate val ridRespMap = ConcurrentHashMap<String, RespBaseArguments>()//这个锁是用来阻塞等待服务端响应的(避免轮询),当服务端传来响应时,Connection 就会唤醒锁private val locker = Object()private fun generateRid() = "R-${UUID.randomUUID()}"private fun waitResp(rid: String): RespBaseArguments {val respBase: RespBaseArgumentswhile (ridRespMap[rid] == null) { // 如果为空,说明此时服务端还没有传来响应synchronized(locker) { //为了避免轮询,就让其阻塞等待locker.wait()}}//出了这个循环,那么 ridRespMap[rid] 一定不为空return ridRespMap[rid]!!}fun notifyResp(respBase: RespBaseArguments) {ridRespMap[respBase.rid] = respBasesynchronized(locker) {//当前也不直到有多少线程在等待响应,就全部唤醒locker.notifyAll()}}/*** 创建 Channel*/fun createChannel(): Boolean {//1.创建基本请求val reqBase = ReqBaseArguments(rid = generateRid(),channelId = channelId)//2.构造 TCP 通信请求val payload = BinaryTool.anyToBytes(reqBase)val req = Request(type = 1,length = payload.size,payload = payload)//3.发送请求connection.writeReq(req)//4.等待客户端响应val respBase = waitResp(reqBase.rid)return respBase.ok}fun removeChannel(): Boolean {//1.创建基本请求val reqBase = ReqBaseArguments(rid = generateRid(),channelId = channelId)//2.构造 TCP 通信请求val payload = BinaryTool.anyToBytes(reqBase)val req = Request(type = 2,length = payload.size,payload = payload)//3.发送请求connection.writeReq(req)//4.等待客户端响应val respBase = waitResp(reqBase.rid)return respBase.ok}fun exchangeDeclare(name: String,type: ExchangeType,durable: Boolean,autoDelete: Boolean,arguments: MutableMap<String, Any>,): Boolean {val exchangeDeclareReq = ExchangeDeclareReq(name = name,type = type,durable = durable,autoDelete = autoDelete,arguments = arguments,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(exchangeDeclareReq)val req = Request(type = 3,length = payload.size,payload = payload,)connection.writeReq(req)val respBase = waitResp(exchangeDeclareReq.rid)return respBase.ok}fun exchangeDelete(name: String): Boolean {val exchangeDeleteReq = ExchangeDeleteReq(name = name,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(exchangeDeleteReq)val req = Request(type = 4,length = payload.size,payload = payload,)connection.writeReq(req)val respBase = waitResp(exchangeDeleteReq.rid)return respBase.ok}fun queueDeclare(name: String,durable: Boolean,exclusive: Boolean,autoDelete: Boolean,arguments: MutableMap<String, Any>,): Boolean {val queueDeclareReq = QueueDeclareReq(name = name,durable = durable,exclusive = exclusive,autoDelete = autoDelete,arguments = arguments,rid = generateRid(),channelId = channelId,)val payload = BinaryTool.anyToBytes(queueDeclareReq)val req = Request(type = 5,length = payload.size,payload = payload,)connection.writeReq(req)val resp = waitResp(queueDeclareReq.rid)return resp.ok}}

 

四、Demo

a)启动服务器

fun main() {val server = BrokerServer(9000)server.start()
}

b)客户端连接

class Test2 {
}fun main() {val factory = ConnectionFactory("127.0.0.1", 9000)val connection = factory.newConnection()val channel = connection.createChannel()val ok1 = channel.createChannel()val ok2 = channel.exchangeDeclare("e1", ExchangeType.DIRECT, false, false, mutableMapOf())val ok3 = channel.removeChannel()println("ok1: $ok1, ok2: $ok2, ok3: $ok3")
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3019303.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

C语言 | Leetcode C语言题解之第75题颜色分类

题目&#xff1a; 题解&#xff1a; void swap(int *a, int *b) {int t *a;*a *b, *b t; }void sortColors(int *nums, int numsSize) {int p0 0, p2 numsSize - 1;for (int i 0; i < p2; i) {while (i < p2 && nums[i] 2) {swap(&nums[i], &num…

ERROR 1045 (28000) Access denied for user ‘root‘@‘IP‘(using password YES/NO)

查看权限 要查看MySQL用户的权限&#xff0c;您可以使用SHOW GRANTS语句。这将列出用户的权限&#xff0c;包括授予的权限和可以授予其他用户的权限。 以下是查看当前用户权限的SQL命令&#xff1a; SHOW GRANTS; 如果您想查看特定用户的权限&#xff0c;可以使用以下命令&…

【每日力扣】543. 二叉树的直径与101. 对称二叉树

&#x1f525; 个人主页: 黑洞晓威 &#x1f600;你不必等到非常厉害&#xff0c;才敢开始&#xff0c;你需要开始&#xff0c;才会变的非常厉害 543. 二叉树的直径 给你一棵二叉树的根节点&#xff0c;返回该树的 直径 。 二叉树的 直径 是指树中任意两个节点之间最长路径的…

nginx--系统参数优化telenct

系统参数 在生产环境中&#xff0c;根据自己的需求在/etc/sysctl.conf来更改内核参数 net.ipv4.ip_nonlocal_bind 1 允许非本地IP地址socket监听 net.ipv4.ip_forward 1 开启IPv4转发 net.ipv4.tcp_timestamps 0 是否开启数据包时间戳 net.ipv4.tcp_tw_reuse 0 端⼝口复⽤…

idea2023专业版安装破解+maven配置教程

前言 上一篇文章已经介绍了maven在Win10系统的安装配置教程。基于Win10的maven配置环境&#xff0c;本篇文章将介绍idea2023的安装破解教程及maven在idea2023的配置教程&#xff08;同时会将maven在idea2023的配置教程内容补充至上一篇文章&#xff09;。 一、idea2023下载安…

Java零拷贝技术实战

文章目录 引入传统IO内存映射mmap文件描述符sendFile测试总结 引入 为什么要使用零拷贝技术&#xff1f; 传统写入数据需要4次拷贝&#xff0c;如下图&#xff1a; 传统IO import java.io.*; import java.net.Socket;public class TranditionIOClient {private static fina…

sh包装脚本

两个脚本,运行的时间间隔分别是一分钟和五分钟,放入到sh文件中,挂在后代,脚本里面的路径最好是绝对路径。 新建sh文件 新建 run_test.sh 文件,使其可以运行两个不同的 Python 脚本,一个每分钟运行一次,另一个每五分钟运行一次。下面是修改后的 run_test.sh 文件的示例:…

机器学习:人工智能中实现自动化决策与精细优化的核心驱动力

&#x1f512;文章目录: &#x1f4a5;1.概述 ❤️2.机器学习基本原理 &#x1f6e4;️2.1定义与关键概念 &#x1f6e3;️2.2 机器学习算法 ☔3.自动化决策中的机器学习应用 &#x1f6b2;4.精细优化与机器学习的结合 &#x1f44a;5.挑战与前景 &#x1f4a5;1.概述 …

【WEB前端2024】简单几步制作web3d《萌宠星球》智体节点模板(2)

【WEB前端2024】简单几步制作web3d《萌宠星球》智体节点模板&#xff08;2&#xff09; 使用dtns.network德塔世界&#xff08;开源的智体世界引擎&#xff09;&#xff0c;策划和设计《乔布斯超大型的开源3D纪念馆》的系列教程。dtns.network是一款主要由JavaScript编写的智体…

21 使用Hadoop Java API读取序列化文件

在上一个实验中我们筛选了竞赛网站日志数据中2021/1和2021/2的数据以序列化的形式写到了hdfs上。 接下来我们使用Java API 读取序列化的数据保存到磁盘中。 其他命令操作请参考&#xff1a;16 Java API操作HDFS-CSDN博客 1.我直接在上一个项目中test/java目录下创建com.maidu.s…

鸿蒙 Next 模拟器 体验

参加华为社区相关Next 的活动&#xff0c;只要申请通过就可以下载模拟器。整个过程稍微慢些&#xff0c;大家可以根据活动相关信息&#xff0c;加入微信群。跟踪催促进度。争取早日体验 next 。 目前模拟器里边还是空空的&#xff0c;没有什么内置 APP &#xff0c;但是足够大…

net7部署经历

1、linux安装dotnet命令&#xff1a; sudo yum install dotnet-sdk-7.0 或者直接在商店里安装 2、配置反向代理 127.0.0.1:5000》localhost 访问后报错 原因&#xff1a;数据表驼峰名&#xff0c; 在windows的数据表不区分大小写&#xff0c;但是在linux里面是默认区分的&…

【xxl-job | 第三篇】SpringBoot整合xxl-job

文章目录 3.SpringBoot整合xxl-job3.1定时任务服务配置3.1.1导入maven依赖3.1.2yml配置3.1.3XxlJobConfig配置类3.1.4定时任务类 3.2xxl-job配置3.2.1新增执行器3.2.2新增任务3.2.3执行任务3.2.4查看日志3.2.5查看任务后台日志 3.3小结 3.SpringBoot整合xxl-job 3.1定时任务服…

第一个C++项目

文章目录 一、新建项目1.打开软件&#xff0c;选择“创建新项目”2.新建项目栏中&#xff0c;按自己的需求来设置项目模板&#xff0c;项目名称和文件存放位置&#xff0c;设置好后点击“确认”3. 点击“Next”4. 按照自己需求设置&#xff0c;设置完后&#xff0c;点击“Next”…

【数据治理】指标体系

文章目录 1. 如何进行体系化建模2. 高层模型设计3. 派生指标3.1 派生指标体系的架构、概念一种可行的指标构造方式&#xff1a;举例&#xff1a; 3.2 派生指标体系的规范细则 1. 如何进行体系化建模 体系化建模流程图&#xff1a; 指标相关设计流程&#xff1a; 首先&#x…

基于大数据+Hadoop的豆瓣电子图书推荐系统实现

&#x1f339;作者主页&#xff1a;青花锁 &#x1f339;简介&#xff1a;Java领域优质创作者&#x1f3c6;、Java微服务架构公号作者&#x1f604; &#x1f339;简历模板、学习资料、面试题库、技术互助 &#x1f339;文末获取联系方式 &#x1f4dd; 系列文章目录 基于大数…

手动实现简易版RPC(四)

手动实现简易版RPC(四) 往期内容 手动实现简易版RPC&#xff08;一&#xff09;&#xff1a;RPC简介及系统架构 手动实现简易版RPC&#xff08;二&#xff09;&#xff1a;简单RPC框架实现 手动实现简易版RPC(三)&#xff1a;mock数据生成 前言 接上几篇博客我们实现了最…

便签怎么设置不同的标签 便签创建不同分组标签的方法

在日常工作和生活中&#xff0c;便签已成为我随身携带的小助手。每当灵感闪现&#xff0c;或是需要临时记录一些重要事项&#xff0c;我都会随手打开便签&#xff0c;快速记录下来。然而&#xff0c;随着记录的内容越来越多&#xff0c;如何高效地管理和查找这些信息成为了一个…

联丰策略股票炒股市场港股恒生指数止步“10连阳”

查查配港股市场今日未能持续之前的上涨趋势。恒生指数在经历了4月22日至5月6日的“十日连阳”罕见行情后,其反弹动能有所减弱。与此同时,恒生科技指数也遭遇了回调。截至收盘,恒生指数跌0.53%,报收18479.37点;科技指数跌2.13%,报收3922.54点;国企指数跌0.70%,报收6526.67点。 …

【保姆级详细步骤教学用DOSBoxV0.74写出一个汇编语言程序输出Hello World!】

使用任何文本编辑器创建一个名为 HELLO.ASM 的文件&#xff0c;并将以下代码粘贴到文件中&#xff1a; .MODEL SMALL .STACK 100H.DATAMSG DB Hello, World!, $PROMPT DB 13, 10, Press any key to exit..., $.CODEMAIN PROCMOV AX, DATAMOV DS, AXMOV AH, 09HLEA DX, MSGINT …