spark 事件总线listenerBus

事件总线基本流程

图片来源:https://blog.csdn.net/sinat_26781639/article/details/105012302

LiveListenerBus创建

在sparkContext初始化中创建LiveListenerBus对象。

主要变量有两个

  • queues:事件队列,里面存放四个队列,每个队列中有对应的listener注册
  • queuedEvents:发生的事件


注册listener

内部listener注册

举例DynamicAllocation,在SpackContext初始化的时候创建了ExecutorAllocationManager对象,再调用start方法。

在ExecutorAllocationManager的start方法中,调用listenerBus相关方法完成注册

LiveListenerBus中queues分成四个队列,分别对应不同的注册方法,最终都是调用addToQueue方法

addToQueue比较简单,就是从queues中获取,有AsyncEventQueue就加入listener,没有就创建再加入。加入是调用AsyncEventQueue的addListener方法。

AsyncEventQueue的addListener方法是父类ListenerBus的addListener方法。
将listener加入到listenersPlusTimers中


内部listener是放入了对应队列的listenersPlusTimers中。

外部listener注册

在内部listener注册完成后调用setupAndStartListenerBus注册外部listener

在setupAndStartListenerBus方法中,读取配置spark.extraListeners获取要注册的外部listener集合,使用反射创建listener对象,遍历调用addToSharedQueue加入share queue。最后调用listenerBus的start方法启动listenerBus。

事件总线启动

如上图,在listener全部完成注册后,调用listenerBus的start方法启动。
将started变量修改为true,标记listenerBus启动。遍历queues,启动有注册的队列AsyncEventQueue(总共四个队列,但不一定全部都启用)。遍历queuedEvents处理在listenerBus没有启动期间产生的event。最后不再需要缓存消息了,将queuedEvents置为空。

AsyncEventQueue启动
将started变量变成true,标记AsyncEventQueue启动。启动dispatchThread线程。

dispatchThread线程是调用dispatch方法。
在dispatch方法中,可以看到是循环读取eventQueue,从其中读取event,调用postToAll发送给全部的listener。
eventQueue是一个阻塞队列LinkedBlockingQueue


到此,listenerBus启动完成,其中的列队AsyncEventQueue也启动完成。AsyncEventQueue循环从eventQueue中获取event来处理(这里是阻塞的)

发送消息

发送消息的入口是调用LiveListenerBus的post方法。
如果还没有启动,就将消息先缓存到queueEvents中。
如果启动了,就调用postToQueues将消息发送给全部队列。
在postToQueues中是遍历queue,调用post方法。

AsyncEventQueue的post方法中,就是将消息放入eventQueue即可。
但是eventQueue是有容量大小的,超过的消息就会丢弃。

至此,发送消息完成,将消息放入到AsyncEventQueue的eventQueue中。

处理消息

在启动的时候,dispatch线程已经完成了启动,从eventQueue获取event来处理。
处理消息是调用父类postToAll方法

postToAll方法中是遍历该队列全部listener,调用doPostEvent方法。

doPostEvent对应是SparkListenerBus的doPostEvent方法,根据event的类型,调用listener的不同的方法。

listener是要实现的SparkListenerInterface的方法,可以看到方法很多。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3248358.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

零基础学习Python(三)

1. 多重继承 一个子类可以继承多个父类,这与一些编程语言的规则不通。 如果多个父类中有同名的变量和方法,子类访问的顺序是按照继承时小括号里书写的顺序进行访问的。 可以用issubclass(B, A)方法判断B是否为A的子类。 2. 绑定 类中的方法通过参数s…

Unity 导入MRTK,使用URP 升级材质,MRTK的材质还是洋红色

