PolarSPARC |
Distributed Task Processing using Celery
Bhaskar S | 09/18/2020 |
Overview
Recently, two new crypto currencies have entered the market and are all the rage - the BLU coin and the RED coin. There are only three exchanges where these popular coins are being traded - the CAT, DOG, and FOX exchanges respectively. There is an API exposed, through which buyers can query the current best favorable rate provided by one of the 3 exchanges. Given the popularity of these coins, the API is being bombarded by the consumers. How can one handle this situation ? What if there is an easy way to distribute the processing, in an asynchronous way, on a single host (or multiple hosts) using message passing, and with an ability to retry automatically on failures ???
Welcome Celery - the distributed task processing framework for Python !!!
Celery is a simple, flexible, and reliable distributed task queue processing framework for Python, with the following features:
Distributed task processing is initiated through message passaging using a middleware broker such as the RabbitMQ
Task processing is handled by worker(s) which are responsible for the execution of the task
Results from a task processing can be stored in a backend store such as Redis
Task processing can be immediately triggered (real-time) or can be scheduled (batched)
Failed tasks will be automatically retried on errors
Complex pipeline of task(s) can be processed by distributing the task(s) across different host(s)
Architecture is based on a pluggable component model
The following illustration depicts the high-level architecture of the Celery task queue processing:
Before we proceed further the following are some of the terminology used in the context of Celery:
Term | Description |
---|---|
Task | A job that needs to be dispatched for execution |
Broker | The messaging middleware such as RabbitMQ |
Worker | The entity that executes task(s) |
Backend | The store where the worker(s) persist the result of an execution |
Now, we can explain the high-level flow using the architecture diagram from Figure.1 above. When client(s) (also referred to as Producer(s)) invoke a Python method (annotated with a special Celery task decorator), the decorated task sends a message (with the name of the method along with its arguments) to a designated task queue in the messaging Broker and returns an asynchronous result object to the caller. There are task Worker(s) waiting on the designated task queue (also referred to as Consumer(s)). When a message arrives on a queue, a task Worker executes the specified Python method with the method arguments (from the message) and sends the result (of the task execution) to the configured Backend store. The caller can check if the task execution has completed using the asynchronous result object. Once the task execution completes, the caller can get the result from the asynchronous result object, which in-turn fetches the result from the Backend store.
Installation
We can install, setup, and demonstrate Celery on a single host with VMs or on multiple host(s). For my setup, will leverage a 6-node cluster consisting of 5 ODroid XU4's and a ODroid C2. The following illustration shows the 6-node cluster:
Ensure each of the nodes have the Armbian OS installed. Next, launch 6 terminal windows and login to each of the nodes (assuming the user-id is alice). We will refer to these terminals as my-xu4-1 thru my-xu4-5 and my-c2-1 respectively.
We need to find the machine architecture for both the ODroid XU4 and the ODroid C2 SBCs. In the terminal my-xu4-1, execute the following command:
$ dpkg --print-architecture
The following would be the typical output:
armhf
Similarly, in the terminal my-c2-1, execute the above command.
The following would be the typical output:
arm64
In each of the 6 terminals, execute the following command:
$ sudo apt-get remove docker docker-engine docker.io containerd runc
The following would be the typical output:
Reading package lists... Done Building dependency tree Reading state information... Done E: Unable to locate package docker-engine
In each of the 6 terminals, execute the following command:
$ sudo apt-get update
The following would be the typical output:
Hit:2 http://ports.ubuntu.com focal InRelease Hit:3 http://ports.ubuntu.com focal-security InRelease Hit:4 http://ports.ubuntu.com focal-updates InRelease Hit:5 http://ports.ubuntu.com focal-backports InRelease Hit:1 https://armbian.systemonachip.net/apt focal InRelease Reading package lists... Done
In each of the 6 terminals, execute the following command:
$ sudo apt-get install apt-transport-https ca-certificates curl gnupg-agent software-properties-common
The following would be the typical output:
Reading package lists... Done Building dependency tree Reading state information... Done ca-certificates is already the newest version (20190110ubuntu1.1). curl is already the newest version (7.68.0-1ubuntu2.2). software-properties-common is already the newest version (0.98.9.2). apt-transport-https is already the newest version (2.0.2ubuntu0.1). The following NEW packages will be installed: gnupg-agent 0 upgraded, 1 newly installed, 0 to remove and 0 not upgraded. Need to get 5,236 B of archives. After this operation, 46.1 kB of additional disk space will be used. Do you want to continue? [Y/n] Y Get:1 http://ports.ubuntu.com focal/universe arm64 gnupg-agent all 2.2.19-3ubuntu2 [5,236 B] Fetched 5,236 B in 0s (18.1 kB/s) Selecting previously unselected package gnupg-agent. (Reading database ... 84665 files and directories currently installed.) Preparing to unpack .../gnupg-agent_2.2.19-3ubuntu2_all.deb ... Unpacking gnupg-agent (2.2.19-3ubuntu2) ... Setting up gnupg-agent (2.2.19-3ubuntu2) ...
In each of the 6 terminals, execute the following command:
$ curl -fsSL https://download.docker.com/linux/ubuntu/gpg | sudo apt-key add -
The following would be the typical output:
OK
In each of the 5 terminals my-xu4-1 thru my-xu4-5, execute the following command:
$ sudo add-apt-repository "deb [arch=armhf] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
The following would be the typical output:
Get:1 https://download.docker.com/linux/ubuntu focal InRelease [36.2 kB] Hit:3 http://ports.ubuntu.com focal InRelease Hit:4 http://ports.ubuntu.com focal-security InRelease Hit:5 http://ports.ubuntu.com focal-updates InRelease Hit:6 http://ports.ubuntu.com focal-backports InRelease Get:7 https://download.docker.com/linux/ubuntu focal/stable armhf Packages [2,743 B] Hit:2 https://mirrors.netix.net/armbian/apt focal InRelease Fetched 38.9 kB in 2s (17.8 kB/s) Reading package lists... Done
In the terminal my-c2-1, execute the following command:
$ sudo add-apt-repository "deb [arch=arm64] https://download.docker.com/linux/ubuntu $(lsb_release -cs) stable"
The following would be the typical output:
Get:1 https://download.docker.com/linux/ubuntu focal InRelease [36.2 kB] Hit:3 http://ports.ubuntu.com focal InRelease Hit:4 http://ports.ubuntu.com focal-security InRelease Hit:5 http://ports.ubuntu.com focal-updates InRelease Hit:6 http://ports.ubuntu.com focal-backports InRelease Get:7 https://download.docker.com/linux/ubuntu focal/stable arm64 Packages [2,743 B] Hit:2 https://mirrors.netix.net/armbian/apt focal InRelease Fetched 38.9 kB in 2s (17.8 kB/s) Reading package lists... Done
In each of the 6 terminals, execute the following command:
$ sudo apt-get update
The following would be the typical output:
Hit:1 https://download.docker.com/linux/ubuntu focal InRelease Hit:3 http://ports.ubuntu.com focal InRelease Hit:4 http://ports.ubuntu.com focal-security InRelease Hit:5 http://ports.ubuntu.com focal-updates InRelease Hit:6 http://ports.ubuntu.com focal-backports InRelease Hit:2 https://imola.armbian.com/apt focal InRelease Reading package lists... Done
In each of the 5 terminals my-xu4-1 thru my-xu4-5, execute the following command:
$ sudo apt-get install docker-ce docker-ce-cli containerd.io
The following would be the typical output:
Reading package lists... Done Building dependency tree Reading state information... Done Recommended packages: cgroupfs-mount | cgroup-lite pigz apparmor The following NEW packages will be installed: containerd.io docker-ce docker-ce-cli 0 upgraded, 3 newly installed, 0 to remove and 0 not upgraded. Need to get 60.2 MB of archives. After this operation, 303 MB of additional disk space will be used. Get:1 https://download.docker.com/linux/ubuntu focal/stable armhf containerd.io armhf 1.2.13-2 [16.2 MB] Get:2 https://download.docker.com/linux/ubuntu focal/stable armhf docker-ce-cli armhf 5:19.03.12~3-0~ubuntu-focal [28.8 MB] Get:3 https://download.docker.com/linux/ubuntu focal/stable armhf docker-ce armhf 5:19.03.12~3-0~ubuntu-focal [15.2 MB] Fetched 60.2 MB in 8s (7,619 kB/s) Selecting previously unselected package containerd.io. (Reading database ... 84669 files and directories currently installed.) Preparing to unpack .../containerd.io_1.2.13-2_armhf.deb ... Unpacking containerd.io (1.2.13-2) ... Selecting previously unselected package docker-ce-cli. Preparing to unpack .../docker-ce-cli_5%3a19.03.12~3-0~ubuntu-focal_armhf.deb ... Unpacking docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ... Selecting previously unselected package docker-ce. Preparing to unpack .../docker-ce_5%3a19.03.12~3-0~ubuntu-focal_armhf.deb ... Unpacking docker-ce (5:19.03.12~3-0~ubuntu-focal) ... Setting up containerd.io (1.2.13-2) ... Created symlink /etc/systemd/system/multi-user.target.wants/containerd.service → /lib/systemd/system/containerd.service. Setting up docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ... Setting up docker-ce (5:19.03.12~3-0~ubuntu-focal) ... Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /lib/systemd/system/docker.service. Created symlink /etc/systemd/system/sockets.target.wants/docker.socket → /lib/systemd/system/docker.socket. Processing triggers for man-db (2.9.1-1) ... Processing triggers for systemd (245.4-4ubuntu3.2) ...
In the terminal my-c2-1, execute the following command:
$ sudo apt-get install docker-ce docker-ce-cli containerd.io
The following would be the typical output:
Reading package lists... Done Building dependency tree Reading state information... Done Recommended packages: cgroupfs-mount | cgroup-lite pigz apparmor The following NEW packages will be installed: containerd.io docker-ce docker-ce-cli 0 upgraded, 3 newly installed, 0 to remove and 0 not upgraded. Need to get 60.2 MB of archives. After this operation, 303 MB of additional disk space will be used. Get:1 https://download.docker.com/linux/ubuntu focal/stable arm64 containerd.io arm64 1.2.13-2 [16.2 MB] Get:2 https://download.docker.com/linux/ubuntu focal/stable arm64 docker-ce-cli arm64 5:19.03.12~3-0~ubuntu-focal [28.8 MB] Get:3 https://download.docker.com/linux/ubuntu focal/stable arm64 docker-ce arm64 5:19.03.12~3-0~ubuntu-focal [15.2 MB] Fetched 60.2 MB in 8s (7,619 kB/s) Selecting previously unselected package containerd.io. (Reading database ... 84669 files and directories currently installed.) Preparing to unpack .../containerd.io_1.2.13-2_arm64.deb ... Unpacking containerd.io (1.2.13-2) ... Selecting previously unselected package docker-ce-cli. Preparing to unpack .../docker-ce-cli_5%3a19.03.12~3-0~ubuntu-focal_arm64.deb ... Unpacking docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ... Selecting previously unselected package docker-ce. Preparing to unpack .../docker-ce_5%3a19.03.12~3-0~ubuntu-focal_arm64.deb ... Unpacking docker-ce (5:19.03.12~3-0~ubuntu-focal) ... Setting up containerd.io (1.2.13-2) ... Created symlink /etc/systemd/system/multi-user.target.wants/containerd.service → /lib/systemd/system/containerd.service. Setting up docker-ce-cli (5:19.03.12~3-0~ubuntu-focal) ... Setting up docker-ce (5:19.03.12~3-0~ubuntu-focal) ... Created symlink /etc/systemd/system/multi-user.target.wants/docker.service → /lib/systemd/system/docker.service. Created symlink /etc/systemd/system/sockets.target.wants/docker.socket → /lib/systemd/system/docker.socket. Processing triggers for man-db (2.9.1-1) ... Processing triggers for systemd (245.4-4ubuntu3.2) ...
In each of the 6 terminals, execute the following commands:
$ sudo usermod -aG docker $USER
$ sudo shutdown -r now
Since the nodes were rebooted, we need to login to each of the nodes one more time.
For our Celery Broker, we will use RabbitMQ as our messaging Broker.
On Docker Hub, check for the current stable version for RabbitMQ. At the time of this article, the current stable version was rabbitmq:3.8.8.
In the terminal my-xu4-3, execute the following command:
$ docker pull rabbitmq:3.8.8-management
The following would be the typical output:
3.8.8-management: Pulling from library/rabbitmq 854ab59e811f: Pull complete 996b7ca18b13: Pull complete 50a08dcf8afc: Pull complete d34a2e7cb38e: Pull complete 593701df30b6: Pull complete 030c2f3628ad: Pull complete ad381249c522: Pull complete 577e81d7995c: Pull complete e54080d86cca: Pull complete 7733ca7770b9: Pull complete d08f278275c3: Pull complete 04f09e774af8: Pull complete 596b7f2fc950: Pull complete 93da642e6c0b: Pull complete Digest: sha256:e18eb565a40a6c03156d76ee2597115e19d5cf632a0aa2cdf87724d94e86cfea Status: Downloaded newer image for rabbitmq:3.8.8-management docker.io/library/rabbitmq:3.8.8-management
For our Celery Backend, we will use Redis as our Backend store.
On Docker Hub, check for the current stable version for Redis. At the time of this article, the current stable version was redis:6.0.8.
In the terminal my-c2-1, execute the following command:
$ docker pull redis:6.0.8
The following would be the typical output:
6.0.8: Pulling from library/redis a6d76de28f58: Pull complete 3573263a91cd: Pull complete 74acfbcef883: Pull complete 720e1be7fe14: Pull complete bcb81e952db9: Pull complete fa95093de04d: Pull complete Digest: sha256:1cfb205a988a9dae5f025c57b92e9643ec0e7ccff6e66bc639d8a5f95bba928c Status: Downloaded newer image for redis:6.0.8 docker.io/library/redis:6.0.8
In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:
$ sudo apt install virtualenv -y
The following would be the typical output:
Reading package lists... Done Building dependency tree Reading state information... Done The following additional packages will be installed: python3-virtualenv The following NEW packages will be installed: python3-virtualenv virtualenv 0 upgraded, 2 newly installed, 0 to remove and 0 not upgraded. Need to get 47.8 kB of archives. After this operation, 171 kB of additional disk space will be used. Get:1 http://ports.ubuntu.com focal/universe armhf python3-virtualenv all 15.1.0+ds-1.1 [43.4 kB] Get:2 http://ports.ubuntu.com focal/universe armhf virtualenv all 15.1.0+ds-1.1 [4,476 B] Fetched 47.8 kB in 0s (107 kB/s) Selecting previously unselected package python3-virtualenv. (Reading database ... 80113 files and directories currently installed.) Preparing to unpack .../python3-virtualenv_15.1.0+ds-1.1_all.deb ... Unpacking python3-virtualenv (15.1.0+ds-1.1) ... Selecting previously unselected package virtualenv. Preparing to unpack .../virtualenv_15.1.0+ds-1.1_all.deb ... Unpacking virtualenv (15.1.0+ds-1.1) ... Setting up python3-virtualenv (15.1.0+ds-1.1) ... Setting up virtualenv (15.1.0+ds-1.1) ... Processing triggers for man-db (2.8.3-2ubuntu0.1) ...
In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:
$ virtualenv -p /usr/bin/python3 celery-venv
The following would be the typical output:
Already using interpreter /usr/bin/python3 Using base prefix '/usr' New python executable in /home/alice/celery-venv/bin/python3 Also creating executable in /home/alice/celery-venv/bin/python Installing setuptools, pkg_resources, pip, wheel...done.
In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:
$ source celery-venv/bin/activate
The following would be the typical output:
(celery-venv) $
In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following command:
(celery-venv) $ pip3 install celery[redis]
The following would be the typical output:
Collecting celery[redis] Downloading https://files.pythonhosted.org/packages/c8/0c/609e3611d20c9f8d883852d1be5516671f630fb08c8c1e56911567dfba7b/celery-4.4.7-py2.py3-none-any.whl (427kB) 100% |--------------------------------| 430kB 1.1MB/s Collecting kombu<4.7,>=4.6.10 (from celery[redis]) Downloading https://files.pythonhosted.org/packages/9e/34/3eea6a3a9ff81b0c7ddbdceb22a1ffc1b5907d863f27ca19a68777d2211d/kombu-4.6.11-py2.py3-none-any.whl (184kB) 100% |--------------------------------| 194kB 2.5MB/s Collecting vine==1.3.0 (from celery[redis]) Downloading https://files.pythonhosted.org/packages/7f/60/82c03047396126c8331ceb64da1dc52d4f1317209f32e8fe286d0c07365a/vine-1.3.0-py2.py3-none-any.whl Collecting billiard<4.0,>=3.6.3.0 (from celery[redis]) Downloading https://files.pythonhosted.org/packages/e8/5e/7591866ff45b370354bd20291cb6f87ddb2eef1f1c88c890a38412037e11/billiard-3.6.3.0-py3-none-any.whl (89kB) 100% |--------------------------------| 92kB 4.2MB/s Collecting pytz>dev (from celery[redis]) Downloading https://files.pythonhosted.org/packages/4f/a4/879454d49688e2fad93e59d7d4efda580b783c745fd2ec2a3adf87b0808d/pytz-2020.1-py2.py3-none-any.whl (510kB) 100% |--------------------------------| 512kB 1.0MB/s Collecting redis>=3.2.0; extra == "redis" (from celery[redis]) Downloading https://files.pythonhosted.org/packages/a7/7c/24fb0511df653cf1a5d938d8f5d19802a88cef255706fdda242ff97e91b7/redis-3.5.3-py2.py3-none-any.whl (72kB) 100% |--------------------------------| 81kB 4.4MB/s Collecting importlib-metadata>=0.18; python_version < "3.8" (from kombu<4.7,>=4.6.10->celery[redis]) Downloading https://files.pythonhosted.org/packages/8e/58/cdea07eb51fc2b906db0968a94700866fc46249bdc75cac23f9d13168929/importlib_metadata-1.7.0-py2.py3-none-any.whl Collecting amqp<2.7,>=2.6.0 (from kombu<4.7,>=4.6.10->celery[redis]) Downloading https://files.pythonhosted.org/packages/bc/90/bb5ce93521772f083cb2d7a413bb82eda5afc62b4192adb7ea4c7b4858b9/amqp-2.6.1-py2.py3-none-any.whl (48kB) 100% |--------------------------------| 51kB 4.2MB/s Collecting zipp>=0.5 (from importlib-metadata>=0.18; python_version < "3.8"->kombu<4.7,>=4.6.10->celery[redis]) Downloading https://files.pythonhosted.org/packages/b2/34/bfcb43cc0ba81f527bc4f40ef41ba2ff4080e047acb0586b56b3d017ace4/zipp-3.1.0-py3-none-any.whl Installing collected packages: billiard, pytz, vine, zipp, importlib-metadata, amqp, kombu, redis, celery Successfully installed amqp-2.6.1 billiard-3.6.3.0 celery-4.4.7 importlib-metadata-1.7.0 kombu-4.6.11 pytz-2020.1 redis-3.5.3 vine-1.3.0 zipp-3.1.0
In the terminal my-xu4-2, execute the following command:
(celery-venv) $ pip3 install flower
The following would be the typical output:
Collecting flower Downloading flower-0.9.5-py2.py3-none-any.whl (459 kB) |--------------------------------| 459 kB 1.9 MB/s Requirement already satisfied: celery>=3.1.0; python_version < "3.7" in ./celery-venv/lib/python3.6/site-packages (from flower) (4.4.7) Collecting tornado<7.0.0,>=5.0.0; python_version >= "3.5.2" Downloading tornado-6.0.4.tar.gz (496 kB) |--------------------------------| 496 kB 16.7 MB/s Collecting prometheus-client==0.8.0 Downloading prometheus_client-0.8.0-py2.py3-none-any.whl (53 kB) |--------------------------------| 53 kB 386 kB/s Requirement already satisfied: pytz in ./celery-venv/lib/python3.6/site-packages (from flower) (2020.1) Collecting humanize Downloading humanize-2.6.0-py3-none-any.whl (68 kB) |--------------------------------| 68 kB 1.5 MB/s Requirement already satisfied: vine==1.3.0 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (1.3.0) Requirement already satisfied: kombu<4.7,>=4.6.10 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (4.6.11) Requirement already satisfied: billiard<4.0,>=3.6.3.0 in ./celery-venv/lib/python3.6/site-packages (from celery>=3.1.0; python_version < "3.7"->flower) (3.6.3.0) Requirement already satisfied: importlib-metadata>=0.18; python_version < "3.8" in ./celery-venv/lib/python3.6/site-packages (from kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (1.7.0) Requirement already satisfied: amqp<2.7,>=2.6.0 in ./celery-venv/lib/python3.6/site-packages (from kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (2.6.1) Requirement already satisfied: zipp>=0.5 in ./celery-venv/lib/python3.6/site-packages (from importlib-metadata>=0.18; python_version < "3.8"->kombu<4.7,>=4.6.10->celery>=3.1.0; python_version < "3.7"->flower) (3.1.0) Building wheels for collected packages: tornado Building wheel for tornado (setup.py) ... done Created wheel for tornado: filename=tornado-6.0.4-cp36-cp36m-linux_armv7l.whl size=415151 sha256=13730a4e10029a5f778cf9c661c6eb841a4fb99a5df9540ddcacbede6ca92413 Stored in directory: /home/bswamina/.cache/pip/wheels/37/a7/db/2d592e44029ef817f3ef63ea991db34191cebaef087a96f505 Successfully built tornado Installing collected packages: tornado, prometheus-client, humanize, flower Successfully installed flower-0.9.5 humanize-2.6.0 prometheus-client-0.8.0 tornado-6.0.4
Setup
Now that we have completed the necessary installation, time to setup the nodes in our cluster for Celery.
We will run the Celery Broker (RabbitMQ) on the host my-xu4-3.
In the terminal my-xu4-3, execute the following commands:
$ mkdir rabbitmq
$ docker run -d --hostname celery-mq --name celery-mq -v /home/bswamina/rabbitmq:/var/lib/rabbitmq -p 15672:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOST=celery_vhost -e RABBITMQ_DEFAULT_USER=celery -e RABBITMQ_DEFAULT_PASS=s3cr3t rabbitmq:3.8.8-management
The following would be the typical output:
6542cb6e3ebfabb42fcfe9649065e40dfa0ae2ba144ac489c3143bac93e97ea9
In the terminal my-xu4-3, execute the following command:
$ docker ps -a
The following would be the typical output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 6542cb6e3ebf rabbitmq:3.8.8-management "docker-entrypoint.s..." 10 seconds ago Up 6 seconds 4369/tcp, 5671/tcp, 0.0.0.0:5672->5672/tcp, 15671/tcp, 15691-15692/tcp, 25672/tcp, 0.0.0.0:15672->15672/tcp celery-mq
Open a web browser for the URL http://my-xu4-3:15672. Enter the user-id celery and password s3cr3t. The following illustration shows the RabbitMQ management screen :
In the terminal my-c2-1, execute the following command:
$ docker run -d --hostname celery-rd --name celery-rd -p 6379:6379 redis:6.0.8
The following would be the typical output:
337cb95c4bbf3bfb1534ce68911ec9747fdce24440f5c865d6838ecb74088214
In the terminal my-c2-1, execute the following command:
$ docker ps -a
The following would be the typical output:
CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES 337cb95c4bbf redis:6.0.8 "docker-entrypoint.s..." 7 seconds ago Up 5 seconds 0.0.0.0:6379->6379/tcp celery-rd
Hands-on Celery
Our Celery Python module is called MyCelery. The following illustration shows the directory contents of the module:
The following is Python script defines the Celery configuration options:
For our setup, we desire using the JSON format to serialize the task as well as the task execution result. This is what is being specified in CELERY_ACCEPT_CONTENT, CELERY_TASK_SERIALIZER, and CELERY_RESULT_SERIALIZER.
All our Celery tasks are defined in the Python script celery_tasks.py. The module (MyCelery) dot the script name (celery_tasks) is what is specified in CELERY_INCLUDE. In other words, this configuration option is a list all the Celery task modules/scripts.
By default, the Celery Worker will acknowledge immediately (before executing) once it gets a task message. We want to change that behavior to acknowledge after the task execution. This is what is specified in CELERY_ACKS_LATE.
The default Celery queue is called celery and this is where all the task messages end up if not customized. For our demonstration, we have chosen two queues, namely, test_task.Q and get_rate.Q. The task test_task will go to the queue test_task.Q, while the tasks get_rate, get_exch_rate, and get_avg_exch_rate will be sent to the queue get_rate.Q. This is what is specified in CELERY_TASK_ROUTES.
The following is Python script creates an instance of the Celery object and configures it using the options defined in the Python script celery_config:
The following is Python script defines all the Celery tasks that we desire to distribute and execute in a task Worker:
The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_tasks.py above:
@celery_app.task :: defines a task decorator using the Celery object instance celery_app defined in the Python script celery_base.py . The argument name specifies the task name. The task decorator is what marshals the method and its arguments as a message and sends to the Broker
jsonify :: defined in celery.utils.serialization, is used to convert an object to a JSON format
The following is Python script defines a Celery client:
The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_client.py above:
AsyncResult :: defined in the module celery.result, an instance of this object represents the state of the distributed Celery task executing in a Worker
delay(*args, **kwargs) :: calling this method on a decorated Python method is what dispatches a task message to a default queue (named celery) on the Broker. This is a basic method to call a Celery task *WITHOUT* any option to specify the execution parameters. This method returns an instance of AsyncResult
apply_async(args=(...,)[, kwargs={...}]) :: this method like the delay method is what dispatches a task message to a specified queue on the Broker. In addition, it allows one to specify certain execution parameters, such as when to dispatch, when it will expire, etc. This method returns an instance of AsyncResult
ready() :: calling this method on an instance of AsyncResult returns True if the task has executed on the Worker, else False
get([timeout=X]) :: calling this method on an instance of AsyncResult returns waits for the distributed Celery task to complete execution and return the result from the Backend. Specifying the timeout parameter will make this method wait for the specified X seconds before this method times out
In the terminals my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5, execute the following commands:
(celery-venv) $ mkdir MyCelery
(celery-venv) $ cd MyCelery
Copy the above menthioned Python scripts into the MyCelery on each of the hosts my-xu4-1, my-xu4-2, my-xu4-4, and my-xu4-5 respectively.
In the terminal my-xu4-4, execute the following command to start a Worker, listening for messages on the two queues named celery and test_task.Q:
$ PYTHONPATH=$HOME celery -A celery_tasks worker -l info -Q celery,test_task.Q -c 2
The following would be the typical output:
-------------- celery@my-xu4-4 v4.4.7 (cliffs) --- ***** ----- -- ******* ---- Linux-5.4.61-odroidxu4-armv7l-with-Ubuntu-20.04-focal 2020-09-18 13:23:36 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: celery_tasks:0xb5ed18f0 - ** ---------- .> transport: amqp://celery:**@192.168.1.23:5672/celery_vhost - ** ---------- .> results: redis://192.168.1.31:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> celery exchange=celery(direct) key=celery .> test_task.Q exchange=test_task.Q(direct) key=test_task.Q [tasks] . MyCelery.celery_tasks.get_avg_exch_rate . MyCelery.celery_tasks.get_exch_rate . MyCelery.celery_tasks.get_rate . MyCelery.celery_tasks.test_task [2020-09-18 13:23:36,671: INFO/MainProcess] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost [2020-09-18 13:23:36,699: INFO/MainProcess] mingle: searching for neighbors [2020-09-18 13:23:37,783: INFO/MainProcess] mingle: sync with 1 nodes [2020-09-18 13:23:37,784: INFO/MainProcess] mingle: sync complete [2020-09-18 13:23:37,832: INFO/MainProcess] celery@my-xu4-4 ready.
The following are brief descriptions of the command-line options used above:
-A :: specifies the instance name and must be set to the Python module name that handles the tasks
worker :: indicates this process is a Celery worker
-l :: specifes the logging level (debug, info, warning, etc)
-Q :: indicates the list of queues (comma separated) on the Broker, from which this worker will process task messages
-c :: indicates the number of child processes on this Worker
In the terminal my-xu4-5, execute the following command to start a Worker, listening for messages on the queue named get_rate.Q:
$ PYTHONPATH=$HOME celery -A celery_tasks worker -l info -Q get_rate.Q -c 2
The following would be the typical output:
-------------- celery@my-xu4-5 v4.4.7 (cliffs) --- ***** ----- -- ******* ---- Linux-5.4.61-odroidxu4-armv7l-with-Ubuntu-20.04-focal 2020-09-18 13:23:07 - *** --- * --- - ** ---------- [config] - ** ---------- .> app: celery_tasks:0xb5e8e7d0 - ** ---------- .> transport: amqp://celery:**@192.168.1.23:5672/celery_vhost - ** ---------- .> results: redis://192.168.1.31:6379/0 - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker) --- ***** ----- -------------- [queues] .> get_rate.Q exchange=get_rate.Q(direct) key=get_rate.Q [tasks] . MyCelery.celery_tasks.get_avg_exch_rate . MyCelery.celery_tasks.get_exch_rate . MyCelery.celery_tasks.get_rate . MyCelery.celery_tasks.test_task [2020-09-18 13:23:07,679: INFO/MainProcess] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost [2020-09-18 13:23:07,706: INFO/MainProcess] mingle: searching for neighbors [2020-09-18 13:23:08,793: INFO/MainProcess] mingle: all alone [2020-09-18 13:23:08,871: INFO/MainProcess] celery@my-xu4-5 ready.
The following illustration shows the RabbitMQ management screen after the screen refresh:
In the terminal my-xu4-2, execute the following command to start the Celery monitoring tool called Flower:
$ PYTHONPATH=$HOME celery -A celery_tasks flower -l info -c 2
The following would be the typical output:
[I 200918 13:24:21 command:140] Visit me at http://localhost:5555 [I 200918 13:24:21 command:145] Broker: amqp://celery:**@192.168.1.23:5672/celery_vhost [I 200918 13:24:21 command:148] Registered tasks: ['MyCelery.celery_tasks.get_avg_exch_rate', 'MyCelery.celery_tasks.get_exch_rate', 'MyCelery.celery_tasks.get_rate', 'MyCelery.celery_tasks.test_task', 'celery.accumulate', 'celery.backend_cleanup', 'celery.chain', 'celery.chord', 'celery.chord_unlock', 'celery.chunks', 'celery.group', 'celery.map', 'celery.starmap'] [I 200918 13:24:22 mixins:229] Connected to amqp://celery:**@192.168.1.23:5672/celery_vhost
Open a web browser for the URL http://my-xu4-2:5555. The following illustration shows the Flower monitoring tool screen:
In the terminal my-xu4-1, execute the following command to start the Celery client to test our setup:
$ PYTHONPATH=$HOME python celery_client.py rate
The following would be the typical output:
Loop - # 1 1 BLU :: {'crypto': 'BLU', 'rate': 0.0092, 'provider': 'CAT'} 1 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'} Loop - # 2 2 BLU :: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'} 2 RED :: {'crypto': 'RED', 'rate': 0.0022, 'provider': 'CAT'} Loop - # 3 3 BLU :: {'crypto': 'BLU', 'rate': 0.0093, 'provider': 'DOG'} 3 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'} Loop - # 4 4 BLU :: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'} 4 RED :: {'crypto': 'RED', 'rate': 0.0021, 'provider': 'FOX'} Loop - # 5 5 BLU :: {'crypto': 'BLU', 'rate': 0.009, 'provider': 'CAT'} 5 RED :: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
The following illustration shows the Flower monitoring tool screen after the screen refresh:
In the terminal my-xu4-5, which is running a Celery task Worker, the following would be the typical output:
[2020-09-17 20:27:13,184: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[ade1c95c-de97-4e53-8a36-b9b85a7f984e] [2020-09-17 20:27:13,380: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[ade1c95c-de97-4e53-8a36-b9b85a7f984e] succeeded in 0.18701653799871565s: {'crypto': 'BLU', 'rate': 0.0092, 'provider': 'CAT'} [2020-09-17 20:27:13,394: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[4c2ba9ee-4de1-40d6-b471-51de7439a8f3] [2020-09-17 20:27:13,556: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[4c2ba9ee-4de1-40d6-b471-51de7439a8f3] succeeded in 0.15665612700104248s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'} [2020-09-17 20:27:13,562: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[dfd36f77-d59d-4f96-ba06-0072195ed57d] [2020-09-17 20:27:13,723: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[dfd36f77-d59d-4f96-ba06-0072195ed57d] succeeded in 0.1563869599995087s: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'} [2020-09-17 20:27:13,730: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[509816a9-4c74-4b39-8e4b-0a6ab4263c19] [2020-09-17 20:27:13,891: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[509816a9-4c74-4b39-8e4b-0a6ab4263c19] succeeded in 0.1563130859976809s: {'crypto': 'RED', 'rate': 0.0022, 'provider': 'CAT'} [2020-09-17 20:27:13,897: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[6fce83fc-88fb-4d4b-86ea-a583a78edabb] [2020-09-17 20:27:14,058: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[6fce83fc-88fb-4d4b-86ea-a583a78edabb] succeeded in 0.15644379399964237s: {'crypto': 'BLU', 'rate': 0.0093, 'provider': 'DOG'} [2020-09-17 20:27:14,064: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[78141f9b-ccf8-4d5a-94dd-05b283edbcd0] [2020-09-17 20:27:14,225: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[78141f9b-ccf8-4d5a-94dd-05b283edbcd0] succeeded in 0.15650325199749204s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'} [2020-09-17 20:27:14,231: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[d69dac38-de56-4344-961a-7f65a4191946] [2020-09-17 20:27:14,392: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[d69dac38-de56-4344-961a-7f65a4191946] succeeded in 0.1563110860006418s: {'crypto': 'BLU', 'rate': 0.0091, 'provider': 'CAT'} [2020-09-17 20:27:14,398: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[5b452c53-3c38-479b-85db-fc4a81de8bfd] [2020-09-17 20:27:14,559: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[5b452c53-3c38-479b-85db-fc4a81de8bfd] succeeded in 0.15641754399985075s: {'crypto': 'RED', 'rate': 0.0021, 'provider': 'FOX'} [2020-09-17 20:27:14,570: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[2da5c239-3e54-40f3-9e10-27d63c03ea8c] [2020-09-17 20:27:14,731: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[2da5c239-3e54-40f3-9e10-27d63c03ea8c] succeeded in 0.15627242000118713s: {'crypto': 'BLU', 'rate': 0.009, 'provider': 'CAT'} [2020-09-17 20:27:14,739: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_rate[98dd6972-1494-4677-98ce-f4207e4985a3] [2020-09-17 20:27:14,900: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_rate[98dd6972-1494-4677-98ce-f4207e4985a3] succeeded in 0.15628821100108325s: {'crypto': 'RED', 'rate': 0.002, 'provider': 'DOG'}
The following illustration shows the executed tasks in the Flower monitoring tool screen after clicking on the navigation bar option Tasks:
Moving on to the next example, the following is Python script defines a Celery client with two task pipelines:
The following are brief descriptions for some of the keyword(s) and method(s) used in the Python script celery_client2.py above:
GroupResult :: defined in the module celery.result, an instance of this object represents the state of a group of Celery tasks executing in parallel on a set of Workers
TASK.s(*args, **kwargs) :: given a Celery task named TASK (with the Celery task decorator), the TASK.s method creates and returns a callable signature for TASK. Think of it as an alias or a reference for the TASK method that is callable like a normal Python method
group(list of TASK.s) :: this method takes a list of TASK.s (callable task signatures) and dispatches them as task messages to the specified queue(s) on the Broker for parallel execution by the task Workers. This method returns an instance of GroupResult
The following illustration shows the Celery group of tasks primitive:
chain(list of TASK.s) :: this method takes a list of TASK.s (callable task signatures) and dispatches task messages, one after the other, to the specified queue(s) on the Broker for execution by the different task Workers. The result from previous task is passed as an argument to the next task forming a chain. This method returns an instance of AsyncResult
The following illustration shows the Celery chain of tasks primitive:
The Celery group and/or chain primitives support execution of complex sequence of task processing pipelines.
In the terminal my-xu4-1, execute the following command to start the Celery client to test our complex task pipelines:
$ PYTHONPATH=$HOME python celery_client2.py
The following would be the typical output:
group_inst = group((MyCelery.celery_tasks.get_exch_rate('FOX', 'RED'), get_exch_rate('FOX', 'RED'), get_exch_rate('FOX', 'RED'))) group res type: <class 'celery.result.GroupResult'> group ready ?: False response: [{'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0023}, {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}, {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025}] group ready ?: True chain_inst = %MyCelery.celery_tasks.get_avg_exch_rate((get_exch_rate('DOG', 'BLU'), get_exch_rate('DOG', 'BLU'), get_exch_rate('DOG', 'BLU'))) chain res type: <class 'celery.result.AsyncResult'> chain ready ?: False response: {'provider': 'DOG', 'crypto': 'BLU', 'average': 0.009766666666666667} chain ready ?: True
In the terminal my-xu4-5, which is running a Celery task Worker, the following would be the typical output:
[2020-09-18 13:24:48,082: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[cf21d167-ab11-44ad-84e5-2fb75d044fb6] [2020-09-18 13:24:48,088: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[745d22bf-4991-40f4-85aa-9d18f3b8ce7f] [2020-09-18 13:24:48,093: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[b3c81594-5f20-44b3-ae77-9111329df7eb] [2020-09-18 13:24:48,634: INFO/ForkPoolWorker-1] Task MyCelery.celery_tasks.get_exch_rate[745d22bf-4991-40f4-85aa-9d18f3b8ce7f] succeeded in 0.5376590699997905s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025} [2020-09-18 13:24:48,634: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[cf21d167-ab11-44ad-84e5-2fb75d044fb6] succeeded in 0.534990074000234s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0023} [2020-09-18 13:24:49,149: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[b3c81594-5f20-44b3-ae77-9111329df7eb] succeeded in 0.5070336179996957s: {'provider': 'FOX', 'crypto': 'RED', 'rate': 0.0025} [2020-09-18 13:24:49,159: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[af761c51-3e76-4ddf-8997-c1eefc161f56] [2020-09-18 13:24:49,163: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[b8d5d4d2-7fa2-41d4-95e4-5bafca9a0840] [2020-09-18 13:24:49,168: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_exch_rate[8d853764-a866-46cc-a10f-19d555940d9b] [2020-09-18 13:24:49,685: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[af761c51-3e76-4ddf-8997-c1eefc161f56] succeeded in 0.5117993189996923s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.0097} [2020-09-18 13:24:49,685: INFO/ForkPoolWorker-1] Task MyCelery.celery_tasks.get_exch_rate[b8d5d4d2-7fa2-41d4-95e4-5bafca9a0840] succeeded in 0.5109045700010029s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.009899999999999999} [2020-09-18 13:24:50,316: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_exch_rate[8d853764-a866-46cc-a10f-19d555940d9b] succeeded in 0.6247811409994029s: {'provider': 'DOG', 'crypto': 'BLU', 'rate': 0.0097} [2020-09-18 13:24:50,318: INFO/MainProcess] Received task: MyCelery.celery_tasks.get_avg_exch_rate[fd3b5850-7ac6-43d0-872f-99a2ca2cd90d] [2020-09-18 13:24:50,833: INFO/ForkPoolWorker-2] Task MyCelery.celery_tasks.get_avg_exch_rate[fd3b5850-7ac6-43d0-872f-99a2ca2cd90d] succeeded in 0.5043644559991662s: {'provider': 'DOG', 'crypto': 'BLU', 'average': 0.009766666666666667}
This completes our demonstration of the Celery distributed task queue processing system using Python.
The crypto coins and exchanges mentioned above in this article are purely *FICTITIOUS* and are for illustrative purposes only.
References