-
-
Save westonpace/f7fc7bebb3deaeb6e187fdb9d2e2a7f4 to your computer and use it in GitHub Desktop.
Arrow vs Lance
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| // WriteOps for 10GB | |
| //// Observed: | |
| //// Arrow write duration: 35.340100258s | |
| //// Lance write duration: 112.428478707s | |
| //ReadOps for 10GB: | |
| //// Observed: | |
| //// Arrow read: 2.873216313s | |
| //// Lance read: 4.404420162s | |
| //// Dependencies in Cargo.toml | |
| // [package] | |
| // name = "benchmark" | |
| // version = "0.1.0" | |
| // edition = "2024" | |
| // [dependencies] | |
| // arrow = "55" | |
| // flatbuffers = "25.2.10" | |
| // futures = "0.3.31" | |
| // lance = "0.32.0" | |
| // tokio = { version = "1.47.1", features = ["rt-multi-thread"] } | |
| // src/main.rs | |
| #![allow(unused)] | |
| use std::fs::File; | |
| use std::mem::MaybeUninit; | |
| use std::path::PathBuf; | |
| use std::str; | |
| use std::str::FromStr; | |
| use std::sync::Arc; | |
| use arrow::array::Array; | |
| use arrow::array::Float32Builder; | |
| use arrow::array::Int32Array; | |
| use arrow::array::Int32Builder; | |
| use arrow::array::LargeBinaryBuilder; | |
| use arrow::array::LargeStringBuilder; | |
| use arrow::array::RecordBatch; | |
| use arrow::array::RecordBatchIterator; | |
| const TOTAL_BYTES: usize = 10 * 1024 * 1024 * 1024; | |
| use arrow::datatypes::DataType; | |
| use arrow::datatypes::Field; | |
| use arrow::datatypes::Schema; | |
| use arrow::error::ArrowError; | |
| use arrow::ipc::reader::FileReader; | |
| use arrow::ipc::writer::FileWriter; | |
| use futures::StreamExt; | |
| use lance::Dataset; | |
| use tokio::runtime; | |
| #[derive(Clone)] | |
| pub struct Rng { | |
| state: u32, | |
| } | |
| impl Rng { | |
| fn init(seed: u32) -> Self { | |
| Self { state: seed } | |
| } | |
| } | |
| impl Iterator for Rng { | |
| type Item = u32; | |
| fn next(&mut self) -> Option<Self::Item> { | |
| self.state ^= self.state << 13; | |
| self.state ^= self.state >> 17; | |
| self.state ^= self.state << 5; | |
| Some(self.state) | |
| } | |
| } | |
| const BUFFER_LEN: usize = 4096; | |
| const RB_SIZE: usize = 1024 * 1024 * 1024 * 1; | |
| const SEED: u32 = 139849023; | |
| #[derive(Clone)] | |
| struct RBGen { | |
| r: Rng, | |
| total_bytes: usize, | |
| schema: Schema, | |
| } | |
| impl Iterator for RBGen { | |
| type Item = Result<RecordBatch, ArrowError>; | |
| fn next(&mut self) -> Option<Self::Item> { | |
| if self.total_bytes >= TOTAL_BYTES { | |
| return None; | |
| } | |
| let mut int32_builder = Int32Builder::new(); | |
| let mut f32_builder = Float32Builder::new(); | |
| let mut str_builder = LargeStringBuilder::new(); | |
| let mut blob_builder = LargeBinaryBuilder::new(); | |
| let value = self.r.next().unwrap(); | |
| let mut rb_size = 0; | |
| let mut is_null = true; | |
| while rb_size < RB_SIZE { | |
| if is_null { | |
| int32_builder.append_value(value as i32); | |
| f32_builder.append_value(f32::from_bits(value)); | |
| } else { | |
| int32_builder.append_null(); | |
| f32_builder.append_null(); | |
| } | |
| is_null = !is_null; | |
| rb_size += std::mem::size_of::<u32>(); | |
| rb_size += std::mem::size_of::<f32>(); | |
| // Create Random String | |
| let mut written_chars: u32 = 0; | |
| let u8_value = (value % 256) as u8; | |
| let n_chars = if value % 2 == 0 { | |
| 1024 * 1024 * 1024 | |
| } else { | |
| 1 | |
| }; | |
| let mut initial_char = std::cmp::min(std::cmp::max(u8_value, 0x21), 0x7E); | |
| let mut string = String::with_capacity(n_chars as usize); | |
| while written_chars < n_chars { | |
| let mut string_buffer = vec![0x30_u8; BUFFER_LEN]; | |
| unsafe { | |
| for char in string_buffer.iter_mut() { | |
| *char = initial_char; | |
| initial_char += 1; | |
| if initial_char == 0x7F { | |
| initial_char = 0x21; | |
| } | |
| } | |
| written_chars += string_buffer.len() as u32; | |
| let string_buffer_str = | |
| str::from_boxed_utf8_unchecked(string_buffer.into_boxed_slice()); | |
| string.push_str(&string_buffer_str); | |
| } | |
| } | |
| str_builder.append_value(&string); | |
| rb_size += string.len(); | |
| blob_builder.append_value(&string); | |
| rb_size += string.len(); | |
| } | |
| self.total_bytes += rb_size; | |
| let int32_arr = int32_builder.finish(); | |
| let f32_arr = f32_builder.finish(); | |
| let string_arr = str_builder.finish(); | |
| let blob_arr = blob_builder.finish(); | |
| let rb = RecordBatch::try_new( | |
| Arc::new(self.schema.clone()), | |
| vec![ | |
| Arc::new(int32_arr), | |
| Arc::new(f32_arr), | |
| Arc::new(string_arr), | |
| Arc::new(blob_arr), | |
| ], | |
| ) | |
| .unwrap(); | |
| Some(Ok(rb)) | |
| } | |
| } | |
| fn write_arrow_lance() { | |
| // Custom random generator for transparency | |
| let r = Rng::init(SEED); | |
| // 4 different datatyp columns: int32, f32, LargeUtf8 and LargeBinary | |
| let int32_field = Field::new("int32", DataType::Int32, true); | |
| let f32_field = Field::new("f32", DataType::Float32, true); | |
| let string_field = Field::new("string", DataType::LargeUtf8, true); | |
| let blob_field = Field::new("blob", DataType::LargeBinary, true); | |
| let schema = Schema::new(vec![int32_field, f32_field, string_field, blob_field]); | |
| // Generates Record Batches until 10GB | |
| let rb_gen = RBGen { | |
| r: r, | |
| total_bytes: 0, | |
| schema: schema.clone(), | |
| }; | |
| // Arrow Writes | |
| let mut file = File::create("./blob").unwrap(); | |
| let mut fw = FileWriter::try_new(&mut file, &schema).unwrap(); | |
| let arrow_file_start = std::time::Instant::now(); | |
| for rb in rb_gen.clone() { | |
| fw.write(&rb.unwrap()).unwrap(); | |
| } | |
| fw.finish().unwrap(); | |
| println!("Arrow write duration: {:?}", arrow_file_start.elapsed()); | |
| //Lance Writes | |
| let rb_iter = RecordBatchIterator::new(rb_gen, Arc::new(schema)); | |
| let lance_file = PathBuf::from_str("./blob.lance").unwrap(); | |
| if lance_file.exists() { | |
| std::fs::remove_dir_all(&lance_file).unwrap(); | |
| } | |
| let rt = runtime::Builder::new_multi_thread() | |
| .enable_all() | |
| .build() | |
| .unwrap(); | |
| let lance_file_start = std::time::Instant::now(); | |
| rt.block_on(Dataset::write(rb_iter, "./blob.lance", None)) | |
| .unwrap(); | |
| println!("Lance write duration: {:?}", lance_file_start.elapsed()); | |
| } | |
| fn read_arrow() { | |
| // let start_read = std::time::Instant::now(); | |
| let file = File::open("./blob").unwrap(); | |
| let mut fr = FileReader::try_new(file, None).unwrap(); | |
| // println!("===={:?}=====", start_read.elapsed()); | |
| loop { | |
| // let rb_read = std::time::Instant::now(); | |
| if let Some(rb_result) = fr.next() { | |
| // println!("{:?}", rb_read.elapsed()); | |
| // let finish = std::time::Instant::now(); | |
| let rb = rb_result.unwrap(); | |
| let n_rows = rb.num_rows(); | |
| let size = rb.get_array_memory_size(); | |
| let col = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap(); | |
| let nulls = col.null_count(); | |
| let value = arrow::compute::sum(col).unwrap(); | |
| // let end = finish.elapsed(); | |
| // println!("{},{},{:?},{},{}", n_rows, size, end, value, nulls); | |
| } else { | |
| break; | |
| } | |
| } | |
| } | |
| fn read_lance() { | |
| let block = async { | |
| // let open_time = std::time::Instant::now(); | |
| let dataset = Dataset::open("./blob.lance").await.unwrap(); | |
| // println!("{:?}", open_time.elapsed()); | |
| // let scanner_time = std::time::Instant::now(); | |
| let scanner = dataset.scan(); | |
| // println!("{:?}", scanner_time.elapsed()); | |
| // let stream_time = std::time::Instant::now(); | |
| let mut stream = scanner.try_into_stream().await.unwrap(); | |
| // println!("{:?}", stream_time.elapsed()); | |
| loop { | |
| // let rb_read = std::time::Instant::now(); | |
| if let Some(rb_result) = stream.next().await { | |
| // println!("{:?}", rb_read.elapsed()); | |
| // let finish = std::time::Instant::now(); | |
| let rb = rb_result.unwrap(); | |
| let n_rows = rb.num_rows(); | |
| let size = rb.get_array_memory_size(); | |
| let col = rb.column(0).as_any().downcast_ref::<Int32Array>().unwrap(); | |
| let nulls = col.null_count(); | |
| let value = arrow::compute::sum(col).unwrap(); | |
| // let end = finish.elapsed(); | |
| // println!("{},{},{:?},{},{}", n_rows, size, end, value, nulls); | |
| // pause(); | |
| } else { | |
| break; | |
| } | |
| } | |
| }; | |
| let rt = runtime::Builder::new_multi_thread() | |
| .enable_all() | |
| .build() | |
| .unwrap(); | |
| rt.block_on(block); | |
| } | |
| fn main() { | |
| // Write arrow and lance datasets from same source | |
| //// Observed: | |
| //// Arrow write duration: 35.340100258s | |
| //// Lance write duration: 112.428478707s | |
| write_arrow_lance(); | |
| // File Reads | |
| // Note: Uncomment the inner durations if needed. | |
| // Commented them not to interfere with actual intended operation. | |
| //// Observed: | |
| //// Arrow read: 2.873216313s | |
| //// Lance read: 4.404420162s | |
| //// Read Arrow File | |
| println!("==========ARROW READ==========="); | |
| let arrow_t = std::time::Instant::now(); | |
| read_arrow(); | |
| println!("{:?}", arrow_t.elapsed()); | |
| //// Read Lance Folder | |
| println!("==========LANCE READ==========="); | |
| let lance_t = std::time::Instant::now(); | |
| read_lance(); | |
| println!("{:?}", lance_t.elapsed()); | |
| } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment