Skip to content

Instantly share code, notes, and snippets.

@suchoudh
Forked from antuneza/main.rs
Created April 4, 2024 07:22
Show Gist options
  • Select an option

  • Save suchoudh/c9cad300451adf1139f4b89b9085590e to your computer and use it in GitHub Desktop.

Select an option

Save suchoudh/c9cad300451adf1139f4b89b9085590e to your computer and use it in GitHub Desktop.
One Billion Row Challenge, Part I: The Basics
use std::collections::{BTreeMap};
use std::fs::File;
use std::io::{self, BufRead, Read, Seek, SeekFrom};
use std::ops::AddAssign;
use fnv::FnvHashMap;
#[derive(Debug, Clone, Copy)]
struct TemperatureStats {
min: f32,
max: f32,
sum: f32,
count: usize,
}
impl TemperatureStats {
fn update(&mut self, temperature: f32) {
self.min = self.min.min(temperature);
self.max = self.max.max(temperature);
self.count += 1;
self.sum += temperature;
}
}
impl Default for TemperatureStats {
fn default() -> Self {
TemperatureStats {
min: f32::MAX,
max: f32::MIN,
sum: 0.0,
count: 0,
}
}
}
impl std::ops::AddAssign for TemperatureStats {
fn add_assign(&mut self, other: Self) {
self.min = self.min.min(other.min);
self.max = self.max.max(other.max);
self.count += other.count;
self.sum += other.sum;
}
}
fn parse_temperature(buf: &[u8], t: &mut f32) -> usize {
let mut idx = buf.len() - 2;
*t = (buf[idx] - b'0') as f32 / 10.0;
idx -=2;
*t = *t + (buf[idx] - b'0') as f32;
idx -=1;
if buf[idx]!=b';' && buf[idx]!=b'-' {
*t = *t + 10.0 * (buf[idx] - b'0') as f32;
idx -=1;
}
if buf[idx]==b'-' {
*t *= -1.0;
idx -=1;
}
idx
}
#[derive(Debug, Copy, Clone)]
struct FileFullChunk {
start: u64,
size: u64,
}
fn split_file_complete_chunks(file_path: &str, n: u64) -> Vec<FileFullChunk> {
let mut f = File::open(file_path).unwrap();
let mut result = Vec::with_capacity(n as usize);
const MAXLINESIZE: usize = 107; //100 bytes for station + semicolon + 5 for number + new line
let mut buf : [u8; 107] = [0; 107];
let filesize = f.metadata().unwrap().len();
let chunksize = filesize / n ;
let mut offset: u64 = 0;
for _i in 1..n {
let mut new_offset = std::cmp::max(0, offset + chunksize - MAXLINESIZE as u64);
let _ = f.seek(SeekFrom::Start(new_offset));
let _ = f.read(&mut buf);
let mut nlindex = 106;
loop {
if buf[nlindex] == b'\n' {
break;
}
nlindex -= 1;
}
new_offset += nlindex as u64;
result.push(FileFullChunk{start: offset, size: new_offset - offset + 1});
offset = new_offset + 1;
}
result.push(FileFullChunk{start:offset, size: filesize - offset});
result
}
fn read_data(file_path: &str, chunk: &FileFullChunk) -> FnvHashMap<String, TemperatureStats> {
let mut file = File::open(file_path).unwrap();
let _ = file.seek(SeekFrom::Start(chunk.start));
let mut reader = io::BufReader::with_capacity(33_554_432, file);
let mut buf_vec = Vec::with_capacity(128);
let mut stats_map: FnvHashMap<String, TemperatureStats> = FnvHashMap::default();
let mut total_bytes_read : usize = 0;
loop {
total_bytes_read += reader.read_until(b'\n',&mut buf_vec).unwrap();
if total_bytes_read >= chunk.size as usize {
break;
}
let mut temperature : f32 = 0.0;
let idx_colon = parse_temperature(&buf_vec, &mut temperature);
let name = unsafe {std::str::from_utf8_unchecked(&buf_vec[0..idx_colon])};
if let Some(entry) = stats_map.get_mut(name) {
entry.update(temperature);
} else {
stats_map.insert(name.to_string(), TemperatureStats::default());
}
buf_vec.clear();
}
stats_map
}
fn main() {
let file_path = "data/measurements.txt";
let cpu_num = std::thread::available_parallelism().unwrap().get();
let chunks = split_file_complete_chunks(file_path, cpu_num as u64);
let mut thread_handles = vec![];
for chunk in chunks {
let handle = std::thread::spawn(move || {
read_data(&file_path, &chunk)
});
thread_handles.push(handle);
}
let mut partial_results : Vec<FnvHashMap<String, TemperatureStats>> = vec![];
for handle in thread_handles {
partial_results.push(handle.join().unwrap());
}
let mut combined_result: FnvHashMap<String, TemperatureStats> = FnvHashMap::default();
for partial_result in partial_results {
for (key, value) in partial_result.iter() {
combined_result.entry(key.clone()).or_insert_with(TemperatureStats::default).add_assign(value.clone());
}
}
let stats_map_ordered: BTreeMap<_, _> = combined_result.into_iter().collect();
let formatted_output: String = stats_map_ordered
.iter()
.map(|(station, stats)| {
format!("{}={:.1}/{:.1}/{:.1}",
station,
stats.min,
stats.sum / stats.count as f32,
stats.max,
)
})
.collect::<Vec<String>>()
.join(", ");
println!("{{{}}}", formatted_output);
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment