A guide for Go developers learning Python's multiprocessing module.
- Process vs Goroutine
- Queue vs Channel
- Communication Patterns
- Event vs WaitGroup/Context
- Complete Example
- Key Gotchas
- When to Use What
- Data Serialization
// Lightweight goroutine (green thread)
go func() {
// runs concurrently
}()# Full OS process (heavier than goroutine)
from multiprocessing import Process
def worker():
# runs in separate process
pass
p = Process(target=worker)
p.start() # Like go keyword
p.join() # Wait for completion| Aspect | Go Goroutine | Python Process |
|---|---|---|
| Weight | ~2KB stack | ~10MB overhead |
| Scalability | Thousands easily | Dozens typically |
| Memory | Shared memory space | Separate memory space |
| Startup | Microseconds | Milliseconds |
| Use Case | Concurrency | True parallelism |
Why the difference?
- Python has the GIL (Global Interpreter Lock) - threads can't run Python code in parallel
- Processes bypass the GIL by running in separate memory spaces
- Each process is like a separate Python interpreter
// Buffered channel
ch := make(chan Job, 100)
// Send
ch <- job
// Receive
job := <-ch
// Close
close(ch)
// Iterate until closed
for job := range ch {
process(job)
}# Queue (similar to buffered channel)
from multiprocessing import Queue
q = Queue() # Unbounded by default
# Send (put)
q.put(job)
# Receive (get)
job = q.get() # Blocks until available
job = q.get(timeout=1.0) # With timeout
job = q.get_nowait() # Non-blocking (raises exception if empty)
# Close - use "poison pill" pattern
q.put(None) # Signal end
# Iterate until poison pill
while True:
job = q.get()
if job is None:
break
process(job)| Feature | Go Channel | Python Queue |
|---|---|---|
| Buffering | Optional (unbuffered or buffered) | Always buffered (infinite default) |
| Closing | close(ch) built-in |
Poison pill pattern (put(None)) |
| Iteration | for x := range ch |
Manual loop with break |
| Select | select statement |
No native equivalent |
| Implementation | Language primitive | Uses locks internally |
func worker(jobs <-chan Job, results chan<- Result) {
for job := range jobs { // Iterate until channel closed
result := process(job)
results <- result
}
}
func main() {
jobs := make(chan Job, 100)
results := make(chan Result, 100)
// Start workers
for i := 0; i < 4; i++ {
go worker(jobs, results)
}
// Send jobs
for _, job := range jobList {
jobs <- job
}
close(jobs) // Signal no more jobs
// Collect results
for i := 0; i < len(jobList); i++ {
result := <-results
handleResult(result)
}
}from multiprocessing import Process, Queue
def worker(job_queue, result_queue):
while True:
try:
job = job_queue.get(timeout=1.0)
if job is None: # Poison pill
break
result = process(job)
result_queue.put(result)
except:
break
def main():
job_queue = Queue()
result_queue = Queue()
# Start workers
workers = []
for i in range(4):
p = Process(target=worker, args=(job_queue, result_queue))
p.start()
workers.append(p)
# Send jobs
for job in job_list:
job_queue.put(job)
# Send poison pills (one per worker)
for _ in workers:
job_queue.put(None)
# Collect results
results = []
for _ in job_list:
results.append(result_queue.get())
# Wait for workers to finish
for p in workers:
p.join()// WaitGroup for synchronization
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
// work
}()
wg.Wait()
// Context for cancellation
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go func() {
select {
case <-ctx.Done():
return
case <-time.After(time.Second):
// work
}
}()
cancel() // Signal cancellationfrom multiprocessing import Event, Process
# Event for signaling (like context cancellation)
stop_event = Event()
def worker(stop_event):
while not stop_event.is_set():
# work
time.sleep(1)
print("Worker stopping...")
p = Process(target=worker, args=(stop_event,))
p.start()
# Signal stop (like cancel())
stop_event.set()
p.join()
# Check if set
if stop_event.is_set():
print("Event is set")
# Clear event
stop_event.clear()| Go | Python | Purpose |
|---|---|---|
sync.WaitGroup |
Process.join() |
Wait for completion |
context.Context |
Event |
Cancellation signal |
ctx.Done() |
event.is_set() |
Check if cancelled |
cancel() |
event.set() |
Trigger cancellation |
from multiprocessing import Queue, Process
import multiprocessing as mp
# Create queues (like Go channels)
job_queue = mp.Queue() # Jobs to process
result_queue = mp.Queue() # Results from worker
stop_event = mp.Event() # Signal to stop (like context.Done())
# Worker function (runs in separate process)
def indexing_worker(job_queue: Queue, result_queue: Queue, stop_event: mp.Event):
"""
Go equivalent:
func worker(jobs <-chan Job, results chan<- Result, ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case job, ok := <-jobs:
if !ok { return }
result := process(job)
results <- result
}
}
}
"""
while not stop_event.is_set(): # Like: for { select { case <-ctx.Done(): return }}
try:
job = job_queue.get(timeout=1.0) # Like: select { case job := <-jobCh: }
if job is None: # Poison pill (Go: if !ok after close(ch))
break
# Process job
result = process(job)
result_queue.put(result) # Like: resultCh <- result
except:
continue # Timeout or empty queue
# Start worker process (like: go worker())
worker_process = Process(
target=indexing_worker,
args=(job_queue, result_queue, stop_event),
daemon=True # Dies when parent dies (like goroutine)
)
worker_process.start()
# Submit job (like: jobCh <- job)
job_queue.put({"task": "index_pdf", "paper_id": "2301.12345"})
# Get result (like: result := <-resultCh)
result = result_queue.get(timeout=60.0)
# Stop worker (like: cancel())
stop_event.set() # Signal stop
job_queue.put(None) # Poison pill
worker_process.join(timeout=5.0) # Wait with timeout
# Force kill if still alive
if worker_process.is_alive():
worker_process.terminate()
worker_process.join(timeout=2.0)| Go | Python | Notes |
|---|---|---|
go func() |
Process(target=func) |
Much heavier in Python |
ch <- val |
queue.put(val) |
Always succeeds (unbounded) |
val := <-ch |
val = queue.get() |
Blocks until available |
val, ok := <-ch |
try: val = q.get_nowait() |
No built-in "ok" pattern |
close(ch) |
queue.put(None) |
Use poison pill pattern |
select {} |
queue.get(timeout=1.0) |
No native select |
| Shared memory | Separate memory | Must serialize data between processes |
sync.Mutex |
mp.Lock() |
Works across processes |
context.Context |
Event |
Simpler, just set/clear |
defer |
try/finally |
Different syntax, same concept |
# ❌ WRONG: Trying to share objects directly
class MyClass:
def __init__(self):
self.data = []
obj = MyClass()
def worker(obj):
obj.data.append(1) # Won't work! Separate memory space
p = Process(target=worker, args=(obj,))
p.start()
p.join()
print(obj.data) # Still empty! Changes happened in child process
# ✅ CORRECT: Use Queue to communicate
def worker(result_queue):
result_queue.put([1, 2, 3])
result_queue = Queue()
p = Process(target=worker, args=(result_queue,))
p.start()
data = result_queue.get()
p.join()
print(data) # [1, 2, 3]# Python threads DON'T run in parallel due to GIL
from threading import Thread
# This won't use multiple CPU cores for CPU-bound work
def cpu_intensive_work():
total = 0
for i in range(10_000_000):
total += i
return total
# ❌ These threads will run one at a time (GIL)
t1 = Thread(target=cpu_intensive_work)
t2 = Thread(target=cpu_intensive_work)
t1.start()
t2.start()
t1.join()
t2.join()
# ✅ These processes will run in parallel
from multiprocessing import Process
p1 = Process(target=cpu_intensive_work)
p2 = Process(target=cpu_intensive_work)
p1.start()
p2.start()
p1.join()
p2.join()| Use Case | Go | Python |
|---|---|---|
| CPU-bound work | go func() |
Process |
| I/O-bound work | go func() |
Thread or asyncio |
| Many concurrent I/O | go func() |
asyncio |
| Shared state | Channels + mutexes | Manager() or Queue |
# Python equivalents to Go patterns
# Go: Thousands of goroutines for I/O
# Python: Use asyncio
import asyncio
async def fetch_url(url):
# async I/O
pass
async def main():
tasks = [fetch_url(url) for url in urls]
await asyncio.gather(*tasks)
asyncio.run(main())
# Go: CPU-bound parallel work
# Python: Use multiprocessing
from multiprocessing import Pool
def process_data(item):
# CPU-intensive work
pass
with Pool(processes=4) as pool:
results = pool.map(process_data, items)Important: Unlike Go channels, Python queues serialize data using pickle.
# ✅ Picklable (can be sent through Queue)
queue.put(42) # Primitives
queue.put("string") # Strings
queue.put([1, 2, 3]) # Lists
queue.put({"key": "value"}) # Dicts
queue.put(MyClass()) # Class instances (if picklable)
# ❌ NOT Picklable (will raise exception)
queue.put(lambda x: x) # Lambdas
queue.put(open("file.txt")) # File handles
queue.put(threading.Lock()) # Thread locks
queue.put(socket.socket()) # Sockets// Go: Can send pointers, interfaces, anything
type MyStruct struct {
Data []int
Fn func()
}
ch <- &MyStruct{
Data: []int{1, 2, 3},
Fn: func() { fmt.Println("hello") },
}# Python: Must be serializable
from dataclasses import dataclass
@dataclass
class MyStruct:
data: list
# Can't include functions!
queue.put(MyStruct(data=[1, 2, 3])) # ✅ Worksclass MyClass:
def __init__(self, value):
self.value = value
self.lock = threading.Lock() # Not picklable!
def __getstate__(self):
# Called when pickling
state = self.__dict__.copy()
del state['lock'] # Remove unpicklable attribute
return state
def __setstate__(self, state):
# Called when unpickling
self.__dict__.update(state)
self.lock = threading.Lock() # Recreate lock
# Now it can be sent through Queue
queue.put(MyClass(42))# Method 1: Function
def worker(arg1, arg2):
print(f"Working with {arg1}, {arg2}")
p = Process(target=worker, args=("hello", 42))
p.start()
p.join()
# Method 2: Class
class Worker(Process):
def __init__(self, arg):
super().__init__()
self.arg = arg
def run(self):
print(f"Working with {self.arg}")
w = Worker("hello")
w.start()
w.join()from multiprocessing import Queue
q = Queue(maxsize=10) # Bounded queue
# Put (send)
q.put(item) # Block if full
q.put(item, block=False) # Raise exception if full
q.put(item, timeout=1.0) # Wait up to 1 second
# Get (receive)
item = q.get() # Block if empty
item = q.get(block=False) # Raise exception if empty
item = q.get(timeout=1.0) # Wait up to 1 second
# Check state
q.empty() # True if empty (unreliable in multiprocessing)
q.full() # True if full (unreliable in multiprocessing)
q.qsize() # Approximate sizep = Process(target=worker)
p.start() # Start process
p.join() # Wait for completion
p.join(timeout=5.0) # Wait with timeout
p.is_alive() # Check if running
p.terminate() # Send SIGTERM
p.kill() # Send SIGKILL (Python 3.7+)
p.pid # Process ID
p.exitcode # Exit code (None if running)Main Takeaways:
-
Python Process ≈ Go goroutine, but heavier
- Use for CPU-bound work
- Each process has separate memory
- Startup overhead is significant
-
Python Queue ≈ Go channel, but always buffered
- Data is serialized (pickled)
- No native
selectstatement - Use poison pill pattern for closing
-
Python Event ≈ Go Context
- Simpler than context
- Just set/clear/check
- Good for cancellation signals
-
Use the right tool:
- CPU-bound:
multiprocessing.Process - I/O-bound:
threading.Threadorasyncio - Many concurrent I/O:
asyncio
- CPU-bound:
-
Remember the GIL:
- Threads don't run Python code in parallel
- Processes bypass the GIL
- This is why multiprocessing exists