RabbitMQ从原理到实战—基于Golang【万字详解】

文章目录

  • 前言
  • 一、MQ是什么?
    • 优势
    • 劣势
  • 二、MQ的用途
    • 1、应用解耦
    • 2、异步加速
    • 3、削峰填谷
    • 4、消息分发
  • 三、RabbitMQ是什么
    • 1、AMQP 协议
    • 2、RabbitMQ 包含的要素
    • 3、RabbitMQ 基础架构
  • 四、实战
    • 1、Simple模式(即最简单的收发模式)
    • 2、Work Queues 模型
    • 3、Publish/Subscribe 模型
    • 4、Routing 模型
    • 5、Topics 模型


前言

最近秋招开始找工作,顺便回顾消息队列并且总结。

一、MQ是什么?

消息队列(Message Queue)是一种在应用程序之间传递消息的通信模式。它通过在发送者和接收者之间建立一个消息队列来实现异步通信和解耦。

在消息队列模式中,发送者(Producer)将消息发送到一个中间件(Message Broker)中的消息队列,而接收者(Consumer)则从该队列中接收和处理消息。这种方式使得发送者和接收者可以独立地进行处理,而无需直接交互,从而实现解耦。发送者和接收者只需要知道如何与消息队列进行通信,而不需要知道彼此的存在。

优势

1. 异步通信:发送者将消息放入队列后即可继续进行其他操作,无需等待接收者的响应。接收者可以在合适的时候从队列中获取消息进行处理,实现了异步通信模式。

2. 解耦:发送者和接收者之间通过消息队列进行通信,彼此之间不直接耦合。发送者只需将消息发送到队列中,而不需要知道消息是如何被处理的。接收者只需从队列中获取消息进行处理,而不需要知道消息的来源。

3. 可靠性传输:消息队列通常提供持久化机制,确保消息在发送和接收过程中不会丢失。即使接收者暂时不可用,消息也会在队列中等待,直到接收者准备好接收为止。

4. 扩展性:消息队列可以支持多个发送者和接收者,实现系统的扩展性和高并发处理能力。

5. 缓冲和削峰填谷:通过将消息缓存到队列中,可以平衡发送者和接收者之间的处理速度差异,从而避免系统过载。

消息队列在分布式系统、微服务架构、异步任务处理、事件驱动架构等场景中被广泛应用。一些常见的消息队列系统包括RabbitMQ、Apache Kafka、ActiveMQ、Amazon SQS等。它们提供了丰富的功能和配置选项,可以根据应用需求选择合适的消息队列实现。

劣势

系统可用性降低
系统引入的外部依赖越多,系统稳定性越差。一旦 MQ 宕机,就会对业务造成影响。如何保证MQ的高可用?
系统复杂度提高
MQ 的加入大大增加了系统的复杂度,以前系统间是同步的远程调用,现在是通过 MQ 进行异步调用。如何保证消息没有被重复消费?怎么处理消息丢失情况?那么保证消息传递的顺序性?
一致性问题
A 系统处理完业务,通过 MQ 给B、C、D三个系统发消息数据,如果 B 系统、C 系统处理成功,D 系统处理失败。如何保证消息数据处理的一致性?

二、MQ的用途

四个用途
应用解耦:提高系统容错性和可维护性
异步提速:提升用户体验和系统吞吐量
削峰填谷:提高系统稳定性
消息分发:提高系统灵活性

1、应用解耦

应用解耦是指通过使用消息队列等中间件来降低应用程序之间的直接依赖性,从而实现独立开发、部署和升级的能力。通过解耦,每个应用程序可以通过消息队列发送和接收消息,而不需要了解其他应用程序的具体实现细节。通过应用解耦,可以实现系统的松耦合架构,提高系统的可维护性、扩展性和容错性。
没有使用MQ:
在这里插入图片描述

  • 系统的耦合性越高,容错性就越低,可维护性就越低。
  • 在这里插入图片描述使用 MQ 使得应用间解耦,提升容错性和可维护性。

2、异步加速

异步提速是指通过将耗时的操作转化为异步执行,从而提高系统的响应速度和吞吐量。通过异步处理,应用程序可以在等待某个操作完成的同时继续执行其他任务,而不需要阻塞等待结果返回。
例如,当一个应用程序需要进行网络请求并等待响应时,如果采用同步方式,应用程序会被阻塞,直到响应返回才能继续执行其他任务。而通过异步方式,应用程序可以继续执行其他任务,不需要等待网络请求的结果返回。这样可以提高系统的响应速度,使用户获得更好的体验。
没有使用MQ:
在这里插入图片描述
一个下单操作耗时:20 + 300 + 300 + 300 = 920ms
用户点击完下单按钮后,需要等待920ms才能得到下单响应,太慢!
使用MQ:
在这里插入图片描述

用户点击完下单按钮后,只需等待25ms就能得到下单响应 (20 + 5 = 25ms)。
提升用户体验和系统吞吐量(单位时间内处理请求的数目)。不需要的等待完成

3、削峰填谷

削峰填谷是一种通过平衡系统负载,减轻峰值压力和填充低谷时的资源利用率的技术。它的目标是在系统负载波动较大的情况下,合理利用资源,确保系统的稳定性和高效性。
没有使用MQ:
在这里插入图片描述

使用MQ:
在这里插入图片描述在这里插入图片描述
使用了 MQ 之后,限制消费消息的速度为1000,这样一来,高峰期产生的数据势必会被积压在 MQ中,高峰就被“削”掉了,但是因为消息积压,在高峰期过后的一段时间内,消费消息的速度还是会维持在1000,直到消费完积压的消息,这就叫做填谷。简单来说就是慢慢分发
使用MQ后,可以提高系统稳定性。

4、消息分发

消息分发是一种将消息从发送者传递到接收者的机制,它在异步系统和事件驱动架构中起着重要的作用。消息分发可以实现解耦和灵活性,允许不同组件或模块之间通过消息进行通信,从而实现系统的松耦合和可扩展性。
下面是消息分发的一些关键概念和示例:

发布者(Publisher):发布者是消息分发系统中的发送者,它负责生成并发布消息。发布者将消息发送到消息分发系统,而不需要知道消息的具体接收者。

订阅者(Subscriber):订阅者是消息分发系统中的接收者,它通过订阅特定的消息或消息类型来表明自己对消息的兴趣。当有匹配的消息到达时,消息分发系统会将消息传递给订阅者。

主题(Topic):主题是消息分发系统中用于分类和组织消息的标识符或名称。发布者可以将消息发布到特定的主题,而订阅者可以选择订阅感兴趣的主题。通过主题,可以实现消息的细粒度过滤和选择性订阅。

三、RabbitMQ是什么

RabbitMQ是使用Erlang语言开发的开源消息队列系统,基于AMQP协议来实现。AMQP的主要特征是面向消息、队列、路由(包括点对点和发布/订阅)、可靠性、安全。AMQP协议更多用在企业系统内对数据一致性、稳定性和可靠性要求很高的场景,对性能和吞吐量的要求可能比较低了。

1、AMQP 协议

AMQP,即 Advanced Message Queuing Protocol(高级消息队列协议),是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。2006年,AMQP规范发布。类比HTTP。
AMQP三层协议:
Module Layer:协议最高层,主要定义了一些客户端调用的命令,客户端可以用这些命令实现自己的业务逻辑。
Session Layer:中间层,主要负责客户端命会发送给服务器,再将服务端应答返回客户端,提供可靠性同步机制和错误处理。
TransportLayer:最底层,主要传输二进制数据流,提供帧的处理、信道服用、错误检测和数据表示等。

AMQP组件:
交换器(Exchange):消息代理服务器中用于把消息路由到队列的组件。
队列(queue):用来存储消息的数据结构,位于硬盘或内存中。
绑定(Binding):一套规则,告知交换器消息应该将消息投递给哪个队列。

2、RabbitMQ 包含的要素

生产者:消息队列创建者,发送消息到MQ
消费者:连接到RabbitMQ,订阅到队列上,消费消息,持续订阅和单条订阅
消息:包含有效载荷和标签,有效载荷指要传输的数据,标签描述了有效载荷,并且RabbitMQ用它来决定谁获得消息,消费者只能拿到有效载荷,并不知道生产者是谁

3、RabbitMQ 基础架构

在这里插入图片描述
Broker:接收和分发消息的应用,RabbitMQ Server就是 Message Broker
Virtual host:出于多租户和安全因素设计的,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出多个vhost,每个用户在自己的 vhost创建 exchange/queue 等
Connection:publisher/consumer 和 broker 之间的 TCP 连接
Channel:Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销。是生产者、消费者与RabbitMQ通信的渠道,生产者publish或是消费者subscribe 一个队列都是通过信道来通信的。
信道是建立在TCP连接上的虚拟连接,就是说RabbitMQ在一条TCP上建立成百上千个信道来达到多个线程处理,这个TCP被多个线程共享,每个线程对应一个信道,信道在RabbitMQ都有一个唯一的ID,保证了信道私有性,对应上唯一的线程使用。
Exchange交换机:message 到达 broker 的第一站**,根据分发规则,匹配查询表中的 routing key,分发消息到queue中去。生产者将消息发送到交换器,有交换器将消息路由到一个或者多个队中。当路由不到时,或返回给生产者或直接丟弃。
Queue:消息最终被送到这里等待 consumer 取走
Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key。Binding信息被保存到 exchange 中的查询表中,用于 message 的分发依据

四、实战

RabbitMQ 提供了 6 种工作模式:简单模式、work queues、Publish/Subscribe 发布与订阅模式、Routing 路由模式、Topics 主题模式、RPC 远程调用模式(远程调用,不太算 MQ;暂不作介绍)。

1、Simple模式(即最简单的收发模式)

消息的消费者监听消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失,这里可以设置成手动的ack,但如果设置成手动ack,处理完后要及时发送ack消息给队列,否则会造成内存溢出)。
消费者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费消息msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签true,       // 自动确认false,      // 独占false,      // 不等待服务器确认false,      // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}// 处理接收到的消息for msg := range msgs {log.Printf("接收到消息:%s", msg.Body)}
}

上述代码首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"heo"的队列。接下来,通过ch.Consume函数注册一个消费者,用于从队列中接收消息。在fo循环中,我们处理接收到的消息,这里只是简单地打印出来。
生产者:

package mainimport ("log""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 声明一个队列queue, err := ch.QueueDeclare("hello", // 队列名false,   // 持久性false,   // 自动删除false,   // 独占false,   // 等待服务器确认nil,     // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 发送消息body := "Hello, RabbitMQ!"err = ch.Publish("",         // 交换机queue.Name, // 队列名false,      // 必须发送到队列false,      // 不等待服务器确认amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)if err != nil {log.Fatalf("无法发送消息:%s", err)}log.Printf("消息已发送:%s", body)
}

上述代码与消费者程序类似,首先建立了与RabbitMQ服务器的连接,然后创建了一个通道和一个名为"hello"的队列。接下来,通过ch.Publishi函数向队列发送一条消息。

2、Work Queues 模型

消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2同时监听同一个队列,消息被消费。C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患:高并发情况下,默认会产生某一个消息被多个消费者共同使用,可以设置一个开关[syncronize]保证一条消息只能被一个消费者使用)。
让多个消费者绑定到一个队列,共同消费队列中的消息。队列中的消息一旦消费,就会消失,因此任务是不会被重复执行的。
消费者:

package mainimport ("fmt""log""math/rand""time""github.com/streadway/amqp"
)func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")if err != nil {log.Fatalf("无法连接到RabbitMQ服务器:%s", err)}defer conn.Close()// 创建一个通道ch, err := conn.Channel()if err != nil {log.Fatalf("无法创建通道:%s", err)}defer ch.Close()// 启动多个消费者并行处理任务for i := 1; i <= 3; i++ {go startConsumer(i, ch)}// 阻塞主进程select {}
}func generateTask(id int) string {time.Sleep(time.Duration(rand.Intn(3)) * time.Second)return fmt.Sprintf("Task %d", id)
}func startConsumer(id int, ch *amqp.Channel) {// 声明一个队列queue, err := ch.QueueDeclare("tasks_queue", // 队列名true,          // 持久性false,         // 自动删除false,         // 独占false,         // 等待服务器确认nil,           // 参数)if err != nil {log.Fatalf("无法声明队列:%s", err)}// 消费任务msgs, err := ch.Consume(queue.Name, // 队列名"",         // 消费者标签false,      // 手动确认false,      // 不等待服务器确认false,      // 不使用内置的参数false,      // 参数nil,           // 参数)if err != nil {log.Fatalf("无法注册消费者:%s", err)}for msg := range msgs {task := string(msg.Body)log.Printf("消费者 %d 接收到任务:%s", id, task)log.Printf("消费者 %d 完成任务:%s", id, task)// 手动确认任务已处理msg.Ack(false)}
}

利用协城启动多个消费者进行消费。
结果如下:
在这里插入图片描述

3、Publish/Subscribe 模型

每个消费者监听自己的队列。
生产者将消息发给broker,由交换机将消息转发到绑定此交换机的每个队列,每个绑定交换机的队列都将接收到消息。
在RabbitMQ的Publish/Subscribe模型中,生产者将消息发送到交换机,交换机负责将消息广播给所有绑定到它上面的队列。消费者创建队列并将其绑定到交换机上,从而接收交换机发送的消息。这样,一个消息可以被多个消费者接收。
在这里插入图片描述

在订阅模型中,多了一个 Exchange 角色,而且过程略有变化:

P:生产者,也就是要发送消息的程序,但是不再发送到队列中,而是发给X(交换机)
C:消费者,消息的接收者,会一直等待消息到来
Queue:消息队列,接收消息、缓存消息
Exchange:交换机(X)。一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。Exchange有常见以下3种类型:
Fanout:广播,将消息交给所有绑定到交换机的队列
Direct:定向,把消息交给符合指定routing key 的队列
Topic:通配符,把消息交给符合routing pattern(路由模式) 的队列
Exchange(交换机)只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与 Exchange 绑定,或者没有符合路由规则的队列,那么消息会丢失!

package mainimport ("log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 发布消息到交换机body := "Hello, RabbitMQ!"err = ch.Publish("logs", // 交换机名称"",     // 路由键,留空表示广播给所有队列false,  // 是否等待服务器响应false,  // 其他属性amqp.Publishing{ContentType: "text/plain",Body:        []byte(body),},)failOnError(err, "Failed to publish a message")log.Printf("Message sent: %s", body)
}

连接到RabbitMQ服务器,声明了一个名为"logs"的交换机,并通过调用ch.Publish方法将消息发布到交换机上。
在示例代码中,通过指定交换机名称为"logs",路由键为空字符串,消息将被广播给所有绑定到该交换机的队列。

package mainimport ("fmt""log""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs",   // 交换机名称"fanout", // 交换机类型true,     // 是否持久化false,    // 是否自动删除false,    // 是否内部使用false,    // 是否等待服务器响应nil,      // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上err = ch.QueueBind(q.Name, // 队列名称"",     // 路由键,留空表示接收交换机的所有消息"logs", // 交换机名称false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to bind a queue")// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

它连接到RabbitMQ服务器,声明一个fanout类型的交换机(Exchange),创建一个临时队列,将队列绑定到交换机上,并订阅消息。

在示例代码中,创建的交换机名为"logs",交换机类型为"fanout",表示消息将被广播给所有绑定到该交换机的队列。

消费者创建了一个临时队列,并将其绑定到交换机上,这样交换机就会将消息发送到该队列中。

4、Routing 模型

在fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。
在这里插入图片描述

在Direct模型下:

1、队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
2、消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey。
3、Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

消息生产者将消息发送给交换机按照路由判断,路由是字符串(info)当前产生的消息携带路由字符(对象的方法),交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息。
生产者

package mainimport ("log""os""strings""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 从命令行参数获取要发送的路由键和消息内容if len(os.Args) < 3 {log.Fatalf("Usage: %s [info] [message]", os.Args[0])}severity := os.Args[1]message := strings.Join(os.Args[2:], " ")// 发布消息到交换机,并指定路由键err = ch.Publish("logs_direct", // 交换机名称severity,      // 路由键false,         // 是否等待服务器响应false,         // 是否立即将消息写入磁盘amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),并通过指定路由键将消息发布到交换机。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

生产者从命令行参数获取要发送的路由键和消息内容。路由键可以是任意字符串,用于标识消息的类型或者级别。消息内容可以是任意文本。
消费者

package mainimport ("fmt""log""os""github.com/streadway/amqp"
)func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_direct", // 交换机名称"direct",      // 交换机类型true,          // 是否持久化false,         // 是否自动删除false,         // 是否内部使用false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 从命令行参数获取要绑定的路由键if len(os.Args) < 2 {log.Fatalf("Usage: %s [info] [warning] [error]", os.Args[0])}severities := os.Args[1:]// 将队列绑定到交换机上,并指定要接收的路由键for _, severity := range severities {err = ch.QueueBind(q.Name,        // 队列名称severity,      // 路由键"logs_direct", // 交换机名称false,         // 是否等待服务器响应nil,           // 其他属性)failOnError(err, "Failed to bind a queue")}// 订阅消息msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否独占模式(仅限于当前连接)false,  // 是否等待服务器响应false,  // 其他属性nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收消息的goroutinego func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages. To exit press CTRL+C")<-make(chan struct{}) // 阻塞主goroutine
}

