Async Socket I/O#
Asynchronous I/O is essential for building high-performance network servers that can handle thousands of concurrent connections efficiently. Traditional blocking I/O requires one thread per connection, which doesn’t scale well due to memory overhead and context switching costs. Asynchronous I/O solves this by allowing a single thread to handle multiple connections using I/O multiplexing—the program monitors multiple sockets simultaneously and processes whichever ones are ready for reading or writing.
This section covers the evolution of I/O multiplexing in Python, from the classic
select() system call (portable but limited) to modern high-performance mechanisms
like epoll (Linux) and kqueue (BSD/macOS) that can efficiently handle tens of
thousands of connections. We also cover the selectors module, which provides a
high-level, platform-independent interface that automatically uses the best available
mechanism. Understanding these primitives is valuable even if you use higher-level
frameworks like asyncio, as they build upon these same concepts.
Async TCP Server - select#
select() is the oldest and most portable I/O multiplexing mechanism, available on
virtually all platforms including Windows, Linux, and macOS. It monitors file descriptors
for three conditions: readability (data available to read), writability (buffer space
available to write), and exceptional conditions (errors). While portable, select()
has limitations: it typically supports only up to 1024 file descriptors and has O(n)
performance as it must scan all monitored descriptors on each call.
from select import select
import socket
host = ('localhost', 5566)
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(host)
sock.listen(5)
read_list = [sock]
write_list = []
messages = {}
try:
while True:
readable, writable, _ = select(read_list, write_list, [])
for s in readable:
if s == sock:
conn, addr = sock.accept()
read_list.append(conn)
else:
msg = s.recv(1024)
if msg:
messages[s.fileno()] = msg
write_list.append(s)
else:
read_list.remove(s)
s.close()
for s in writable:
msg = messages.pop(s.fileno(), None)
if msg:
s.send(msg)
write_list.remove(s)
except KeyboardInterrupt:
sock.close()
Async TCP Server - poll#
poll() is similar to select() but more efficient for large numbers of
file descriptors. Available on Unix systems.
import socket
import select
import contextlib
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, poll):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
poll.register(fd, select.POLLIN)
requests[fd] = conn
connections[fd] = conn
def recv(fd, poll):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
poll.modify(fd, select.POLLOUT)
else:
poll.unregister(fd)
conn.close()
connections.pop(fd, None)
def send(fd, poll):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
requests[fd] = conn
poll.modify(fd, select.POLLIN)
with create_server(host, port) as server:
poll = select.poll()
poll.register(server.fileno(), select.POLLIN)
try:
while True:
events = poll.poll(1000)
for fd, event in events:
if fd == server.fileno():
accept(server, poll)
elif event & (select.POLLIN | select.POLLPRI):
recv(fd, poll)
elif event & select.POLLOUT:
send(fd, poll)
except KeyboardInterrupt:
pass
Async TCP Server - epoll#
epoll is Linux-specific and the most efficient for handling thousands of
connections. It uses edge-triggered or level-triggered notifications.
import socket
import select
import contextlib
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, epoll):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
epoll.register(fd, select.EPOLLIN)
requests[fd] = conn
connections[fd] = conn
def recv(fd, epoll):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
epoll.modify(fd, select.EPOLLOUT)
else:
epoll.unregister(fd)
conn.close()
connections.pop(fd, None)
def send(fd, epoll):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
requests[fd] = conn
epoll.modify(fd, select.EPOLLIN)
with create_server(host, port) as server:
epoll = select.epoll()
epoll.register(server.fileno(), select.EPOLLIN)
try:
while True:
events = epoll.poll(1)
for fd, event in events:
if fd == server.fileno():
accept(server, epoll)
elif event & select.EPOLLIN:
recv(fd, epoll)
elif event & select.EPOLLOUT:
send(fd, epoll)
except KeyboardInterrupt:
pass
finally:
epoll.close()
Async TCP Server - kqueue#
kqueue is the BSD/macOS equivalent of epoll, providing efficient event
notification for large numbers of file descriptors.
import socket
import select
import contextlib
if not hasattr(select, 'kqueue'):
print("kqueue not supported on this platform")
exit(1)
host = 'localhost'
port = 5566
connections = {}
requests = {}
responses = {}
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.setblocking(False)
s.bind((host, port))
s.listen(10)
try:
yield s
finally:
s.close()
def accept(server, kq):
conn, addr = server.accept()
conn.setblocking(False)
fd = conn.fileno()
ke = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke], 0)
requests[fd] = conn
connections[fd] = conn
def recv(fd, kq):
conn = requests.pop(fd, None)
if not conn:
return
msg = conn.recv(1024)
if msg:
responses[fd] = msg
# Switch from read to write
ke_del = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
ke_add = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_ADD)
kq.control([ke_del, ke_add], 0)
requests[fd] = conn
else:
ke = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_DELETE)
kq.control([ke], 0)
conn.close()
connections.pop(fd, None)
def send(fd, kq):
conn = connections.get(fd)
msg = responses.pop(fd, None)
if conn and msg:
conn.send(msg)
# Switch from write to read
ke_del = select.kevent(fd, select.KQ_FILTER_WRITE, select.KQ_EV_DELETE)
ke_add = select.kevent(fd, select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke_del, ke_add], 0)
requests[fd] = conn
with create_server(host, port) as server:
kq = select.kqueue()
ke = select.kevent(server.fileno(), select.KQ_FILTER_READ, select.KQ_EV_ADD)
kq.control([ke], 0)
try:
while True:
events = kq.control(None, 1024, 1)
for e in events:
fd = e.ident
if fd == server.fileno():
accept(server, kq)
elif e.filter == select.KQ_FILTER_READ:
recv(fd, kq)
elif e.filter == select.KQ_FILTER_WRITE:
send(fd, kq)
except KeyboardInterrupt:
pass
finally:
kq.close()
High-Level API - selectors#
The selectors module (Python 3.4+) provides a high-level, platform-independent
interface that automatically uses the best available mechanism (epoll, kqueue, etc.).
import selectors
import socket
import contextlib
@contextlib.contextmanager
def create_server(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
s.bind((host, port))
s.listen(10)
sel = selectors.DefaultSelector()
try:
yield s, sel
finally:
s.close()
sel.close()
def accept_handler(sock, sel):
conn, addr = sock.accept()
sel.register(conn, selectors.EVENT_READ, read_handler)
def read_handler(conn, sel):
msg = conn.recv(1024)
if msg:
conn.send(msg)
else:
sel.unregister(conn)
conn.close()
host = 'localhost'
port = 5566
with create_server(host, port) as (sock, sel):
sel.register(sock, selectors.EVENT_READ, accept_handler)
try:
while True:
events = sel.select()
for key, mask in events:
handler = key.data
handler(key.fileobj, sel)
except KeyboardInterrupt:
pass
Comparison of I/O Multiplexing Methods#
Method |
Platform |
Scalability |
Notes |
|---|---|---|---|
select |
All |
O(n) - Limited |
Max ~1024 FDs |
poll |
Unix |
O(n) - Better |
No FD limit |
epoll |
Linux |
O(1) - Excellent |
Edge/level triggered |
kqueue |
BSD/macOS |
O(1) - Excellent |
Similar to epoll |
selectors |
All |
Best available |
Recommended for new code |
Note
For new code, use the selectors module or asyncio for async I/O.
The low-level APIs (select, poll, epoll, kqueue) are mainly useful for
understanding how async I/O works or when you need fine-grained control.