Coroutines and async/await
An async def function is a coroutine function. Calling it returns a coroutine object - it does not run until awaited or scheduled as a Task:
import asyncio
async def greet(name, delay):
await asyncio.sleep(delay) # suspend here, let others run
print(f'Hello, {name}!')
return f'greeted {name}'
# asyncio.run() starts the event loop and runs the coroutine
async def main():
result = await greet('Alice', 1)
print(result)
asyncio.run(main())
Hello, Alice! greeted Alice
The key insight: await asyncio.sleep(1) suspends only the current coroutine, not the program. Other coroutines can run during that sleep. Compare sequential vs concurrent execution:
import asyncio, time
async def fetch_data(url, delay):
await asyncio.sleep(delay) # simulates network I/O
return f'data from {url}'
async def sequential():
start = time.perf_counter()
r1 = await fetch_data('api/users', 2)
r2 = await fetch_data('api/products', 3)
print(f'Sequential: {time.perf_counter() - start:.1f}s') # ~5s
async def concurrent():
start = time.perf_counter()
r1, r2 = await asyncio.gather(
fetch_data('api/users', 2),
fetch_data('api/products', 3),
)
print(f'Concurrent: {time.perf_counter() - start:.1f}s') # ~3s
asyncio.run(sequential())
asyncio.run(concurrent())
Sequential: 5.0s Concurrent: 3.0s
Calling time.sleep(1), requests.get(url), or any blocking I/O inside an async function freezes the entire event loop - no other coroutines can run. Use asyncio.sleep() and async libraries (aiohttp, aiofiles). For unavoidable blocking calls, use await loop.run_in_executor(None, blocking_func) to offload to a thread pool.
The Event Loop
The event loop is the scheduler that runs coroutines. asyncio.run() is the standard way to start it. For advanced use, you can get and control the loop directly:
import asyncio
async def main():
loop = asyncio.get_event_loop()
print(f'running: {loop.is_running()}') # True
# Schedule a callback (non-coroutine, fire-and-forget)
loop.call_soon(print, 'scheduled callback')
# Run a blocking function in a thread pool
import time
result = await loop.run_in_executor(None, time.sleep, 0.1)
# Get current time from event loop (avoids system call)
now = loop.time()
print(f'loop time: {now:.3f}')
asyncio.run(main())
Running async code from synchronous code (e.g., inside a framework that doesn't support async):
import asyncio
async def fetch():
await asyncio.sleep(0.1)
return 42
# From synchronous code - creates a new event loop
result = asyncio.run(fetch())
print(result) # 42
# From a thread that already has an event loop running (e.g., Jupyter)
# asyncio.run() would raise RuntimeError: "cannot run nested event loop"
# Instead, install nest_asyncio: pip install nest_asyncio
# import nest_asyncio; nest_asyncio.apply()
# asyncio.get_event_loop() is deprecated for top-level use; use asyncio.run()
Tasks and gather()
A Task wraps a coroutine and schedules it to run concurrently. asyncio.create_task() schedules immediately; asyncio.gather() creates multiple tasks and waits for all:
import asyncio
async def worker(n, delay):
print(f'worker {n} starting')
await asyncio.sleep(delay)
print(f'worker {n} done')
return n * 10
async def main():
# create_task schedules immediately - runs concurrently
task1 = asyncio.create_task(worker(1, 2))
task2 = asyncio.create_task(worker(2, 1))
# Tasks are already running - await just waits for completion
result1 = await task1
result2 = await task2
print(result1, result2) # 10 20
asyncio.run(main())
worker 1 starting worker 2 starting worker 2 done worker 1 done 10 20
import asyncio
async def main():
# gather() runs all coroutines concurrently, returns list of results
results = await asyncio.gather(
worker(1, 2),
worker(2, 1),
worker(3, 3),
)
print(results) # [10, 20, 30] - in submission order, not completion order
# return_exceptions=True - failed tasks return exception objects instead of raising
results = await asyncio.gather(
worker(1, 1),
asyncio.sleep(0, result=ZeroDivisionError('bad')), # simulated failure
return_exceptions=True
)
for r in results:
if isinstance(r, Exception):
print(f'task failed: {r}')
else:
print(f'task result: {r}')
Cancelling tasks:
import asyncio
async def long_task():
try:
await asyncio.sleep(100)
except asyncio.CancelledError:
print('task was cancelled - cleaning up')
raise # always reraise CancelledError
async def main():
task = asyncio.create_task(long_task())
await asyncio.sleep(1)
task.cancel()
try:
await task
except asyncio.CancelledError:
print('confirmed cancelled')
asyncio.run(main())
Waiting with timeout using asyncio.wait_for():
import asyncio
async def slow_fetch():
await asyncio.sleep(10)
return 'data'
async def main():
try:
result = await asyncio.wait_for(slow_fetch(), timeout=2.0)
except asyncio.TimeoutError:
print('request timed out')
# Python 3.11+ - asyncio.timeout context manager
try:
async with asyncio.timeout(2.0):
result = await slow_fetch()
except asyncio.TimeoutError:
print('request timed out')
asyncio.run(main())
TaskGroup (Python 3.11+)
asyncio.TaskGroup provides structured concurrency - if any task fails, all others are automatically cancelled:
import asyncio
async def fetch(url, delay):
await asyncio.sleep(delay)
return f'response from {url}'
async def failing_task():
await asyncio.sleep(0.5)
raise ValueError('something went wrong')
async def main():
results = []
# All tasks succeed
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch('api/users', 1))
t2 = tg.create_task(fetch('api/orders', 2))
t3 = tg.create_task(fetch('api/products', 1))
# All three tasks are complete here
print(t1.result(), t2.result(), t3.result())
# One task fails - others are cancelled
try:
async with asyncio.TaskGroup() as tg:
t1 = tg.create_task(fetch('api/users', 3))
t2 = tg.create_task(failing_task()) # fails after 0.5s
except* ValueError as eg: # ExceptionGroup - Python 3.11+
print(f'Errors: {eg.exceptions}')
asyncio.run(main())
When multiple tasks fail inside a TaskGroup, Python raises an ExceptionGroup containing all exceptions. Use except* ExceptionType (the starred form, Python 3.11+) to handle groups. except* ValueError catches the group if it contains any ValueError instances; other exception types propagate.
Async I/O Patterns
Async file I/O with aiofiles
import asyncio
import aiofiles # pip install aiofiles
async def read_file(path):
async with aiofiles.open(path, 'r') as f:
return await f.read()
async def write_file(path, content):
async with aiofiles.open(path, 'w') as f:
await f.write(content)
async def main():
# Read multiple files concurrently
contents = await asyncio.gather(
read_file('file1.txt'),
read_file('file2.txt'),
read_file('file3.txt'),
)
combined = '\n'.join(contents)
await write_file('combined.txt', combined)
Semaphore - limit concurrency
import asyncio
async def fetch_with_limit(sem, url):
async with sem: # only N concurrent fetches at a time
await asyncio.sleep(0.1) # simulates HTTP request
return f'data from {url}'
async def main():
sem = asyncio.Semaphore(10) # max 10 concurrent requests
urls = [f'api/item/{i}' for i in range(100)]
tasks = [fetch_with_limit(sem, url) for url in urls]
results = await asyncio.gather(*tasks)
print(f'fetched {len(results)} items')
Async Queue - producer/consumer
import asyncio
async def producer(queue, n):
for i in range(n):
await asyncio.sleep(0.1)
await queue.put(i)
print(f'produced {i}')
await queue.put(None) # sentinel
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
print(f'consumed {item}')
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=5)
await asyncio.gather(
producer(queue, 10),
consumer(queue),
)
Synchronisation Primitives
asyncio provides async versions of threading synchronisation tools. Use these - not their threading equivalents - in async code:
import asyncio
# Lock - mutual exclusion
lock = asyncio.Lock()
shared = []
async def safe_append(val):
async with lock:
shared.append(val)
# Event - signal between coroutines
event = asyncio.Event()
async def waiter():
print('waiting for event...')
await event.wait()
print('event fired!')
async def setter():
await asyncio.sleep(1)
event.set()
# Condition - wait for a condition with notification
condition = asyncio.Condition()
items = []
async def wait_for_items():
async with condition:
while not items:
await condition.wait()
return items.pop()
async def add_item(val):
async with condition:
items.append(val)
condition.notify_all()
aiohttp HTTP Requests
aiohttp is the async alternative to requests. Install with pip install aiohttp:
import asyncio
import aiohttp
async def fetch_json(session, url):
async with session.get(url, timeout=aiohttp.ClientTimeout(total=10)) as resp:
resp.raise_for_status()
return await resp.json()
async def main():
async with aiohttp.ClientSession() as session:
data = await fetch_json(session, 'https://api.example.com/users')
print(data)
asyncio.run(main())
Fetching many URLs concurrently with rate limiting:
import asyncio, aiohttp
async def fetch(session, sem, url):
async with sem:
try:
async with session.get(url, timeout=aiohttp.ClientTimeout(total=5)) as r:
r.raise_for_status()
return {'url': url, 'data': await r.json()}
except aiohttp.ClientError as e:
return {'url': url, 'error': str(e)}
async def fetch_all(urls, concurrency=20):
sem = asyncio.Semaphore(concurrency)
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, sem, url) for url in urls]
return await asyncio.gather(*tasks)
urls = [f'https://api.example.com/item/{i}' for i in range(100)]
results = asyncio.run(fetch_all(urls))
POST request with JSON body:
import asyncio, aiohttp
async def create_user(name, email):
async with aiohttp.ClientSession() as session:
payload = {'name': name, 'email': email}
headers = {'Authorization': 'Bearer my-token'}
async with session.post(
'https://api.example.com/users',
json=payload,
headers=headers,
) as resp:
resp.raise_for_status()
return await resp.json()
user = asyncio.run(create_user('Alice', 'alice@example.com'))
Create one aiohttp.ClientSession per application and reuse it for all requests - it maintains a connection pool that dramatically reduces latency for repeated requests to the same host. Creating a new session per request defeats connection pooling. The async with aiohttp.ClientSession() pattern at the top of your main() is the standard pattern.