Python > Advanced Python Concepts > Concurrency and Parallelism > Event Loops (`asyncio` module)
Asynchronous Task Queue with `asyncio` and `asyncio.Queue`
This snippet demonstrates how to create a simple asynchronous task queue using `asyncio.Queue`. The queue is used to enqueue tasks (in this case, simulating work with `asyncio.sleep`), and multiple worker coroutines consume tasks from the queue concurrently. This is a fundamental pattern for distributing work across multiple asynchronous workers.
Code Implementation
The code defines a `worker` coroutine that continuously gets tasks (simulated delays) from the queue, sleeps for the specified delay, and signals task completion. The `main` function creates the queue, populates it with random delays representing work items, creates multiple worker coroutines, and then waits for all tasks in the queue to be processed using `queue.join()`. Finally, it cancels the worker tasks to ensure the program terminates cleanly.
import asyncio
import random
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
delay = await queue.get()
print(f'{name}: Working on {delay} second(s)...')
# Simulate doing the work
await asyncio.sleep(delay)
# Signal to the queue that the "work item" has been processed.
queue.task_done()
print(f'{name}: Finished {delay} second(s)')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random work and put it in the queue.
total_tasks = 10
for i in range(total_tasks):
delay = random.randint(1, 5)
await queue.put(delay)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i+1}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
await queue.join()
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('Finished!')
if __name__ == "__main__":
asyncio.run(main())
Concepts Behind the Snippet
Real-Life Use Case
This pattern is used in many real-world applications, including:
Best Practices
Interview Tip
Be prepared to discuss the benefits of using a task queue for asynchronous processing, the different types of queues available in `asyncio`, and the trade-offs between different concurrency models. Understanding how to handle errors and gracefully shut down asynchronous workers is also important. Be familiar with `asyncio.create_task`, `asyncio.gather` and `queue.join()`
When to Use Them
Use asynchronous task queues when you need to distribute work across multiple workers and process tasks concurrently in an I/O-bound environment. They are particularly useful when the tasks are independent and can be processed in any order.
Memory Footprint
The memory footprint depends on the size of the queue and the amount of data associated with each task. Using a bounded queue can help limit the memory usage. `asyncio` generally has lower overhead than threading or multiprocessing for task queues.
Alternatives
Alternatives to `asyncio.Queue` for task queues include:
Pros
Cons
FAQ
-
What happens if a worker crashes?
If a worker crashes, the task it was processing will be lost unless you have implemented a mechanism for retrying failed tasks. Robust error handling is essential to prevent worker crashes. -
How do I handle task dependencies?
For tasks with dependencies, you can use a more sophisticated task scheduling mechanism or a directed acyclic graph (DAG) to represent the task dependencies. Libraries like Airflow can help manage complex task dependencies. -
How can I monitor the progress of the task queue?
You can monitor the progress of the task queue by tracking the number of tasks in the queue, the number of tasks completed, and the number of workers that are currently active. Logging and metrics can be used to gather this information.