|
//! Demo: EOF retry bug causing data duplication |
|
//! |
|
//! Run via: ./demo_eof_retry_bug.sh |
|
//! Or directly: AWS_BUCKET=bucket AWS_REGION=region rust-script demo_eof_retry_bug.rs |
|
//! |
|
//! ```cargo |
|
//! [dependencies] |
|
//! tokio = { version = "1", features = ["full"] } |
|
//! object_store = { version = "0.13", features = ["aws"] } |
|
//! futures = "0.3" |
|
//! bytes = "1" |
|
//! tracing-subscriber = { version = "0.3", features = ["env-filter"] } |
|
//! ``` |
|
|
|
// Dep for v0.13: |
|
// object_store = { version = "0.13", features = ["aws"] } |
|
// Dep for local path: |
|
// object_store = { path = ".", features = ["aws"] } |
|
|
|
use bytes::Bytes; |
|
use futures::StreamExt; |
|
use object_store::aws::AmazonS3Builder; |
|
use object_store::path::Path; |
|
use object_store::{ClientOptions, ObjectStoreExt}; |
|
use std::env; |
|
use std::time::Duration; |
|
|
|
#[tokio::main] |
|
async fn main() { |
|
// Enable object_store logs to see retry behavior |
|
tracing_subscriber::fmt() |
|
.with_env_filter("object_store=info") |
|
.init(); |
|
|
|
let bucket = env::var("AWS_BUCKET").expect("AWS_BUCKET required"); |
|
let key = env::var("AWS_KEY").expect("AWS_KEY required"); |
|
let region = env::var("AWS_REGION").unwrap_or_else(|_| "us-west-2".into()); |
|
|
|
// Client with 10s timeout - we'll sleep 15s to trigger timeout after reading |
|
let options = ClientOptions::new().with_timeout(Duration::from_secs(10)); |
|
let store = AmazonS3Builder::from_env() |
|
.with_bucket_name(&bucket) |
|
.with_region(®ion) |
|
.with_client_options(options) |
|
.build() |
|
.expect("Failed to build S3 client"); |
|
|
|
let path = Path::from(key); |
|
let expected_content = Bytes::from_static(b"hello"); |
|
|
|
// Get the file as a stream |
|
println!("--- Reading file with intentional delay ---"); |
|
let result = store.get(&path).await.expect("Failed to GET"); |
|
let mut stream = result.into_stream(); |
|
|
|
let mut all_bytes = Vec::new(); |
|
|
|
// Read first chunk (should be all 5 bytes) |
|
if let Some(chunk) = stream.next().await { |
|
let bytes = chunk.expect("Failed to read chunk"); |
|
println!( |
|
"Read {} bytes: {:?}", |
|
bytes.len(), |
|
String::from_utf8_lossy(&bytes) |
|
); |
|
all_bytes.extend_from_slice(&bytes); |
|
} |
|
|
|
// Sleep longer than timeout - simulates timeout after reading all content but before EOF |
|
println!("Sleeping 15s (longer than 10s timeout)..."); |
|
tokio::time::sleep(Duration::from_secs(15)).await; |
|
|
|
// Try to read more - this triggers the retry bug |
|
println!("Continuing to read..."); |
|
while let Some(chunk) = stream.next().await { |
|
match chunk { |
|
Ok(bytes) => { |
|
println!( |
|
"Read {} more bytes: {:?}", |
|
bytes.len(), |
|
String::from_utf8_lossy(&bytes) |
|
); |
|
all_bytes.extend_from_slice(&bytes); |
|
} |
|
Err(e) => { |
|
println!("Error reading: {e}"); |
|
break; |
|
} |
|
} |
|
} |
|
|
|
println!("\n--- Result ---"); |
|
println!("Total bytes read: {}", all_bytes.len()); |
|
println!("Content: {:?}", String::from_utf8_lossy(&all_bytes)); |
|
|
|
if all_bytes.len() > expected_content.len() { |
|
println!( |
|
"\nBUG: Data was duplicated! Expected {} bytes, got {}", |
|
expected_content.len(), |
|
all_bytes.len() |
|
); |
|
println!( |
|
"After timeout, retry_stream sent Range: bytes={}-{} (invalid).", |
|
expected_content.len(), |
|
expected_content.len() - 1 |
|
); |
|
println!("S3 ignored the invalid Range header and returned the full file (200 OK)."); |
|
println!("Without validation, the full file was appended again."); |
|
} else if all_bytes.len() == expected_content.len() { |
|
println!("\nOK: Correct number of bytes received"); |
|
} else { |
|
println!("\nWARNING: Received fewer bytes than expected"); |
|
} |
|
} |