The with Statement
The with statement guarantees that cleanup code runs - even when exceptions occur. Without it, every file open needs a matching close in a finally block.
# Old way - easy to forget close, especially on exception
f = open('data.txt', 'r')
try:
content = f.read()
finally:
f.close() # must always call this
# With statement - automatic cleanup
with open('data.txt', 'r') as f:
content = f.read()
# f is closed here automatically, even if read() raises
Multiple context managers can be combined in one with statement. They are entered left-to-right and exited right-to-left:
# Multiple context managers on one line
with open('input.txt') as src, open('output.txt', 'w') as dst:
dst.write(src.read())
# Equivalent nested form
with open('input.txt') as src:
with open('output.txt', 'w') as dst:
dst.write(src.read())
# Multi-line form (parenthesized, Python 3.10+)
with (
open('input.txt') as src,
open('output.txt', 'w') as dst,
):
dst.write(src.read())
The as clause captures the return value of __enter__. It is optional - some context managers are used only for side effects:
import threading
lock = threading.Lock()
# as clause not needed - lock just needs acquire/release
with lock:
shared_resource += 1
# as clause useful when __enter__ returns a useful object
with open('data.json') as f: # f is the file object returned by __enter__
data = json.load(f)
__enter__ and __exit__
Any object with __enter__ and __exit__ methods works as a context manager. This is the class-based approach:
class ManagedFile:
def __init__(self, path, mode='r'):
self.path = path
self.mode = mode
self.file = None
def __enter__(self):
self.file = open(self.path, self.mode)
return self.file # value bound to 'as' variable
def __exit__(self, exc_type, exc_val, exc_tb):
if self.file:
self.file.close()
# Return False (or None) to propagate exceptions
# Return True to suppress exceptions
return False
with ManagedFile('data.txt') as f:
print(f.read())
The __exit__ method receives exception info when the block raises. Returning a truthy value suppresses the exception:
class SuppressValueError:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is ValueError:
print(f'Suppressed ValueError: {exc_val}')
return True # suppress - execution continues after with block
return False # reraise all other exceptions
with SuppressValueError():
raise ValueError('bad input') # suppressed
print('this is not reached')
print('execution continues here') # this runs
Suppressed ValueError: bad input execution continues here
A database transaction manager is a classic example - commit on success, rollback on failure:
class Transaction:
def __init__(self, connection):
self.conn = connection
def __enter__(self):
self.conn.begin()
return self.conn
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is None:
self.conn.commit()
else:
self.conn.rollback()
return False # always reraise exceptions
# Usage
with Transaction(db_conn) as conn:
conn.execute('UPDATE users SET balance = balance - 100 WHERE id = 1')
conn.execute('UPDATE users SET balance = balance + 100 WHERE id = 2')
# commits if both succeed, rolls back if either raises
contextlib.contextmanager
The @contextmanager decorator turns a generator function into a context manager. Code before yield is setup; code after yield is teardown. The yielded value becomes the as target:
from contextlib import contextmanager
@contextmanager
def managed_file(path, mode='r'):
f = open(path, mode)
try:
yield f # execution of the with block happens here
finally:
f.close() # always runs - even on exception
with managed_file('data.txt') as f:
print(f.read())
The generator-based form maps directly to __enter__/__exit__: everything before yield is __enter__, the yielded value is returned by __enter__, and everything after (in the finally) is __exit__.
Handling exceptions in a @contextmanager:
from contextlib import contextmanager
@contextmanager
def safe_open(path):
try:
f = open(path)
yield f
except FileNotFoundError:
print(f'File {path!r} not found - using empty string')
yield '' # NOT VALID - can only yield once
finally:
f.close()
# Correct pattern: catch inside, yield once
@contextmanager
def safe_section(label):
print(f'[{label}] starting')
try:
yield
except Exception as e:
print(f'[{label}] failed: {e}')
raise # reraise after logging
else:
print(f'[{label}] success')
finally:
print(f'[{label}] done')
with safe_section('import'):
import json
[import] starting [import] success [import] done
A @contextmanager generator must yield exactly once. Yielding zero times raises RuntimeError on entry; yielding more than once raises RuntimeError on exit. Use try/finally instead of try/except/yield patterns that might yield multiple times.
Practical timer context manager:
import time
from contextlib import contextmanager
@contextmanager
def timer(label=''):
start = time.perf_counter()
try:
yield
finally:
elapsed = time.perf_counter() - start
print(f'{label}: {elapsed:.4f}s')
with timer('sorting 1M items'):
data = sorted(range(1_000_000), reverse=True)
sorting 1M items: 0.1823s
Temporary directory that cleans up automatically:
import tempfile, shutil, os
from contextlib import contextmanager
@contextmanager
def temp_directory():
path = tempfile.mkdtemp()
try:
yield path
finally:
shutil.rmtree(path, ignore_errors=True)
with temp_directory() as tmpdir:
config_path = os.path.join(tmpdir, 'config.json')
with open(config_path, 'w') as f:
f.write('{"debug": true}')
# process config...
# tmpdir and all its contents are deleted here
contextlib Utilities
The contextlib module includes several ready-made context managers and helpers:
contextlib.suppress
Silently swallows the specified exceptions - cleaner than a try/except/pass block:
from contextlib import suppress
import os
# Without suppress
try:
os.remove('temp.txt')
except FileNotFoundError:
pass # ok if already gone
# With suppress - same effect, more readable
with suppress(FileNotFoundError):
os.remove('temp.txt')
# Multiple exception types
with suppress(FileNotFoundError, PermissionError):
os.remove('locked_file.txt')
contextlib.redirect_stdout / redirect_stderr
Redirects stdout or stderr to any file-like object - useful for capturing output in tests:
from contextlib import redirect_stdout
import io
# Capture print() output
buffer = io.StringIO()
with redirect_stdout(buffer):
print('hello')
print('world')
output = buffer.getvalue()
print(repr(output)) # 'hello\nworld\n'
# Redirect to a file
with open('output.log', 'w') as f:
with redirect_stdout(f):
print('this goes to the file')
contextlib.ExitStack
ExitStack manages a dynamic number of context managers - useful when the count is not known at compile time:
from contextlib import ExitStack
filenames = ['a.txt', 'b.txt', 'c.txt']
# Open a dynamic number of files
with ExitStack() as stack:
files = [stack.enter_context(open(f)) for f in filenames]
contents = [f.read() for f in files]
# all files closed here
# Also useful for conditional context managers
def process(path, use_lock=False):
lock = threading.Lock()
with ExitStack() as stack:
if use_lock:
stack.enter_context(lock)
f = stack.enter_context(open(path))
return f.read()
contextlib.nullcontext
nullcontext is a no-op context manager - useful as a default when a context manager is optional:
from contextlib import nullcontext
import threading
def process_data(data, lock=None):
ctx = lock if lock is not None else nullcontext()
with ctx:
# do work - same code path whether lock is given or not
return [x * 2 for x in data]
# No lock
result = process_data([1, 2, 3])
# With lock
my_lock = threading.Lock()
result = process_data([1, 2, 3], lock=my_lock)
Practical Patterns
Managed Database Connection
from contextlib import contextmanager
import sqlite3
@contextmanager
def get_db(db_path):
conn = sqlite3.connect(db_path)
conn.row_factory = sqlite3.Row
try:
yield conn
conn.commit()
except Exception:
conn.rollback()
raise
finally:
conn.close()
with get_db('app.db') as db:
db.execute('INSERT INTO users (name) VALUES (?)', ('Alice',))
rows = db.execute('SELECT * FROM users').fetchall()
Thread-Safe Counter
import threading
from contextlib import contextmanager
class Counter:
def __init__(self):
self._value = 0
self._lock = threading.Lock()
@contextmanager
def locked(self):
with self._lock:
yield self
def increment(self):
with self.locked() as c:
c._value += 1
@property
def value(self):
with self._lock:
return self._value
Changing Directory Temporarily
import os
from contextlib import contextmanager
@contextmanager
def chdir(path):
original = os.getcwd()
os.chdir(path)
try:
yield
finally:
os.chdir(original)
# Python 3.11+: contextlib.chdir() is built-in
# from contextlib import chdir
with chdir('/tmp'):
os.system('ls') # runs in /tmp
# back to original directory
Environment Variable Patch (for testing)
import os
from contextlib import contextmanager
@contextmanager
def env(**kwargs):
old = {k: os.environ.get(k) for k in kwargs}
os.environ.update({k: v for k, v in kwargs.items() if v is not None})
for k, v in kwargs.items():
if v is None:
os.environ.pop(k, None)
try:
yield
finally:
for k, v in old.items():
if v is None:
os.environ.pop(k, None)
else:
os.environ[k] = v
with env(DEBUG='true', API_KEY='test-key'):
# tests run with these env vars set
assert os.environ['DEBUG'] == 'true'
# original env is restored
contextlib.contextmanager-decorated generators can also be used as function decorators (they implement __call__). Write @timer('my_func') above a function definition and the function body becomes the with-block. Use this for consistent profiling or logging across many functions.
Async Context Managers
Async context managers use __aenter__ and __aexit__ and are used with async with. They allow async setup/teardown - useful for async database pools, HTTP sessions, and locks:
import asyncio
import aiohttp
# aiohttp.ClientSession is an async context manager
async def fetch(url):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
return await response.text()
# Class-based async context manager
class AsyncDB:
async def __aenter__(self):
self.conn = await connect_to_db()
return self.conn
async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.conn.close()
return False
async def main():
async with AsyncDB() as db:
rows = await db.fetchall('SELECT * FROM users')
The @asynccontextmanager decorator from contextlib creates async context managers from async generators:
from contextlib import asynccontextmanager
import asyncio
@asynccontextmanager
async def managed_task(coro):
task = asyncio.create_task(coro)
try:
yield task
finally:
if not task.done():
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
async def background_work():
while True:
await asyncio.sleep(1)
print('working...')
async def main():
async with managed_task(background_work()) as task:
await asyncio.sleep(3) # let it run for 3s
# task is cancelled here
For managing a dynamic set of async context managers, use contextlib.AsyncExitStack. It is the async counterpart of ExitStack and supports await stack.enter_async_context(cm) alongside regular stack.enter_context(cm) in the same stack.