PolarSPARC

Introduction to Asyncio in Python


Bhaskar S 07/26/2020


Overview

asyncio is a popular library for writing concurrent asynchronous applications in Python.

One may wonder why we need one more addition to the existing list of choices - multiprocessing, threading, concurrent.futures , etc. The simple answer - asyncio is a safer and less error-prone concurrency paradigm as compared to the non-trivial threading model which is susceptible to race conditions.

Before we go further, it is worth clarifying the difference between Concurrency and Parallelism, as it is often a source of some confusion.

In our fictitious example, Alice has to run a few errands - go to a Pharmacy to get a prescription filled, take-out pizza for dinner from Bob's Pizza, and get some items from the Grocery store. Each of the three tasks take some time.

The following illustration depicts the act of Alice performing each of the three tasks sequentially:

Sequential
Figure.1

A more optimal approach for Alice would be to go to the Pharmacy to submit the prescription, and then head to Bob's Pizza to place an order for take-out, and then go to the Grocery store to pick the items. Once she has completed grocery shopping, she will come back to pick-up the pizza as it will be ready by then, and then finally pick the prescription. This is an example of Concurrency.

The following illustration depicts the act of Alice performing each of the three tasks concurrently:

Concurrent
Figure.2

If Alice gets help from Charlie to get the prescription and the pizza while she gets the groceries, that is and example of Parallelism. The following illustration depicts the act of both Alice and Charlie dividing and performing the tasks in parallel:

Parallelism
Figure.3

In summary, Concurrency means multiple tasks can run in an overlapping manner, while Parallelism means multiple tasks run at the same time independently.

The recommended pattern is to use Concurrency ( concurrent.futures and/or threading ) for I/O (networking, storage, etc) intensive workloads and Parallelism (multiprocessing) for compute intensive workloads.

Now that we have clarified the difference between Concurrency and Parallelism , we turn our attention to Asynchronous. So, what is it ???

Asynchronous is a simpler concurrency paradigm that uses a single thread within a single process, along with cooperative preemptive multitasking to let the different tasks take turns to make progress. If a task blocks, it yields to another ready task to move forward. In other words, tasks overlap each other giving the illusion that they are all running at the same time - it is *NOT* parallel, but concurrent.

From our example, once Alice gets the groceries, she picks up the pizza as it is ready, then she goes to check on the prescription and realizes it is not ready yet. So she heads off to get a cup of coffee and check back again. The following illustration depicts the act of Alice performing the three tasks in an asynchronous manner:

Asynchronous
Figure.4

Now that we have explained what we mean by Asynchronous, we will start to dig a little deeper into the asyncio library in Python.

The core components of asyncio are as follows:

Component Description
Event Loop Manages the execution of a set of Python functions and switches between them as they block and unblock
Coroutines Special Python functions that behave like generators and yield control back to the event loop when they block
Tasks & Futures Objects that represent the state of the coroutine(s) that may or may not have completed execution. A task is a subclass of future. A task object can be used to monitor the status of the underlying coroutine

Setup

For the demonstration, we will be using Python version 3.8 or above.

Hands-on Asyncio

The following is the first simple example demonstrating the use of asyncio in Python:

example-01.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time

async def sub_task():
    print(f'{time.ctime()} sub_task - World')
    await asyncio.sleep(0.5)
    print(f'{time.ctime()} sub_task - Hello')

async def main_task():
    print(f'{time.ctime()} main_task - Hello')
    await sub_task()
    print(f'{time.ctime()} main_task - World')

if __name__ == '__main__':
    asyncio.run(main_task())

Executing example-01.py produces the following output:

Output.1

Sun Jul 19 20:20:07 2020 main_task - Hello
Sun Jul 19 13:20:07 2020 sub_task - World
Sun Jul 19 13:20:07 2020 sub_task - Hello
Sun Jul 19 13:20:07 2020 main_task - World

The following are brief descriptions for some of the keyword(s) and method(s) used in example-01.py above:

The following illustration depicts the execution of example-01.py:

example-01
Figure.5

The following example demonstrates a variation of the previous example but using the event loop explicitly:

example-02.py
#
# @Author: Bhaskar S
# @Blog:   https://www.polarsparc.com
# @Date:   19 Jul 2020
#

async def sub_task():
    print(f'{time.ctime()} sub_task - Mundo')
    await asyncio.sleep(0.5)
    print(f'{time.ctime()} sub_task - Hola')

async def main_task():
    print(f'{time.ctime()} main_task - Hola')
    await sub_task()
    print(f'{time.ctime()} main_task - Mundo')

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_task())
    loop.close()

Executing example-02.py produces the following output:

Output.2

Sun Jul 19 13:25:44 2020 main_task - Hola
Sun Jul 19 13:25:44 2020 sub_task - Mundo
Sun Jul 19 13:25:45 2020 sub_task - Hola
Sun Jul 19 13:25:45 2020 main_task - Mundo

The following are brief descriptions for some of the keyword(s) and method(s) used in example-02.py above:

Under the hood, the call to asyncio.run() performs the steps in example-02.py , that is, call the method asyncio.get_event_loop(), then invoke the method run_until_complete(), and finally call the method close().

The following example demonstrates three tasks executed concurrently by the event loop:

example-03.py
#
# @Author: Bhaskar S
# @Blog:   https: // www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time
import random

async def process(name):
    delay = (random.random() * 10) / 10.0
    print("%s:: %s [%.1f] - step - 1" % (time.ctime(), name, delay))
    await asyncio.sleep(delay)
    print("%s:: %s [%.1f] - step - 2" % (time.ctime(), name, delay))

async def main_process():
    tasks = [process("P1"), process("P2"), process("P3")]
    await asyncio.gather(*tasks)

if __name__ == "__main__":
    print("Starting ...")
    asyncio.run(main_process())
    print("Done !!!")

Executing example-03.py produces the following output:

Output.3

Starting ...
Sun Jul 19 13:31:29 2020:: P1 [0.6] - step - 1
Sun Jul 19 13:31:29 2020:: P2 [0.9] - step - 1
Sun Jul 19 13:31:29 2020:: P3 [0.3] - step - 1
Sun Jul 19 13:31:29 2020:: P3 [0.3] - step - 2
Sun Jul 19 13:31:30 2020:: P1 [0.6] - step - 2
Sun Jul 19 13:31:30 2020:: P2 [0.9] - step - 2
Done !!!

The following is brief description for the method used in example-03.py above:

The following illustration depicts the execution of example-03.py:

example-03
Figure.6

The following example demonstrates returning values from coroutine(s):

example-04.py
#
# @Author: Bhaskar S
# @Blog:   https: // www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time
import random

async def process(name):
    delay = (random.random() * 10) / 10.0
    print("%s:: %s [%.1f] - step - 1" % (time.ctime(), name, delay))
    await asyncio.sleep(delay)
    print("%s:: %s [%.1f] - step - 2" % (time.ctime(), name, delay))
    res = random.randint(0, 9)
    print("%s:: %s [%.1f] - result - %d" % (time.ctime(), name, delay, res))
    return res

async def main_process():
    t1 = asyncio.create_task(process("P1"))
    t2 = asyncio.create_task(process("P2"))
    t3 = asyncio.create_task(process("P3"))

    print('Ready to start tasks t1, t2, and t3...')

    r1 = await t1
    r2 = await t2
    r3 = await t3

    print('Result from t1: %d' % r1)
    print('Result from t2: %d' % r2)
    print('Result from t3: %d' % r3)

    print('Completed Tasks t1, t2, and t3 !!!')

if __name__ == "__main__":
    print("Starting ...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_process())
    loop.close()
    print("Done !!!")

Executing example-04.py produces the following output:

Output.4

Starting ...
Ready to start tasks t1, t2, and t3...
Sun Jul 19 13:36:58 2020:: P1 [0.1] - step - 1
Sun Jul 19 13:36:58 2020:: P2 [0.7] - step - 1
Sun Jul 19 13:36:58 2020:: P3 [0.8] - step - 1
Sun Jul 19 13:36:58 2020:: P1 [0.1] - step - 2
Sun Jul 19 13:36:58 2020:: P1 [0.1] - result - 5
Sun Jul 19 13:36:58 2020:: P2 [0.7] - step - 2
Sun Jul 19 13:36:58 2020:: P2 [0.7] - result - 5
Sun Jul 19 13:36:59 2020:: P3 [0.8] - step - 2
Sun Jul 19 13:36:59 2020:: P3 [0.8] - result - 7
Result from t1: 5
Result from t2: 5
Result from t3: 7
Completed Tasks t1, t2, and t3 !!!
Done !!!

The following are brief descriptions for some of the keyword(s) and method(s) used in example-04.py above:

The following example demonstrates returning values from coroutine(s) as they complete, the quickest one first, then the next quickest, and so on:

example-05.py
#
# @Author: Bhaskar S
# @Blog:   https: // www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time
import random

async def process(name):
    delay = (random.random() * 10) / 10.0
    print("%s:: %s [%.1f] - step - 1" % (time.ctime(), name, delay))
    await asyncio.sleep(delay)
    print("%s:: %s [%.1f] - step - 2" % (time.ctime(), name, delay))
    res = random.randint(0, 9)
    print("%s:: %s [%.1f] - result - %d" % (time.ctime(), name, delay, res))
    return name, res

async def main_process():
    t1 = asyncio.create_task(process("P1"))
    t2 = asyncio.create_task(process("P2"))
    t3 = asyncio.create_task(process("P3"))

    print('Ready to get values from tasks t1, t2, and t3...')

    for task in asyncio.as_completed([t1, t2, t3]):
        n, r = await task
        print('Result from %s: %d' % (n, r))

    print('Completed Tasks t1, t2, and t3 !!!')

if __name__ == "__main__":
    print("Starting ...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_process())
    loop.close()
    print("Done !!!")

Executing example-05.py produces the following output:

Output.5

Starting ...
Ready to get values from tasks t1, t2, and t3...
Sun Jul 19 13:44:57 2020:: P1 [1.0] - step - 1
Sun Jul 19 13:44:57 2020:: P2 [0.6] - step - 1
Sun Jul 19 13:44:57 2020:: P3 [0.9] - step - 1
Sun Jul 19 13:44:58 2020:: P2 [0.6] - step - 2
Sun Jul 19 13:44:58 2020:: P2 [0.6] - result - 8
Result from P2: 8
Sun Jul 19 13:44:58 2020:: P3 [0.9] - step - 2
Sun Jul 19 13:44:58 2020:: P3 [0.9] - result - 6
Result from P3: 6
Sun Jul 19 13:44:58 2020:: P1 [1.0] - step - 2
Sun Jul 19 13:44:58 2020:: P1 [1.0] - result - 3
Result from P1: 3
Completed Tasks t1, t2, and t3 !!!
Done !!!

The following is a brief description of the method used in example-05.py above:

The following example demonstrates returning values from coroutine(s) within a specific duration or else timeout:

example-06.py
#
# @Author: Bhaskar S
# @Blog:   https: // www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time
import random

async def process(name):
    delay = (random.random() * 10) / 10.0
    print("%s:: %s [%.1f] - step - 1" % (time.ctime(), name, delay))
    await asyncio.sleep(delay)
    print("%s:: %s [%.1f] - step - 2" % (time.ctime(), name, delay))
    res = random.randint(0, 9)
    print("%s:: %s [%.1f] - result - %d" % (time.ctime(), name, delay, res))
    return name, res

async def gather_process():
    t1 = asyncio.create_task(process("P1"))
    t2 = asyncio.create_task(process("P2"))
    t3 = asyncio.create_task(process("P3"))

    print('Ready to get values from tasks t1, t2, and t3 ...')

    res = await asyncio.gather(t1, t2, t3)

    print('Completed Tasks t1, t2, and t3 - %s' % res)

async def main_process():
    print('Ready to get values within 0.5 secs ...')

    try:
        await asyncio.wait_for(gather_process(), timeout=0.5)
    except asyncio.TimeoutError:
        print('Could not complete the tasks in under 0.5 secs')

    print('Got all values !!!')

if __name__ == "__main__":
    print("Starting ...")
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_process())
    loop.close()
    print("Done !!!")

Executing example-06.py produces the following output for a successful run:

Output.6

Starting ...
Ready to get values within 0.5 secs ...
Ready to get values from tasks t1, t2, and t3 ...
Sun Jul 19 13:52:36 2020:: P1 [0.3] - step - 1
Sun Jul 19 13:52:36 2020:: P2 [0.4] - step - 1
Sun Jul 19 13:52:36 2020:: P3 [0.1] - step - 1
Sun Jul 19 13:52:36 2020:: P3 [0.1] - step - 2
Sun Jul 19 13:52:36 2020:: P3 [0.1] - result - 2
Sun Jul 19 13:52:36 2020:: P1 [0.3] - step - 2
Sun Jul 19 13:52:36 2020:: P1 [0.3] - result - 6
Sun Jul 19 13:52:36 2020:: P2 [0.4] - step - 2
Sun Jul 19 13:52:36 2020:: P2 [0.4] - result - 6
Completed Tasks t1, t2, and t3 - [('P1', 6), ('P2', 6), ('P3', 2)]
Got all values !!!
Done !!!

Executing example-06.py produces the following output for a timeout run:

Output.7

Starting ...
Ready to get values within 0.5 secs ...
Ready to get values from tasks t1, t2, and t3 ...
Sun Jul 19 13:53:02 2020:: P1 [1.0] - step - 1
Sun Jul 19 13:53:02 2020:: P2 [0.7] - step - 1
Sun Jul 19 13:53:02 2020:: P3 [0.2] - step - 1
Sun Jul 19 13:53:02 2020:: P3 [0.2] - step - 2
Sun Jul 19 13:53:02 2020:: P3 [0.2] - result - 4
Could not complete the tasks in under 0.5 secs
Got all values !!!
Done !!!

The following is a brief description of the method used in example-06.py above:

The following example demonstrates the ability to cancel a task that wraps a coroutine if it exceeds an SLA:

example-07.py
#
# @Author: Bhaskar S
# @Blog:   https: // www.polarsparc.com
# @Date:   19 Jul 2020
#

import asyncio
import time
import random

async def data_process():
    try:
        delay = random.randint(0, 9)
        print("%s:: [%d] - retrieve from DB..." % (time.ctime(), delay))
        await asyncio.sleep(delay)
        delay = random.randint(1, 3)
        print("%s:: [%d] - process dataset..." % (time.ctime(), delay))
        await asyncio.sleep(delay)
        res = random.sample(range(1, 101), 10)
        print("%s:: [%d] - result - %d" % (time.ctime(), delay, len(res)))
        return res
    except asyncio.CancelledError:
        print('Task processing cancelled !!!')

async def main_process():
    print('Ready to start data processing...')
    task = asyncio.create_task(data_process())
    await asyncio.sleep(6.0)
    if not task.done():
        task.cancel()
        print('Cancelling data processing due to SLA breach...')
    else:
        print('Completed data processing: %s' % task.result())

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main_process())
    loop.close()

Executing example-07.py produces the following output for a successful run:

Output.8

Ready to start data processing...
Sun Jul 19 14.03:49 2020:: [0] - retrieve from DB...
Sun Jul 19 14.03:49 2020:: [2] - process dataset...
Sun Jul 19 14.03:51 2020:: [2] - result - 10
Completed data processing: [1, 53, 30, 18, 65, 27, 5, 51, 64, 37]

Executing example-07.py produces the following output for a timeout run:

Output.9

Ready to start data processing...
Sun Jul 19 14:04:18 2020:: [9] - retrieve from DB...
Cancelling data processing due to SLA breach...
Task processing cancelled !!!

The following are the brief descriptions of some of the method(s) used in example-07.py above:


References

Asyncio Documentation


© PolarSPARC