Skip to content

Instantly share code, notes, and snippets.

@darko-mesaros
Created November 26, 2025 21:16
Show Gist options
  • Select an option

  • Save darko-mesaros/dea185369131e060e01456d03bd08357 to your computer and use it in GitHub Desktop.

Select an option

Save darko-mesaros/dea185369131e060e01456d03bd08357 to your computer and use it in GitHub Desktop.
CSV File Processor from Amazon S3
use std::sync::{atomic::{AtomicUsize, Ordering}, Arc};
use aws_sdk_s3::Client;
use futures::{stream, StreamExt, TryStreamExt};
use serde::Deserialize;
use tokio::{sync::Semaphore};
use tracing_subscriber::EnvFilter;
#[derive(Debug, Deserialize)]
struct Record {
timestamp: String,
temp: String,
humidity: String,
measure_point: String,
}
async fn process_csv_parallel(s3: &Client, bucket: &str, csv_keys: Vec<String>) -> Result<Vec<Record>, anyhow::Error> {
// NOTE:
// Not using multi-threading here as the bottleneck is IO not CPU
// For displaying progress
let total_files = csv_keys.len();
let completed = Arc::new(AtomicUsize::new(0));
// Limit concurrent downloads to 750 - this is 750 S3 API calls (to avoid throttling)
let download_semaphore = Arc::new(Semaphore::new(500));
// Convert Vec<String> into a stream for paraller processing
let results: Result<Vec<Vec<Record>>, anyhow::Error> = stream::iter(csv_keys)
.map(|key| {
// Bunch of cloning
let s3 = s3.clone();
let bucket = bucket.to_string();
let sem = download_semaphore.clone();
let completed = completed.clone();
// Return async closure for each async task (each key in this case)
async move {
// Get the semaphore permit (blocks if 50 downloads are already running)
let permit = sem.acquire().await?;
// Download CSV
let response = s3.get_object()
.bucket(&bucket)
.key(&key)
.send()
.await?;
// Collect response body and convert to UTF-8
let body = response.body.collect().await?;
// We are done with the API, drop the permit
drop(permit);
let csv_content = String::from_utf8(body.to_vec())?;
// Parse CSV in Memory into the Record struct
let mut reader = csv::Reader::from_reader(csv_content.as_bytes());
let records: Result<Vec<Record>, _> = reader
.deserialize() // Convert each CSV row to Record
.collect(); // Collect all rows
// Progress tracking
let done = completed.fetch_add(1, Ordering::Relaxed) + 1;
if done.is_multiple_of(100) || done == total_files {
println!("Progress: {}/{} files ({:.1}%)",
done, total_files, (done as f64 / total_files as f64) * 100.0);
}
Ok(records?)
}
})
.buffer_unordered(1000) // Run up to 1000 async tasks concurrently (task scheduling)
.try_collect() // Collect all results, die on first error
.await;
// Flatten Vec<Vec<Record>> into Vec<Record>
Ok(results?.into_iter().flatten().collect())
}
async fn list_all_csv_objects(s3: &Client, bucket: &str, prefix: &str) -> Result<Vec<String>, anyhow::Error> {
let mut objects = Vec::new();
let mut continuation_token = None;
loop {
let mut request = s3.list_objects_v2()
.bucket(bucket)
.prefix(prefix)
.max_keys(1000);
if let Some(token) = continuation_token {
request = request.continuation_token(token)
}
let response = request.send().await?;
if let Some(contents) = response.contents {
objects.extend(
contents.into_iter()
.filter_map(|obj| {
obj.key()
.filter(|key| key.ends_with(".csv"))
.map(|key| key.to_string())
})
);
}
continuation_token = response.next_continuation_token;
if continuation_token.is_none() {
break;
}
}
Ok(objects)
}
#[tokio::main]
async fn main() -> Result<(), anyhow::Error> {
// Tracing
tracing_subscriber::fmt()
.with_env_filter(EnvFilter::from_default_env())
.with_writer(std::io::stderr)
.with_ansi(false)
.init();
// Config AWS
let config = aws_config::load_from_env().await;
let s3_client = aws_sdk_s3::Client::new(&config);
let files = list_all_csv_objects(&s3_client, "darko-rust-csv-demo", "data/").await?;
//println!("{:?}", files);
let records = process_csv_parallel(&s3_client, "darko-rust-csv-demo", files).await?;
//println!("{:?}", records);
Ok(())
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment