SSL/TLS Sockets#

Transport Layer Security (TLS), formerly known as Secure Sockets Layer (SSL), is the standard protocol for encrypting network communication. TLS provides three essential security properties: confidentiality (data is encrypted and cannot be read by eavesdroppers), integrity (data cannot be modified in transit without detection), and authentication (parties can verify each other’s identity using certificates). Every HTTPS connection, secure email, and VPN uses TLS under the hood.

Python’s ssl module provides a comprehensive interface for TLS, allowing you to wrap regular sockets with encryption. This section covers creating TLS-enabled servers and clients, configuring cipher suites for security compliance, handling X.509 certificates, implementing mutual TLS (mTLS) for client authentication, and building non-blocking TLS servers for high-performance applications. Whether you’re building a secure API server, implementing certificate pinning, or debugging TLS handshake issues, these examples provide the foundation you need.

Simple TLS Echo Server#

A basic TLS server wraps accepted TCP connections with an SSL context to provide encryption. The server requires a certificate (public key) and private key, which can be self-signed for testing or obtained from a Certificate Authority (CA) for production use. The SSLContext object manages all TLS settings including protocol version, cipher suites, and certificate verification options.

import socket
import ssl

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 5566))
sock.listen(10)

sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain(certfile='cert.pem', keyfile='key.pem')

try:
    while True:
        conn, addr = sock.accept()
        sslconn = sslctx.wrap_socket(conn, server_side=True)
        msg = sslconn.recv(1024)
        if msg:
            sslconn.send(msg)
        sslconn.close()
finally:
    sock.close()

Generate self-signed certificate and test:

# Generate private key and self-signed certificate
$ openssl genrsa -out key.pem 2048
$ openssl req -x509 -new -nodes -key key.pem -days 365 -out cert.pem

# Run server
$ python3 ssl_server.py &

# Test with openssl client
$ openssl s_client -connect localhost:5566
Hello SSL
Hello SSL

TLS Server with Cipher Configuration#

Configure specific cipher suites for security compliance or compatibility.

import socket
import ssl
import json

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 5566))
sock.listen(10)

sslctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
sslctx.load_cert_chain(certfile='cert.pem', keyfile='key.pem')

# Set specific ciphers (TLS 1.2)
sslctx.set_ciphers('ECDHE+AESGCM:DHE+AESGCM')

# Print configured ciphers
print(json.dumps(sslctx.get_ciphers(), indent=2))

try:
    while True:
        conn, addr = sock.accept()
        sslconn = sslctx.wrap_socket(conn, server_side=True)
        print(f"Cipher: {sslconn.cipher()}")
        msg = sslconn.recv(1024)
        if msg:
            sslconn.send(msg)
        sslconn.close()
finally:
    sock.close()

TLS Client#

Connect to a TLS server with certificate verification.

import socket
import ssl

hostname = 'www.google.com'
port = 443

# Create default SSL context (verifies certificates)
context = ssl.create_default_context()

with socket.create_connection((hostname, port)) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
        print(f"TLS version: {ssock.version()}")
        print(f"Cipher: {ssock.cipher()}")

        # Send HTTP request
        ssock.send(b"GET / HTTP/1.1\r\nHost: www.google.com\r\n\r\n")
        response = ssock.recv(4096)
        print(response[:200])

TLS Client with Custom CA#

Verify server certificate against a custom Certificate Authority.

import socket
import ssl

hostname = 'localhost'
port = 5566

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_verify_locations('ca-cert.pem')  # CA certificate
context.check_hostname = True
context.verify_mode = ssl.CERT_REQUIRED

with socket.create_connection((hostname, port)) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
        ssock.send(b"Hello")
        print(ssock.recv(1024))

Mutual TLS (mTLS)#

Both client and server present certificates for mutual authentication.

Server:

import socket
import ssl

sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 5566))
sock.listen(10)

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
context.load_cert_chain('server-cert.pem', 'server-key.pem')
context.load_verify_locations('ca-cert.pem')
context.verify_mode = ssl.CERT_REQUIRED  # Require client cert

try:
    while True:
        conn, addr = sock.accept()
        sslconn = context.wrap_socket(conn, server_side=True)
        # Get client certificate info
        cert = sslconn.getpeercert()
        print(f"Client: {cert.get('subject')}")
        msg = sslconn.recv(1024)
        if msg:
            sslconn.send(msg)
        sslconn.close()
finally:
    sock.close()

Client:

import socket
import ssl

context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
context.load_cert_chain('client-cert.pem', 'client-key.pem')
context.load_verify_locations('ca-cert.pem')

with socket.create_connection(('localhost', 5566)) as sock:
    with context.wrap_socket(sock, server_hostname='localhost') as ssock:
        ssock.send(b"Hello mTLS")
        print(ssock.recv(1024))

Non-blocking TLS with selectors#

Handle TLS handshake and I/O asynchronously using the selectors module.

import socket
import selectors
import ssl
from functools import partial

sslctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
sslctx.load_cert_chain(certfile="cert.pem", keyfile="key.pem")

def accept(sock, sel):
    conn, addr = sock.accept()
    sslconn = sslctx.wrap_socket(conn,
                                 server_side=True,
                                 do_handshake_on_connect=False)
    sel.register(sslconn, selectors.EVENT_READ, do_handshake)

def do_handshake(sslconn, sel):
    try:
        sslconn.do_handshake()
        sel.modify(sslconn, selectors.EVENT_READ, read)
    except ssl.SSLWantReadError:
        pass  # Need more data, wait for next event
    except ssl.SSLWantWriteError:
        sel.modify(sslconn, selectors.EVENT_WRITE, do_handshake)

def read(sslconn, sel):
    try:
        msg = sslconn.recv(1024)
        if msg:
            sel.modify(sslconn, selectors.EVENT_WRITE,
                      partial(write, msg=msg))
        else:
            sel.unregister(sslconn)
            sslconn.close()
    except ssl.SSLWantReadError:
        pass

def write(sslconn, sel, msg=None):
    try:
        if msg:
            sslconn.send(msg)
        sel.modify(sslconn, selectors.EVENT_READ, read)
    except ssl.SSLWantWriteError:
        pass

# Main server loop
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind(('localhost', 5566))
sock.listen(10)

sel = selectors.DefaultSelector()
sel.register(sock, selectors.EVENT_READ, accept)

try:
    while True:
        events = sel.select()
        for key, mask in events:
            handler = key.data
            handler(key.fileobj, sel)
except KeyboardInterrupt:
    pass
finally:
    sock.close()
    sel.close()

Get Certificate Information#

Retrieve and inspect server certificate details.

import socket
import ssl
import pprint

hostname = 'www.google.com'
port = 443

context = ssl.create_default_context()

with socket.create_connection((hostname, port)) as sock:
    with context.wrap_socket(sock, server_hostname=hostname) as ssock:
        cert = ssock.getpeercert()
        pprint.pprint(cert)

        # Get specific fields
        print(f"Subject: {dict(x[0] for x in cert['subject'])}")
        print(f"Issuer: {dict(x[0] for x in cert['issuer'])}")
        print(f"Not Before: {cert['notBefore']}")
        print(f"Not After: {cert['notAfter']}")

        # Get certificate in DER format
        der_cert = ssock.getpeercert(binary_form=True)
        print(f"Certificate size: {len(der_cert)} bytes")

TLS Version and Security Settings#

Configure minimum TLS version and security options.

import ssl

context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)

# Set minimum TLS version (TLS 1.2+)
context.minimum_version = ssl.TLSVersion.TLSv1_2

# Disable older protocols explicitly
context.options |= ssl.OP_NO_SSLv2
context.options |= ssl.OP_NO_SSLv3
context.options |= ssl.OP_NO_TLSv1
context.options |= ssl.OP_NO_TLSv1_1

# Disable compression (CRIME attack mitigation)
context.options |= ssl.OP_NO_COMPRESSION

# Use server's cipher preference
context.options |= ssl.OP_CIPHER_SERVER_PREFERENCE

# Load certificate
context.load_cert_chain('cert.pem', 'key.pem')

# Set strong ciphers only
context.set_ciphers('ECDHE+AESGCM:DHE+AESGCM:!aNULL:!MD5:!DSS')

print(f"Min version: {context.minimum_version}")
print(f"Ciphers: {len(context.get_ciphers())}")