【Java】一文搞懂生产者和消费者模型

  • 阻塞队列的概念
  • 生产者消费者模式
    • 消息队列
    • 消息队列的作用
  • JDK中的阻塞队列
  • 实现阻塞队列
  • 实现生产者消费者模型

阻塞队列的概念

之前介绍过队列,是一种数据结构,先进先出FIFO。阻塞队列也满足队列的特性,不过有特殊之处:
入队元素时,先判断一下队列是否已满,如果满了就等待(阻塞),当有空余空间时再插入;
出队元素时,先判断一下队列是否空了,如果空了就等待(阻塞),当队列中有元素时再取出。

现实生活中的例子:包饺子
1.每个人各擀各的饺子皮,各包各的饺子
这种情况大概率会出现争抢擀面杖的现象,在多线程环境下就是锁竞争。
2.一个人专门来擀饺子皮,其他人负责包饺子
当饺子皮多的时候,擀饺子皮的人就可以休息一会儿;当没有饺子皮的时候就得一直擀;
当饺子皮多的时候,包饺子的人就要一直去包饺子;当没有饺子皮的时候就停下来休息;
在这里插入图片描述

上述这个例子中,擀皮的人可以成为生产者,包饺子的人称为消费者,放饺子皮的地方就是一个交易场所,这个场所就可以用阻塞队列实现。这就是阻塞队列的一个典型应用场景—— “生产者消费者模型”,这是一种非常典型的开发模型。

生产者消费者模式

生产者消费者模式就是通过一个容器来解决生产者和消费者的强耦合问题。生产者和消费者彼此之间不直接通讯,而通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,直接扔给阻塞队列,消费者不找生产者要数据,而是直接从阻塞队列里取。在具体项目中,使用消息队列这样一个“中间件”来实现这个功能。

消息队列

在基础的数据结构上,做了一些针对应用场景的优化和实现,把这样的一些框架和软件称为“中间件”。消息队列就是一个“中间件”,本质上是一个阻塞队列,在此基础上把放入阻塞队列的消息打了一个标签。比如卖包子场景:
在这里插入图片描述
假如说刚出锅的全是豆沙包,那么只需要处理买豆沙包的消息,消息的标签就可以实现分组的作用

消息队列的作用

1.解耦
比如下面这种情况,服务器A和服务器B必须知道对方的存在,且参数必须要约定好,如果有其中一方挂了,那么整个流程都会受影响。这种情况下如果某一个功能需要修改,涉及的服务器可能都需要修改代码。这种情况就说耦合度非常高,不建议这种系统组织方式。
在这里插入图片描述
通过消息队列把三个系统进行解耦,是他们不再进行直接通信,起到了单独维护,单独运行的效果
在这里插入图片描述
在设计程序的时候提出过一些要求:比如高内聚,低耦合。高内聚就是把功能强相关的代码写在一起,维护起来非常方便。这是一种组织代码的方式。低耦合就是不要把相同的代码写的到处都是,一般是通过抽象的方式把代码封装成方法,使用的时候调用即可。良好的代码组织方式,可以有效的降低维护成本。

2.削峰填谷
峰和谷是指消息的密集程度。
比如双十一期间流量会暴增,在这个链路中,任何一个节点出现问题都会影响整个业务流程。
在这里插入图片描述
这时你可能会想,可以使用多组链路,不同的用户可以通过不同的链路来访问(负载均衡),这样可以解决流量暴增带来的问题。确实可以解决这个问题,但是需要思考的是,流量不会一直在峰值,大部分时间流量都是正常状态, 此时部署的其他链路就用不上了,这无疑增加了好几倍的花销。

假设银行一秒只能处理200个订单,物流公司一秒只能处理100个订单,当调用第三方接口时,如果调用次数达到了上限就阻塞一会儿。此时使用消息队列,在流量激增的时候用消息队列缓冲(削峰)在流量减少的时候,把消息队列中存储的消息一点点消费(填谷)。最终让系统和硬件配置达到平衡。
在这里插入图片描述

3.异步
同步是指请求方必须死等对方的响应。
异步是指发出请求之后,自己去干别的事情,有响应时会接收到通知从而处理响应

JDK中的阻塞队列

JDK提供了多种不同的阻塞队列,可以根据不同的业务场景选择不同的阻塞队列实现方式。

