SoFunction
Updated on 2024-10-28

Python task scheduling tool of APScheduler details

 

Task Scheduling Application Scenarios

The so-called task scheduling refers to arranging the execution plan of the task, i.e. when to execute, how to execute, etc.. They often appear in real projects; especially data-based projects, such as real-time statistics on the number of visits to the site every 5 minutes, you need to analyze the number of visits from the log data at regular intervals of every 5 minutes.

Summarize the task scheduling application scenarios:

  • Offline job scheduling: executing a task with temporal granularity
  • Shared cache updates: timed cache refreshes, e.g. redis cache; shared data between different processes

Task scheduling tool

  • crontab for linux, supports executing tasks at minute/hour/day/month/week granularity.
  • Quartz for java
  • Task plan for windows

This article introduces a task scheduling library in python, APScheduler (advance python scheduler). If you know Quartz, you can see that APScheduler is a python implementation of Quartz; APScheduler provides a time-based, fixed point in time and crontab method of task invocation scheme, can be used as a cross-platform scheduling tool.

APScheduler

Component Introduction

APScheduler consists of 5 parts: trigger, scheduler, task memory, executor and task events.

  • Task job: task id and task execution func
  • Trigger triggers: determine when a task starts executing
  • Job Stores: Stores the state of a job.
  • Executor executors: determines how a task is executed
  • Task event: monitoring of task execution exceptions
  • Scheduler schedulers: connect the entire life cycle of the task, add editing tasks to the task memory, when the execution time of the task arrives, the task will be handed over to the executor to execute and return the results; at the same time, the event listener is issued to monitor the task events.

 

mounting

pip install apscheduler

simple example

from  import BackgroundScheduler 
from  import ThreadPoolExecutor, ProcessPoolExecutor 
from  import SQLAlchemyJobStore 
from  import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR 
import logging 
import datetime 
# Task Execution Functions
def job_func(job_id): 
 print('job %s is runed at %s' % (job_id, ().strftime('%Y-%m-%d %H:%M:%S'))) 
# Event Listening
def job_exception_listener(event): 
 if : 
 # todo: exception handling, alerts, etc.
 print('The job crashed :(') 
 else: 
 print('The job worked :)') 
# Logs
() 
('apscheduler').setLevel() 
# Define a non-blocking scheduler for background tasks
scheduler = BackgroundScheduler() 
# Add a task to memory
# Trigger: trigger='interval' seconds=10 Trigger execution every 10s
# Executor: executor='default' thread execution
# taskstore: jobstore='default' default memory store
# Maximum concurrency: max_instances
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) 
# Set up task listeners
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) 
# Start the scheduler
() 

Operation:

job 1 is runed at 2020-03-21 20:00:38 
The job worked :) 
job 1 is runed at 2020-03-21 20:00:48 
The job worked :) 
job 1 is runed at 2020-03-21 20:00:58 
The job worked :)

flip-flop (electronics)

Triggers determine when a task is executed, and there are three types of triggers supported by the APScheduler

trigger='interval': Execute at a fixed interval, supports weeks, days, hours, minutes, seconds, and can specify a time range.

sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00') 

trigger='date': fixed time, executed once

sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text']) 

trigger='cron': support crontab way to execute tasks

Parameters: minutes/hours/days/months/weeks granularity, time range can also be specified

year (int|str) – 4-digit year 
 month (int|str) – month (1-12) 
 day (int|str) – day of the (1-31) 
 week (int|str) – ISO week (1-53) 
 day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) 
 hour (int|str) – hour (0-23) 
 minute (int|str) – minute (0-59) 
 second (int|str) – second (0-59) 
 start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) 
 end_date (datetime|str) – latest possible date/time to trigger on (inclusive) 

(for) instance

# Mon-Fri, 5:30 Execute task job_function until 2014-05-30 00:00:00
  sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') 
  # Execute in crontab format in minutes hours days months weeks, * indicates all
  # 1 to 15 of May through August, 0:00 0:00 Execute task job_function
  sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *')) 

actuators

The executor decides how to execute the task; APScheduler supports four different executors, commonly used pool (thread/process) and gevent (io-multiplexing, supports high concurrency), the default is the pool in the thread pool, the different executors can be configured in the scheduler's configuration (see scheduler)

  • : synchronized io, blocking
  • : io multiplexed, non-blocking
  • : ThreadPoolExecutor and ProcessPoolExecutor
  • : event-driven based

task memory

The task memory determines how the tasks are stored. by default they are stored in memory (MemoryJobStore) and are not available after a reboot.The task memories supported by the APScheduler are:

  • : Memory
  • : stored in mongodb
  • : stored in redis
  • : stored in rethinkdb
  • : Support sqlalchemy databases such as mysql, sqlite, etc.
  •  :zookeeper

The different task memories can be configured in the scheduler's configuration (see Scheduler)

scheduler

APScheduler supports the following scheduler methods, the more commonly used are BlockingScheduler and BackgroundScheduler

  • BlockingScheduler: applies to the scheduler is the only process running in the process, call start function will block the current thread, can not immediately return.
  • BackgroundScheduler: applicable to scheduler running in the background of the application, the main thread will not block after calling start.
  • AsyncIOScheduler: for applications that use the asyncio module.
  • GeventScheduler: for applications that use the gevent module.
  • TwistedScheduler: for building Twisted applications.
  • QtScheduler: for building Qt applications.

From the previous example, we can see that the scheduler can manipulate tasks (and specify triggers, task memories, and executors for tasks) and monitor tasks.

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

Let's look at each part in detail

Scheduler configuration: in add_job we see that both jobstore and executor are default.APSchedulerDifferent task stores and executors can be specified when defining the scheduler, as well as initial parameters

from pytz import utc 
 from  import BackgroundScheduler 
 from  import MongoDBJobStore 
 from  import SQLAlchemyJobStore 
 from  import ThreadPoolExecutor, ProcessPoolExecutor 
 # Execute different jobstores, executors and default parameters by dict method
 jobstores = { 
 'mongo': MongoDBJobStore(), 
 'default': SQLAlchemyJobStore(url='sqlite:///') 
 } 
 executors = { 
 'default': ThreadPoolExecutor(20), 
 'processpool': ProcessPoolExecutor(5) 
 } 
 job_defaults = { 
 'coalesce': False, 
 'max_instances': 3 
 } 
 # Define the scheduler
 scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc) 
 def job_func(job_id): 
 print('job %s is runed at %s' % (job_id, ().strftime('%Y-%m-%d %H:%M:%S'))) 
 # Add tasks
 scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10) 
 # Start the scheduler
 () 

Manipulating tasks: The scheduler can add, delete, pause, resume and modify tasks. Note that the operations here only work on unexecuted tasks; tasks that have already been executed and are currently executing are not affected by these operations.

  add_job    

scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)

remove_job: When a job is deleted, the corresponding job memory record is also deleted, using the unique id of the job.

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') 
 scheduler.remove_job('my_job_id') 

Pausing and resuming jobs: Pausing and resuming jobs

scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') 
 scheduler.pause_job('my_job_id') 
 scheduler.resume_job('my_job_id') 

Modifying jobs: Modifying the configuration of jobs

job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10) 
 # Modify the attributes of the task
 (max_instances=6, name='Alternate name') 
 # Triggers to modify tasks
 scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5') 

The more common types of monitoring task event types are:

  • EVENT_JOB_ERROR: Indicates an exception triggered during the execution of the task.
  • EVENT_JOB_EXECUTED: when task execution is successful
  • EVENT_JOB_MAX_INSTANCES: when the tasks executed on the scheduler exceed the configured parameters

scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)   

summarize

This article on Python task scheduling tool APScheduler detailed article is introduced to this, more related python task scheduling APScheduler content, please search for my previous posts or continue to browse the following related articles I hope you will support me in the future more!