Last active
March 1, 2026 15:17
-
-
Save shivamMg/b5b04925ec69a17b3b40c8d530f3afd8 to your computer and use it in GitHub Desktop.
MapReduce Algorithm
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import shutil | |
| from collections import Counter | |
| from pathlib import Path | |
| from typing import List | |
| def print_file(file_path): | |
| """Print specific parts of a file using seek().""" | |
| with open(file_path, "rb") as f: # use rb to use seek() | |
| f.seek(0, 0) # move cursor to beginning | |
| print(f"Content at {f.tell()}:", f.read(10).decode("utf-8")) # read first 10B. read() automatically moves cursor | |
| f.seek(500, 1) # move cursor 500 from current position at 10 (510) | |
| print(f"Content at {f.tell()}:", f.read(10).decode("utf-8")) | |
| f.seek(-10, 2) # move cursor to 10 from end of file | |
| print(f"Content at {f.tell()}:", f.read(10).decode("utf-8")) | |
| class MapReduce: | |
| def __init__(self, input_file, output_dir): | |
| self.input_file = input_file | |
| self.output_dir = output_dir | |
| self.num_partitions = 4 # number of partitions for reduce phase | |
| @staticmethod | |
| def read_chunks(input_file, chunk_size=250_000): | |
| with open(input_file) as f: | |
| while True: | |
| chunk = f.read(chunk_size) | |
| if not chunk: | |
| break | |
| yield chunk | |
| def get_partition_id(self, word) -> int: | |
| return hash(word) % self.num_partitions | |
| def chunk(self, input_file) -> List[Path]: | |
| dir = self.output_dir / "input_chunks" | |
| dir.mkdir() | |
| chunk_files = [] | |
| for i, chunk in enumerate(self.read_chunks(input_file)): | |
| chunk_file = dir / f"chunk_{i}.txt" | |
| with open(chunk_file, "w") as f: | |
| f.write(chunk) | |
| chunk_files.append(chunk_file) | |
| return chunk_files | |
| def map(self, chunk_files) -> List[Path]: | |
| dir = self.output_dir / "map_output" | |
| dir.mkdir() | |
| map_files = {} # partition_id -> (file path, file handle) | |
| for i in range(self.num_partitions): | |
| map_file = dir / f"partition_{i}.txt" | |
| map_files[i] = (map_file, open(map_file, "w")) | |
| for i, chunk_file in enumerate(chunk_files): | |
| with open(chunk_file) as f: | |
| chunk = f.read() | |
| for word, count in Counter(chunk.split()).items(): | |
| partition_id = self.get_partition_id(word) | |
| # write to corresponding partitioned map file | |
| map_files[partition_id][1].write(f"{word}\t{count}\n") | |
| # close files before return | |
| [f.close() for _, f in map_files.values()] | |
| return [map_file for map_file, _ in map_files.values()] | |
| def reduce(self, map_files) -> List[Path]: | |
| dir = self.output_dir / "reduce_output" | |
| dir.mkdir() | |
| reduce_files = [] | |
| for map_file in map_files: | |
| word_counts = Counter() | |
| with open(map_file) as f: | |
| for line in f: | |
| word, count = line.strip().split("\t") | |
| word_counts[word] += int(count) | |
| reduce_file = dir / map_file.name | |
| with open(reduce_file, "w") as f: | |
| for word, count in word_counts.items(): | |
| f.write(f"{word}\t{count}\n") | |
| reduce_files.append(reduce_file) | |
| return reduce_files | |
| def execute(self): | |
| chunk_files = self.chunk(self.input_file) | |
| map_files = self.map(chunk_files) | |
| reduce_files = self.reduce(map_files) | |
| return self.topk(reduce_files) | |
| def topk(self, reduce_files, k=5): | |
| word_counts = Counter() | |
| for reduce_file in reduce_files: | |
| with open(reduce_file) as f: | |
| for line in f: | |
| word, count = line.strip().split("\t") | |
| word_counts[word] += int(count) | |
| return word_counts.most_common(k) | |
| if __name__ == "__main__": | |
| INPUT_FILE = Path("Tartan.md") | |
| OUTPUT_DIR = Path("output") | |
| shutil.rmtree(OUTPUT_DIR, ignore_errors=True) | |
| OUTPUT_DIR.mkdir() | |
| print_file(INPUT_FILE) | |
| # Content at 0: Tartan (Sc | |
| # Content at 510: kilt. Spe | |
| # Content at 1114799: nal links | |
| map_reduce = MapReduce(INPUT_FILE, OUTPUT_DIR) | |
| topk_words = map_reduce.execute() | |
| # input_chunks/ | |
| # chunk_0.txt | |
| # chunk_1.txt | |
| # chunk_2.txt | |
| # map_output/ | |
| # partition_0.txt: | |
| # hello,1 | |
| # world,1 | |
| # hello,2 | |
| # partition_1.txt: | |
| # my,1 | |
| # apple,1 | |
| # reduce_output/ | |
| # partition_0.txt: | |
| # hello,3 | |
| # world,1 | |
| # partition_1.txt: | |
| # my,1 | |
| # apple,1 | |
| print(topk_words) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment