Python Async / Await

Write concurrent I/O-bound code with async/await, the asyncio event loop, Tasks, and gather() - without threads.

Intermediate

Coroutines and async/await

An async def function is a coroutine function. Calling it returns a coroutine object - it does not run until awaited or scheduled as a Task:

Python
import asyncio

async def greet(name, delay):
    await asyncio.sleep(delay)   # suspend here, let others run
    print(f'Hello, {name}!')
    return f'greeted {name}'

# asyncio.run() starts the event loop and runs the coroutine
async def main():
    result = await greet('Alice', 1)
    print(result)

asyncio.run(main())
Output (after ~1 second)
Hello, Alice!
greeted Alice

The key insight: await asyncio.sleep(1) suspends only the current coroutine, not the program. Other coroutines can run during that sleep. Compare sequential vs concurrent execution:

Python
import asyncio, time

async def fetch_data(url, delay):
    await asyncio.sleep(delay)   # simulates network I/O
    return f'data from {url}'

async def sequential():
    start = time.perf_counter()
    r1 = await fetch_data('api/users',    2)
    r2 = await fetch_data('api/products', 3)
    print(f'Sequential: {time.perf_counter() - start:.1f}s')   # ~5s

async def concurrent():
    start = time.perf_counter()
    r1, r2 = await asyncio.gather(
        fetch_data('api/users',    2),
        fetch_data('api/products', 3),
    )
    print(f'Concurrent: {time.perf_counter() - start:.1f}s')   # ~3s

asyncio.run(sequential())
asyncio.run(concurrent())
Output
Sequential: 5.0s
Concurrent: 3.0s
Never call blocking code in async functions

Calling time.sleep(1), requests.get(url), or any blocking I/O inside an async function freezes the entire event loop - no other coroutines can run. Use asyncio.sleep() and async libraries (aiohttp, aiofiles). For unavoidable blocking calls, use await loop.run_in_executor(None, blocking_func) to offload to a thread pool.

The Event Loop

The event loop is the scheduler that runs coroutines. asyncio.run() is the standard way to start it. For advanced use, you can get and control the loop directly:

Python
import asyncio

async def main():
    loop = asyncio.get_event_loop()
    print(f'running: {loop.is_running()}')   # True

    # Schedule a callback (non-coroutine, fire-and-forget)
    loop.call_soon(print, 'scheduled callback')

    # Run a blocking function in a thread pool
    import time
    result = await loop.run_in_executor(None, time.sleep, 0.1)

    # Get current time from event loop (avoids system call)
    now = loop.time()
    print(f'loop time: {now:.3f}')

asyncio.run(main())

Running async code from synchronous code (e.g., inside a framework that doesn't support async):

Python
import asyncio

async def fetch():
    await asyncio.sleep(0.1)
    return 42

# From synchronous code - creates a new event loop
result = asyncio.run(fetch())
print(result)   # 42

# From a thread that already has an event loop running (e.g., Jupyter)
# asyncio.run() would raise RuntimeError: "cannot run nested event loop"
# Instead, install nest_asyncio: pip install nest_asyncio
# import nest_asyncio; nest_asyncio.apply()

# asyncio.get_event_loop() is deprecated for top-level use; use asyncio.run()

Tasks and gather()

A Task wraps a coroutine and schedules it to run concurrently. asyncio.create_task() schedules immediately; asyncio.gather() creates multiple tasks and waits for all:

Python
import asyncio

async def worker(n, delay):
    print(f'worker {n} starting')
    await asyncio.sleep(delay)
    print(f'worker {n} done')
    return n * 10

async def main():
    # create_task schedules immediately - runs concurrently
    task1 = asyncio.create_task(worker(1, 2))
    task2 = asyncio.create_task(worker(2, 1))

    # Tasks are already running - await just waits for completion
    result1 = await task1
    result2 = await task2
    print(result1, result2)   # 10 20

asyncio.run(main())
Output
worker 1 starting
worker 2 starting
worker 2 done
worker 1 done
10 20
Python
import asyncio

async def main():
    # gather() runs all coroutines concurrently, returns list of results
    results = await asyncio.gather(
        worker(1, 2),
        worker(2, 1),
        worker(3, 3),
    )
    print(results)   # [10, 20, 30] - in submission order, not completion order

    # return_exceptions=True - failed tasks return exception objects instead of raising
    results = await asyncio.gather(
        worker(1, 1),
        asyncio.sleep(0, result=ZeroDivisionError('bad')),  # simulated failure
        return_exceptions=True
    )
    for r in results:
        if isinstance(r, Exception):
            print(f'task failed: {r}')
        else:
            print(f'task result: {r}')

Cancelling tasks:

Python
import asyncio

async def long_task():
    try:
        await asyncio.sleep(100)
    except asyncio.CancelledError:
        print('task was cancelled - cleaning up')
        raise   # always reraise CancelledError

async def main():
    task = asyncio.create_task(long_task())
    await asyncio.sleep(1)
    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print('confirmed cancelled')

asyncio.run(main())

Waiting with timeout using asyncio.wait_for():

Python
import asyncio

async def slow_fetch():
    await asyncio.sleep(10)
    return 'data'

async def main():
    try:
        result = await asyncio.wait_for(slow_fetch(), timeout=2.0)
    except asyncio.TimeoutError:
        print('request timed out')

    # Python 3.11+ - asyncio.timeout context manager
    try:
        async with asyncio.timeout(2.0):
            result = await slow_fetch()
    except asyncio.TimeoutError:
        print('request timed out')

asyncio.run(main())

TaskGroup (Python 3.11+)

asyncio.TaskGroup provides structured concurrency - if any task fails, all others are automatically cancelled:

Python
import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f'response from {url}'

async def failing_task():
    await asyncio.sleep(0.5)
    raise ValueError('something went wrong')

async def main():
    results = []

    # All tasks succeed
    async with asyncio.TaskGroup() as tg:
        t1 = tg.create_task(fetch('api/users',    1))
        t2 = tg.create_task(fetch('api/orders',   2))
        t3 = tg.create_task(fetch('api/products', 1))
    # All three tasks are complete here
    print(t1.result(), t2.result(), t3.result())

    # One task fails - others are cancelled
    try:
        async with asyncio.TaskGroup() as tg:
            t1 = tg.create_task(fetch('api/users', 3))
            t2 = tg.create_task(failing_task())     # fails after 0.5s
    except* ValueError as eg:   # ExceptionGroup - Python 3.11+
        print(f'Errors: {eg.exceptions}')

asyncio.run(main())
except* - ExceptionGroup handling

When multiple tasks fail inside a TaskGroup, Python raises an ExceptionGroup containing all exceptions. Use except* ExceptionType (the starred form, Python 3.11+) to handle groups. except* ValueError catches the group if it contains any ValueError instances; other exception types propagate.

Async I/O Patterns

Async file I/O with aiofiles

Python
import asyncio
import aiofiles   # pip install aiofiles

async def read_file(path):
    async with aiofiles.open(path, 'r') as f:
        return await f.read()

async def write_file(path, content):
    async with aiofiles.open(path, 'w') as f:
        await f.write(content)

async def main():
    # Read multiple files concurrently
    contents = await asyncio.gather(
        read_file('file1.txt'),
        read_file('file2.txt'),
        read_file('file3.txt'),
    )
    combined = '\n'.join(contents)
    await write_file('combined.txt', combined)

Semaphore - limit concurrency

Python
import asyncio

async def fetch_with_limit(sem, url):
    async with sem:   # only N concurrent fetches at a time
        await asyncio.sleep(0.1)   # simulates HTTP request
        return f'data from {url}'

async def main():
    sem = asyncio.Semaphore(10)   # max 10 concurrent requests
    urls = [f'api/item/{i}' for i in range(100)]

    tasks = [fetch_with_limit(sem, url) for url in urls]
    results = await asyncio.gather(*tasks)
    print(f'fetched {len(results)} items')

Async Queue - producer/consumer

Python
import asyncio

async def producer(queue, n):
    for i in range(n):
        await asyncio.sleep(0.1)
        await queue.put(i)
        print(f'produced {i}')
    await queue.put(None)   # sentinel

async def consumer(queue):
    while True:
        item = await queue.get()
        if item is None:
            break
        print(f'consumed {item}')
        queue.task_done()

async def main():
    queue = asyncio.Queue(maxsize=5)
    await asyncio.gather(
        producer(queue, 10),
        consumer(queue),
    )

Synchronisation Primitives

asyncio provides async versions of threading synchronisation tools. Use these - not their threading equivalents - in async code:

Python
import asyncio

# Lock - mutual exclusion
lock = asyncio.Lock()
shared = []

async def safe_append(val):
    async with lock:
        shared.append(val)

# Event - signal between coroutines
event = asyncio.Event()

async def waiter():
    print('waiting for event...')
    await event.wait()
    print('event fired!')

async def setter():
    await asyncio.sleep(1)
    event.set()

# Condition - wait for a condition with notification
condition = asyncio.Condition()
items = []

async def wait_for_items():
    async with condition:
        while not items:
            await condition.wait()
        return items.pop()

async def add_item(val):
    async with condition:
        items.append(val)
        condition.notify_all()

aiohttp HTTP Requests

aiohttp is the async alternative to requests. Install with pip install aiohttp:

Python
import asyncio
import aiohttp

async def fetch_json(session, url):
    async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
        resp.raise_for_status()
        return await resp.json()

async def main():
    async with aiohttp.ClientSession() as session:
        data = await fetch_json(session, 'https://api.example.com/users')
        print(data)

asyncio.run(main())

Fetching many URLs concurrently with rate limiting:

Python
import asyncio, aiohttp

async def fetch(session, sem, url):
    async with sem:
        try:
            async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as r:
                r.raise_for_status()
                return {'url': url, 'data': await r.json()}
        except aiohttp.ClientError as e:
            return {'url': url, 'error': str(e)}

async def fetch_all(urls, concurrency=20):
    sem = asyncio.Semaphore(concurrency)
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, sem, url) for url in urls]
        return await asyncio.gather(*tasks)

urls = [f'https://api.example.com/item/{i}' for i in range(100)]
results = asyncio.run(fetch_all(urls))

POST request with JSON body:

Python
import asyncio, aiohttp

async def create_user(name, email):
    async with aiohttp.ClientSession() as session:
        payload = {'name': name, 'email': email}
        headers = {'Authorization': 'Bearer my-token'}
        async with session.post(
            'https://api.example.com/users',
            json=payload,
            headers=headers,
        ) as resp:
            resp.raise_for_status()
            return await resp.json()

user = asyncio.run(create_user('Alice', 'alice@example.com'))
Reuse the ClientSession

Create one aiohttp.ClientSession per application and reuse it for all requests - it maintains a connection pool that dramatically reduces latency for repeated requests to the same host. Creating a new session per request defeats connection pooling. The async with aiohttp.ClientSession() pattern at the top of your main() is the standard pattern.

Frequently Asked Questions

Threading uses OS threads - multiple threads run concurrently, preempted by the OS. Python's GIL limits true parallelism for CPU-bound work. Asyncio is single-threaded cooperative multitasking - coroutines voluntarily yield control at await points. Asyncio is better for I/O-bound work (network, files) with thousands of concurrent operations; threading is better when you need true parallelism with existing blocking libraries.

await expr suspends the current coroutine and gives control back to the event loop, which can then run other coroutines. When the awaited expression (another coroutine, a Task, or a Future) completes, the event loop resumes the suspended coroutine with the result. Crucially, await only pauses the current coroutine - not the whole program.

asyncio.gather() is available since Python 3.4. asyncio.TaskGroup was added in Python 3.11. Use TaskGroup when possible - it has better error semantics: if one task raises, all other tasks are cancelled automatically (structured concurrency). gather() with return_exceptions=False cancels nothing on failure by default, which can leave tasks running.

Yes - you can call any regular (synchronous) function from an async function directly. Just don't call blocking I/O functions (like requests.get() or time.sleep()) without wrapping them, as they will block the entire event loop. Use asyncio.sleep() instead of time.sleep(), and loop.run_in_executor() to run blocking functions in a thread pool without blocking the event loop.

asyncio.run(coro) is the standard entry point for asyncio programs. It creates a new event loop, runs the given coroutine to completion, closes the loop, and returns the result. Use it at the top level of your program. Do not call it inside an already-running event loop - use await instead. In Jupyter notebooks, the event loop is already running, so use await coro directly.