Task Scheduling Application Scenarios
The so-called task scheduling refers to arranging the execution plan of the task, i.e. when to execute, how to execute, etc.. They often appear in real projects; especially data-based projects, such as real-time statistics on the number of visits to the site every 5 minutes, you need to analyze the number of visits from the log data at regular intervals of every 5 minutes.
Summarize the task scheduling application scenarios:
- Offline job scheduling: executing a task with temporal granularity
- Shared cache updates: timed cache refreshes, e.g. redis cache; shared data between different processes
Task scheduling tool
- crontab for linux, supports executing tasks at minute/hour/day/month/week granularity.
- Quartz for java
- Task plan for windows
This article introduces a task scheduling library in python, APScheduler (advance python scheduler). If you know Quartz, you can see that APScheduler is a python implementation of Quartz; APScheduler provides a time-based, fixed point in time and crontab method of task invocation scheme, can be used as a cross-platform scheduling tool.
APScheduler
Component Introduction
APScheduler consists of 5 parts: trigger, scheduler, task memory, executor and task events.
- Task job: task id and task execution func
- Trigger triggers: determine when a task starts executing
- Job Stores: Stores the state of a job.
- Executor executors: determines how a task is executed
- Task event: monitoring of task execution exceptions
- Scheduler schedulers: connect the entire life cycle of the task, add editing tasks to the task memory, when the execution time of the task arrives, the task will be handed over to the executor to execute and return the results; at the same time, the event listener is issued to monitor the task events.
mounting
pip install apscheduler
simple example
from import BackgroundScheduler from import ThreadPoolExecutor, ProcessPoolExecutor from import SQLAlchemyJobStore from import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR import logging import datetime # Task Execution Functions def job_func(job_id): print('job %s is runed at %s' % (job_id, ().strftime('%Y-%m-%d %H:%M:%S'))) # Event Listening def job_exception_listener(event): if : # todo: exception handling, alerts, etc. print('The job crashed :(') else: print('The job worked :)') # Logs () ('apscheduler').setLevel() # Define a non-blocking scheduler for background tasks scheduler = BackgroundScheduler() # Add a task to memory # Trigger: trigger='interval' seconds=10 Trigger execution every 10s # Executor: executor='default' thread execution # taskstore: jobstore='default' default memory store # Maximum concurrency: max_instances scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10) # Set up task listeners scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) # Start the scheduler ()
Operation:
job 1 is runed at 2020-03-21 20:00:38
The job worked :)
job 1 is runed at 2020-03-21 20:00:48
The job worked :)
job 1 is runed at 2020-03-21 20:00:58
The job worked :)
flip-flop (electronics)
Triggers determine when a task is executed, and there are three types of triggers supported by the APScheduler
trigger='interval': Execute at a fixed interval, supports weeks, days, hours, minutes, seconds, and can specify a time range.
sched.add_job(job_function, 'interval', hours=2, start_date='2010-10-10 09:30:00', end_date='2014-06-15 11:00:00')
trigger='date': fixed time, executed once
sched.add_job(my_job, 'date', run_date=datetime(2009, 11, 6, 16, 30, 5), args=['text'])
trigger='cron': support crontab way to execute tasks
Parameters: minutes/hours/days/months/weeks granularity, time range can also be specified
year (int|str) – 4-digit year month (int|str) – month (1-12) day (int|str) – day of the (1-31) week (int|str) – ISO week (1-53) day_of_week (int|str) – number or name of weekday (0-6 or mon,tue,wed,thu,fri,sat,sun) hour (int|str) – hour (0-23) minute (int|str) – minute (0-59) second (int|str) – second (0-59) start_date (datetime|str) – earliest possible date/time to trigger on (inclusive) end_date (datetime|str) – latest possible date/time to trigger on (inclusive)
(for) instance
# Mon-Fri, 5:30 Execute task job_function until 2014-05-30 00:00:00 sched.add_job(job_function, 'cron', day_of_week='mon-fri', hour=5, minute=30, end_date='2014-05-30') # Execute in crontab format in minutes hours days months weeks, * indicates all # 1 to 15 of May through August, 0:00 0:00 Execute task job_function sched.add_job(job_function, CronTrigger.from_crontab('0 0 1-15 may-aug *'))
actuators
The executor decides how to execute the task; APScheduler supports four different executors, commonly used pool (thread/process) and gevent (io-multiplexing, supports high concurrency), the default is the pool in the thread pool, the different executors can be configured in the scheduler's configuration (see scheduler)
- : synchronized io, blocking
- : io multiplexed, non-blocking
- : ThreadPoolExecutor and ProcessPoolExecutor
- : event-driven based
task memory
The task memory determines how the tasks are stored. by default they are stored in memory (MemoryJobStore) and are not available after a reboot.The task memories supported by the APScheduler are:
- : Memory
- : stored in mongodb
- : stored in redis
- : stored in rethinkdb
- : Support sqlalchemy databases such as mysql, sqlite, etc.
- :zookeeper
The different task memories can be configured in the scheduler's configuration (see Scheduler)
scheduler
APScheduler supports the following scheduler methods, the more commonly used are BlockingScheduler and BackgroundScheduler
- BlockingScheduler: applies to the scheduler is the only process running in the process, call start function will block the current thread, can not immediately return.
- BackgroundScheduler: applicable to scheduler running in the background of the application, the main thread will not block after calling start.
- AsyncIOScheduler: for applications that use the asyncio module.
- GeventScheduler: for applications that use the gevent module.
- TwistedScheduler: for building Twisted applications.
- QtScheduler: for building Qt applications.
From the previous example, we can see that the scheduler can manipulate tasks (and specify triggers, task memories, and executors for tasks) and monitor tasks.
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
Let's look at each part in detail
Scheduler configuration: in add_job we see that both jobstore and executor are default.APScheduler
Different task stores and executors can be specified when defining the scheduler, as well as initial parameters
from pytz import utc from import BackgroundScheduler from import MongoDBJobStore from import SQLAlchemyJobStore from import ThreadPoolExecutor, ProcessPoolExecutor # Execute different jobstores, executors and default parameters by dict method jobstores = { 'mongo': MongoDBJobStore(), 'default': SQLAlchemyJobStore(url='sqlite:///') } executors = { 'default': ThreadPoolExecutor(20), 'processpool': ProcessPoolExecutor(5) } job_defaults = { 'coalesce': False, 'max_instances': 3 } # Define the scheduler scheduler = BackgroundScheduler(jobstoresjobstores=jobstores, executorsexecutors=executors, job_defaultsjob_defaults=job_defaults, timezone=utc) def job_func(job_id): print('job %s is runed at %s' % (job_id, ().strftime('%Y-%m-%d %H:%M:%S'))) # Add tasks scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', jobstore='default', executor='processpool', seconds=10) # Start the scheduler ()
Manipulating tasks: The scheduler can add, delete, pause, resume and modify tasks. Note that the operations here only work on unexecuted tasks; tasks that have already been executed and are currently executing are not affected by these operations.
add_job
scheduler.add_job(job_func, trigger='interval', args=[1], id='1', name='a test job', max_instances=10, jobstore='default', executor='default', seconds=10)
remove_job: When a job is deleted, the corresponding job memory record is also deleted, using the unique id of the job.
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.remove_job('my_job_id')
Pausing and resuming jobs: Pausing and resuming jobs
scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id') scheduler.pause_job('my_job_id') scheduler.resume_job('my_job_id')
Modifying jobs: Modifying the configuration of jobs
job = scheduler.add_job(myfunc, 'interval', minutes=2, id='my_job_id', max_instances=10) # Modify the attributes of the task (max_instances=6, name='Alternate name') # Triggers to modify tasks scheduler.reschedule_job('my_job_id', trigger='cron', minute='*/5')
The more common types of monitoring task event types are:
- EVENT_JOB_ERROR: Indicates an exception triggered during the execution of the task.
- EVENT_JOB_EXECUTED: when task execution is successful
- EVENT_JOB_MAX_INSTANCES: when the tasks executed on the scheduler exceed the configured parameters
scheduler.add_listener(job_exception_listener, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)
summarize
This article on Python task scheduling tool APScheduler detailed article is introduced to this, more related python task scheduling APScheduler content, please search for my previous posts or continue to browse the following related articles I hope you will support me in the future more!