Created
November 26, 2025 21:16
-
-
Save darko-mesaros/dea185369131e060e01456d03bd08357 to your computer and use it in GitHub Desktop.
CSV File Processor from Amazon S3
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| use std::sync::{atomic::{AtomicUsize, Ordering}, Arc}; | |
| use aws_sdk_s3::Client; | |
| use futures::{stream, StreamExt, TryStreamExt}; | |
| use serde::Deserialize; | |
| use tokio::{sync::Semaphore}; | |
| use tracing_subscriber::EnvFilter; | |
| #[derive(Debug, Deserialize)] | |
| struct Record { | |
| timestamp: String, | |
| temp: String, | |
| humidity: String, | |
| measure_point: String, | |
| } | |
| async fn process_csv_parallel(s3: &Client, bucket: &str, csv_keys: Vec<String>) -> Result<Vec<Record>, anyhow::Error> { | |
| // NOTE: | |
| // Not using multi-threading here as the bottleneck is IO not CPU | |
| // For displaying progress | |
| let total_files = csv_keys.len(); | |
| let completed = Arc::new(AtomicUsize::new(0)); | |
| // Limit concurrent downloads to 750 - this is 750 S3 API calls (to avoid throttling) | |
| let download_semaphore = Arc::new(Semaphore::new(500)); | |
| // Convert Vec<String> into a stream for paraller processing | |
| let results: Result<Vec<Vec<Record>>, anyhow::Error> = stream::iter(csv_keys) | |
| .map(|key| { | |
| // Bunch of cloning | |
| let s3 = s3.clone(); | |
| let bucket = bucket.to_string(); | |
| let sem = download_semaphore.clone(); | |
| let completed = completed.clone(); | |
| // Return async closure for each async task (each key in this case) | |
| async move { | |
| // Get the semaphore permit (blocks if 50 downloads are already running) | |
| let permit = sem.acquire().await?; | |
| // Download CSV | |
| let response = s3.get_object() | |
| .bucket(&bucket) | |
| .key(&key) | |
| .send() | |
| .await?; | |
| // Collect response body and convert to UTF-8 | |
| let body = response.body.collect().await?; | |
| // We are done with the API, drop the permit | |
| drop(permit); | |
| let csv_content = String::from_utf8(body.to_vec())?; | |
| // Parse CSV in Memory into the Record struct | |
| let mut reader = csv::Reader::from_reader(csv_content.as_bytes()); | |
| let records: Result<Vec<Record>, _> = reader | |
| .deserialize() // Convert each CSV row to Record | |
| .collect(); // Collect all rows | |
| // Progress tracking | |
| let done = completed.fetch_add(1, Ordering::Relaxed) + 1; | |
| if done.is_multiple_of(100) || done == total_files { | |
| println!("Progress: {}/{} files ({:.1}%)", | |
| done, total_files, (done as f64 / total_files as f64) * 100.0); | |
| } | |
| Ok(records?) | |
| } | |
| }) | |
| .buffer_unordered(1000) // Run up to 1000 async tasks concurrently (task scheduling) | |
| .try_collect() // Collect all results, die on first error | |
| .await; | |
| // Flatten Vec<Vec<Record>> into Vec<Record> | |
| Ok(results?.into_iter().flatten().collect()) | |
| } | |
| async fn list_all_csv_objects(s3: &Client, bucket: &str, prefix: &str) -> Result<Vec<String>, anyhow::Error> { | |
| let mut objects = Vec::new(); | |
| let mut continuation_token = None; | |
| loop { | |
| let mut request = s3.list_objects_v2() | |
| .bucket(bucket) | |
| .prefix(prefix) | |
| .max_keys(1000); | |
| if let Some(token) = continuation_token { | |
| request = request.continuation_token(token) | |
| } | |
| let response = request.send().await?; | |
| if let Some(contents) = response.contents { | |
| objects.extend( | |
| contents.into_iter() | |
| .filter_map(|obj| { | |
| obj.key() | |
| .filter(|key| key.ends_with(".csv")) | |
| .map(|key| key.to_string()) | |
| }) | |
| ); | |
| } | |
| continuation_token = response.next_continuation_token; | |
| if continuation_token.is_none() { | |
| break; | |
| } | |
| } | |
| Ok(objects) | |
| } | |
| #[tokio::main] | |
| async fn main() -> Result<(), anyhow::Error> { | |
| // Tracing | |
| tracing_subscriber::fmt() | |
| .with_env_filter(EnvFilter::from_default_env()) | |
| .with_writer(std::io::stderr) | |
| .with_ansi(false) | |
| .init(); | |
| // Config AWS | |
| let config = aws_config::load_from_env().await; | |
| let s3_client = aws_sdk_s3::Client::new(&config); | |
| let files = list_all_csv_objects(&s3_client, "darko-rust-csv-demo", "data/").await?; | |
| //println!("{:?}", files); | |
| let records = process_csv_parallel(&s3_client, "darko-rust-csv-demo", files).await?; | |
| //println!("{:?}", records); | |
| Ok(()) | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment