Skip to content

Instantly share code, notes, and snippets.

@vyeevani
Created October 10, 2024 02:04
Show Gist options
  • Select an option

  • Save vyeevani/6a3d3d695d30ff41b3dbe0268abd7304 to your computer and use it in GitHub Desktop.

Select an option

Save vyeevani/6a3d3d695d30ff41b3dbe0268abd7304 to your computer and use it in GitHub Desktop.
basic code to drive a Feetech sts3125 from rust
use serialport::{self, SerialPort};
use std::time::Duration;
use std::io::{self, Write};
const PING: u8 = 0x01;
const READ: u8 = 0x02;
const WRITE: u8 = 0x03;
const REGWRITE: u8 = 0x04;
const ACTION: u8 = 0x05;
const SYNCWRITE: u8 = 0x83;
const RESET: u8 = 0x06;
enum STSRegisters {
FirmwareMajor = 0x00,
FirmwareMinor = 0x01,
ServoMajor = 0x03,
ServoMinor = 0x04,
Id = 0x05,
BaudRate = 0x06,
ResponseDelay = 0x07,
ResponseStatusLevel = 0x08,
MinimumAngle = 0x09,
MaximumAngle = 0x0B,
MaximumTemperature = 0x0D,
MaximumVoltage = 0x0E,
MinimumVoltage = 0x0F,
MaximumTorque = 0x10,
UnloadingCondition = 0x13,
LedAlarmCondition = 0x14,
PosProportionalGain = 0x15,
PosDerivativeGain = 0x16,
PosIntegralGain = 0x17,
MinimumStartupForce = 0x18,
CkInsensitiveArea = 0x1A,
CckInsensitiveArea = 0x1B,
CurrentProtectionTh = 0x1C,
AngularResolution = 0x1E,
PositionCorrection = 0x1F,
OperationMode = 0x21,
TorqueProtectionTh = 0x22,
TorqueProtectionTime = 0x23,
OverloadTorque = 0x24,
SpeedProportionalGain = 0x25,
OvercurrentTime = 0x26,
SpeedIntegralGain = 0x27,
TorqueSwitch = 0x28,
TargetAcceleration = 0x29,
TargetPosition = 0x2A,
RunningTime = 0x2C,
RunningSpeed = 0x2E,
TorqueLimit = 0x30,
WriteLock = 0x37,
CurrentPosition = 0x38,
CurrentSpeed = 0x3A,
CurrentDriveVoltage = 0x3C,
CurrentVoltage = 0x3E,
CurrentTemperature = 0x3F,
AsynchronousWriteSt = 0x40,
Status = 0x41,
MovingStatus = 0x42,
CurrentCurrent = 0x45,
}
enum STSMode {
Position = 0,
Velocity = 1,
Step = 3,
}
struct STSServoDriver {
port: Box<dyn SerialPort>,
servo_type: [ServoType; 256],
}
#[derive(Copy, Clone)]
enum ServoType {
Unknown,
Scs,
Sts,
}
impl STSServoDriver {
fn new(port: Box<dyn SerialPort>) -> Self {
STSServoDriver {
port,
servo_type: [ServoType::Unknown; 256],
}
}
fn ping(&mut self, servo_id: u8) {
let mut response = [0xFF];
let send = self.send_message(servo_id, PING, 0, &mut response);
if send != 6 {
panic!("Failed to send message");
}
let rd = self.receive_message(servo_id, 1, &mut response);
if rd < 0 {
panic!("Failed to receive message");
}
if response[0] != 0x00 {
panic!("Invalid response format");
}
}
fn send_message(&mut self, servo_id: u8, command_id: u8, param_length: u8, parameters: &mut [u8]) -> usize {
let mut message = vec![0xFF, 0xFF, servo_id, param_length + 2, command_id];
let mut checksum = servo_id + param_length + 2 + command_id;
for i in 0..param_length as usize {
message.push(parameters[i]);
checksum += parameters[i];
}
message.push(!checksum);
self.port.write(&message).unwrap_or_else(|_| panic!("Failed to send message"))
}
fn receive_message(&mut self, servo_id: u8, read_length: u8, output_buffer: &mut [u8]) -> i32 {
let total_length = read_length as usize + 5;
let mut result = vec![0; total_length];
self.port.read_exact(&mut result).unwrap_or_else(|_| panic!("Failed to receive message"));
if result[0] != 0xFF || result[1] != 0xFF || result[2] != servo_id || result[3] != read_length + 1 {
panic!(
"Invalid response format: expected header [0xFF, 0xFF, {}, {}], got [{:X}, {:X}, {:X}, {:X}]",
servo_id,
read_length + 1,
result[0],
result[1],
result[2],
result[3]
);
}
let mut checksum: u8 = 0;
for i in 2..(read_length as usize + 4) {
checksum = checksum.wrapping_add(result[i]);
}
checksum = !checksum;
if result[read_length as usize + 4] != checksum {
panic!("Checksum mismatch");
}
output_buffer.copy_from_slice(&result[4..(4 + read_length as usize)]);
0
}
fn enable_torque(&mut self, servo_id: u8) {
self.write_register(servo_id, STSRegisters::TorqueSwitch as u8, 1);
let mut response = [0; 1];
if self.receive_message(servo_id, 1, &mut response) != 0 {
panic!("Failed to receive message");
}
}
fn disable_torque(&mut self, servo_id: u8) {
self.write_register(servo_id, STSRegisters::TorqueSwitch as u8, 0);
}
fn get_position(&mut self, servo_id: u8) -> i16 {
self.port.flush().unwrap_or_else(|_| panic!("Failed to flush the serial port"));
self.read_two_bytes_register(servo_id, STSRegisters::CurrentPosition as u8)
}
fn set_position(&mut self, servo_id: u8, position: i16) {
let mut params = [0, 0];
self.convert_int_to_bytes(servo_id, position, &mut params);
self.write_two_bytes_register(servo_id, STSRegisters::TargetPosition as u8, params);
let mut response = [0; 1];
if self.receive_message(servo_id, 1, &mut response) != 0 {
panic!("Failed to receive message");
}
}
// Helper functions
fn write_register(&mut self, servo_id: u8, register_id: u8, value: u8) {
let mut params = [register_id, value];
if self.send_message(servo_id, WRITE, 2, &mut params) != 8 {
panic!("Failed to send message");
}
}
fn write_two_bytes_register(&mut self, servo_id: u8, register_id: u8, value: [u8; 2]) {
let mut params = [register_id, value[0], value[1]];
if self.send_message(servo_id, WRITE, 3, &mut params) != 9 {
panic!("Failed to send message");
}
}
fn read_two_bytes_register(&mut self, servo_id: u8, register_id: u8) -> i16 {
let mut response = [0, 0, 0];
if self.send_message(servo_id, READ, 2, &mut [register_id, 2]) != 8 {
panic!("Failed to send message");
}
// not sure why we need to add one here tbh
if self.receive_message(servo_id, 2 + 1, &mut response) != 0 {
panic!("Failed to receive message");
}
i16::from_le_bytes([response[1], response[2]])
}
fn convert_int_to_bytes(&self, servo_id: u8, value: i16, result: &mut [u8; 2]) {
let servo_value = if value < 0 {
0x8000 | value.abs() as u16
} else {
value as u16
};
result[0] = (servo_value & 0xFF) as u8;
result[1] = ((servo_value >> 8) & 0xFF) as u8;
}
}
#[cfg(test)]
mod tests {
use super::*;
use serialport::SerialPort;
use std::time::Duration;
#[test]
fn test_ping() {
let port = serialport::new("/dev/tty.usbserial-110", 1000000)
.timeout(Duration::from_secs(10))
.data_bits(serialport::DataBits::Eight)
.open()
.expect("Failed to open port");
let mut driver = STSServoDriver::new(port);
println!("Servo driver created");
driver.ping(1);
println!("successful ping");
}
#[test]
fn test_enable_torque_and_read_position() {
let port = serialport::new("/dev/tty.usbserial-110", 1000000)
.timeout(Duration::from_secs(10))
.data_bits(serialport::DataBits::Eight)
.open()
.expect("Failed to open port");
let mut driver = STSServoDriver::new(port);
println!("Servo driver created");
// Enable torque on motor id 1
driver.enable_torque(1);
// Read position of motor id 1
let position = driver.get_position(1);
println!("Current position: {}", position);
driver.set_position(1, 3096);
std::thread::sleep(Duration::from_secs(1));
let position = driver.get_position(1);
println!("Current position: {}", position);
driver.set_position(1, 4096);
std::thread::sleep(Duration::from_secs(1));
let position = driver.get_position(1);
println!("Current position: {}", position);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment