PolarSPARC |
Using Python APScheduler
Bhaskar S | 09/04/2022 |
Introduction
Often times, there is a need for an application to fetch some data from a source (be internal or external) on a regular interval (say daily at around 7.00 AM EST). This is where the Python APScheduler comes in handy.
APScheduler (or Advanced Python Scheduler for long) is an open source Python library that allows one to schedule jobs (or tasks), which can be executed on a regular periodic basis (such as hourly, daily, weekly, etc).
By default, a scheduled job's data in APScheduler is serialized and stored in-memory. This implies that the scheduled job does NOT survive a system crash or a system restart. To ensure a scheduled job survives a system crash or a system restart, APScheduler can be configured to use a database so that the data related to the scheduled jobs can be persisted to disk (in a database).
The APScheduler library comprises of the following core components:
Job :: Encapsulates a Python function (with function arguments) that will be executed by the scheduler
Jobstore :: A repository where a scheduled job's data and state are saved. By default, APScheduler uses an in-memory store. One can specify a disk based persistence (such as a database) to be used as the jobstore
Executor :: Runs the scheduled job in a thread or a process from a pool
Trigger :: Encapsulates the plan on when the job should be executed. Each job has its own scheduling trigger.
The following are two commonly used triggers supported by the framework:
interval :: Used when a job needs to periodically execute on scheduled intervals. One can specify the weeks, days, hours, minutes, and seconds as options
cron :: Mimics the capabilities of the Cron utility found in most Linux systems. One can specify a year, month, day, week, day_of_week, hour, minute, and second as options
Scheduler :: The core engine that schedules, tracks, and executes jobs. It provides the necessary API for configuring the executor, the jobstore, etc., and for adding jobs (at runtime) to be executed.
The most commonly used scheduler is the BackgroundScheduler, which runs as a background daemon thread in the application
Installation and Setup
Installation and setup will be on a Linux desktop running Ubuntu 22.04 LTS. Note that the stable Python version on Ubuntu is 3.10.
For our demonstration, we will create a directory called APScheduler under the users home directory by executing the following command in a terminal window:
$ mkdir -p $HOME/APScheduler
Next, we will create a project specific Python virtual environment using the venv module. In order to do that, we first need to install the package for venv by executing the following command in a terminal window:
$ sudo apt install -y python3.10-venv
The Python venv module allows one to create a lightweight virtual environments, each with its own directory structure, that are isolated from the system specific directory structure. To create a Python virtual environment, execute the following command(s) in the terminal window:
$ cd $HOME/APScheduler
$ python3 -m venv venv
This will create a directory called venv under the current directory. On needs to activate the newly created virtual environment by executing the following command in the terminal window:
$ source venv/bin/activate
On successful virtual environment activation, the prompt will be prefixed with (venv).
We will now install the following Python modules:
apscheduler :: the advanced python scheduler
sqlalchemy :: the popular SQL database abstraction layer
Execute the following command(s) in the terminal window (with venv activated):
$ pip install apscheduler sqlalchemy
Next, we will install a small, fast, self-contained, highly-reliable, full-featured, open-source SQL database engine called sqlite by executing the following command in a terminal window:
$ sudo apt install -y sqlite3 sqlitebrowser
For storing the sqlite database file, we will create a data directory under the directory APScheduler by executing the following command in the terminal window:
$ mkdir -p $HOME/APScheduler/data
Hands-on Python APScheduler
The following is the Python script called sample-1.py that demonstrates a simple scheduling application:
# # @Author: Bhaskar S # @Blog: https://www.polarsparc.com # @Date: 04 Sep 2022 # import logging import time from apscheduler.schedulers.background import BackgroundScheduler logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO) logger = logging.getLogger('sample-1') def task(): logger.info('Started sample-1 task...') time.sleep(1) logger.info('Completed sample-1 task !!!') def main(): scheduler = BackgroundScheduler(daemon=True) scheduler.add_job(task, trigger='interval', seconds=60, misfire_grace_time=5*60) scheduler.start() try: while True: time.sleep(5) except KeyboardInterrupt: scheduler.shutdown() if __name__ == '__main__': main()
Some aspects of the sample-1.py from the above needs a little explanation.
BackgroundScheduler :: a scheduler that runs in the background and execute a given task at geven schedule trigger
add_job() :: is the method that adds the specified job to a job list for execution. The first argument is the scheduled job (a callable function). The option trigger='interval' specifies the interval trigger that will execute every 60 seconds (seconds=60). The option misfire_grace_time=5*60 allows a grace period of jobs that have failed to execute at the scheduled trigger due to various reasons (no executor thread in the pool, job in the persistent store and the scheduler restarted, etc)
start() :: is the method that starts the scheduler thread in the background
The try-except block with the sleep in a loop is important as it will keep the background thread running. Else the program will terminate and nothing will happen.
To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:
$ python3 sample-1.py
The following would be a typical output:
INFO 2022-09-04 14:14:18,803 - Adding job tentatively -- it will be properly scheduled when the scheduler starts INFO 2022-09-04 14:14:18,803 - Added job "task" to job store "default" INFO 2022-09-04 14:14:18,803 - Scheduler started INFO 2022-09-04 14:15:18,803 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:15:18 EDT)" (scheduled at 2022-09-04 14:15:18.803045-04:00) INFO 2022-09-04 14:15:18,803 - Started sample-1 task... INFO 2022-09-04 14:15:19,805 - Completed sample-1 task !!! INFO 2022-09-04 14:15:19,805 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:16:18 EDT)" executed successfully INFO 2022-09-04 14:16:18,804 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:17:18 EDT)" (scheduled at 2022-09-04 14:16:18.803045-04:00) INFO 2022-09-04 14:16:18,804 - Started sample-1 task... INFO 2022-09-04 14:16:19,805 - Completed sample-1 task !!! INFO 2022-09-04 14:16:19,805 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 14:17:18 EDT)" executed successfully <CTRL-C> INFO 2022-09-04 14:16:23,814 - Scheduler has been shut down
As was indicated in the introduction, the default jobstore used by APScheduler is memory. So, if sample-1.py crashes and restarts, it will appear as though its a fresh start.
For the next demostration, we will configure the sqlite database as the jobstore.
The following is the Python script sample-2.py that is functionally the same as the previous case, except that it uses a persistent jobstore:
# # @Author: Bhaskar S # @Blog: https://www.polarsparc.com # @Date: 04 Sep 2022 # import logging import time from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO) logger = logging.getLogger('sample-2') jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:////home/alice/APScheduler/data/jobs.db') } def task(): logger.info('Started sample-2 task...') time.sleep(2) logger.info('Completed sample-2 task !!!') def main(): job_id = 'sample-2' scheduler = BackgroundScheduler(jobstores=jobstores, daemon=True) scheduler.add_job(task, id=job_id, trigger='interval', seconds=60, misfire_grace_time=5*60) scheduler.start() try: while True: scheduler.print_jobs(jobstore="default") time.sleep(15) except KeyboardInterrupt: scheduler.remove_job(job_id) scheduler.shutdown() if __name__ == '__main__': main()
Some aspects of the sample-2.py from the above needs a little explanation.
SQLAlchemyJobStore :: allows one to configure a persistent jobstore in a database table using Python SQLAlchemy framework. The job table will be created if it does not exist in the database
add_job() :: notice that one can associate a job ID (id=job_id) when a job is scheduled and later use it to identify this same job
print_jobs() :: is the method that lists all the currently scheduled jobs from the specified jobstore (jobstore='default')
remove_job() :: is the method that is used to remove a scheduled job given its ID
Notice how the jobstore is configured during the instantiation of the BackgroundScheduler using the option jobstores, which takes in a Python dictionary.
To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:
$ python3 sample-2.py
The following would be a typical output:
INFO 2022-09-04 19:55:36,543 - Adding job tentatively -- it will be properly scheduled when the scheduler starts INFO 2022-09-04 19:55:36,554 - Added job "task" to job store "default" INFO 2022-09-04 19:55:36,554 - Scheduler started Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT) Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT) Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT) Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT) INFO 2022-09-04 19:56:36,551 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 19:56:36 EDT)" (scheduled at 2022-09-04 19:56:36.543060-04:00) INFO 2022-09-04 19:56:36,551 - Started sample-2 task... Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT) INFO 2022-09-04 19:56:38,555 - Completed sample-2 task !!! INFO 2022-09-04 19:56:38,555 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT)" executed successfully Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 19:57:36 EDT) <CTRL-C> INFO 2022-09-04 19:56:52,103 - Removed job sample-2 INFO 2022-09-04 19:56:52,103 - Scheduler has been shut down
Launch the sqlite browser and access the database /home/alice/APScheduler/data/jobs.db (before pressing CTRL-C). The following illustration shows the row from the job table:
What if we want to be notified of either the success or the failure of a job ??? This is where the job event listener comes to the rescue.
For the next demostration, we will specify a job event listener, which will get invoked post a job execution.
The following is the Python script sample-3.py that is functionally the same as the previous case, except that it uses a job status listener:
# # @Author: Bhaskar S # @Blog: https://www.polarsparc.com # @Date: 04 Sep 2022 # import logging import time from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO) logger = logging.getLogger('sample-3') jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:////home/alice/APScheduler/data/jobs.db') } def task(): logger.info('Started sample-3 task...') time.sleep(3) logger.info('Completed sample-3 task !!!') def job_status_listener(event): if event.exception: logger.error('The job [%s] encountered exception ...' % event.job_id) else: logger.info('The job [%s] succeed !!!' % event.job_id) def main(): job_id = 'sample-3' scheduler = BackgroundScheduler(jobstores=jobstores, daemon=True) scheduler.add_job(task, id=job_id, trigger='interval', seconds=60, misfire_grace_time=5 * 60) scheduler.add_listener(job_status_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED) scheduler.start() try: while True: scheduler.print_jobs(jobstore="default") time.sleep(30) except KeyboardInterrupt: scheduler.remove_job(job_id) scheduler.shutdown() if __name__ == '__main__': main()
Some aspects of the sample-3.py from the above needs a little explanation.
job_status_listener(event) :: is the user defined Python function that is invoked post a job execution. The event parameter indicates the type code, the job id, and a flag to indicate an exception occurred
add_listener() :: is the method that registers the specified listerner function (first parameter) as the callback for the specified type of event mask (EVENT_JOB_ERROR | EVENT_JOB_EXECUTED). In this example, we are only interested if the job succeeds (VENT_JOB_EXECUTED) or fails (EVENT_JOB_ERROR)
To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about 3 minutes before interrupting by pressing CTRL-C:
$ python3 sample-3.py
The following would be a typical output:
INFO 2022-09-04 20:24:59,910 - Adding job tentatively -- it will be properly scheduled when the scheduler starts INFO 2022-09-04 20:24:59,915 - Added job "task" to job store "default" INFO 2022-09-04 20:24:59,915 - Scheduler started Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT) Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT) INFO 2022-09-04 20:25:59,918 - Running job "task (trigger: interval[0:01:00], next run at: 2022-09-04 20:25:59 EDT)" (scheduled at 2022-09-04 20:25:59.909965-04:00) INFO 2022-09-04 20:25:59,919 - Started sample-3 task... Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT) INFO 2022-09-04 20:26:02,920 - Completed sample-3 task !!! INFO 2022-09-04 20:26:02,920 - Job "task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT)" executed successfully INFO 2022-09-04 20:26:02,920 - The job [sample-3] succeed !!! Jobstore default: task (trigger: interval[0:01:00], next run at: 2022-09-04 20:26:59 EDT) <CTRL-C> INFO 2022-09-04 20:26:31,947 - Removed job sample-3 INFO 2022-09-04 20:26:31,947 - Scheduler has been shut down
Now that we have a basic understanding and working knowledge of the core parts of APScheduler, it is time to tackle a real use-case. Typically, a job (or a task) needs some input parameters (arguments to the task function). For example, the job may be to process a daily feed from a particular location in the filesystem. Also, let us assume that the daily feed arrives at around 7 AM EST. In reality, it is possible that the feed may be delayed a little bit. In such cases, one may have to adjust the job's schedule to retry again after a fixed interval till the feed is successfully processed. This use-case of our next demonstration.
The following is the Python script sample-4.py that will look for a file (dummy.dat in this case) in a specific folder (/tmp in this case) and adjust the job's schedule on exception to run more frequently:
# # @Author: Bhaskar S # @Blog: https://www.polarsparc.com # @Date: 04 Sep 2022 # import logging import time import os import zoneinfo from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR logging.basicConfig(format='%(levelname)s %(asctime)s - %(message)s', level=logging.INFO) logger = logging.getLogger('sample-4') jobstores = { 'default': SQLAlchemyJobStore(url='sqlite:////home/bswamina/MyProjects/Python/APScheduler/data/jobs.db') } tz_NYC = zoneinfo.ZoneInfo('America/New_York') scheduler = BackgroundScheduler(jobstores=jobstores, misfire_grace_time=5*60, daemon=True) def task(jid, root, file): logger.info('Started [%s] task...' % jid) time.sleep(2) if not os.path.exists(os.path.join(root, file)): raise FileNotFoundError logger.info('Completed [%s] task !!!' % jid) def job_status_listener(event): if event.exception: logger.error('*** The job [%s] encountered exception !!!' % event.job_id) # Failure - reschedule for sooner scheduler.reschedule_job(event.job_id, trigger='interval', seconds=15) else: logger.info('The job [%s] succeed.' % event.job_id) # Success - Back to default scheduler.reschedule_job(event.job_id, trigger='cron', day_of_week='mon-fri', minute='*/1', timezone=tz_NYC) def main(): job_id = 'sample-4' root = '/tmp' file = 'dummy.dat' scheduler.add_job(task, id=job_id, args=[job_id, root, file], trigger='cron', day_of_week='mon-fri', minute='*/1', timezone=tz_NYC) scheduler.add_listener(job_status_listener, EVENT_JOB_ERROR | EVENT_JOB_EXECUTED) scheduler.start() try: while True: scheduler.print_jobs(jobstore="default") time.sleep(30) except KeyboardInterrupt: scheduler.remove_job(job_id) scheduler.shutdown() if __name__ == '__main__': main()
Some aspects of the sample-4.py from the above needs a little explanation.
BackgroundScheduler(...) :: an instance of the background scheduler that is configured to use a sqlite jobstore and with a default misfire_grace_time=5*60 that applies to all jobs handled by this scheduler
task(jid, root, file) :: is the job function that will executed on a scheduled job trigger. The function takes 3 input parameters - a job ID, a root directory and a file name
reschedule_job(...) :: is the method that updates the job trigger for the specified job ID. When the file is not found, the job trigger is adjusted to be run more frequent, while on success reverts to the original schedule
add_job(...) :: is the method that adds the specified job to a job list for execution. Notice that it uses a cron job trigger (trigger='cron') which will execute all weekdays (day_of_week='mon-fri') at every minute (minute='*/1') in the EST timezone
To test our the Python scheduler application, execute the following command in the venv terminal window and wait for about a minute or so:
$ python3 sample-4.py
The following would be a typical output:
INFO 2022-09-04 20:32:41,726 - Adding job tentatively -- it will be properly scheduled when the scheduler starts INFO 2022-09-04 20:32:41,733 - Added job "task" to job store "default" INFO 2022-09-04 20:32:41,733 - Scheduler started Jobstore default: task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:33:00 EDT) INFO 2022-09-04 20:33:00,008 - Running job "task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:33:00 EDT)" (scheduled at 2022-09-04 20:33:00-04:00) INFO 2022-09-04 20:33:00,009 - Started [sample-4] task... ERROR 2022-09-04 20:33:02,011 - Job "task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:34:00 EDT)" raised an exception Traceback (most recent call last): File "/home/alice/APScheduler/venv/lib/python3.10/site-packages/apscheduler/executors/base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "/home/alice/APScheduler/sample-4.py", line 31, in task raise FileNotFoundError FileNotFoundError ERROR 2022-09-04 20:33:02,012 - *** The job [sample-4] encountered exception !!! Jobstore default: task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:17 EDT) INFO 2022-09-04 20:33:17,017 - Running job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:32 EDT)" (scheduled at 2022-09-04 20:33:17.013184-04:00) INFO 2022-09-04 20:33:17,018 - Started [sample-4] task... ERROR 2022-09-04 20:33:19,020 - Job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:32 EDT)" raised an exception Traceback (most recent call last): File "/home/alice/APScheduler/venv/lib/python3.10/site-packages/apscheduler/executors/base.py", line 125, in run_job retval = job.func(*job.args, **job.kwargs) File "/home/alice/APScheduler/sample-4.py", line 31, in task raise FileNotFoundError FileNotFoundError ERROR 2022-09-04 20:33:19,021 - *** The job [sample-4] encountered exception !!!
Open another terminal window and execute the following commands:
$ cd /tmp
$ > dummy.dat
Wait for a few seconds and then press CTRL-C on the terminal running sample-4.py and we will observe the following output:
INFO 2022-09-04 20:33:34,025 - Running job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:49 EDT)" (scheduled at 2022-09-04 20:33:34.021253-04:00) INFO 2022-09-04 20:33:34,026 - Started [sample-4] task... INFO 2022-09-04 20:33:36,028 - Completed [sample-4] task !!! INFO 2022-09-04 20:33:36,028 - Job "task (trigger: interval[0:00:15], next run at: 2022-09-04 20:33:49 EDT)" executed successfully INFO 2022-09-04 20:33:36,028 - The job [sample-4] succeed. Jobstore default: task (trigger: cron[day_of_week='mon-fri', minute='*/1'], next run at: 2022-09-04 20:34:00 EDT) <CTRL-C> INFO 2022-09-04 20:33:46,254 - Removed job sample-4 INFO 2022-09-04 20:33:46,254 - Scheduler has been shut down
The following is the link to the Github Repo that provides all the code samples from this article:
References