控制台显示信息 ToggleBackground material was not upgraded. There’s no upgrader to convert Mixed Reality Toolkit/Standard shader to selected pipeline UnityEditor.Rendering.Universal.UniversalRenderPipelineMaterialUpgrader:UpgradeProjectMaterials() (at 点击…

Windows 电脑部署 ollama3 并安装模型

Windows 电脑部署 ollama3 并安装模型 部署中为了尽可能减少对本地环境的污染,使用 Docker 安装! github: https://github.com/ollama/ollama 准备部署文件 version: 3.8services:ollama:volumes:- ./models:/root/.ollama # 将本地文件夹挂载到容器中…

window11 部署llama.cpp并运行Qwen2-0.5B-Instruct-GGUF

吾名爱妃,性好静亦好动。好编程,常沉浸于代码之世界,思维纵横,力求逻辑之严密,算法之精妙。亦爱篮球,驰骋球场,尽享挥洒汗水之乐。且喜跑步,尤钟马拉松,长途奔袭&#xf…

AWS与其他友商云相比的优势

亚马逊网络服务(AWS)作为全球领先的云计算平台,在激烈的市场竞争中一直保持着领先地位。尽管其他云服务提供商如微软Azure和谷歌云平台也在不断发展,但AWS仍然拥有一些显著的优势。本文将结合九河云的分析探讨AWS相较于其他友商云服务的主要优势。 1. 全面的服务生态系统 AWS…

spring boot(学习笔记第十三课)

spring boot(学习笔记第十三课) 传统后端开发模式和前后端分离模式的不同,Spring Security的logout,invalidateHttpSession不好用,bug? 学习内容: 传统后端开发模式 vs 前后端分离模式Spring Security的logout功能inv…

初学者如何通过建立个人博客盈利

建立个人博客不仅能让你在网上表达自己,还能与他人建立联系。通过博客,可以创建自己的空间,分享想法和故事,并与有相似兴趣和经历的人交流。 本文将向你展示如何通过建立个人博客来实现盈利。你将学习如何选择博客主题、挑选合适…

[C/C++入门][ifelse]19、制作一个简单计算器

简单的方法 我们将假设用户输入两个数字和一个运算符&#xff08;、-、*、/&#xff09;&#xff0c;然后根据所选的运算符执行相应的操作。 #include <iostream> using namespace std;int main() {double num1, num2;char op;cout << "输入 (,-,*,/): &quo…

git镜像链接

镜像链接https://registry.npmmirror.com/binary.html?pathgit-for-windows/ CNPM Binaries Mirror 1.git init 2.克隆 IDEA集成git git分支

springboot助农电商系统-计算机毕业设计源码 08655

基于移动端的助农电商系统的设计与实现 XXX专业XX级XX班&#xff1a;XXX 指导教师&#xff1a;XXX 摘要 近年来&#xff0c;电子商务的快速发展引起了行业和学术界的高度关注。基于移动端的助农电商系统旨在为用户提供一个简单、高效、便捷的农产品购物体验&#xff0c;它不…

SpringCloud教程 | 第九篇: 使用API Gateway

1、参考资料 SpringCloud基础篇-10-服务网关-Gateway_springcloud gateway-CSDN博客 2、先学习路由&#xff0c;参考了5.1 2.1、建了一个cloudGatewayDemo&#xff0c;这是用来配置网关的工程&#xff0c;配置如下&#xff1a; http://localhost:18080/aaa/name 该接口代码如…

关于思维和智能体模型的思考(3)

在前面的讨论中我们已经提出&#xff0c;基于Agent 的AI 应用软件是由一组Agent 和环境信息构成的。其中环境信息非常重要&#xff0c;它们是大模型完成目标的重要依据。他决定了大模型思维的脉络。本文我们讨论环境信息。 环境信息的主要内容 每一次对话而言&#xff0c;大语…

LLaMA-Factory

文章目录 一、关于 LLaMA-Factory项目特色性能指标 二、如何使用1、安装 LLaMA Factory2、数据准备3、快速开始4、LLaMA Board 可视化微调5、构建 DockerCUDA 用户&#xff1a;昇腾 NPU 用户&#xff1a;不使用 Docker Compose 构建CUDA 用户&#xff1a;昇腾 NPU 用户&#xf…

9款初学者也能上手的电脑录音软件,高质量录制不是梦

市面上的电脑录音软件多如牛毛&#xff0c;我们该如何挑选最适合自己的电脑录音软件呢&#xff1f;挑选录音软件其实是有技巧的&#xff0c;今天小编整理了2024年十款用户较为熟悉的电脑录音工具。通过软件兼容系统、产品功能特性、用户评价反馈这三种方面。轻松帮助大家解决电…

一、网络通信和tcp协议

一、网络协议 1、计算机网络 简单类说就是利用通信线路实现计算机和通信设备进行信息交互的系统&#xff1b; 2、网络分类 局域网&#xff08;LAN&#xff09;&#xff1a;一般为几十米到及时公里 域域网&#xff08;MAN&#xff09;&#xff1a;介于LAN与WAN之间 广域网&…

Gettler‘s Screep World 笔记 Ⅰ

夏促时候刚刚入坑&#xff0c;写个笔记叭~ 环境配置 参考 HoPGoldy 大佬的简书&#xff0c;先配置下开发环境 萌新去看大佬的详细教程&#xff0c;我这里比较简单&#xff0c;有前端基础的可以直接抄 VSCode 跳过 node 我配的是v18.18.2 换源 npm config set registry h…

【查看WIFI密码】:在window操作系统上查看已连接过的WIFI密码(两种方式)

前言 通常情况下&#xff0c;我们想要将已经连接过的wifi分享给好友&#xff0c;但不知道怎么查看&#xff0c;废话不多说&#xff0c;直接上干货 方式一&#xff1a;通过cmd命令 Step01&#xff1a;打开cmd WIN r 弹出运行框 输入&#xff1a;cmd&#xff0c;点击确定&…

打靶记录——靶机easy_cloudantivirus

靶机下载地址 链接&#xff1a;https://pan.baidu.com/s/1OfrqdNKbabAkMvmoM70gbQ?pwdgz0m 提取码&#xff1a;gz0m Vulnhub 的靶机都有一个特点&#xff0c;通常导入到 VMware Workstation 时都会获取不到 IP 地址&#xff0c;虽然可以进紧急模式中修改&#xff0c;但是太麻…

Android SurfaceView 组件介绍,挖洞原理详解

文章目录 组件介绍基本概念关键特性使用场景 SurfaceHolder介绍主要功能使用示例 SurfaceView 挖洞原理工作机制 使用SurfaceView展示图片示例创建一个自定义的 SurfaceView类在 Activity 中使用 ImageSurfaceView注意事项效果展示 组件介绍 在 Android 开发中&#xff0c;Sur…

【STM32 HAL库】全双工DMA双buffer的I2S使用

1、配置I2S 我们的有效数据是32位的&#xff0c;使用飞利浦格式。 2、配置DMA **这里需要注意&#xff1a;**i2s的DR寄存器是16位的&#xff0c;如果需要发送32位的数据&#xff0c;是需要写两次DR寄存器的&#xff0c;所以DMA的外设数据宽度设置16位&#xff0c;而不是32位。…