13518219792

建站动态

根据您的个性需求进行定制 先人一步 抢占小程序红利时代

Python任务调度利器:APScheduler

任务调度应用场景

创新互联公司从2013年创立,先为天宁等服务建站,天宁等地企业,进行企业商务咨询服务。为天宁企业网站制作PC+手机+微官网三网同步一站式服务解决您的所有建站问题。

所谓的任务调度是指安排任务的执行计划,即何时执行,怎么执行等。在现实项目中经常出现它们的身影;特别是数据类项目,比如实时统计每5分钟网站的访问量,就需要每5分钟定时从日志数据分析访问量。

总结下任务调度应用场景:

任务调度工具

本文介绍的是python中的任务调度库,APScheduler(advance python scheduler)。如果你了解Quartz的话,可以看出APScheduler是Quartz的python实现;APScheduler提供了基于时间,固定时间点和crontab方式的任务调用方案, 可以当作一个跨平台的调度工具来使用。

APScheduler

组件介绍

APScheduler由5个部分组成:触发器、调度器、任务存储器、执行器和任务事件。

安装

 
 
 
 
  1. pip install apscheduler 

简单例子

 
 
 
 
  1. from apscheduler.schedulers.background import BackgroundScheduler  
  2. from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  
  3. from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore  
  4. from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR  
  5. import logging  
  6. import datetime  
  7. # 任务执行函数  
  8. def job_func(job_id):  
  9.     print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))  
  10. # 事件监听  
  11. def job_exception_listener(event):  
  12.     if event.exception:  
  13.         # todo:异常处理, 告警等  
  14.         print('The job crashed :(')  
  15.     else:  
  16.         print('The job worked :)')  
  17. # 日志  
  18. logging.basicConfig()  
  19. logging.getLogger('apscheduler').setLevel(logging.DEBUG)  
  20. # 定义一个后台任务非阻塞调度器  
  21. scheduler = BackgroundScheduler()  
  22. # 添加一个任务到内存中   
  23. # 触发器:trigger='interval' seconds=10 每10s触发执行一次  
  24. # 执行器:executor='default' 线程执行  
  25. # 任务存储器:jobstore='default' 默认内存存储  
  26. # 最大并发数:max_instances  
  27. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)  
  28. # 设置任务监听  
  29. scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)  
  30. # 启动调度器  
  31. scheduler.start() 

运行情况:

 
 
 
 
  1. job 1 is runed at 2020-03-21 20:00:38  
  2. The job worked :)  
  3. job 1 is runed at 2020-03-21 20:00:48  
  4. The job worked :)  
  5. job 1 is runed at 2020-03-21 20:00:58  
  6. The job worked :) 

触发器

触发器决定何时执行任务,APScheduler支持的触发器有3种

 
 
 
 
  1. sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') 
 
 
 
 
  1. sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) 
 
 
 
 
  1. year (int|str) – 4-digit year  
  2.       month (int|str) – month (1-12)  
  3.       day (int|str) – day of the (1-31)  
  4.       week (int|str) – ISO week (1-53)  
  5.       day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun)  
  6.       hour (int|str) – hour (0-23)  
  7.       minute (int|str) – minute (0-59)  
  8.       second (int|str) – second (0-59)  
  9.       start_date (datetime|str) – earliest possible date/time to trigger on (inclusive)  
  10.       end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 
 
 
 
 
  1. # 星期一到星期五,5点30执行任务job_function,直到2014-05-30 00:00:00  
  2.            sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30')  
  3.            # 按照crontab格式执行, 格式为:分钟 小时 天 月 周,*表示所有  
  4.            # 5月到8月的1号到15号,0点0分执行任务job_function  
  5.            sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

执行器

执行器决定如何执行任务;APScheduler支持4种不同执行器,常用的有pool(线程/进程)和gevent(io多路复用,支持高并发),默认为pool中线程池, 不同的执行器可以在调度器的配置中进行配置(见调度器)

任务存储器

任务存储器决定任务的保存方式, 默认存储在内存中(MemoryJobStore),重启后就没有了。APScheduler支持的任务存储器有:

不同的任务存储器可以在调度器的配置中进行配置(见调度器)

调度器

APScheduler支持的调度器方式如下,比较常用的为BlockingScheduler和BackgroundScheduler

从前面的例子,我们可以看到,调度器可以操作任务(并为任务指定触发器、任务存储器和执行器)和监控任务。

 
 
 
 
  1. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) 

我们来详细看下各个部分

 
 
 
 
  1. from pytz import utc  
  2.    from apscheduler.schedulers.background import BackgroundScheduler  
  3.    from apscheduler.jobstores.mongodb import MongoDBJobStore  
  4.    from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore  
  5.    from apscheduler.executors.pool import ThreadPoolExecutor, ProcessPoolExecutor  
  6.    # 通过dict方式执行不同的jobstores、executors和默认的参数  
  7.    jobstores = {  
  8.        'mongo': MongoDBJobStore(),  
  9.        'default': SQLAlchemyJobStore(url='sqlite:///jobs.sqlite')  
  10.    }  
  11.    executors = {  
  12.        'default': ThreadPoolExecutor(20),  
  13.        'processpool': ProcessPoolExecutor(5)  
  14.    }  
  15.    job_defaults = {  
  16.        'coalesce': False,  
  17.        'max_instances': 3  
  18.    }  
  19.    # 定义调度器  
  20.    scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc)  
  21.    def job_func(job_id):  
  22.        print('job %s is runed at %s' % (job_id, datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))  
  23.    # 添加任务  
  24.    scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10)  
  25.    # 启动调度器  
  26.    scheduler.start() 
 
 
 
 
  1. scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) 
 
 
 
 
  1. scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
  2.  scheduler.remove_job('my_job_id') 
 
 
 
 
  1. scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id')  
  2.         scheduler.pause_job('my_job_id')  
  3.         scheduler.resume_job('my_job_id') 
 
 
 
 
  1. job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10)  
  2.        # 修改任务的属性  
  3.        job.modify(max_instances=6, name='Alternate name')  
  4.        # 修改任务的触发器  
  5.        scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') 
 
 
 
 
  1. scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)    

分享名称:Python任务调度利器:APScheduler
转载来源:http://cdbrznjsb.com/article/dpipeei.html

其他资讯

让你的专属顾问为你服务