上述代码实现了一个Routing模型的消费者。它连接到RabbitMQ服务器,声明一个direct类型的交换机(Exchange),创建一个临时队列,并将队列绑定到交换机上,同时指定要接收的路由键。

在RabbitMQ的Routing模型中,生产者将消息发送到交换机,并在发送消息时指定一个路由键(routing key)。交换机根据路由键将消息发送给与之绑定的队列。消费者创建队列并将其绑定到交换机上,并通过指定要接收的路由键来选择性地接收消息。

在示例代码中,创建的交换机名为"logs_direct",交换机类型为"direct",表示消息将根据指定的路由键进行选择性地发送给队列。

消费者创建了一个临时队列,并通过循环将该队列绑定到交换机上,并指定要接收的路由键。路由键可以是任意字符串,用于标识消息的类型或者级别。在示例中,我们通过命令行参数传入要绑定的路由键。

最后,消费者通过调用ch.Consume方法订阅消息。该方法返回一个消息通道msgs,消费者可以从该通道接收到消息。在示例中,我们使用一个goroutine来异步接收消息,并在收到消息时打印出来。

5、Topics 模型

在这里插入图片描述
Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符!

这种模型Routingkey 一般都是由一个或多个单词组成,多个单词之间以”.”分割,例如: item.insert

统配符
* 匹配不多不少恰好1个词
# 匹配一个或多个词

如:
fan.# 匹配 fan.one.two 或者 fan.one 等
fan.* 只能匹配 fan.one
生产者

func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 定义要发送的消息的路由键和内容routingKey := "example.key.das"message := "Hello, RabbitMQ!"// 发布消息到交换机,并指定路由键err = ch.Publish("logs_topic", // 交换机名称routingKey,   // 路由键false,        // 是否等待服务器响应false,        // 是否立即发送amqp.Publishing{ContentType: "text/plain",Body:        []byte(message),},)failOnError(err, "Failed to publish a message")log.Printf("Sent message: %s", message)
}

消费者


func failOnError(err error, msg string) {if err != nil {log.Fatalf("%s: %s", msg, err)}
}func main() {// 连接到RabbitMQ服务器conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")failOnError(err, "Failed to connect to RabbitMQ")defer conn.Close()// 创建一个通道ch, err := conn.Channel()failOnError(err, "Failed to open a channel")defer ch.Close()// 声明一个交换机err = ch.ExchangeDeclare("logs_topic", // 交换机名称"topic",      // 交换机类型true,         // 是否持久化false,        // 是否自动删除false,        // 是否内部使用false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to declare an exchange")// 声明一个临时队列q, err := ch.QueueDeclare("",    // 队列名称,留空表示由RabbitMQ自动生成false, // 是否持久化false, // 是否自动删除(当没有任何消费者连接时)true,  // 是否排他队列(仅限于当前连接)false, // 是否等待服务器响应nil,   // 其他属性)failOnError(err, "Failed to declare a queue")// 将队列绑定到交换机上,并指定要接收的路由键err = ch.QueueBind(q.Name,       // 队列名称"example.#",  // 路由键,可以使用通配符*匹配多个单词"logs_topic", // 交换机名称false,        // 是否等待服务器响应nil,          // 其他属性)failOnError(err, "Failed to bind a queue")// 创建一个消费者通道msgs, err := ch.Consume(q.Name, // 队列名称"",     // 消费者标识符,留空表示由RabbitMQ自动生成true,   // 是否自动应答false,  // 是否排他消费者false,  // 是否阻塞false,  // 是否等待服务器响应nil,    // 其他属性)failOnError(err, "Failed to register a consumer")// 接收和处理消息forever := make(chan bool)go func() {for d := range msgs {log.Printf("Received a message: %s", d.Body)}}()log.Printf("Waiting for messages...")<-forever
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/1621525.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

lnmp架构-mysql

1.MySQL数据库编译 make完之后是这样的 mysql 初始化 所有这种默认不在系统环境中的路径里 就这样加 这样就可以直接调用 不用输入路径调用 2.初始化 重置密码 3.mysql主从复制 配置master 配置slave 当master 端中还没有插入数据时 在server2 上配slave 此时master 还没进…

Tornado服务器连接数据库

环境 python3.6.1 vscode mysql navicat 安装需要的包 pip install torndb_for_python3 pip install pymysql0.8.0 #请安装pymysql的0.8.0版本&#xff0c;否则可能出现一个FLAG不存在的bug。亲测0.8.0可用。 tornadb不适用于python3&#xff0c;torndb_for_python3 是修改过的…

Tornado框架入门教程

Tornado框架入门教程 Tornado在知乎广为使用&#xff0c;当你用Chrome打开网页版本的知乎&#xff0c;使用开发者工具仔细观察Network里面的请求&#xff0c;就会发现有一个特别的状态码为101的请求&#xff0c;它是用浏览器的websocket技术和后端服务器建立了长连接用来接收服…

Tornado自定义路由

默认路由规则 代码 由下述代码可知&#xff0c;路由规则都在放在 tornado.web.Application 中&#xff0c;强迫症的我看着不舒服不说&#xff0c;而且如果有多级路由的话就很难搞了&#xff0c;比如说用户模块、不同视图模块等等&#xff0c;前一级路由是固定的&#xff0c;后…

Tornado重定向(三)

为什么需要重定向呢&#xff1f;比较常见的就是当网站内部结构变动&#xff0c;如栏目、网址等等的变动就需要进行301重定向&#xff0c;还有内部一些错误的页面也可以做301重定向&#xff0c;提高用户体验。当因网站内部结构变动&#xff0c;需要删除网站中的某些目录时&#…

Tornado介绍

文章目录 特点结构Tornado实现异步原理模块Tornado服务器的三个底层核心模块设计模型 Tornado龙卷风是一个开源的网络服务器框架&#xff0c;它是基于社交聚合网站FriendFeed的实时信息服务开发而来的 Tornado是使用Python编写的Web服务器兼Web应用框架与主流Web服务器框架不同…

Tornado初见(一)

在之前的其他博客中介绍了Django&#xff0c;这里介绍一下Tornado。两者的区别可以总结为&#xff1a; django大而全&#xff0c;适合小型的压力不大的项目&#xff0c;一旦压力上来其是扛不住的&#xff0c;毕竟一是太重&#xff0c;而是非异步。 但是好处就是什么都有&#…

Tornado框架学习

目录 底层原理 select epoll 运行机制 Tornado中的Ioloop模块的作用 获取请求方式 文件上传与展示 通过请求头信息的判断来进行反爬验证 注册功能demo 重定向 用户登录 以及自己设置的错误界面跳转 Cookie Tornado 异步服务器端方式 客户端异步请求 Tornado基于epoll…

tornado入门必备知识总结——异步事件循环与协程

文章目录 前言同步、异步、阻塞和非阻塞socket的非阻塞io请求htmlselect、poll和epoll协程异步http请求tornado实现高并发的爬虫 前言 要想走得远&#xff0c;基础就得牢&#xff0c;路漫漫其修远兮&#xff0c;吾将上下而求索。 tornado简介 python web编程三剑客Django&…

【Tornado】Tornado入门教程

目录 Tornado特点结构三个底层核心模块 安装1. 安装python32. 安装tornado3. 编写简单server4. 运行流程 核心组件1. ioloop实例2. app实例3. urls路由表4. handler类 异步协程async 和 await如何工作的怎样调用 协程模式结合 callbacks调用阻塞函数交叉存取技术循环在后台运行…

【数据结构与算法篇】手撕八大排序算法之交换排序

​&#x1f47b;内容专栏&#xff1a; 《数据结构与算法篇》 &#x1f428;本文概括&#xff1a;常见交换排序包括冒泡排序与快速排序&#xff0c;本篇讲述冒泡排序与快速排序的思想及实现、复杂度分析。 &#x1f43c;本文作者&#xff1a; 花 蝶 &#x1f438;发布时间&#…

ie ajax十分卡,解决jquery .ajax 在IE下卡死问题的解决方法

解决jquery .ajax 在IE下卡死问题的解决方法 解决IE编码问题第一步&#xff1a; dataType:($.browser.msie) ? "text" : "xml" 先这样做让IE 识别返回的是text 还是xml 第二步&#xff1a; 复制代码 代码如下: function parseXml(xml) { //XML IE编码问题…

数学分析:场论

我们之前知道的是里斯表示定理。 这里看到&#xff0c;对于多重线性映射&#xff0c;里斯表示定理会从内积变成混合积。当然我们还是只考虑三维以内的情况。 于是我们可以把不同的1形式和2形式的下标写上&#xff0c;表示他们相当于内积或者混合积对应的那个向量。 然后还差0形…

设计师:设计师之家装材料知识之家装八项(吊顶材料、门窗材料、五金材料、墙面材料、地面材料、胶粘材料、油漆材料、水电材料等)之详细攻略

设计师&#xff1a;设计师之家装材料知识之家装八项(吊顶材料、门窗材料、五金材料、墙面材料、地面材料、胶粘材料、油漆材料、水电材料等)之详细攻略 目录 家装八项 吊顶材料 门窗材料 五金材料 墙面材料 地面材料 胶粘材料 油漆材料 水电材料 参考文章&#xff1a;…

材料封样信息流程指引

材料封样信息流程指引 一、材料封样流程 1、新项目中标后&#xff0c;由设计院根据合同、招标文件、技术要求填写材料送样清单&#xff08;格式按公司标准化标格填写&#xff09;&#xff0c;并正式下发给采购部、工程管理部及项目部安排后续送样确认&#xff1b; 2、幕墙工程…

EyouCMS响应式木材板材公司模板/易优CMS装修材料类企业网站模板

☑️ 编号&#xff1a;ym247 ☑️ 品牌&#xff1a;EyouCMS ☑️ 语言&#xff1a;PHP ☑️ 大小&#xff1a;22.4MB ☑️ 类型&#xff1a;木材板材公司模板 ☑️ 支持&#xff1a;pcwap &#x1f389; 欢迎免费领取&#xff08;注明编号&#xff09; &#x1f389; ✨ 源码介…

2023最新装修材料石膏线品牌加盟类模板源码+织梦内核开发的

正文: 装修材料石膏线品牌加盟类织梦模板&#xff0c;带手机版数据同步。 效果相当的炫酷&#xff0c;相当简洁大气高端。适用于建材网站源码、石英石网站模版&#xff0c;有兴趣的自行去安装体验吧&#xff0c;其它就没什么好介绍的了。 程序: wweohd.lanzouo.com/iRCs80t…

Linux内核学习(十一)—— 进程地址空间(基于Linux 2.6内核)

目录 一、地址空间 二、内存描述符 三、虚拟内存区域 四、操作内存区域 find_vma() mmap() 和 do_mmap()&#xff1a;创建地址区间 五、页表 一、地址空间 进程地址空间由进程可寻址并且允许进程使用的虚拟内存组成&#xff0c; 每个进程都有一个 32 位或 64 位的平坦&…

在云原生环境中构建可扩展的大数据平台:方法和策略

文章目录 1. **选择适当的云提供商&#xff1a;**2. **采用容器化和微服务架构&#xff1a;**3. **分层架构设计&#xff1a;**4. **弹性计算资源&#xff1a;**5. **使用分布式计算框架&#xff1a;**6. **数据分区和分片&#xff1a;**7. **使用列式存储&#xff1a;**8. **缓…

java 高级面试题整理(薄弱技术-2023)

session 和cookie的区别和联系 session1.什么是session Session是另一种记录客户状态的机制&#xff0c;不同的是Cookie保存在客户端浏览器中&#xff0c;而Session保存在服务器上。客户端浏览器访问服务器的时候&#xff0c;服务器把客户端信息以某种形式记录在服务器上。这就…