-
-
Save suchoudh/c9cad300451adf1139f4b89b9085590e to your computer and use it in GitHub Desktop.
One Billion Row Challenge, Part I: The Basics
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::collections::{BTreeMap}; | |
| use std::fs::File; | |
| use std::io::{self, BufRead, Read, Seek, SeekFrom}; | |
| use std::ops::AddAssign; | |
| use fnv::FnvHashMap; | |
| #[derive(Debug, Clone, Copy)] | |
| struct TemperatureStats { | |
| min: f32, | |
| max: f32, | |
| sum: f32, | |
| count: usize, | |
| } | |
| impl TemperatureStats { | |
| fn update(&mut self, temperature: f32) { | |
| self.min = self.min.min(temperature); | |
| self.max = self.max.max(temperature); | |
| self.count += 1; | |
| self.sum += temperature; | |
| } | |
| } | |
| impl Default for TemperatureStats { | |
| fn default() -> Self { | |
| TemperatureStats { | |
| min: f32::MAX, | |
| max: f32::MIN, | |
| sum: 0.0, | |
| count: 0, | |
| } | |
| } | |
| } | |
| impl std::ops::AddAssign for TemperatureStats { | |
| fn add_assign(&mut self, other: Self) { | |
| self.min = self.min.min(other.min); | |
| self.max = self.max.max(other.max); | |
| self.count += other.count; | |
| self.sum += other.sum; | |
| } | |
| } | |
| fn parse_temperature(buf: &[u8], t: &mut f32) -> usize { | |
| let mut idx = buf.len() - 2; | |
| *t = (buf[idx] - b'0') as f32 / 10.0; | |
| idx -=2; | |
| *t = *t + (buf[idx] - b'0') as f32; | |
| idx -=1; | |
| if buf[idx]!=b';' && buf[idx]!=b'-' { | |
| *t = *t + 10.0 * (buf[idx] - b'0') as f32; | |
| idx -=1; | |
| } | |
| if buf[idx]==b'-' { | |
| *t *= -1.0; | |
| idx -=1; | |
| } | |
| idx | |
| } | |
| #[derive(Debug, Copy, Clone)] | |
| struct FileFullChunk { | |
| start: u64, | |
| size: u64, | |
| } | |
| fn split_file_complete_chunks(file_path: &str, n: u64) -> Vec<FileFullChunk> { | |
| let mut f = File::open(file_path).unwrap(); | |
| let mut result = Vec::with_capacity(n as usize); | |
| const MAXLINESIZE: usize = 107; //100 bytes for station + semicolon + 5 for number + new line | |
| let mut buf : [u8; 107] = [0; 107]; | |
| let filesize = f.metadata().unwrap().len(); | |
| let chunksize = filesize / n ; | |
| let mut offset: u64 = 0; | |
| for _i in 1..n { | |
| let mut new_offset = std::cmp::max(0, offset + chunksize - MAXLINESIZE as u64); | |
| let _ = f.seek(SeekFrom::Start(new_offset)); | |
| let _ = f.read(&mut buf); | |
| let mut nlindex = 106; | |
| loop { | |
| if buf[nlindex] == b'\n' { | |
| break; | |
| } | |
| nlindex -= 1; | |
| } | |
| new_offset += nlindex as u64; | |
| result.push(FileFullChunk{start: offset, size: new_offset - offset + 1}); | |
| offset = new_offset + 1; | |
| } | |
| result.push(FileFullChunk{start:offset, size: filesize - offset}); | |
| result | |
| } | |
| fn read_data(file_path: &str, chunk: &FileFullChunk) -> FnvHashMap<String, TemperatureStats> { | |
| let mut file = File::open(file_path).unwrap(); | |
| let _ = file.seek(SeekFrom::Start(chunk.start)); | |
| let mut reader = io::BufReader::with_capacity(33_554_432, file); | |
| let mut buf_vec = Vec::with_capacity(128); | |
| let mut stats_map: FnvHashMap<String, TemperatureStats> = FnvHashMap::default(); | |
| let mut total_bytes_read : usize = 0; | |
| loop { | |
| total_bytes_read += reader.read_until(b'\n',&mut buf_vec).unwrap(); | |
| if total_bytes_read >= chunk.size as usize { | |
| break; | |
| } | |
| let mut temperature : f32 = 0.0; | |
| let idx_colon = parse_temperature(&buf_vec, &mut temperature); | |
| let name = unsafe {std::str::from_utf8_unchecked(&buf_vec[0..idx_colon])}; | |
| if let Some(entry) = stats_map.get_mut(name) { | |
| entry.update(temperature); | |
| } else { | |
| stats_map.insert(name.to_string(), TemperatureStats::default()); | |
| } | |
| buf_vec.clear(); | |
| } | |
| stats_map | |
| } | |
| fn main() { | |
| let file_path = "data/measurements.txt"; | |
| let cpu_num = std::thread::available_parallelism().unwrap().get(); | |
| let chunks = split_file_complete_chunks(file_path, cpu_num as u64); | |
| let mut thread_handles = vec![]; | |
| for chunk in chunks { | |
| let handle = std::thread::spawn(move || { | |
| read_data(&file_path, &chunk) | |
| }); | |
| thread_handles.push(handle); | |
| } | |
| let mut partial_results : Vec<FnvHashMap<String, TemperatureStats>> = vec![]; | |
| for handle in thread_handles { | |
| partial_results.push(handle.join().unwrap()); | |
| } | |
| let mut combined_result: FnvHashMap<String, TemperatureStats> = FnvHashMap::default(); | |
| for partial_result in partial_results { | |
| for (key, value) in partial_result.iter() { | |
| combined_result.entry(key.clone()).or_insert_with(TemperatureStats::default).add_assign(value.clone()); | |
| } | |
| } | |
| let stats_map_ordered: BTreeMap<_, _> = combined_result.into_iter().collect(); | |
| let formatted_output: String = stats_map_ordered | |
| .iter() | |
| .map(|(station, stats)| { | |
| format!("{}={:.1}/{:.1}/{:.1}", | |
| station, | |
| stats.min, | |
| stats.sum / stats.count as f32, | |
| stats.max, | |
| ) | |
| }) | |
| .collect::<Vec<String>>() | |
| .join(", "); | |
| println!("{{{}}}", formatted_output); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment