MQ四兄弟:如何保证消息顺序性

在当今的分布式系统架构中,消息队列(MQ)是不可或缺的组成部分。它们在确保系统组件之间高效通信方面发挥着关键作用。特别是在金融交易、物流跟踪等对消息处理顺序有严格要求的场景中,消息队列的顺序性保证显得更为重要。接下来,我们将深入探讨RabbitMQ、RocketMQ、Kafka和Pulsar这四个广泛使用的消息队列系统,分析它们是如何确保消息的顺序性,并附上相应的代码示例。

RabbitMQ

RabbitMQ作为一款成熟的开源消息队列,,基于AMQP(Advanced Message Queuing Protocol)协议构建,广泛应用于企业级应用中。虽然RabbitMQ本身并不保证严格的全局顺序性,但可以通过特定的设计模式来实现消息顺序性。

  1. 单一队列和单一消费者模式:确保一个队列只被一个消费者消费,这样可以保证消息按照发送的顺序被处理。因为队列本身就是一个先进先出的结构。
  2. 消息排序:在消息生产者端,为消息添加序列号或时间戳,消费者端根据这些信息对消息进行排序。

以下是一个简单的Java代码片段,展示了如何在RabbitMQ中发送消息。请注意,这个例子没有包含消息排序的逻辑,因为它依赖于具体的业务场景和消息结构。


public class Send {private final static String QUEUE_NAME = "hello";public static void main(String[] argv) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.queueDeclare(QUEUE_NAME, false, false, false, null);String message = "Hello World!";channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println(" [x] Sent '" + message + "'");}}
}

创建了一个连接和一个通道,然后声明了一个队列。之后,我们发布了一个简单的消息到队列中。为了保证消息的顺序性,我们需要确保所有消息都是通过同一个通道发送,并且在消费端也是由同一个消费者按顺序接收处理。

RocketMQ

RocketMQ作为阿里巴巴开源的分布式消息队列,在保证消息顺序性方面提供了一种基于MessageQueueSelector的解决方案。其核心思路是将有序的消息写入特定的队列,从而使消费端固定消费某个队列时,就能够按顺序消费消息。

具体来说,RocketMQ中有两个重要概念:

  • Topic: 逻辑上的消息主题
  • MessageQueue: 物理上存储消息的队列

一个Topic包含多个MessageQueue,消息会根据其内容进行哈希计算,分配到不同的MessageQueue中。用户可以通过提供MessageQueueSelector,对特定类型的消息强制分配到同一个MessageQueue,从而保证顺序性。

示例代码:

生产者

// 实例化消息生产者Producer
DefaultMQProducer producer = new DefaultMQProducer("unique_group_name");
// 设置NameServer的地址
producer.setNamesrvAddr("nameserver:9876");
// 启动Producer实例
producer.start();
// 创建消息,并指定Topic,Tag和消息体
Message msg = new Message("TopicTest", "TagA", "OrderID" + orderId, ("Hello RocketMQ " + i).getBytes());
// 发送有序消息
producer.send(msg, new MessageQueueSelector() {@Overridepublic MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) {Integer orderId = (Integer) arg; // 订单ID作为选择器的参数int index = orderId % mqs.size(); // 根据订单ID计算MessageQueue索引return mqs.get(index); // 返回该索引对应的MessageQueue}
}, orderId);

通过上述代码,发送端可以将具有相同订单号的消息发送到同一个MessageQueue。

消费端

 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("unique_group_name");consumer.setNamesrvAddr("nameserver:9876");consumer.subscribe("TopicTest", "TagA");consumer.registerMessageListener(new MessageListenerOrderly() {@Overridepublic ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {context.setAutoCommit(true);for (MessageExt msg : msgs) {System.out.printf("Consumer: %s %n", new String(msg.getBody()));}return ConsumeOrderlyStatus.SUCCESS;}});

消费端只需固定消费指定的MessageQueue,即可以保证消息按顺序被消费。

Kafka

Kafka通过Partition(分区)的概念来保证消息的顺序性。同一个Partition中的消息是有序的,但不同Partition之间是无序的。Producer在发送消息时可以指定消息要发送到的分区。Kafka默认提供了基于key的分区策略,确保具有相同key的消息会被发送到同一个分区,从而保证这些消息在这个分区内的顺序性。

以下是一个简单的 Java 代码示例,展示了如何在 Kafka 中发送和消费有序消息:

生产者代码

public class OrderProducer {public static void main(String[] args) {Properties props = new Properties();props.put("bootstrap.servers", "localhost:9092");KafkaProducer<String, String> producer = new KafkaProducer<>(props);// 假设我们有10个订单,每个订单的消息需要顺序处理for (int orderId = 0; orderId < 10; orderId++) {for (int i = 0; i < 5; i++) { // 每个订单发送5条消息String message = String.format("Order %d, Message %d", orderId, i);producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));}}producer.close();}
}

producer.send(new ProducerRecord<>("OrderTopic", Integer.toString(orderId), message));

第二个参数是消息的键(key),这里使用订单ID作为键,确保相同订单ID的消息发送到同一个分区

消费者代码


Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "order_consumer_group");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Collections.singletonList("OrderTopic"));

在生产者代码中,我们使用了相同的 key(即订单ID)来确保消息被发送到同一个 Partition。在消费者代码中,我们订阅了整个 Topic,但由于我们使用了相同的 key 来发送消息,Kafka 会自动将具有相同 key 的消息路由到同一个 Partition,从而保证顺序性。

Pulsar

Apache Pulsar 通过分区主题(Partitioned Topics)来保证消息的顺序性。在Pulsar中,每个分区可以看作是一个独立的消息队列,分区内的消息保持发送顺序。为了确保消息的顺序性,生产者在发送消息时需要指定一个键(Key),Pulsar会根据这个键将消息路由到特定的分区。这样,具有相同键的消息就会被发送到同一个分区,并且按照发送的顺序进行消费。

生产者代码示例:


public class PulsarOrderProducer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Producer<String> producer = client.newProducer(Schema.STRING).topic("persistent://public/default/my-topic").create();for (int i = 0; i < 100; i++) {String key = "OrderID" + (i % 10); // 假设OrderID是业务键String value = "Message" + i;producer.newMessage().key(key).value(value).send();}producer.close();client.close();}
}

消费者代码示例:

public class PulsarOrderConsumer {public static void main(String[] args) throws Exception {PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();Consumer<String> consumer = client.newConsumer(Schema.STRING).topic("persistent://public/default/my-topic").subscriptionName("my-subscription").subscriptionType(SubscriptionType.Exclusive).subscribe();while (true) {Message<String> msg = consumer.receive();try {// 处理消息System.out.printf("Message with key %s: %s", msg.getKey(), msg.getValue());consumer.acknowledge(msg);} catch (Exception e) {consumer.negativeAcknowledge(msg);}}}
}

在消费者代码中,我们使用了SubscriptionType.Exclusive,使订阅被独占,确保只有一个消费者能够消费分区内的消息,从而保证了消息的顺序性。

总结

尽管RabbitMQ、RocketMQ、Kafka和Pulsar这些消息队列系统虽然在实现细节上有所不同,但它们保证消息顺序性的核心思想都是相似的,即确保具有相同特征的消息被发送到同一队列或分区中,由于队列数据结构本身就是先进先出的结构,因此只需要消费者从该队列按顺序消费,就能够保证消息的有序性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3225783.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

本地电脑连接FTP服务器,显示无权限连接?

问题&#xff1a; 打开文档&#xff0c;在这一栏输入ftp服务器地址&#xff0c;按下回车&#xff0c;弹出无权限提示。 解决方案&#xff1a; 1、系统设置——网络和Internet——网络和共享中心 2、Internet选项 3、高级——取消“使用被动FTP”的勾选

@RequestBody注解的使用及源码解析

前言 RequestBody 注解是我们进行JavaEE开发&#xff0c;最常见的几个注解之一&#xff0c;这篇博文我们以案例和源码相结合&#xff0c;帮助大家更好的了解 RequestBody 注解 使用案例 1.自定义实体类 Data NoArgsConstructor AllArgsConstructor public class User {priv…

如何判断一个js对象为数组类型

如何判断一个js对象为数组类型? 能想到的最常见的intanceof是吗?开始是这么认为,但是不是哈,看下面的解释,也没有太明白,暂且记住吧 综上,判断js对象为数组的两种方式 Array.isArray([]) // trueObject.prototype.toString.call([]) ‘[object Array]’ //true

Oracle基础以及一些‘方言’(一)

1、什么是Oracle ORACLE数据库系统是美国ORACLE公司&#xff08;甲骨文&#xff09;提供的以分布式数据库为核心的一组软件产品&#xff0c;是最流行的客户/服务器(CLIENT/SERVER)或B/S体系结构的数据库之一。 ORACLE 通常应用于大型系统的数据库产品。 ORACLE 数据库是目前世界…

CSDN回顾与前行:我的创作之旅——2048天的技术成长与感悟

CSDN回顾与前行&#xff1a;我的创作之旅——2048天的技术成长与感悟 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 前言 时光荏苒&#xff0c;岁月如梭。转眼间&#xff0c;从我在CSDN上写下第一篇技术博客《2-6 带头结点的链式表操作集…

从头开始学习扩散模型 Stable Diffusion

今天我们来揭开 Stable Diffusion 技术的神秘面纱。 1.稳定扩散原理 Stable Diffusion 在2022年发表&#xff0c;一种基于Latent Diffusion Models的新兴机器学习技术。它基于扩散过程&#xff0c;利用数学模型将机器学习中的高维度数据降低到低维度空间&#xff0c;并在该空间…

安卓 无线投屏 sink端 RTSP报 “505 RTSP Version not supported“

最近做安卓无线投屏的sink端&#xff0c;自己手搓RTSP协议&#xff0c;自己手搓容易出错&#xff0c;然后报了上面505&#xff0c;这个RTSP文档的意思是版本不一致&#xff0c;但是出现 "505 RTSP Version not supported"不一定是版本不一致&#xff0c;可能是 消息错…

AI究竟是在帮助开发者还是取代他们?来看大佬的观点你就明白了

AI&#xff08;人工智能&#xff09;在现代社会中扮演着越来越重要的角色&#xff0c;其在软件开发领域的应用也日益广泛。关于AI是在帮助开发者还是取代他们&#xff0c;V 哥个人认为&#xff0c;一半一半吧&#xff0c;为什么这么说&#xff0c;先不用噴&#xff0c;我们需要…

一个使用Go语言和现代Web技术构建跨平台桌面应用程序开源项目

大家好&#xff0c;今天给大家分享一个使用Go语言和现代Web技术构建跨平台桌面应用程序开源项目Wails。 Wails是一个允许开发者使用Go和Web技术编写桌面应用程序的项目。 它被设计为Go的快速且轻量的Electron替代品&#xff0c;旨在提供一个平台&#xff0c;让开发者可以利用Go…

excel PivotTable 透视表

开发数据导出excel功能&#xff0c;设置导出透视表 数据源&#xff1a; 透视表&#xff1a; 使用插件EPPlus 数据源&#xff1a; IF OBJECT_ID(tempdb..#temptable) IS NOT NULLDROP TABLE #temptable; CREATE TABLE #temptable ( [PROJECT] varchar(50), [PRODUCT_CODE] var…

Java NIO合并多个文件

NIO API java.nio (Java Platform SE 8 ) 直接上代码 package com.phil.aoplog.util;import lombok.extern.slf4j.Slf4j;import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.channels.FileChannel;Slf4j public…

深入剖析C++的 “属性“(Attribute specifier sequence)

引言 在阅读开源项目源代码是&#xff0c;发现了一个有趣且特殊的C特性&#xff1a;属性。 属性&#xff08;attribute specifier sequences&#xff09;是在C11标准引入的。在C11之前&#xff0c;编译器特有的扩展被广泛用来提供额外的代码信息。例如&#xff0c;GNU编译器&…

无菌隔离器内操作规范性的验证之气流流型验证-北京中邦兴业

无菌隔离器在制药行业的使用愈加广泛&#xff0c;但已有的研究更多地聚焦于设计布局、物料状态等方面&#xff0c;对人员操作因素的影响方面关注较少。以冻干制剂生产车间为例&#xff0c;设计了一系列合理的无菌隔离器内干预操作&#xff0c;并在操作人员实行干预操作的基础上…

面试题 21:解释 Python 中的 help() 函数和 dir() 函数?

欢迎莅临我的博客 &#x1f49d;&#x1f49d;&#x1f49d;&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 推荐:「stormsha的主页」…

8.6结构体函数参数

代码 #include <iostream> using namespace std; #include <string>//结构体函数参数//定义学生结构体 struct student {string name;int age;int score; };//打印学生信息的函数 //1、值传递 void printStudent1(struct student s) {cout << "子函数1…

如何查看GD32 Keil和IAR工程的map文件

我们在设计调试程序时&#xff0c;往往需要知道一个函数或一个变量它在MCU中具体所在的地址以及所占用的空间大小&#xff0c;这时候就需要查看map文件。 那么什么是map文件呢&#xff1f;map文件是编译器编译工程后生成的一个文件&#xff0c;文件会有很多信息&#xff0c;比…

小米恢复联系人,跟着这2个步骤,让你的社交重回巅峰

当你突然发现小米手机里的联系人列表变得空空如也&#xff0c;是不是感觉就像失去了与外界沟通的“秘密武器”&#xff1f;别担心&#xff0c;这并不意味着你真的失去了他们。他们可能只是藏在了手机里的某个神秘角落&#xff0c;等待着你的召唤。接下来&#xff0c;小编将会介…

连续6年夺冠 6项细分领域第一,中电金信持续领跑中国银行业IT解决方案市场

7月9日&#xff0c;工信部赛迪顾问发布《2023年度中国银行业IT解决方案市场分析报告》&#xff08;简称《报告》&#xff09;。中电金信以7.38%的市场份额再度蝉联2023中国银行业IT解决方案市场份额第一&#xff0c;以显著优势持续领跑中国银行业IT解决方案市场。在细分领域&am…

园区电表4G/Lora远程无线通讯-安科瑞自助缴费系统

项目案例&#xff1a;张江高科产业园 背景 上海张江高科技园区自1992年成立以来&#xff0c;经过近二十年的开发&#xff0c; 园区构筑了生物医药创新链&#xff0c;集成电路产业链和软件产业链的框架。园区建有国家上海生物医药科技产业基地、国家信息产业基地、国家集成电路…