Skip to content

Instantly share code, notes, and snippets.

@corecode
Created August 7, 2025 10:49
Show Gist options
  • Select an option

  • Save corecode/5c0459a9fb4f39577f274ace96c957b5 to your computer and use it in GitHub Desktop.

Select an option

Save corecode/5c0459a9fb4f39577f274ace96c957b5 to your computer and use it in GitHub Desktop.
File Device #rs
use std::fs::File;
use std::io::{BufReader, Read, Seek, SeekFrom};
use std::path::PathBuf;
use std::time::{Duration, Instant};
use num_complex::Complex32;
use seify::{Args, DeviceTrait, Direction, Error, Range, RxStreamer, TxStreamer};
pub struct FileDevice {
path: PathBuf,
sample_rate: f64,
}
pub struct FileRxStreamer {
reader: BufReader<File>,
sample_rate: f64,
start_time: Option<Instant>,
samples_emitted: usize,
}
// Dummy TX streamer
pub struct FileTxStreamer;
impl TxStreamer for FileTxStreamer {
fn mtu(&self) -> Result<usize, Error> {
Err(Error::NotSupported)
}
fn activate_at(&mut self, _time_ns: Option<i64>) -> Result<(), Error> {
Err(Error::NotSupported)
}
fn deactivate_at(&mut self, _time_ns: Option<i64>) -> Result<(), Error> {
Err(Error::NotSupported)
}
fn write(
&mut self,
_buffers: &[&[Complex32]],
_at_ns: Option<i64>,
_end_burst: bool,
_timeout_us: i64,
) -> Result<usize, Error> {
Err(Error::NotSupported)
}
fn write_all(
&mut self,
_buffers: &[&[Complex32]],
_at_ns: Option<i64>,
_end_burst: bool,
_timeout_us: i64,
) -> Result<(), Error> {
Err(Error::NotSupported)
}
}
impl FileDevice {
pub fn new(path: impl Into<PathBuf>, sample_rate: f64) -> Self {
Self {
path: path.into(),
sample_rate,
}
}
}
impl DeviceTrait for FileDevice {
type RxStreamer = FileRxStreamer;
type TxStreamer = FileTxStreamer;
fn as_any(&self) -> &dyn std::any::Any {
self
}
fn as_any_mut(&mut self) -> &mut dyn std::any::Any {
self
}
fn driver(&self) -> seify::Driver {
seify::Driver::Dummy
}
fn id(&self) -> Result<String, Error> {
Ok(self.path.to_string_lossy().to_string())
}
fn info(&self) -> Result<Args, Error> {
let mut args = Args::new();
args.set("driver", "file");
args.set("path", self.path.to_string_lossy().to_string());
args.set("sample_rate", self.sample_rate.to_string());
Ok(args)
}
fn num_channels(&self, direction: Direction) -> Result<usize, Error> {
match direction {
Direction::Rx => Ok(1),
Direction::Tx => Ok(0),
}
}
fn full_duplex(&self, _direction: Direction, _channel: usize) -> Result<bool, Error> {
Ok(false)
}
fn rx_streamer(&self, channels: &[usize], _args: Args) -> Result<Self::RxStreamer, Error> {
if channels != [0] {
return Err(Error::NotSupported);
}
let file = File::open(&self.path).map_err(Error::Io)?;
Ok(FileRxStreamer::new(BufReader::new(file), self.sample_rate))
}
fn tx_streamer(&self, _channels: &[usize], _args: Args) -> Result<Self::TxStreamer, Error> {
Err(Error::NotSupported)
}
// The following methods are required by the trait but not implemented for simplicity.
// We return default values or errors.
fn antennas(&self, _direction: Direction, _channel: usize) -> Result<Vec<String>, Error> {
Ok(vec!["FILE".to_string()])
}
fn antenna(&self, _direction: Direction, _channel: usize) -> Result<String, Error> {
Ok("FILE".to_string())
}
fn set_antenna(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
) -> Result<(), Error> {
Ok(())
}
fn supports_agc(&self, _direction: Direction, _channel: usize) -> Result<bool, Error> {
Ok(false)
}
fn enable_agc(&self, _direction: Direction, _channel: usize, _agc: bool) -> Result<(), Error> {
Ok(())
}
fn agc(&self, _direction: Direction, _channel: usize) -> Result<bool, Error> {
Ok(false)
}
fn gain_elements(&self, _direction: Direction, _channel: usize) -> Result<Vec<String>, Error> {
Ok(vec![])
}
fn set_gain(&self, _direction: Direction, _channel: usize, _gain: f64) -> Result<(), Error> {
Ok(())
}
fn gain(&self, _direction: Direction, _channel: usize) -> Result<Option<f64>, Error> {
Ok(None)
}
fn gain_range(&self, _direction: Direction, _channel: usize) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn set_gain_element(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
_gain: f64,
) -> Result<(), Error> {
Ok(())
}
fn gain_element(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
) -> Result<Option<f64>, Error> {
Ok(None)
}
fn gain_element_range(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn frequency_range(&self, _direction: Direction, _channel: usize) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn frequency(&self, _direction: Direction, _channel: usize) -> Result<f64, Error> {
Ok(0.0)
}
fn set_frequency(
&self,
_direction: Direction,
_channel: usize,
_frequency: f64,
_args: Args,
) -> Result<(), Error> {
Ok(())
}
fn frequency_components(
&self,
_direction: Direction,
_channel: usize,
) -> Result<Vec<String>, Error> {
Ok(vec![])
}
fn component_frequency_range(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn component_frequency(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
) -> Result<f64, Error> {
Ok(0.0)
}
fn set_component_frequency(
&self,
_direction: Direction,
_channel: usize,
_name: &str,
_frequency: f64,
) -> Result<(), Error> {
Ok(())
}
fn sample_rate(&self, _direction: Direction, _channel: usize) -> Result<f64, Error> {
Ok(self.sample_rate)
}
fn set_sample_rate(
&self,
_direction: Direction,
_channel: usize,
_rate: f64,
) -> Result<(), Error> {
Ok(())
}
fn get_sample_rate_range(
&self,
_direction: Direction,
_channel: usize,
) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn bandwidth(&self, _direction: Direction, _channel: usize) -> Result<f64, Error> {
Ok(0.0)
}
fn set_bandwidth(&self, _direction: Direction, _channel: usize, _bw: f64) -> Result<(), Error> {
Ok(())
}
fn get_bandwidth_range(&self, _direction: Direction, _channel: usize) -> Result<Range, Error> {
Ok(Range::new(vec![]))
}
fn has_dc_offset_mode(&self, _direction: Direction, _channel: usize) -> Result<bool, Error> {
Ok(false)
}
fn set_dc_offset_mode(
&self,
_direction: Direction,
_channel: usize,
_automatic: bool,
) -> Result<(), Error> {
Ok(())
}
fn dc_offset_mode(&self, _direction: Direction, _channel: usize) -> Result<bool, Error> {
Ok(false)
}
}
impl FileRxStreamer {
fn new(reader: BufReader<File>, sample_rate: f64) -> Self {
Self {
reader,
sample_rate,
start_time: None,
samples_emitted: 0,
}
}
}
impl RxStreamer for FileRxStreamer {
fn mtu(&self) -> Result<usize, Error> {
Ok(4096)
}
fn activate_at(&mut self, _time_ns: Option<i64>) -> Result<(), Error> {
self.start_time = Some(Instant::now());
self.samples_emitted = 0;
Ok(())
}
fn deactivate_at(&mut self, _time_ns: Option<i64>) -> Result<(), Error> {
self.start_time = None;
Ok(())
}
fn read(&mut self, buffers: &mut [&mut [Complex32]], _timeout_us: i64) -> Result<usize, Error> {
let channel0 = &mut buffers[0];
let num_samples = channel0.len();
// Throttle the reading to simulate real-time
if let Some(start_time) = self.start_time {
let elapsed = start_time.elapsed();
let expected_samples = (elapsed.as_secs_f64() * self.sample_rate) as usize;
if expected_samples < self.samples_emitted + num_samples {
let delay = Duration::from_secs_f64(
((self.samples_emitted + num_samples - expected_samples) as f64)
/ self.sample_rate,
);
std::thread::sleep(delay);
}
}
let sample_size = std::mem::size_of::<Complex32>();
let mut total_bytes_read = 0;
let byte_len = std::mem::size_of_val(*channel0);
let dest =
unsafe { std::slice::from_raw_parts_mut(channel0.as_mut_ptr() as *mut u8, byte_len) };
let mut has_reset = false;
while total_bytes_read < byte_len {
match self.reader.read(&mut dest[total_bytes_read..]) {
Ok(0) => {
// Reached EOF, reset to the beginning
self.reader.seek(SeekFrom::Start(0))?;
if has_reset {
// If we've already reset and still get 0, break to avoid infinite loop
break;
}
has_reset = true;
}
Ok(n) => {
total_bytes_read += n;
has_reset = false;
}
Err(e) => {
if e.kind() == std::io::ErrorKind::Interrupted {
continue;
} else {
return Err(Error::Io(e));
}
}
}
}
let read_samples = total_bytes_read / sample_size;
self.samples_emitted += read_samples;
Ok(read_samples)
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
#[test]
fn test_file_device() {
// Create a temporary file with some samples
let mut file = NamedTempFile::new().unwrap();
let samples: Vec<Complex32> = vec![
Complex32::new(1.0, 0.0),
Complex32::new(0.0, 1.0),
Complex32::new(-1.0, 0.0),
Complex32::new(0.0, -1.0),
];
let bytes = unsafe {
std::slice::from_raw_parts(
samples.as_ptr() as *const u8,
samples.len() * std::mem::size_of::<Complex32>(),
)
};
file.write_all(bytes).unwrap();
let path = file.path().to_path_buf();
let sample_rate = 1e6;
let device = FileDevice::new(path, sample_rate);
// Open the device and get a streamer
let mut rx = device.rx_streamer(&[0], Args::new()).unwrap();
// Activate the streamer
rx.activate_at(None).unwrap();
// Read one sample
let mut buffer = [Complex32::new(0.0, 0.0); 1];
{
let mut buffers = [&mut buffer[..]];
let num_read = rx.read(&mut buffers, 0).unwrap();
assert_eq!(num_read, 1);
}
let sample1 = buffer[0];
assert_eq!(sample1, Complex32::new(1.0, 0.0));
// Read the next sample
{
let mut buffers = [&mut buffer[..]];
let num_read = rx.read(&mut buffers, 0).unwrap();
assert_eq!(num_read, 1);
}
let sample2 = buffer[0];
assert_eq!(sample2, Complex32::new(0.0, 1.0));
rx.deactivate_at(None).unwrap();
}
#[test]
fn test_file_device_looping() {
// Create a temporary file with some samples
let mut file = NamedTempFile::new().unwrap();
let samples: Vec<Complex32> = vec![
Complex32::new(1.0, 0.0),
Complex32::new(0.0, 1.0),
Complex32::new(-1.0, 0.0),
Complex32::new(0.0, -1.0),
];
let bytes = unsafe {
std::slice::from_raw_parts(
samples.as_ptr() as *const u8,
samples.len() * std::mem::size_of::<Complex32>(),
)
};
file.write_all(bytes).unwrap();
let path = file.path().to_path_buf();
let sample_rate = 1e6;
let device = FileDevice::new(path, sample_rate);
// Open the device and get a streamer
let mut rx = device.rx_streamer(&[0], Args::new()).unwrap();
// Activate the streamer
rx.activate_at(None).unwrap();
// Read the entire file (4 samples)
let mut buffer = [Complex32::new(0.0, 0.0); 4];
{
let mut buffers = [&mut buffer[..]];
let num_read = rx.read(&mut buffers, 0).unwrap();
assert_eq!(num_read, 4);
}
assert_eq!(buffer[0], Complex32::new(1.0, 0.0));
assert_eq!(buffer[1], Complex32::new(0.0, 1.0));
assert_eq!(buffer[2], Complex32::new(-1.0, 0.0));
assert_eq!(buffer[3], Complex32::new(0.0, -1.0));
// Read one more sample, which should be the first sample again (looping)
let mut buffer2 = [Complex32::new(0.0, 0.0); 1];
{
let mut buffers = [&mut buffer2[..]];
let num_read = rx.read(&mut buffers, 0).unwrap();
assert_eq!(num_read, 1);
}
assert_eq!(buffer2[0], Complex32::new(1.0, 0.0));
rx.deactivate_at(None).unwrap();
}
#[test]
fn test_file_device_throttle() {
// Create a temporary file with enough samples for 0.1 second of playback at 1e6 sps
let num_samples = 100000;
let sample_rate = 1e6; // 1e6 samples per second
let samples = vec![Complex32::new(0.0, 0.0); num_samples];
let bytes = unsafe {
std::slice::from_raw_parts(
samples.as_ptr() as *const u8,
samples.len() * std::mem::size_of::<Complex32>(),
)
};
let mut file = NamedTempFile::new().unwrap();
file.write_all(bytes).unwrap();
let path = file.path().to_path_buf();
let device = FileDevice::new(path, sample_rate);
let mut rx = device.rx_streamer(&[0], Args::new()).unwrap();
// Activate the streamer
rx.activate_at(None).unwrap();
// Read all samples and measure time
let start = std::time::Instant::now();
let mut buffer = Vec::new();
buffer.resize(rx.mtu().unwrap_or(1024), Complex32::new(0.0, 0.0));
let mut total_samples = 0;
while total_samples < 200000 {
// Read 200000 samples (2 loops)
let mut buffers = [&mut buffer[..]];
let num_read = rx.read(&mut buffers, 0).unwrap();
total_samples += num_read;
}
let elapsed = start.elapsed();
// Verify reading took at least 0.2 second (for 200000 samples at 1e6 sps)
let min_expected = std::time::Duration::from_secs_f64(0.2);
let max_expected = min_expected.mul_f64(1.1);
assert!(
elapsed >= min_expected,
"Reading took {elapsed:?} but expected at least {min_expected:?}"
);
assert!(
elapsed < max_expected,
"Reading took {elapsed:?} but expected at least {max_expected:?}"
);
rx.deactivate_at(None).unwrap();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment