Asyncio Advanced#

Source:

src/basic/asyncio_.py

Introduction#

Beyond basic coroutines and networking, asyncio provides synchronization primitives, queues, subprocess management, and debugging tools. This section covers advanced patterns for building robust async applications, including producer-consumer patterns, rate limiting, graceful shutdown, and integration with synchronous code.

Locks#

asyncio.Lock prevents multiple coroutines from accessing a shared resource simultaneously. Unlike threading locks, async locks must be used with await and only work within the same event loop.

import asyncio

class SharedCounter:
    def __init__(self):
        self.value = 0
        self._lock = asyncio.Lock()

    async def increment(self):
        async with self._lock:
            current = self.value
            await asyncio.sleep(0.01)  # Simulate work
            self.value = current + 1

async def worker(counter, name, count):
    for _ in range(count):
        await counter.increment()
    print(f"{name} done")

async def main():
    counter = SharedCounter()
    await asyncio.gather(
        worker(counter, "A", 100),
        worker(counter, "B", 100),
        worker(counter, "C", 100),
    )
    print(f"Final value: {counter.value}")  # Should be 300

asyncio.run(main())

Semaphores for Rate Limiting#

asyncio.Semaphore limits the number of concurrent operations. This is essential for rate limiting API calls, limiting database connections, or controlling resource usage.

import asyncio

async def fetch(url, semaphore):
    async with semaphore:
        print(f"Fetching {url}")
        await asyncio.sleep(1)  # Simulate network request
        return f"Response from {url}"

async def main():
    # Limit to 3 concurrent requests
    semaphore = asyncio.Semaphore(3)

    urls = [f"https://api.example.com/{i}" for i in range(10)]
    tasks = [fetch(url, semaphore) for url in urls]
    results = await asyncio.gather(*tasks)

    for r in results:
        print(r)

asyncio.run(main())

Events for Signaling#

asyncio.Event allows coroutines to wait for a signal from another coroutine. This is useful for coordinating startup, shutdown, or state changes between multiple tasks.

import asyncio

async def waiter(event, name):
    print(f"{name} waiting for event")
    await event.wait()
    print(f"{name} got the event!")

async def setter(event):
    print("Setting event in 2 seconds...")
    await asyncio.sleep(2)
    event.set()
    print("Event set!")

async def main():
    event = asyncio.Event()

    await asyncio.gather(
        waiter(event, "Task 1"),
        waiter(event, "Task 2"),
        waiter(event, "Task 3"),
        setter(event),
    )

asyncio.run(main())

Conditions for Complex Synchronization#

asyncio.Condition combines a lock with the ability to wait for a condition. This is useful for producer-consumer patterns where consumers need to wait for specific conditions.

import asyncio

class Buffer:
    def __init__(self, size):
        self.buffer = []
        self.size = size
        self.condition = asyncio.Condition()

    async def put(self, item):
        async with self.condition:
            while len(self.buffer) >= self.size:
                await self.condition.wait()
            self.buffer.append(item)
            self.condition.notify()

    async def get(self):
        async with self.condition:
            while not self.buffer:
                await self.condition.wait()
            item = self.buffer.pop(0)
            self.condition.notify()
            return item

async def producer(buffer, name):
    for i in range(5):
        await buffer.put(f"{name}-{i}")
        print(f"Produced: {name}-{i}")
        await asyncio.sleep(0.1)

async def consumer(buffer, name):
    for _ in range(5):
        item = await buffer.get()
        print(f"{name} consumed: {item}")
        await asyncio.sleep(0.2)

async def main():
    buffer = Buffer(size=2)
    await asyncio.gather(
        producer(buffer, "P1"),
        consumer(buffer, "C1"),
        consumer(buffer, "C2"),
    )

asyncio.run(main())

Queues for Producer-Consumer#

asyncio.Queue is the preferred way to implement producer-consumer patterns. It handles synchronization internally and provides blocking get/put operations with optional timeouts.

import asyncio

async def producer(queue, name):
    for i in range(5):
        item = f"{name}-item-{i}"
        await queue.put(item)
        print(f"Produced: {item}")
        await asyncio.sleep(0.5)

async def consumer(queue, name):
    while True:
        try:
            item = await asyncio.wait_for(queue.get(), timeout=2.0)
            print(f"{name} consumed: {item}")
            queue.task_done()
            await asyncio.sleep(0.1)
        except asyncio.TimeoutError:
            print(f"{name} timed out, exiting")
            break

async def main():
    queue = asyncio.Queue(maxsize=3)

    producers = [
        asyncio.create_task(producer(queue, "P1")),
        asyncio.create_task(producer(queue, "P2")),
    ]
    consumers = [
        asyncio.create_task(consumer(queue, "C1")),
        asyncio.create_task(consumer(queue, "C2")),
    ]

    await asyncio.gather(*producers)
    await queue.join()  # Wait for all items to be processed

    for c in consumers:
        c.cancel()

asyncio.run(main())

Priority Queue#

asyncio.PriorityQueue processes items by priority. Lower priority values are processed first. Items must be comparable or wrapped in tuples with priority as the first element.

import asyncio

async def producer(queue):
    items = [
        (3, "low priority"),
        (1, "high priority"),
        (2, "medium priority"),
    ]
    for priority, item in items:
        await queue.put((priority, item))
        print(f"Added: {item} (priority {priority})")

async def consumer(queue):
    while not queue.empty():
        priority, item = await queue.get()
        print(f"Processing: {item} (priority {priority})")
        await asyncio.sleep(0.5)
        queue.task_done()

async def main():
    queue = asyncio.PriorityQueue()
    await producer(queue)
    await consumer(queue)

asyncio.run(main())

Running Subprocesses#

Asyncio can run and communicate with subprocesses asynchronously. This is useful for running shell commands, external tools, or parallel processes without blocking the event loop.

import asyncio

async def run_command(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.PIPE
    )

    stdout, stderr = await proc.communicate()

    return {
        'cmd': cmd,
        'returncode': proc.returncode,
        'stdout': stdout.decode().strip(),
        'stderr': stderr.decode().strip()
    }

async def main():
    commands = [
        "echo 'Hello World'",
        "python --version",
        "date",
    ]

    results = await asyncio.gather(*[run_command(c) for c in commands])

    for r in results:
        print(f"Command: {r['cmd']}")
        print(f"Output: {r['stdout']}")
        print()

asyncio.run(main())

Subprocess with Streaming Output#

For long-running processes, you can stream output line by line instead of waiting for the process to complete. This is useful for monitoring logs or progress.

import asyncio

async def stream_subprocess(cmd):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stdout=asyncio.subprocess.PIPE,
        stderr=asyncio.subprocess.STDOUT
    )

    while True:
        line = await proc.stdout.readline()
        if not line:
            break
        print(f"[{cmd[:20]}] {line.decode().strip()}")

    await proc.wait()
    return proc.returncode

async def main():
    # Run multiple commands and stream their output
    await asyncio.gather(
        stream_subprocess("for i in 1 2 3; do echo $i; sleep 1; done"),
        stream_subprocess("for i in a b c; do echo $i; sleep 0.5; done"),
    )

asyncio.run(main())

Graceful Shutdown#

Proper shutdown handling ensures all tasks complete cleanly and resources are released. Use signal handlers to catch SIGINT/SIGTERM and cancel tasks gracefully.

import asyncio
import signal

async def worker(name):
    try:
        while True:
            print(f"{name} working...")
            await asyncio.sleep(1)
    except asyncio.CancelledError:
        print(f"{name} cancelled, cleaning up...")
        await asyncio.sleep(0.5)  # Cleanup time
        print(f"{name} cleanup done")
        raise

async def main():
    loop = asyncio.get_event_loop()
    tasks = [
        asyncio.create_task(worker("Worker-1")),
        asyncio.create_task(worker("Worker-2")),
    ]

    def shutdown():
        print("\nShutdown requested...")
        for task in tasks:
            task.cancel()

    loop.add_signal_handler(signal.SIGINT, shutdown)
    loop.add_signal_handler(signal.SIGTERM, shutdown)

    try:
        await asyncio.gather(*tasks)
    except asyncio.CancelledError:
        print("All tasks cancelled")

asyncio.run(main())

Running Async Code in Threads#

When you need to run async code from synchronous code (e.g., in a callback or from another thread), use asyncio.run_coroutine_threadsafe().

import asyncio
import threading
import time

async def async_task(value):
    await asyncio.sleep(1)
    return value * 2

def thread_function(loop):
    # Run async code from a different thread
    future = asyncio.run_coroutine_threadsafe(
        async_task(21), loop
    )
    result = future.result(timeout=5)
    print(f"Thread got result: {result}")

async def main():
    loop = asyncio.get_event_loop()

    # Start a thread that will call async code
    thread = threading.Thread(target=thread_function, args=(loop,))
    thread.start()

    # Keep the event loop running
    await asyncio.sleep(2)
    thread.join()

asyncio.run(main())

Debugging Asyncio#

Enable debug mode to catch common mistakes like blocking calls, unawaited coroutines, and slow callbacks. Debug mode adds overhead so use it only during development.

import asyncio
import logging

# Enable debug logging
logging.basicConfig(level=logging.DEBUG)

async def slow_callback():
    import time
    time.sleep(0.2)  # This will trigger a warning in debug mode

async def main():
    await slow_callback()

# Method 1: Environment variable
# PYTHONASYNCIODEBUG=1 python script.py

# Method 2: asyncio.run with debug=True
asyncio.run(main(), debug=True)

Custom Event Loop#

You can customize the event loop behavior by subclassing or patching. This is useful for debugging, profiling, or adding custom functionality.

import asyncio

class DebugEventLoop(asyncio.SelectorEventLoop):
    def _run_once(self):
        # Track number of scheduled callbacks
        num_ready = len(self._ready)
        num_scheduled = len(self._scheduled)
        if num_ready or num_scheduled:
            print(f"Ready: {num_ready}, Scheduled: {num_scheduled}")
        super()._run_once()

async def task(n):
    await asyncio.sleep(n)
    print(f"Task {n} done")

# Use custom event loop
loop = DebugEventLoop()
asyncio.set_event_loop(loop)

try:
    loop.run_until_complete(asyncio.gather(
        task(0.1),
        task(0.2),
        task(0.3),
    ))
finally:
    loop.close()

Timeout Patterns#

Different timeout patterns for various use cases: per-operation timeout, overall timeout, and timeout with fallback.

import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"Response from {url}"

async def fetch_with_timeout(url, delay, timeout):
    """Per-operation timeout."""
    try:
        return await asyncio.wait_for(fetch(url, delay), timeout)
    except asyncio.TimeoutError:
        return f"Timeout for {url}"

async def fetch_all_with_timeout(urls, timeout):
    """Overall timeout for all operations."""
    async def fetch_all():
        return await asyncio.gather(*[fetch(u, i) for i, u in enumerate(urls)])

    try:
        return await asyncio.wait_for(fetch_all(), timeout)
    except asyncio.TimeoutError:
        return ["Overall timeout"]

async def fetch_with_fallback(url, delay, timeout, fallback):
    """Timeout with fallback value."""
    try:
        return await asyncio.wait_for(fetch(url, delay), timeout)
    except asyncio.TimeoutError:
        return fallback

async def main():
    # Per-operation timeout
    result = await fetch_with_timeout("slow.com", 5, 1)
    print(result)

    # Timeout with fallback
    result = await fetch_with_fallback("slow.com", 5, 1, "cached response")
    print(result)

asyncio.run(main())

Retry Pattern#

Implement retry logic for transient failures with exponential backoff. This is essential for robust network clients.

import asyncio
import random

class RetryError(Exception):
    pass

async def unreliable_operation():
    """Simulates an operation that fails randomly."""
    if random.random() < 0.7:
        raise ConnectionError("Network error")
    return "Success!"

async def retry(coro_func, max_retries=3, base_delay=1.0):
    """Retry with exponential backoff."""
    last_exception = None

    for attempt in range(max_retries):
        try:
            return await coro_func()
        except Exception as e:
            last_exception = e
            if attempt < max_retries - 1:
                delay = base_delay * (2 ** attempt)
                jitter = random.uniform(0, 0.1 * delay)
                print(f"Attempt {attempt + 1} failed, retrying in {delay:.2f}s")
                await asyncio.sleep(delay + jitter)

    raise RetryError(f"Failed after {max_retries} attempts") from last_exception

async def main():
    try:
        result = await retry(unreliable_operation, max_retries=5)
        print(f"Result: {result}")
    except RetryError as e:
        print(f"All retries failed: {e}")

asyncio.run(main())

Async Context Variable#

Context variables (Python 3.7+) provide task-local storage, similar to thread-local storage but for async tasks. Useful for request IDs, user context, or database connections.

import asyncio
import contextvars

# Create context variable
request_id = contextvars.ContextVar('request_id', default=None)

async def process_request(rid):
    request_id.set(rid)
    await step1()
    await step2()

async def step1():
    rid = request_id.get()
    print(f"[{rid}] Step 1")
    await asyncio.sleep(0.1)

async def step2():
    rid = request_id.get()
    print(f"[{rid}] Step 2")
    await asyncio.sleep(0.1)

async def main():
    await asyncio.gather(
        process_request("req-001"),
        process_request("req-002"),
        process_request("req-003"),
    )

asyncio.run(main())

TaskGroup (Python 3.11+)#

TaskGroup provides structured concurrency, ensuring all tasks complete or are cancelled together. Exceptions in any task cancel all other tasks in the group.

import asyncio

async def task(name, delay, should_fail=False):
    await asyncio.sleep(delay)
    if should_fail:
        raise ValueError(f"{name} failed!")
    return f"{name} done"

async def main():
    try:
        async with asyncio.TaskGroup() as tg:
            tg.create_task(task("A", 1))
            tg.create_task(task("B", 2))
            tg.create_task(task("C", 0.5, should_fail=True))
    except* ValueError as eg:
        for exc in eg.exceptions:
            print(f"Caught: {exc}")

# Python 3.11+
asyncio.run(main())