public class Demo01_BlockingQueue {public static void main(String[] args) {//定义阻塞队列BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);BlockingQueue<Integer> queue1 = new ArrayBlockingQueue<>(3);BlockingQueue<Integer> queue2 = new PriorityBlockingQueue<>(3);}
}

1.添加元素
在定义阻塞队列时可以给定初始化容量。下面这个示例中,由于当前的阻塞容量是3,所以当插入第四个元素时就会发生阻塞。

public class Demo01_BlockingQueue {public static void main(String[] args) throws InterruptedException {//定义一个阻塞队列BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);//往队列中写入元素queue.put(1);queue.put(2);queue.put(3);System.out.println("已经插入了三个元素");queue.put(4);System.out.println("已经插入了第四个元素");}
}

在这里插入图片描述
2.取出元素

阻塞队列中获取元素不使用poll()方法,而是使用take()方法,会产生阻塞效果
在这里插入图片描述

public class Demo01_BlockingQueue {public static void main(String[] args) throws InterruptedException {//定义一个阻塞队列BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);//往队列中写入元素queue.put(1);queue.put(2);queue.put(3);System.out.println("已经插入了三个元素");System.out.println(queue);//阻塞队列中获取元素使用take方法,会产生阻塞效果System.out.println("开始获取元素");System.out.println(queue.take());System.out.println(queue);}
}

在这里插入图片描述
当获取完阻塞队列中所有元素时,此时阻塞队列为空,再继续获取元素时,会进入阻塞状态。

public class Demo01_BlockingQueue {public static void main(String[] args) throws InterruptedException {//定义一个阻塞队列BlockingQueue<Integer> queue = new LinkedBlockingQueue<>(3);//往队列中写入元素queue.put(1);queue.put(2);queue.put(3);System.out.println("已经插入了三个元素");System.out.println(queue);//阻塞队列中获取元素使用take方法,会产生阻塞效果System.out.println("开始获取元素");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println("已经获取了三个元素");System.out.println(queue.take());System.out.println("已经获取了四个元素");}
}

在这里插入图片描述

实现阻塞队列

在学习数据结构时,实现一个普通队列底层用到了两种数据结构:循环数组和链表。阻塞队列就是在普通队列上加入了阻塞等待的操作。这个阻塞等待的操作就是等待(wait)和唤醒(notify)。
确定加锁的范围:
这两个方法和synchronized是强相关的,所以要加入synchronized,此时要先确定加锁的范围。在put()和take()方法中,根据修改共享变量的范围,加入synchronized。由于整个方法都在修改共享变量,所以给整个方法加锁。如果一个对象需要new出来使用,那么锁对象一般是this,此时的锁对象是this即可。
确定等待的时机:
在添加元素中,当前数组已满时,此时要阻塞等待;在取元素时,当阻塞队列为空时,此时要阻塞等待。
确定唤醒的时机:
在添加元素的方法中,执行put操作的最后一步之后再执行唤醒操作,把取元素的线程唤醒;在取元素的方法中,当前队列有空余位置的时候,唤醒添加元素的线程。

代码实现

public class MyBlockingQueue {//定义一个保存元素的数组private int[] elementData = new int[100];//定义队首下标private int head;//定义队尾下标private int tail;//定义有效元素的个数private int size;//插入一个元素public void put(int value) throws InterruptedException {//根据修改共享变量的范围加锁//锁对象是this即可synchronized (this){//判断数组是否已满while(size >= elementData.length){//阻塞等待this.wait();}//向队尾插入元素elementData[tail] = value;//移动队尾下标tail ++;//修正队尾下标if (tail>=elementData.length) {tail = 0;}//修改有效元素个数size ++;//做唤醒操作this.notifyAll();}}//获取一个元素public int take() throws InterruptedException {//根据修改共享变量的范围加锁//锁对象是this即可synchronized (this){//判断队列是否为空while (size<=0) {this.wait();}//从队首出队int value = elementData[head];//移动队首下标head ++;//修正队首下标if (head>=elementData.length) {head = 0;}//修改有效元素个数size --;//唤醒操作this.notifyAll();//返回队首元素return value;}}
}

⚠️⚠️⚠️注意:在wait方法的官方文档中指出:“线程可以在没有通知、中断或超时的情况下唤醒,即所谓的虚假唤醒。虽然在这种情况下在实践中很少发生,但是应用程序必须通过测试导致线程被唤醒的条件来防止这种情况,如果条件不满足,则继续等待。换句话说,等待应该总是出现在循环中”。
也就是说,第一次满足wait条件时,线程进入阻塞状态,被唤醒之后,这期间会发生很多事情,有一种可能是被唤醒后,等待的条件依然成立,所以需要再次检查等待条件,如果满足就继续阻塞等待。即就是在实现上述阻塞队列时,检查wait的判断条件时,用while来判断,而非if
在这里插入图片描述

测试自定义阻塞队列

public class Demo02_MyBlockingQueue {public static void main(String[] args) throws InterruptedException {//定义一个阻塞队列MyBlockingQueue queue = new MyBlockingQueue();//往队列中写入元素queue.put(1);queue.put(2);queue.put(3);System.out.println("已经插入了三个元素");System.out.println("开始获取元素");System.out.println(queue.take());System.out.println(queue.take());System.out.println(queue.take());System.out.println("已经获取了三个元素");System.out.println(queue.take());System.out.println("已经获取了四个元素");}
}

在这里插入图片描述

实现生产者消费者模型

分别用一个线程模拟生产者和消费者,实现一个简单的生产者消费者模型。让生产者线程每10ms就生产一次,让消费者线程每1s再消费一次。
在这里插入图片描述
代码实现

public class Demo03_ProducerConsumer {// 定义一个阻塞队列,初始容量为100private static MyBlockingQueue queue = new MyBlockingQueue();public static void main(String[] args) {// 创建生产者线程Thread producer = new Thread(() -> {//记录消息编号int num = 1;while (true) {// 生产一条就打印一条日志System.out.println("生产了元素 " + num);try {// 把消息放入阻塞队列中queue.put(num);num++;// 休眠10msTimeUnit.MILLISECONDS.sleep(10);} catch (InterruptedException e) {e.printStackTrace();}}});// 启动生产者producer.start();// 创建消费者线程Thread consumer = new Thread(() -> {while (true) {try {// 从队列中获取元素(消息)int num = queue.take();// 打印一下消费日志System.out.println("消费了元素 :" + num);// 休眠1秒TimeUnit.SECONDS.sleep(1);} catch (InterruptedException e) {e.printStackTrace();}}});// 启动消费者consumer.start();}
}

测试结果:
让生产者线程每10ms就生产一次,让消费者线程每1s再消费一次。此时阻塞队列中消息满了再消费,但生产和消费还是同时进行的。
在这里插入图片描述

相反,如果让生产的线程慢,消费的线程快,则每生产一个消息就消费一个。阻塞队列永远不会满。
在这里插入图片描述


继续加油~
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/351035.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

泪崩!测试面试技术面过了却挂在了——“谈谈你的职业生涯规划”

前不久&#xff0c;软件测试交流群里面有一个成员吐槽&#xff0c;说今天的面试技术已经面过了&#xff0c;可HR却问了她“未来的职业发展目标是什么&#xff1f;”然后&#xff0c;挂了&#xff01;这个问题我们平时在交流群里都有讲过&#xff0c;可是这丫头比较疯&#xff0…

kml或kmz文件用什么软件打开

下载安装 bigemap GIS office软件&#xff08;免费就可以) 2、 安装好下载的bigemap软件&#xff0c;直接将kml kmz拖到软件里面就打开了&#xff0c;或者左上角文件打开 选择 kml/kmz 然后选择你的文件 打开记性了。 BIGEMAP支持所有文件格式的打开和保存&#xff0c;如下图…

解读DXF文件

转自&#xff1a;http://blog.csdn.net/yingmutongxue/article/details/8226104 C语言代码读取DXF&#xff1a; http://www.docin.com/p-346248233.html?nb1 组码详解&#xff1a;http://www.doc88.com/p-21997575043.html DXF文件就是以文档的格式描述图形数据的。可以用写…

GS2972(3G-SDI)视频输出驱动调试

GS2972视频输出调试 GS2972的硬件初始化GS2972的驱动时序GS2972的驱动RTL代码GS2972输出彩条GS2972驱动易出bug GS2972的硬件初始化 GS2972是HD-SDI/3G-SDI视频、音频串化器。其使用非常简单&#xff0c;但是要想把该芯片驱动起来&#xff0c;真心不容易。需要了解相关视频标准…

如何在Moonbeam设置多重签名钱包,加固资产安全

Moonbeam Safe是以太坊上Safe&#xff08;先前名为Gnosis Safe&#xff09;的分叉。Safe于2018年正式推出&#xff0c;并发展成为了以太坊上知名的去中心化托管协议和集体资产管理平台。 Moonbeam Safe可用于创建多重签名Safe钱包&#xff0c;通过配置一个多签&#xff08;mul…

【Protobuf速成指南】Win/Centos7下Protobuf安装教程

文章目录 安装教程一、Windows1.1 下载编译器1.2 配置PATH1.3 其他依赖项 二、Centos72.1 安装必要的工具2.2 下载安装包2.3 安装 安装教程 以版本为V21.11为例说明 一、Windows 1.1 下载编译器 下载地址&#xff1a;链接&#xff0c;一直往下翻找到 V21.11版本 win用户根据…

去除迅雷极速版 提示升级的广告

转载于:https://www.cnblogs.com/x-huihui/p/10926954.html

迅雷精简版 4.0.0 Mac中文版

迅雷精简版是一款非常实用的下载工具&#xff0c;体积轻巧但是下载速度却依然很快&#xff0c;采用与浏览器结合的方式&#xff0c;让用户在享受极速下载模式的同时也能流畅上网&#xff0c;系统性能依然稳定&#xff0c;而且迅雷精简版没有广告&#xff0c;给用户更好的上网体…

迅雷精简版 for Mac 去除多余无用功能

推一款非常给力、速度又快的专业下载工具mac迅雷精简版&#xff0c;普通的非VIP迅雷软件往往会出现一些下载限速&#xff0c;体验上相对并不是很好&#xff0c;而迅雷极速精简版则更好的解决了这个问题&#xff0c;其界面清爽简约&#xff0c;无广告&#xff0c;下载速度快&…

[Android] 安卓迅雷带云盘内测版7.0 简洁 无广告 官方版

迅雷内测版本 界面 简洁 无广告 带云盘功能 我也不多说 大家都知道迅雷APP 版本德性 全是广告 直接上图 跟链接 对于我说 内测版本 属实香 看不到一点广告 下载地址: https://n802.com/file/349707-458153240 http://www.yimuhe.com/file-4770885.html http://www.369pan.c…

2015 年一月联考逻辑真题

2015 年一月联考逻辑真题 真题&#xff08;2015-26&#xff09; 26.晴朗的夜晚我们可以看到满天星斗&#xff0c;其中有些是自身发光的恒星&#xff0c;有些是自身不发光但可以反射附近恒星光的行星。恒星尽管遥远&#xff0c;但是有些可以被现有的光学望远镜“看到”。和恒星不…

python使用requests+excel进行接口自动化测试

在当今的互联网时代中&#xff0c;接口自动化测试越来越成为软件测试的重要组成部分。Python是一种简单易学&#xff0c;高效且可扩展的语言&#xff0c;自然而然地成为了开发人员的首选开发语言。而requests和xlwt这两个常用的Python标准库&#xff0c;能够帮助我们轻松地开发…

navicat与SQLyog的区别

在之前的学习中由于先学的SQL Server&#xff0c;后来才学的MySQL&#xff0c;导致我刚学习的时候冥冥之中感觉到那有点不对劲&#xff0c;但是又说不出来。通过进行深入的学习解除到了Navicat Premium和SQLyog这两个工具&#xff0c;才让我明白了MySQL与之前学习的内容是有所出…

usb2.0-crw出现黄色叹号

最近点开设备管理器发现了许多黄色感叹号&#xff1a; 解决方案&#xff1a; 进入电脑官网&#xff0c;我的是DELL&#xff0c;搜索"驱动与下载dell"&#xff0c;输入自己的电脑型号&#xff0c;在手动搜索的模块里找主板芯片组&#xff0c;下载并安装。

postgrsql 增加字段

alter table 表名 add 字段名 数据类型&#xff1b; 不支持指定位置增加列&#xff0c;增加的列在末尾。 实例:alter table crw_it.ncs_ccs_stmt_zdhk add stmt_no varchar; crw_it是模式名&#xff08;schema&#xff09;

STM32单片机蓝牙APP语音识别取暖器GSM短信超温报警

实践制作DIY- GC0141-蓝牙APP语音识别取暖器 基于STM32单片机设计---蓝牙APP语音识别取暖器 二、功能介绍&#xff1a; 电路&#xff1a;STM32F103C最小系统DS18B20温度传感器 多个按键 LCD1602显示器 1个串口语音识别模块1个5V 加热片 模拟加热蜂鸣器SIM800 GSM短信模块 HC0…

ubuntu下依靠guvcview使用摄像头

1.检验系统是否可以检测到设备 $ lsusb Bus 001 Device 040: ID 046d:0825 Logitech, Inc. Webcam C270 如果没有相关的信息说明系统的驱动未安装。 2.检验摄像头的端口 ls -la /dev/vid* crw-rw---- 1 root video 81, 0 Feb 12 2016 /dev/video0 crw-rw---- 1 root vid…

降本增效,StarRocks 在同程旅行的实践

作者&#xff1a;周涛 同程旅行数据中心大数据研发工程师 同程旅行是中国在线旅游行业的创新者和市场领导者。作为一家一站式平台&#xff0c;同程旅行致力于满足用户旅游需求&#xff0c;秉持 "让旅行更简单、更快乐" 的使命&#xff0c;主要通过包括微信小程序、AP…

CRC16浅析

CRC即循环冗余校验码&#xff08;Cyclic Redundancy Check&#xff09;&#xff0c;是数据通信领域中最常用的一种查错校验码。奇偶校验虽然简单&#xff0c;但是漏检率太高&#xff0c;而CRC则要低的多&#xff0c;所以大多数都是使用CRC来校验。CRC也称为多项式码。 任何一个…

【Linux】关于Linux中的权限

文章目录 前言Linux权限文件访问者的分类&#xff08;人&#xff09;文件类型和访问权限&#xff08;事物属性&#xff09;文件类型基本权限 目录的权限粘滞位权限的总结 前言 前面我们已经知道。Linux下有两种用户&#xff1a;超级用户&#xff08;root&#xff09;、普通用户…