APScheduler定时器使用【简易版】:django中使用apscheduler,使用mysql做存储后端

一、基本环境

python版本:3.8.5

APScheduler==3.10.4
Django==3.2.7
djangorestframework==3.15.1
SQLAlchemy==2.0.29
PyMySQL==1.1.0

二、django基本设置

2.1、新增一个app

该app用来写apscheduler相关的代码

python manage.py startapp gs_scheduler

2.2、修改配置文件settings.py

#使用pymysql做客户端
import pymysql
pymysql.install_as_MySQLdb()INSTALLED_APPS = ['rest_framework', #注册restful 应用'gs_scheduler', #注册新增的app
]#配置mysql
MYSQL_HOST = '127.0.0.1'
MYSQL_PORT = 3306
MYSQL_USER = 'root'
MYSQL_PASSWORD = 'admin-root'
MYSQL_NAME = 'study_websocket'DATABASES = {'default': {'ENGINE': 'django.db.backends.mysql','HOST': MYSQL_HOST,'PORT': MYSQL_PORT,'USER': MYSQL_USER,'PASSWORD': MYSQL_PASSWORD,'NAME': MYSQL_NAME,}
}

2.3、gs_scheduler创建urls.py

1、gs_scheduler/urls.py

from django.urls import path
from . import views
urlpatterns = []

2、根路由urls.py

from django.contrib import admin
from django.urls import path,includeurlpatterns = [path('admin/', admin.site.urls),path('api/scheduler/',include('gs_scheduler.urls')),
]

三、配置gs_scheduler应用

3.1、配置api接口

这些接口,用来展示定时任务的运行情况

1、urls.py

from django.urls import path
from . import views
urlpatterns = [path('run_next/', views.JobNextRunTimeAPIView.as_view()),#定时任务下次运行path('run_history/',views.JobRunTimeHistory.as_view()),#定时任务运行历史path('run_error/',views.JobRunErrorHistory.as_view()), #定时任务最近运行错误
]

2、models.py

from django.db import models# Create your models here.
import sqlite3
from gs_scheduler.management.commands.config import MYSQL_HOST,MYSQL_NAME,MYSQL_PORT,MYSQL_PASSWORD,MYSQL_USER,MYSQL_CHARSET
from datetime import datetime,timedelta
from django.conf import settingsimport pymysql
from concurrent.futures import ThreadPoolExecutor#时间戳转时间字符串
def timestamp_to_time_str(timestamp):# 使用 datetime 模块将时间戳转换为 datetime 对象dt = datetime.fromtimestamp(timestamp)# 将 datetime 对象格式化为时间字符串time_str = dt.strftime('%Y-%m-%d %H:%M:%S')return time_strclass MysqlDB:DATETIME_FORMAT = '%Y-%m-%d %H:%M:%S'# 创建数据库连接池def __init__(self):self.conn = pymysql.connect(host=MYSQL_HOST,port=MYSQL_PORT,user=MYSQL_USER,password=MYSQL_PASSWORD,db=MYSQL_NAME,charset=MYSQL_CHARSET,cursorclass=pymysql.cursors.DictCursor)# 执行数据库增删改查def _execute_sql(self,query):sql,args = querytry:with self.conn.cursor() as cursor:# 执行sql语句if args:if isinstance(args,(tuple,list)):# args = (value1,value2)cursor.execute(sql,args)else:# args = value1cursor.execute(sql,(args,))else:cursor.execute(sql)# 不同类型sql,设置不同返回值if sql.strip().lower()[:6] == 'select':#查询语句rows = cursor.fetchall() or []return rowselif sql.strip().lower()[:4] == 'show':#查看表是否存在result = cursor.fetchall()return resultelse:#增删改语句self.conn.commit()return Trueexcept Exception as e:print('sql执行失败',e)pass# 线程池执行sql语句def start_sql_with_pool(self, sql:str,args=None):with ThreadPoolExecutor() as executor:result = executor.submit(self._execute_sql, (sql,args)).result()return result#创建记录定时任务历史表(调度器使用)def create_apscheduler_history(self):#创建表语句:表名不能是占位符sql = """CREATE TABLE apscheduler_history (id INTEGER AUTO_INCREMENT PRIMARY KEY,job_id VARCHAR(128),run_time DATETIME,is_error TINYINT,error_msg VARCHAR(256) NULL)"""#判断表存在不,不存在再创建results = self.start_sql_with_pool('SHOW TABLES')is_exist = Falsefor dic in results:if dic['Tables_in_{}'.format(MYSQL_NAME,)] == 'apscheduler_history':is_exist = True#表不存在才创建if not is_exist:# print('表不存在,执行创建表')create = self.start_sql_with_pool(sql)# 创建索引self.start_sql_with_pool(db.conn.cursor().execute("CREATE INDEX run_time_index ON apscheduler_history (run_time)"))return True# 将任务运行的结果记录到数据库中(调度器使用)def insert_into_apscheduler_history(self, data_list: list):if len(data_list) == 4:  # 异常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error,error_msg) VALUES (%s,%s,%s,%s)"""else:  # 正常执行的任务sql = """INSERT INTO apscheduler_history (job_id,run_time,is_error) VALUES (%s,%s,%s)"""#使用线程池执行插入语句self.start_sql_with_pool(sql,data_list)# 删除8小时前的历史记录(调度器使用)def delete_8hour_before_history(self):before_8hour = (datetime.now() - timedelta(hours=8)).strftime('%Y-%m-%d %H:%M:%S')sql = '''DELETE FROM apscheduler_history WHERE run_time < %s'''self.start_sql_with_pool(sql,(before_8hour,))# 查询每个任务的下次运行时间(api展示)def fetch_all_next_run_time(self):sql = 'SELECT id,next_run_time FROM apscheduler_jobs'# 获取查询结果rows = self.start_sql_with_pool(sql)for row in rows:row['next_run_time'] = timestamp_to_time_str(row['next_run_time'])return rows# 查询所有任务最近10次运行记录(api展示)def fetch_all_run_history(self):''':param query:'SELECT id,next_run_time,job_state FROM apscheduler_jobs':return:'''#获取所有定时任务iddata_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)# 创建一个游标对象sql = """SELECT id,job_id,run_time FROM apscheduler_history WHERE is_error=0 AND job_id = %s ORDER BY run_time DESC LIMIT 10"""ret_list = []for id in id_list:# 执行查询rows = self.start_sql_with_pool(sql,(id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近10次运行时间'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timedic['last_run_time'].append(run_time)ret_list.append(dic)return ret_list# 获取任务最近运行失败情况(api展示)def fetch_all_error_history(self):data_list = self.fetch_all_next_run_time()id_list = (dic.get('id') for dic in data_list)sql = """SELECT run_time,error_msg FROM apscheduler_history WHERE is_error=1 AND job_id = %s ORDER BY id DESC LIMIT 5"""ret_list = []for id in id_list:# 获取查询结果rows = self.start_sql_with_pool(sql, (id,)) or []dic = {'id': id, 'last_run_time': [], 'msg': '最近5次执行失败'}for row in rows:run_time = row.get('run_time')run_time = run_time.strftime(self.DATETIME_FORMAT) if isinstance(run_time,datetime) else run_timeerror = row.get('error_msg')dic['last_run_time'].append({'run_time': run_time, 'error': error})ret_list.append(dic)return ret_list# 关闭连接def close(self):self.conn.close()if __name__ == '__main__':db = MysqlDB()db.create_apscheduler_history()# for i in range(1,10):#     db.insert_into_apscheduler_history(['send_to_big_data','2024-05-01 13:{}:12'.format(str(i).zfill(2)),1,'执行失败了'])# db.delete_8hour_before_history()# ret = db.fetch_all_run_history()# print(ret,'history')# ret = db.fetch_all_next_run_time()# print(ret,'next')# ret = db.fetch_all_error_history()# print(ret,'error')db.close()

3、views.py

from django.shortcuts import render# Create your views here.
from rest_framework.views import APIView
from rest_framework.response import Response
from .models import MysqlDB#任务下次运行时间
class JobNextRunTimeAPIView(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_next_run_time()db.close()ret = {'code':200,'status':'success','data':data,}return Response(ret)#任务最近运行历史
class JobRunTimeHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_run_history()db.close()ret = {'code':200,'status':'success','data':data}return Response(ret)#任务最近运行错误
class JobRunErrorHistory(APIView):authentication_classes = []def get(self,request):db = MysqlDB()data = db.fetch_all_error_history()db.close()ret = {'code': 200,'status': 'success','data': data}return Response(ret)

3.2、在gs_scheduler创建

1、config.py 代码

该文件存放的是启动APScheduler调度器的一些配置数据

import os
from apscheduler.jobstores.memory import MemoryJobStore #内存做后端存储
#from apscheduler.jobstores.redis import RedisJobStore #redis做后端存储
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore #mysql等做后端存储
# from django.conf import settings
from study_apscheduler import settings
#mysql://root:ldc-root@127.0.0.1:3306/jobs?charset=utf8
MYSQL_CONFIG = settings.DATABASES.get('default')
MYSQL_USER = MYSQL_CONFIG.get('USER')
MYSQL_PASSWORD = MYSQL_CONFIG.get('PASSWORD')
MYSQL_HOST = MYSQL_CONFIG.get('HOST')
MYSQL_PORT = MYSQL_CONFIG.get('PORT')
MYSQL_NAME = MYSQL_CONFIG.get('NAME')
MYSQL_CHARSET = 'utf8mb4'
URL = 'mysql://{}:{}@{}:{}/{}?charset={}'.format(MYSQL_USER,MYSQL_PASSWORD,MYSQL_HOST,MYSQL_PORT,MYSQL_NAME,MYSQL_CHARSET)
#时区
TIME_ZONE = 'Asia/Shanghai'
#job的默认配置
JOB_DEFAULTS =  {'coalesce': True, #系统挂掉,任务积攒多次为执行,True是合并成一次执行,False是执行所有的次数。 持久化存储才有效'max_instances': 3 # 同一个任务同一时间最多只能有3个实例在运行。}
#job的存储后端
JOB_STORE = {'default': SQLAlchemyJobStore(url=URL)
}#监听事件对应的情况
LISTENER={1:'调度程序启动',2:'调度程序关闭',4:'调度程序中任务处理暂停',64:'将任务存储添加到调度程序中',8192:'任务在执行期间引发异常',4096:'任务执行成功',
}

2、task.py

所有的定时任务都存放在这里

import os
from datetime import datetime, timedelta, date
from gs_scheduler.models import MysqlDB
from utils.log_util import info_log
from utils.send_monitor_data import SendData#推送到数据仓的告警信息:5分钟执行一次
send_to_big_data = SendData().send#清除定时任务历史运行记录
def delete_apscheduler_history():db = MysqlDB()db.delete_8hour_before_history()if __name__ == '__main__':print(os.path.dirname(os.path.dirname(os.path.dirname(__file__))))

3、crontab.py

实例化调度器

# 导入所需的调度器类和触发器类
from apscheduler.schedulers.background import BackgroundScheduler #后台运行
from apscheduler.schedulers.blocking import BlockingScheduler  #主进程运行,需要单独运行
from apscheduler.triggers.interval import IntervalTrigger #时间间隔
from apscheduler.triggers.cron import CronTrigger #复杂的定时任务
from apscheduler.triggers.date import DateTrigger #一次性定时任务
from django.core.management.base import BaseCommand
from apscheduler import events
from pytz import timezone
from threading import RLock
from datetime import datetime, timedelta
from gs_scheduler.models import MysqlDB
#定时任务
from .task import delete_apscheduler_history
from .task import send_to_big_data
#日志
from utils.log_util import info_log
from .config import LISTENER
from .config import TIME_ZONE,JOB_DEFAULTS,JOB_STORE#脚本运行:python manage.py crontab
class Command(BaseCommand):TIME_FORMAT = '%Y-%m-%d %H:%M:%S'#初始化调度器def _scheduler_obj(self):scheduler = BlockingScheduler()scheduler.configure(timezone = TIME_ZONE, #时区job_defaults=JOB_DEFAULTS, #job的默认配置jobstores=JOB_STORE, #job的存储后端)return scheduler#添加任务def _add_job(self,scheduler:BlockingScheduler):#每5分钟执行一次推送告警到大数据仓scheduler.add_job(send_to_big_data,trigger=IntervalTrigger(minutes=1),id='send_to_big_data',replace_existing=True,coalesce=True,)#每隔8个小时,清除历史的记录scheduler.add_job(delete_apscheduler_history,trigger=IntervalTrigger(hours=8),id='delete_apscheduler_history',replace_existing=True,coalesce=True,)#添加监听器def _listener(self,event:events):code = event.coderun_time = datetime.now().strftime(self.TIME_FORMAT)msg = LISTENER.get(code)db = MysqlDB()if msg:if code == 4096:#成功运行job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,0])elif code == 8192:#运行异常了job_id = event.job_id#记录到数据库中db.insert_into_apscheduler_history([job_id,run_time,1,msg])else:info_log(msg)db.close()def start(self):scheduler = self._scheduler_obj()# 创建记录运行记录db = MysqlDB()db.create_apscheduler_history()db.close()# 设置监听器scheduler.add_listener(self._listener)# 设置定时任务self._add_job(scheduler)try:# print('{},定时器启动成功,等待定时任务执行...'.format(datetime.now().strftime(self.TIME_FORMAT)))scheduler.start()except KeyboardInterrupt:scheduler.shutdown()# python manage.py crontab运行 就是调用该方法def handle(self, *args, **options):self.start()#伴随django,在后台运行的。 在wsgi.py 文件中,调用BackRunScheduler().start()
class BackRunScheduler(Command):# 初始化调度器def _scheduler_obj(self):scheduler = BackgroundScheduler()scheduler.configure(timezone=TIME_ZONE,  # 时区job_defaults=JOB_DEFAULTS,  # job的默认配置jobstores=JOB_STORE,  # job的存储后端)return scheduler

3.3、启动方式:

方式一: python manage.py  crontab    (生产环境,推荐使用此方式,单独运行)

方式二:在settings.py中,调用BackRunScheduler().start()  , 后台运行

四、测试

4.1、启动

启动django项目:python manage.py runserver 8080

启动定时器:python manage.py crontab

4.2、等待一段时间

1、查询任务下次执行时间

请求:http://127.0.0.1:8080/api/scheduler/run_next/

2、查询任务最近运行情况

请求:http://127.0.0.1:8080/api/scheduler/run_next/

3、查询任务最近异常情况

请求:http://127.0.0.1:8080/api/scheduler/run_error/

五、源代码下载

码云地址:

django应用定时器: django下使用定时器的方法icon-default.png?t=N7T8https://gitee.com/liuhaizhang/django-application-timer/

目前有两套代码:一个以mysql存储任务,一个用sqlite存储任务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://xiahunao.cn/news/3018994.html

如若内容造成侵权/违法违规/事实不符,请联系瞎胡闹网进行投诉反馈,一经查实,立即删除!

相关文章

20232810 2023-2024-2 《网络攻防实践》实验八

一、实践内容 1.1 恶意代码 1.1.1 简介 定义&#xff1a;恶意代码&#xff08;Malware,或Malicious Code&#xff09;指的是使计算机按照攻击者的意图执行以达到恶意目标的指令集。 指令集合&#xff1a;二进制执行文件、脚本语言代码、宏代码、寄生在文件或者启动扇区的指令…

想要买到心仪的旋转式孔板流量计吗?

选择旋转式孔板流量计可不能云里雾里的乱选择呀&#xff0c;煤矿对产品质量要求很严格的。所以我们要先了解产品的再决定才是对的选择。 旋转式孔板流量计技术参数【1--5--9】 规格&#xff1a;DN15&#xff5e;DN1000 孔径比(βd/D)&#xff1a;β0&#xff0e;2—0&#xff…

深圳车间厂房降温用什么设备好?

环保水空调&#xff08;也被称为水冷空调或蒸发式降温换气机组&#xff09;的特点主要体现在以下几个方面&#xff1a; 节能环保&#xff1a;环保水空调使用水作为冷媒介&#xff0c;相比传统空调的制冷方式&#xff0c;它能在制冷过程中节约更多的能源&#xff0c;减少碳排放…

常用的外贸软件有哪些

常用的外贸软件涵盖了多个方面&#xff0c;包括客户开发、订单管理、库存控制、客户关系管理(CRM)、财务管理以及跨境电商平台等。以下是一些代表性的外贸软件和平台&#xff1a; 客户开发与营销软件: 大镜山谷歌搜索大师易谷歌地图数据采集大师米贸搜 外贸管理软件 (ERP): 神卓…

美国硅谷裸机云大宽带服务器在哪些行业中应用最广泛?

美国硅谷裸机云大宽带服务器在视频流媒体、实时数据分析和金融交易等行业中应用最广泛。关于美国硅谷裸机云大带宽服务器的各行业应用&#xff0c;rak部落小编为您做出详细的阐述。 美国硅谷裸机云大宽带服务器因其结合了高性能物理服务器和大带宽网络连接的特点&#xff0c;成…

H7-TOOL的双硬件串口同时运行Modbus主机和从机方法,方便大家Modbus测试验证(2024-05-06)

H7-TOOL的双硬件串口同时运行Modbus主机和从机方法&#xff0c;方便大家Modbus测试验证&#xff08;2024-05-06&#xff09; 使用这种方法&#xff0c;仅使用一个TOOL就可以方便同时运行Modbus主机和从机。 【Modbus专题视频】 可以用来熟悉Modbus协议 BSP视频教程第23期…

torch教程

一 基本用法 1 torch.autograd.Function PyTorch 74.自定义操作torch.autograd.Function - 知乎 (zhihu.com)https://zhuanlan.zhihu.com/p/344802526 虽然pytorch可以自动求导,但是有时候一些操作是不可导的,这时候你需要自定义求导方式。也就是所谓的 "Extending t…

【0DAY】瑞友天翼应用虚拟化系统index.php接口处存在SQL注入漏洞导致程RCE

免责声明&#xff1a;文章来源互联网收集整理&#xff0c;请勿利用文章内的相关技术从事非法测试&#xff0c;由于传播、利用此文所提供的信息或者工具而造成的任何直接或者间接的后果及损失&#xff0c;均由使用者本人负责&#xff0c;所产生的一切不良后果与文章作者无关。该…

进行交直流充电桩检测的步骤和规范

交直流充电桩检测是为了保证充电桩的安全、稳定和高效运行&#xff0c;对充电桩的各项性能进行检测的过程。以下是进行交直流充电桩检测的步骤和规范&#xff1a; 1. 准备工作&#xff1a;在进行充电桩检测之前&#xff0c;需要准备好相关的检测设备和工具&#xff0c;如电压表…

揭秘速卖通自养号测评:如何提升账号权重与转化率?

速卖通自养号测评深度解析 在速卖通平台上&#xff0c;自养号测评是一个精细且复杂的过程&#xff0c;它涵盖了资源准备、环境搭建、账号运营、测评实施等多个环节。下面&#xff0c;我将详细解析这一过程&#xff0c;并分享我的原创见解。 一、资源与环境的基础搭建 首先&…

uni-app 多列picker切换列显示对应内容

html部分&#xff1a; <view class"uni-list"><view class"uni-list-cell"><view class"uni-list-cell-left">选择用户</view><view class"uni-list-cell-db"><picker mode"multiSelector"…

【设计模式】之观察者模式

系列文章目录 【设计模式】之装饰器模式【设计模式】之工厂模式&#xff08;三种&#xff09;【设计模式】之工厂模式&#xff08;三种&#xff09; 前言 今天给大家介绍另一种设计模式--观察者模式&#xff0c;有了解webscoket实现原理的小伙伴应该对这个设计模式不陌生。不清…

618有什么值得入手的电器?618家电狂欢不容错过的电器分享!

618电器节&#xff0c;作为年中最大的购物狂欢之一&#xff0c;汇聚了众多知名品牌和优质产品&#xff0c;为您提供了丰富的选择和超值的优惠。无论是您家中急需更换的老旧电器&#xff0c;还是您心仪已久的高端智能家电&#xff0c;都能在这个节日里找到心仪的选择。在这里&am…

手把手教数据结构与算法:有序线性表设计(表合并)

个人主页&#xff1a; 想转码的电筒人 专栏&#xff1a; 手把手教数据结构与算法 文章目录 有序线性表 概念 结构 问题描述 输入 输出 样例 解题步骤 结点类 链表类 insert函数 printAll函数 merge函数 test函数 完整代码 有序线性表 概念 单链表是一种物…

面试题:返回倒数第k个节点(简单)

面试题&#xff1a;返回倒数第k个节点&#xff08;简单&#xff09; 面试题 02.02. 返回倒数第 k 个节点 - 力扣&#xff08;LeetCode&#xff09;https://leetcode.cn/problems/kth-node-from-end-of-list-lcci/description/ 这一题是很简单的当做试手题 题目 题目分析 1&…

hive日常使用时忘记部分补充(不定时)

1、date_formate、unix_timestamp、from_unixtime用法&#xff1a; 2、lag&#xff08;&#xff09;、lead()用法&#xff1a; lag&#xff08;)窗口函数返回分区中当前行之前行&#xff08;可以指定第几行&#xff09;的值。 如果没有行&#xff0c;则返回null。 lead()窗口…

windows端口复用

1. 概述 使用 HTTP.sys 中的 Net.tcp Port Sharing 服务&#xff0c;配合 WinRM 实现端口复用。 优点&#xff1a; HTTP.sys 为 windows 原生机制&#xff0c; WinRM 为 windows 自带功能&#xff0c;动作较小&#xff0c;不易触发主 动防御。 需要管理员权限。 2. 原理 (…

定时任务相关:克戎表达式

克戎表达式的历史和概念 克戎表达式&#xff08;Cron Expression&#xff09;是一种用于表示定时任务的字符串格式&#xff0c;在计算机领域被广泛应用。它的历史可以追溯到UNIX系统&#xff0c;最早由Brian Kernighan与其他UNIX开发者在1970年代末和1980年代初开发。 克戎表…

matlab绘制时间序列图,横坐标轴如何标注为月-日

Excel表格中有类似于如下 年月日对应的数据 导入 matlab中&#xff0c;为数值矩阵&#xff1b;了解该表格中的时间跨度为从2021年1月2日至2021年12月31日&#xff0c;中间没有缺失&#xff0c;绘图代码&#xff1a; % clear; timespan1[20210102 20211231]; datenn1datenum(da…

[虚拟机+单机]梦幻契约H5修复版_附GM工具

本教程仅限学习使用&#xff0c;禁止商用&#xff0c;一切后果与本人无关&#xff0c;此声明具有法律效应&#xff01;&#xff01;&#xff01;&#xff01; 教程是本人亲自搭建成功的&#xff0c;绝对是完整可运行的&#xff0c;踩过的坑都给你们填上了 视频演示 [虚拟机单…