Python运维之定时任务模块APScheduler

目录

定时任务模块APScheduler

一、安装及基本概念

1.1、APScheduler的安装

1.2、涉及概念

1.3、APScheduler的工作流程​编辑

二、配置调度器

三、启动调度器

四、调度事件监听


定时任务模块APScheduler

APScheduler提供了基于日期、固定时间间隔以及crontab类型的任务,我们可以在主程序的运行过程中快速增加新作业或删除旧作业。如果把作业存储在数据库中,那么作业的状态会被保存,当调度器重启时,不必重新添加作业,作业会恢复原状态继续执行。

一、安装及基本概念

1.1、APScheduler的安装

 pip install apscheduler

1.2、涉及概念

  • 触发器triggers:触发器包含调度逻辑描述一个任务何时被触发,有按日期按时间间隔按cronjob描述式三种触发方式。每个作业都有自己的触发器,除了初始配置之外,触发器是完全无状态的。
  • 作业存储器job stores:指定了作业被存放的位置,默认的作业存储器是内存,也可以将作业保存在各种数据库中。当作业被存放在数据库中时,它会被序列化;当重新被加载时,会反序列化。作业存储器充当保存、加载、更新和查找作业的中间商。在调度器之间不能共享作业存储
  • 执行器executors:执行器是将指定的作业调用函数提交到线程池或进程池中运行,当任务完成时,执行器通知调度器触发相应的事件
  • 调度器schedulers:任务调度器,控制器角色,通过它配置作业存储器、执行器和触发器、添加、修改和删除任务。调度器协调触发器、作业存储器、执行器的运行,通常只有一个调度程序运行在应用程序中,开发人员不需要直接处理作业存储器、执行器或触发器。配置作业存储器和执行器是通过调度器来完成的

1.3、APScheduler的工作流程

一个简单的间隔任务实例:

 import osfrom datetime import datetimefrom apscheduler.schedulers.blocking import BlockingScheduler​# 打印当前的时间def tick():print('Tick! The time is: %s' % datetime.now())​if __name__ == '__main__':scheduler = BlockingScheduler()# 添加一个作业rick,触发器为interval,每隔3秒执行一次scheduler.add_job(tick, 'interval', seconds=3)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass

另外的触发器为date,cron。date按特定时间点触发cron则按固定的时间间隔触发

上述代码稍作修改可变为cron类的定时任务:

 import osfrom datetime import datetimefrom apscheduler.schedulers.blocking import BlockingScheduler​def tick():print('Tick! The time is: %s' % datetime.now())​if __name__ == '__main__':scheduler = BlockingScheduler()scheduler.add_job(tick, 'cron', hour=19,minute=23)print('Press Ctrl+{0} to exit'.format('Break' if os.name == 'nt' else 'C'))try:scheduler.start()except (KeyboardInterrupt, SystemExit):pass

定时cron任务也非常简单,直接给触发器trigger传入'cron'即可。hour=19,minute23,表示每天的19时23分执行任务

 hour=19,minute=23hour='19',minute='23'minute='*/3'  # 表示每3分钟执行一次hour='19-21',minute='23'  # 表示19:23、20:23、21:23各执行一次任务

二、配置调度器

调度器的主循环其实就是反复检查是否有到期需要执行的任务,具体分两步进行

  • 询问自己的每一个作业存储器,有没有到期需要执行的任务。如果有则计算这些作业中每个作业需要 运行的时间点;如果时间点有多个,就做coalesce检查
  • 提交给执行器按时间点运行

各调度器的适用场景

  • BlockingSchduler:适用于调度程序,是进程中唯一运行的进程,调用start函数会阻塞当前线程,不能立即返回
  • BackgroundScheduler:适用于调度程序,在应用程序的后台运行调用start后主线程不会阻塞
  • AsyncIOScheduler:适用于使用了asyncio模块的应用程序
  • GeventScheduler:适用于使用了gevent模块的应用程序
  • TwistedScheduler:适用于构建Twisted的应用程序
  • QtSchuduler:适用于构建Qt的应用程序。

作业存储器的选择:一是内存( 默认),而是数据库

执行器的选择:默认的ThreadPoolExecutor足够OK,如果作业负载涉及CPU密集型操作,那么考虑使用ProcessPoolExecutor,甚至同时使用,将其作为二级执行器。

APScheduler可以使用字典,关键字参数传递配置调度器。首先实例化调度程序添加作业,然后配置调度器,获得最大的灵活性。

如果调度程序在应用程序的后台运行,则选择BackgroundScheduler,并使用默认的jobstore和executor

 from apscheduler.schedulers.blocking import BlockingSchedulerscheduler = BlockingScheduler()

如果想配置更多的信息,就可设置两个执行器、两个作业存储器、调整新作业的默认值,并设置不同的时区。配置详情:

  • 配置名为mongo的MongoDBjobStore作业存储器
  • 配置名为default的SQLAlchemyJobStore(使用SQLite)
  • 配置名为default的ThreadPoolExecutor,最大进程数为5
  • UTC作为调度器的时区
  • coalesce默认情况下关闭
  • 作业的默认最大运行实例限制为3

方法一:

 from pytz import utc​from apscheduler.schedulers.background import BlockingSchedulerfrom apscheduler.jobstores.mongodb import MongoDBJobStorefrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor​jobstores = {'mongo':MongoDBJobStore(),'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(5)}job_defaults = {'coalesce':False,'max_instances':3}scheduler = BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)

方法二:

 from apscheduler.schedulers.background import BlockingSchedulerscheduler = BlockingScheduler({'apscheduler.jobstores.mongo':{'type':'mongodb'},'apscheduler.jobstores.default':{'type':'sqlalchemy','url':'sqlite:///jobs.sqlite'},'apscheduler.executors.default':{'class':'apscheduler.executors.pool:ThreadPoolExecutor','max_workers':'5'},'apscheduler.job_defaults.coalesce':'fasle','apscheduler.job_defaults.max_instances':'3','apscheduler.timezone':'UTC',})

方法三:

 from pytz import utcfrom apscheduler.schedulers.background import BlockingSchedulerfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutor​jobstores = {'mongo':{'type':'mongodb'},'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':{'type':'threadpool','max_workers':20},'processpool':ProcessPoolExecutor(max_workers=5)}job_defaults = {'coalesce':False,'max_instances':3}scheduler = BlockingScheduler()scheduler.configure(jobstores=jobstores,executors=executors,job_defaults=job_defaults,timezone=utc)

三、启动调度器

启动调度器前需要先添加作业,有两种方法可以向调度器添加作业:一是通过接口add_job();二是通过使用函数装饰器,其中add_job()返回一个apscheduler.job.Job类的实例,用于后续修改或删除作业。

可以随时在调度器上调度作业。如果在添加作业时,调度器还没有启动,那么任务不会运行,并且它的第一次运行时间在调度器启动时计算。

调用调度器的start()方法启动调度器,下面用不同的作业存储器来举例:

 from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimefrom apscheduler.jobstores.memory import MemoryJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorf​def my_job(id='my_job'):print(id,'-->',datetime.datetime.now())​jobstores = {'default':MemoryJobStore()}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(10)}job_defaults = {'coalesce':False,'max_instance':3}scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')try:scheduler.start()except SystemExit:print('exit')exit()

方法二:使用数据库作为作业存储器(修改第5行和11行)

 from apscheduler.schedulers.blocking import BlockingSchedulerimport datetimefrom apscheduler.jobstores.memory import MemoryJobStorefrom apscheduler.executors.pool import ThreadPoolExecutor,ProcessPoolExecutorfrom apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore​def my_job(id='my_job'):print(id,'-->',datetime.datetime.now())​jobstores = {'default':SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')}executors = {'default':ThreadPoolExecutor(20),'processpool':ProcessPoolExecutor(10)}job_defaults = {'coalesce':False,'max_instance':3}scheduler =BlockingScheduler(jobstores=jobstores,executors=executors,job_defaults=job_defaults)scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',end_date='2024-06-06')scheduler.add_job(my_job,args=['job_once_now',],id='job_once_now')scheduler.add_job(my_job,args=['job_date_once',],id='job_date_once',trigger='date',run_date='2024-01-01 00:00:00')try:scheduler.start()except SystemExit:print('exit')exit()

运行过之后,如果不注释添加作业的代码,则作业会重新添加到数据库中,这样就有了两个作业,为了避免这样的情况:设置(replace_existing=True)

 scheduler.add_job(my_job,args=['job_interval',],id='job_interval',trigger='interval',seconds=5,replace_existing=True)

如果想运行错过运行的作业,则使用misfire_grace_time

 scheduler.add_job(my_job,args=['job_cron',],id='job_cron',trigger='cron',month='4-8,5-6',hour='7-11',second='*/10',coalesce=True,misfire_grace_time=30,replace_existing=True,end_date='2024-06-06')

其他操作如下:

 scheduler.remove_job(job_id,jobstore=None)      # 删除作业scheduler.remove_all_jobs(jobstore=None)        # 删除所有作业scheduler.pause_job(job_id,jobstore=None)       # 暂停作业scheduler.resume_job(job_id,jobstore=None)      # 恢复作业scheduler.modify_job(job_id,jobstore=None,**changes)    # 修改单个作业属性配置scheduler.reschedule_job(job_id,jobstore=None,trigger=None,**trigger_args) # 修改单个作业的触发器并更新下次运行时间scheduler.print_jobs(jobstore=None,out=sys.stdout)      # 输出作业信息

四、调度事件监听

日志记录和事件监听:

 from apscheduler.schedulers.blocking import BlockingSchedulerfrom apscheduler.events import EVENT_JOB_EXECUTED,EVENT_JOB_ERRORimport datetimeimport logging​# 配置日志记录信息 logging.basicConfig(level=logging.INFO,format='%(asctime)s %(filename)s[line:%(lineno)d %(levelname)s %(message)s',datefmt='%Y-%m-%d %H:%M:%S',filename='log1.txt',filemode='a')​def aps_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)​def date_test(x):print(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S'),x)print(1/0)​def my_listener(event):if event.exception:print('任务出错了!!!!')else:print('任务照常运行...')​scheduler = BlockingScheduler()scheduler.add_job(func=date_test,args=('一次性任务,会出错',),next_run_time=datetime.datetime.now() + datetime.timedelta(seconds=15),id='date_task')scheduler.add_job(func=aps_test,args=('循环任务',),trigger='interval',seconds=3,id='interval_task')scheduler.add_listener(my_listener,EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)scheduler._logger = logging​scheduler.start()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3029512.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

机器学习——6.模型训练案例: 预测儿童神经缺陷分类TD/ADHD

案例目的 有一份EXCEL标注数据,如下,训练出合适的模型来预测儿童神经缺陷分类。 参考文章:机器学习——5.案例: 乳腺癌预测-CSDN博客 代码逻辑步骤 读取数据训练集与测试集拆分数据标准化数据转化为Pytorch张量label维度转换定义模型定义损…

Navicat连接远程数据库时,隔一段时间不操作出现的卡顿问题

使用 Navicat 连接服务器上的数据库时,如果隔一段时间没有使用,再次点击就会出现卡顿的问题。 如:隔一段时间再查询完数据会出现: 2013 - Lost connection to MySQL server at waiting for initial communication packet, syste…

pydev debugger: process **** is connecting

目录 解决方案一解决方案二 1、调试时出现pydev debugger: process **** is connecting 解决方案一 File->settings->build,execution,deployment->python debugger 下面的attach to subprocess automatically while debugging取消前面的勾选(默认状态为勾…

Linux系统调用过程详解:应用程序调用驱动过程

Linux下应用程序调用驱动程序过程: (1)加载一个驱动模块(.ko),产生一个设备文件,有唯一对应的inode结构体 a、每个设备文件都有一个对应的’inode‘结构体,包含了设备的主次设备号,是设备的唯一…

Qt复习第二天

1、菜单栏工具栏状态栏 #include "mainwindow.h" #include "ui_mainwindow.h" #pragma execution_character_set("utf-8"); MainWindow::MainWindow(QWidget *parent): QMainWindow(parent), ui(new Ui::MainWindow) {ui->setupUi(this);//菜…

重生我是嵌入式大能之串口调试UART

什么是串口 串口是一种在数据通讯中广泛使用的通讯接口,通常我们叫做UART (通用异步收发传输器Universal Asynchronous Receiver/Transmitter),其具有数据传输速度稳定、可靠性高、适用范围广等优点。在嵌入式系统中,串口常用于与外部设备进…

【数据结构与算法】常见的排序算法

文章目录 排序的概念冒泡排序(Bubble Sort)插入排序(Insert Sort)选择排序(Select Sort)希尔排序(Shell Sort)写法一写法二 快速排序(Quick Sort)hoare版本&a…

从零开始搭建Ubuntu CTF-pwn环境

下面就将介绍如何从零搭建一个CTF-pwn环境(由于学习仍在进行,故一些环境如远程执行环境还没有搭建的经历,如今后需要搭建,会在最后进行补充) 可以在ubuntu官方网站上下载最新的长期支持版本:(我下载的是22.04版本) h…

AXI4写时序在AXI Block RAM (BRAM) IP核中的应用

在本文中将展示描述了AXI从设备(slave)AXI BRAM Controller IP核与Xilinx AXI Interconnect之间的写时序关系。 1 Single Write 图1是一个关于32位宽度的BRAM(Block RAM)的单次写入操作的例子。这个例子展示了如何向地址0x1000h…

如何查看centos7中Java在哪些路径下

在 CentOS 7 上,你可以通过几种方式查找安装的 Java 版本及其路径。以下是一些常用的方法: 1. 使用 alternatives 命令 CentOS 使用 alternatives 系统来管理同一命令的多个版本。你可以使用以下命令来查看系统上所有 Java 安装的配置: su…

【JVM】了解JVM规范中的虚拟机结构

目录 JVM规范的主要内容 1)字节码指令集(相当于中央处理器CPU) JVM指令分类 2)Class文件的格式 3)数据类型和值 4)运行时数据区 5)栈帧 6)特殊方法 7)类库 JVM规范的主要内容 1&#…

小程序如何确定会员身份并批量设置会员积分或余额

因为一些原因,商家需要从其它系统里面批量导入会员,确定会员身份,然后给他们设置对应的账户余额。下面,就具体介绍如何进行这种操作。 一、客户进入小程序并绑定手机号 进入小程序:客户打开小程序,系统会自…

在51单片机里面学习C语言

在开始前我有一些资料,是我根据网友给的问题精心整理了一份「C语言的资料从专业入门到高级教程」, 点个关注在评论区回复“888”之后私信回复“888”,全部无偿共享给大家!!! 说出来你们可能都…

创新案例|搜索新王Perplexity如何构建生成式AI产品开发的新模式

Perplexity AI:生成式搜索的颠覆者 刚刚成立满两年,Perplexity AI已经变成了我日常频繁使用的工具,甚至取代了我对 Google搜索的依赖 —— 而我并非个案。该公司仅凭不到 50 名员工,已经吸引了数千万用户。他们目前的年收入超过 …

浅析扩散模型与图像生成【应用篇】(二十三)——Imagic

23. Imagic: Text-Based Real Image Editing with Diffusion Models 该文提出一种基于文本的真实图像编辑方法,能够根据纯文本提示,实现复杂的图像编辑任务,如改变一个或多个物体的位姿和组成,并且保持其他特征不变。相比于其他文…

YOLO系列笔记(十)—— 基础:卷积层及其计算公式

卷积层及其计算公式 前言定义与功能计算过程与输出尺寸没有填充的情况有填充的情况 网络结构中的表示分析一:数字的含义分析二:分支的含义 前言 卷积层是在深度学习领域中非常常见、基础且重要的一种神经网络层。许多初学者可能会对卷积层的功能、其计算…

JDK不同版本里中国夏令时时间

什么是夏令时? 夏令时,(Daylight Saving Time:DST),也叫夏时制,又称“日光节约时制”和“夏令时间”,是一种为节约能源而人为规定地方时间的制度,在这一制度实行期间所采…

部署xwiki服务需要配置 hibernate.cfg.xml如何配置?

1. 定位 hibernate.cfg.xml 文件 首先,确保您可以在 Tomcat 的 XWiki 部署目录中找到 hibernate.cfg.xml 文件: cd /opt/tomcat/latest/webapps/xwiki/WEB-INF ls -l hibernate.cfg.xml如果文件存在,您可以继续编辑它。如果不存在&#xff…

KaiwuDB 参编的《分析型数据库技术要求》标准正式发布

近期,中国电子工业标准化技术协会正式发布团体标准《分析型数据库技术要求》(项目号:T-CESA 2023-006)。该标准由中国电子技术标准化研究院、KaiwuDB(上海沄熹科技有限公司) 等国内 16 家企业联合起草&…

Win11安装Docker Desktop运行Oracle 11g 【详细版】

oracle docker版本安装教程 步骤拉取镜像运行镜像进入数据库配置连接数据库,修改密码Navicat连接数据库 步骤 拉取镜像 docker pull registry.cn-hangzhou.aliyuncs.com/helowin/oracle_11g运行镜像 docker run -d -p 1521:1521 --name oracle11g registry.cn-ha…