Multiprocessing#
- Source:
Introduction#
The multiprocessing module enables true parallel execution by spawning
separate Python processes, each with its own Python interpreter and memory
space. Unlike threads, processes bypass the Global Interpreter Lock (GIL),
making multiprocessing ideal for CPU-bound tasks that need to utilize multiple
CPU cores. The trade-off is higher overhead for process creation and
inter-process communication compared to threads.
Creating Processes#
Creating processes is similar to creating threads. Each process runs in its
own memory space, so changes to variables in one process don’t affect others.
Use start() to begin execution and join() to wait for completion.
from multiprocessing import Process
import os
def worker(name):
print(f"Worker {name}, PID: {os.getpid()}")
if __name__ == "__main__":
processes = []
for i in range(4):
p = Process(target=worker, args=(i,))
processes.append(p)
p.start()
for p in processes:
p.join()
print(f"Main process PID: {os.getpid()}")
Process Pool#
A Pool manages a collection of worker processes and distributes tasks among
them. This is more efficient than creating a new process for each task, as
processes are reused. The pool provides methods like map(), apply(),
and their async variants for different use cases.
from multiprocessing import Pool
import time
def cpu_intensive(n):
"""Simulate CPU-bound work."""
total = 0
for i in range(n):
total += i * i
return total
if __name__ == "__main__":
numbers = [10**6, 10**6, 10**6, 10**6]
# Sequential execution
start = time.time()
results = [cpu_intensive(n) for n in numbers]
print(f"Sequential: {time.time() - start:.2f}s")
# Parallel execution with Pool
start = time.time()
with Pool(4) as pool:
results = pool.map(cpu_intensive, numbers)
print(f"Parallel: {time.time() - start:.2f}s")
Pool Methods#
The Pool class provides several methods for distributing work. map() applies
a function to each item in an iterable and returns results in order. apply()
calls a function with arguments and blocks until complete. The _async
variants return immediately with an AsyncResult object.
from multiprocessing import Pool
def square(x):
return x * x
def add(a, b):
return a + b
if __name__ == "__main__":
with Pool(4) as pool:
# map - apply function to iterable
results = pool.map(square, range(10))
print(f"map: {results}")
# starmap - unpack arguments from iterable
pairs = [(1, 2), (3, 4), (5, 6)]
results = pool.starmap(add, pairs)
print(f"starmap: {results}")
# apply_async - non-blocking single call
result = pool.apply_async(square, (10,))
print(f"apply_async: {result.get()}")
# map_async - non-blocking map
result = pool.map_async(square, range(5))
print(f"map_async: {result.get()}")
Process Synchronization#
Multiprocessing provides the same synchronization primitives as threading:
Lock, RLock, Semaphore, Event, Condition, and Barrier.
These work across processes instead of threads.
from multiprocessing import Process, Lock, Value
def safe_increment(counter, lock):
for _ in range(10000):
with lock:
counter.value += 1
if __name__ == "__main__":
lock = Lock()
counter = Value('i', 0)
processes = [
Process(target=safe_increment, args=(counter, lock))
for _ in range(4)
]
for p in processes:
p.start()
for p in processes:
p.join()
print(f"Counter: {counter.value}") # 40000
Daemon Processes#
Like daemon threads, daemon processes are terminated when the main process
exits. They’re useful for background tasks that shouldn’t prevent program
termination. Set daemon=True before calling start().
from multiprocessing import Process
import time
def background_task():
while True:
print("Background process running...")
time.sleep(1)
if __name__ == "__main__":
p = Process(target=background_task, daemon=True)
p.start()
time.sleep(3)
print("Main process exiting, daemon will be terminated")
Handling Process Termination#
Processes can be terminated gracefully using terminate() or forcefully
using kill(). Always clean up resources properly and consider using
signals for graceful shutdown in production code.
from multiprocessing import Process
import time
import signal
def long_running_task():
try:
while True:
print("Working...")
time.sleep(1)
except KeyboardInterrupt:
print("Graceful shutdown")
if __name__ == "__main__":
p = Process(target=long_running_task)
p.start()
time.sleep(3)
# Graceful termination (SIGTERM)
p.terminate()
p.join(timeout=2)
# Force kill if still alive
if p.is_alive():
p.kill()
p.join()
print(f"Exit code: {p.exitcode}")
ProcessPoolExecutor#
concurrent.futures.ProcessPoolExecutor provides a higher-level interface
for process pools that’s consistent with ThreadPoolExecutor. It’s often
easier to use than multiprocessing.Pool and integrates well with the
futures pattern.
from concurrent.futures import ProcessPoolExecutor, as_completed
def compute(n):
return sum(i * i for i in range(n))
if __name__ == "__main__":
numbers = [10**6, 10**6, 10**6, 10**6]
with ProcessPoolExecutor(max_workers=4) as executor:
# Submit individual tasks
futures = [executor.submit(compute, n) for n in numbers]
# Process results as they complete
for future in as_completed(futures):
print(f"Result: {future.result()}")
# Or use map for ordered results
results = list(executor.map(compute, numbers))
print(f"All results: {results}")
Comparing Threads vs Processes#
Choose threads for I/O-bound tasks (network, file I/O) where the GIL is released during waiting. Choose processes for CPU-bound tasks that need true parallelism. This example demonstrates the performance difference.
from threading import Thread
from multiprocessing import Process, Pool
import time
def cpu_bound(n):
"""CPU-intensive task."""
return sum(i * i for i in range(n))
if __name__ == "__main__":
n = 10**7
count = 4
# Sequential
start = time.time()
for _ in range(count):
cpu_bound(n)
print(f"Sequential: {time.time() - start:.2f}s")
# Threads (limited by GIL)
start = time.time()
threads = [Thread(target=cpu_bound, args=(n,)) for _ in range(count)]
for t in threads:
t.start()
for t in threads:
t.join()
print(f"Threads: {time.time() - start:.2f}s")
# Processes (true parallelism)
start = time.time()
with Pool(count) as pool:
pool.map(cpu_bound, [n] * count)
print(f"Processes: {time.time() - start:.2f}s")