有勇气的牛排博客

Python APScheduler 定时任务调度教程:实现高效任务管理的最佳实践

有勇气的牛排 821 Python 2023-05-18 23:29:32

1 前言

APScheduler (Advanced Python Scheduler) 是一个 Python 库,用于在指定的时间执行调度任务。它支持多种调度方式,包括固定间隔、固定日期和时间间隔等。APScheduler 常用于 Web 开发、数据定时采集、任务自动化等场景。

1.1 APScheduler 基本概念

  • Job: 一个任务,即需要定时执行的函数。
  • Trigger: 决定任务何时执行的触发器。APScheduler 支持多种触发器,例如 dateintervalcron
  • Scheduler: 任务调度器,负责管理和执行任务。

2 安装

apscheduler==3.10.4

测试环境:Python 3.10.9

3 APScheduler 触发器类型

3.1 指定时间 ymd hms

scheduler.add_job(func, 'date', run_date='2024-11-01 10:00:00')

3.2 固定时间 间隔 执行任务

# 每60秒执行一次 scheduler.add_job(func, 'interval', seconds=60)

3.3 cron 表达式

# 每天 19:00 执行一次 scheduler.add_job(func, 'cron', hour=19, minute=0)
# 每周一至周五的 6:30 执行 scheduler.add_job(my_job, 'cron', day_of_week='mon-fri', hour=6, minute=30)

4 基本示例: 每2s执行一次

from apscheduler.schedulers.blocking import BlockingScheduler from datetime import datetime # 初始化調度器 scheduler = BlockingScheduler() def my_job(): print("任务运行时间:", datetime.now()) def start_cron(): # 每 2s 運行一次 scheduler.add_job(my_job, "interval", seconds=2) # 启动调度器 try: scheduler.start() except (KeyboardInterrupt, SystemExit): pass if __name__ == '__main__': start_cron()

Python定时任务 基本示例: 每2s执行一次

5 进阶功能

APScheduler 支持任务参数传递、任务暂停与恢复、错误处理和多种调度器类型。下面是一些进阶功能。

5.1 任务参数传递

def print_msg(msg, url): print(f"{msg}: {url}") # 每 1s 運行一次 scheduler.add_job(print_msg, "interval", seconds=1, args=['有勇氣的牛排定時任務', "https://www.couragesteak.com"])

Python定时任务 任务参数传递

5.2 任务暂停与恢复

job = scheduler.add_job(print_msg, 'interval', seconds=5, args=['有勇氣的牛排定時任務']) job.pause() # 暂停任务 job.resume() # 恢复任务

5.3 错误处理与任务日志

可以使用 job_defaults 配置任务出错后的重试次数、任务日志等。

scheduler.configure(job_defaults={'misfire_grace_time': 30})

6 协成异步支持

import asyncio from apscheduler.schedulers.asyncio import AsyncIOScheduler from datetime import datetime # 定义一个异步任务 async def print_msg(msg, url): print(f"{datetime.now()} - {msg}: {url}") # 模拟异步操作 await asyncio.sleep(1) print("异步任务执行完成") # 启动调度器并添加异步任务 def start_async_cron(): scheduler = AsyncIOScheduler() # 每 5 秒运行一次异步任务 scheduler.add_job( print_msg, 'interval', seconds=1, args=['有勇氣的牛排定時任務', "https://www.couragesteak.com"], max_instances=3 # 允许最多 3 个实例同时运行 ) # 启动调度器 scheduler.start() # 启动 asyncio 事件循环 try: asyncio.get_event_loop().run_forever() except (KeyboardInterrupt, SystemExit): print("正在关闭调度器...") scheduler.shutdown() print("调度器已关闭") if __name__ == '__main__': start_async_cron()

Python定时任务 协成异步支持


留言

专栏
文章
加入群聊