Skip to content

Instantly share code, notes, and snippets.

@shivamMg
Last active March 1, 2026 15:17
Show Gist options
  • Select an option

  • Save shivamMg/b5b04925ec69a17b3b40c8d530f3afd8 to your computer and use it in GitHub Desktop.

Select an option

Save shivamMg/b5b04925ec69a17b3b40c8d530f3afd8 to your computer and use it in GitHub Desktop.
MapReduce Algorithm
import shutil
from collections import Counter
from pathlib import Path
from typing import List
def print_file(file_path):
"""Print specific parts of a file using seek()."""
with open(file_path, "rb") as f: # use rb to use seek()
f.seek(0, 0) # move cursor to beginning
print(f"Content at {f.tell()}:", f.read(10).decode("utf-8")) # read first 10B. read() automatically moves cursor
f.seek(500, 1) # move cursor 500 from current position at 10 (510)
print(f"Content at {f.tell()}:", f.read(10).decode("utf-8"))
f.seek(-10, 2) # move cursor to 10 from end of file
print(f"Content at {f.tell()}:", f.read(10).decode("utf-8"))
class MapReduce:
def __init__(self, input_file, output_dir):
self.input_file = input_file
self.output_dir = output_dir
self.num_partitions = 4 # number of partitions for reduce phase
@staticmethod
def read_chunks(input_file, chunk_size=250_000):
with open(input_file) as f:
while True:
chunk = f.read(chunk_size)
if not chunk:
break
yield chunk
def get_partition_id(self, word) -> int:
return hash(word) % self.num_partitions
def chunk(self, input_file) -> List[Path]:
dir = self.output_dir / "input_chunks"
dir.mkdir()
chunk_files = []
for i, chunk in enumerate(self.read_chunks(input_file)):
chunk_file = dir / f"chunk_{i}.txt"
with open(chunk_file, "w") as f:
f.write(chunk)
chunk_files.append(chunk_file)
return chunk_files
def map(self, chunk_files) -> List[Path]:
dir = self.output_dir / "map_output"
dir.mkdir()
map_files = {} # partition_id -> (file path, file handle)
for i in range(self.num_partitions):
map_file = dir / f"partition_{i}.txt"
map_files[i] = (map_file, open(map_file, "w"))
for i, chunk_file in enumerate(chunk_files):
with open(chunk_file) as f:
chunk = f.read()
for word, count in Counter(chunk.split()).items():
partition_id = self.get_partition_id(word)
# write to corresponding partitioned map file
map_files[partition_id][1].write(f"{word}\t{count}\n")
# close files before return
[f.close() for _, f in map_files.values()]
return [map_file for map_file, _ in map_files.values()]
def reduce(self, map_files) -> List[Path]:
dir = self.output_dir / "reduce_output"
dir.mkdir()
reduce_files = []
for map_file in map_files:
word_counts = Counter()
with open(map_file) as f:
for line in f:
word, count = line.strip().split("\t")
word_counts[word] += int(count)
reduce_file = dir / map_file.name
with open(reduce_file, "w") as f:
for word, count in word_counts.items():
f.write(f"{word}\t{count}\n")
reduce_files.append(reduce_file)
return reduce_files
def execute(self):
chunk_files = self.chunk(self.input_file)
map_files = self.map(chunk_files)
reduce_files = self.reduce(map_files)
return self.topk(reduce_files)
def topk(self, reduce_files, k=5):
word_counts = Counter()
for reduce_file in reduce_files:
with open(reduce_file) as f:
for line in f:
word, count = line.strip().split("\t")
word_counts[word] += int(count)
return word_counts.most_common(k)
if __name__ == "__main__":
INPUT_FILE = Path("Tartan.md")
OUTPUT_DIR = Path("output")
shutil.rmtree(OUTPUT_DIR, ignore_errors=True)
OUTPUT_DIR.mkdir()
print_file(INPUT_FILE)
# Content at 0: Tartan (Sc
# Content at 510: kilt. Spe
# Content at 1114799: nal links
map_reduce = MapReduce(INPUT_FILE, OUTPUT_DIR)
topk_words = map_reduce.execute()
# input_chunks/
# chunk_0.txt
# chunk_1.txt
# chunk_2.txt
# map_output/
# partition_0.txt:
# hello,1
# world,1
# hello,2
# partition_1.txt:
# my,1
# apple,1
# reduce_output/
# partition_0.txt:
# hello,3
# world,1
# partition_1.txt:
# my,1
# apple,1
print(topk_words)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment