Skip to content

Instantly share code, notes, and snippets.

@Michaelliv
Created September 15, 2020 19:57
Show Gist options
  • Select an option

  • Save Michaelliv/7c8e14f57de08f410e063139841e14ac to your computer and use it in GitHub Desktop.

Select an option

Save Michaelliv/7c8e14f57de08f410e063139841e14ac to your computer and use it in GitHub Desktop.
import multiprocessing as mp
from typing import Dict
POISON = "POISON"
LOCK = mp.Lock()
class MultiprocessTermCounter:
def __init__(self, num_workers: int = mp.cpu_count()):
self.queue = mp.Queue()
self.process_pool = []
self.counter = mp.Manager().dict()
# append [num_workers] processes to the pool
for i in range(num_workers):
self.process_pool.append(mp.Process(target=self.count_terms, args=(i, self.queue, self.counter)))
[t.start() for t in self.process_pool]
def consume(self, text: str):
self.queue.put(text)
@staticmethod
def count_terms(worker_id: int, queue: mp.Queue, mp_counter: Dict):
dict_counter = dict()
print(f"worker #{worker_id}({mp.current_process().pid})")
while mp.current_process().is_alive():
text = queue.get()
if text == POISON:
queue.put(POISON)
with LOCK:
for k, v in dict_counter.items():
if k in mp_counter:
mp_counter[k] = mp_counter[k] + v
else:
mp_counter[k] = v
return
terms = text.lower().split(' ')
for w in terms:
if w in dict_counter:
dict_counter[w] = dict_counter[w] + 1
else:
dict_counter[w] = 1
def join(self):
[t.join() for t in self.process_pool]
def run():
term_counter = MultiprocessTermCounter()
lines_read = 0
with open(DATA_PATH, encoding="utf8") as f:
for line in f:
lines_read += 1
term_counter.consume(line)
term_counter.consume(POISON)
term_counter.join()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment