Created
November 29, 2025 15:37
-
-
Save sjchoi86/2e46015e1faf5fcac1719bbd0f22615b to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| class SitePositionIKSolver(object): | |
| """ | |
| Inverse Kinematics (IK) solver for site position targets using Damped Least Squares (DLS) method. | |
| This class provides methods to set up IK targets, configure solver parameters, and compute joint updates to achieve desired site positions. | |
| Usage: | |
| ------ | |
| # Create IK solver (use all rev joints by default) | |
| ik_solver = SitePositionIKSolver( | |
| ik_env = ik_env, | |
| max_ik_tick = 10, | |
| ik_stepsize = 1.0, | |
| ik_update_th = np.deg2rad(10), | |
| dls_damping = 1e-8, | |
| max_probe = np.deg2rad(3), | |
| k_null = 0.1, | |
| q_home_rev = env.get_qpos_joints(env.rev_joint_names), | |
| ) | |
| ... | |
| # Main loop | |
| while env.is_viewer_alive(): | |
| ... | |
| ik_solver.reset_buffers() | |
| ik_solver.add_ik_target(site_name1, p_site1_trgt) | |
| ik_solver.add_ik_target(site_name2, p_site2_trgt) | |
| ik_solver.set_ik_config(max_ik_tick=20,ik_stepsize=0.2) | |
| q_rev_best, info = ik_solver.compute_dq( | |
| env = env, | |
| joints_use = joints_use, | |
| joint_limit_handle_flag = True, | |
| nullspace_control_flag = True, | |
| ) | |
| ik_err_best = info['ik_err_best'] | |
| # Update env | |
| nv.forward(q=q_rev_best,joint_names=env.rev_joint_names) | |
| ... | |
| # Render | |
| ... | |
| ik_solver.render(env,r=0.01,alpha=0.5) | |
| ... | |
| env.render() | |
| """ | |
| def __init__( | |
| self, | |
| name = "Site Position IK Solver", | |
| ik_env = None, | |
| joints_use = None, | |
| max_ik_tick = 100, | |
| ik_stepsize = 0.1, | |
| ik_update_th = np.deg2rad(10.0), | |
| dls_damping = 1e-8, | |
| max_probe = np.deg2rad(3.0), | |
| k_null = 0.01, | |
| q_home_rev = None, | |
| ): | |
| """ | |
| Initialize the SitePositionIKSolver with the given parameters. | |
| Parameters: | |
| ---------- | |
| name (str): Name of the IK solver. | |
| ik_env: The environment object that provides methods for IK computations. | |
| joints_use (list, optional): List of joint names to be used for the IK computation. | |
| max_ik_tick (int): Maximum number of IK iterations. | |
| ik_stepsize (float): Stepsize scaling factor for IK updates. | |
| ik_update_th (float): Threshold for IK updates. | |
| dls_damping (float): Damping term for the least-squares computation. | |
| max_probe (float): Maximum probing angle for IK. | |
| k_null (float): Gain for nullspace posture control. | |
| q_home_rev (np.array, optional): Home configuration for reversible joints. | |
| """ | |
| self.name = name | |
| self.ik_env = ik_env | |
| if joints_use is None: | |
| self.joints_use = ik_env.rev_joint_names | |
| else: | |
| self.joints_use = joints_use | |
| # Basic IK information | |
| self.idxs_jac_rev = self.ik_env.get_idxs_jac(self.ik_env.rev_joint_names) | |
| self.idxs_rev_active = get_idxs(self.ik_env.rev_joint_names,self.ik_env.active_joint_names) | |
| # Set IK configiurations | |
| self.set_ik_config( | |
| max_ik_tick = max_ik_tick, | |
| ik_stepsize = ik_stepsize, | |
| ik_update_th = ik_update_th, | |
| dls_damping = dls_damping, | |
| max_probe = max_probe, | |
| k_null = k_null, | |
| q_home_rev = q_home_rev, | |
| ) | |
| # Reset buffers for IK info | |
| self.reset_buffers() | |
| # Timer | |
| self.tt = TicTocClass(name='%s'%(self.name)) | |
| def set_ik_config( | |
| self, | |
| max_ik_tick = None, | |
| ik_stepsize = None, | |
| ik_update_th = None, | |
| dls_damping = None, | |
| max_probe = None, | |
| k_null = None, | |
| q_home_rev = None, | |
| ): | |
| """ | |
| Set the IK solver configuration parameters. | |
| Parameters: | |
| ---------- | |
| max_ik_tick (int): Maximum number of IK iterations. | |
| ik_stepsize (float): Stepsize scaling factor for IK updates. | |
| ik_update_th (float): Threshold for IK updates. | |
| dls_damping (float): Damping term for the least-squares computation. | |
| max_probe (float): Maximum probing angle for IK. | |
| k_null (float): Gain for nullspace posture control. | |
| q_home_rev (np.array, optional): Home configuration for reversible joints. | |
| """ | |
| if max_ik_tick is not None: | |
| self.max_ik_tick = max_ik_tick | |
| if ik_stepsize is not None: | |
| self.ik_stepsize = ik_stepsize | |
| if ik_update_th is not None: | |
| self.ik_update_th = ik_update_th | |
| if dls_damping is not None: | |
| self.dls_damping = dls_damping | |
| if max_probe is not None: | |
| self.max_probe = max_probe | |
| if k_null is not None: | |
| self.k_null = k_null | |
| if q_home_rev is not None: | |
| self.q_home_rev = q_home_rev | |
| def reset_buffers(self): | |
| """ | |
| Reset the IK solver by clearing all stored IK target information. | |
| """ | |
| self.site_names = [] | |
| self.n_site = len(self.site_names) | |
| self.p_curr_list = [] | |
| self.p_trgt_list = [] | |
| self.p_err_list = [] | |
| self.J_p_list = [] | |
| def add_ik_target( | |
| self, | |
| site_name = None, | |
| p_trgt = None, | |
| ): | |
| """ | |
| Add inverse kinematics (IK) target information to the IK solver. | |
| Parameters: | |
| ---------- | |
| site_name (str, optional): Name of the site for which the IK target is defined. | |
| p_trgt (np.array, optional): Target position for IK. Expected shape: (3,). | |
| Updates: | |
| ------- | |
| p_trgt_list: List of target positions for IK. | |
| p_curr_list: List of current positions for IK. | |
| p_err_list: List of position errors for IK. | |
| J_p_list: List of position Jacobians for IK. | |
| """ | |
| # Append site name | |
| self.site_names.append(site_name) | |
| self.n_site = len(self.site_names) | |
| # Append IK target | |
| p_trgt = np.asarray(p_trgt).reshape(3) # (3,) | |
| self.p_trgt_list.append(p_trgt) | |
| # Append current position and error | |
| p_curr = self.ik_env.get_p_site(site_name=site_name) # (3,) | |
| self.p_curr_list.append(p_curr) | |
| self.p_err_list.append(p_trgt-p_curr) | |
| # Append Jacobian | |
| J_p = self.ik_env.get_J_site(site_name,return_type='pos') # (3 x nv) | |
| self.J_p_list.append(J_p) | |
| def update_ik_state(self): | |
| """ | |
| Update the current state of the IK solver by recalculating positions, errors, and Jacobians | |
| for all targets. | |
| Updates: | |
| ------- | |
| p_curr_list: Updated list of current positions for IK. | |
| p_err_list: Updated list of position errors for IK. | |
| J_p_list: Updated list of position Jacobians for IK. | |
| """ | |
| for site_idx,site_name in enumerate(self.site_names): | |
| # Update current position and error | |
| p_curr = self.ik_env.get_p_site(site_name=site_name) # (3,) | |
| self.p_curr_list[site_idx] = p_curr | |
| p_trgt = self.p_trgt_list[site_idx] | |
| self.p_err_list[site_idx] = p_trgt - p_curr | |
| # Update Jacobian | |
| J_p = self.ik_env.get_J_site(site_name,return_type='pos') # (3 x nv) | |
| self.J_p_list[site_idx] = J_p | |
| def compute_dq( | |
| self, | |
| env = None, # only for syncing 'ik_env' with 'env' | |
| joints_use = None, | |
| joint_limit_handle_flag = True, | |
| nullspace_control_flag = True, | |
| ): | |
| """ | |
| Compute the change in joint configuration (delta q) to minimize the IK error. | |
| Summarized steps: | |
| 1) Stack Jacobians and errors from all targets. | |
| 2) Handle joint coupling and compute initial Damped Least Squares (DLS) solution. | |
| 3) Handle joint limit violations and recompute DLS if necessary. | |
| 4) (Optional) Apply nullspace posture control toward home configuration. | |
| Parameters: | |
| ---------- | |
| env: The environment object to optionally sync with the IK environment. | |
| joints_use (list, optional): List of joint names to be used for the IK computation. | |
| joint_limit_handle_flag (bool): If True, handle joint limit violations during IK. | |
| nullspace_control_flag (bool): If True, apply nullspace posture control toward home configuration. | |
| """ | |
| # Start timer | |
| self.tt.tic() | |
| # Reset IK history buffers | |
| q_rev_list,ik_err_array = [],np.zeros(self.max_ik_tick+1) | |
| # (Optional) sync ik_env with env | |
| if env is not None: | |
| self.ik_env.forward(env.get_qpos()) | |
| # (Optional) update joints to use | |
| if joints_use is not None: | |
| self.joints_use = joints_use | |
| self.idxs_rev_use = get_idxs(self.ik_env.rev_joint_names, self.joints_use) | |
| # Update IK state | |
| self.update_ik_state() | |
| # Set the first buffer values | |
| q_rev_list.append(self.ik_env.get_qpos_joints(self.ik_env.rev_joint_names)) | |
| ik_err_array[0] = np.linalg.norm(np.hstack(self.p_err_list)) | |
| for ik_tick in range(self.max_ik_tick): | |
| # Stack Jacobians and errors to arrays | |
| J_p_array = np.vstack(self.J_p_list) | |
| p_curr_array = np.hstack(self.p_curr_list) | |
| p_trgt_array = np.hstack(self.p_trgt_list) | |
| p_err_array = np.hstack(self.p_err_list) | |
| # Extract revolute joint Jacobian | |
| J_rev = J_p_array[:,self.idxs_jac_rev] | |
| """ | |
| 1) Joint coupling handling and initial Damped Least Squares (DLS) | |
| For each joint coupling equation (eq_data), modify the Jacobian columns | |
| corresponding to the active and passive joints. The passive joint's | |
| column is set to zero after its effect is added to the active joint's column. | |
| Then, mask out the unused joints and compute the DLS solution for dq. | |
| Input: | |
| - J_rev: Revolute joint Jacobian (m x n_rev) | |
| Output: | |
| - J_rev_coupled_masked: Coupled and masked revolute joint Jacobian (m x n_rev) | |
| - dq_rev: Revolute joint position change (n_rev,) | |
| """ | |
| # Apply joint coupling to Jacobian | |
| J_rev_coupled = J_rev.copy() | |
| for eq_idx in range(self.ik_env.model.eq_data.shape[0]): | |
| passive_joint = self.ik_env.model.joint(self.ik_env.model.eq_obj1id[eq_idx]).name | |
| active_joint = self.ik_env.model.joint(self.ik_env.model.eq_obj2id[eq_idx]).name | |
| coef = self.ik_env.model.eq_data[eq_idx, :5] | |
| x0 = self.ik_env.model.joint(active_joint).qpos0[0] | |
| x = self.ik_env.data.joint(active_joint).qpos[0] | |
| idx_passive = self.ik_env.rev_joint_names.index(passive_joint) | |
| idx_active = self.ik_env.rev_joint_names.index(active_joint) | |
| dfdx = self.ik_env.evaluate_poly_derivative(x, coef, x0) | |
| # Modify Jacobian columns: J_active += J_passive * dfdx; J_passive = 0 | |
| J_rev_coupled[:, idx_active] += J_rev_coupled[:, idx_passive] * dfdx | |
| J_rev_coupled[:, idx_passive] = 0.0 | |
| # Mask out not-used joints | |
| J_rev_coupled_masked = np.zeros_like(J_rev_coupled) | |
| J_rev_coupled_masked[:, self.idxs_rev_use] = J_rev_coupled[:, self.idxs_rev_use] | |
| # Damped least squares: dq = J^T (J J^T + λ^2 I)^-1 e using masked Jacobian | |
| m, n_rev = J_rev_coupled_masked.shape | |
| JJt = J_rev_coupled_masked@J_rev_coupled_masked.T | |
| J_pinv = J_rev_coupled_masked.T@np.linalg.inv(JJt+self.dls_damping*np.eye(m)) | |
| dq_rev = J_pinv @ p_err_array | |
| dq_rev = self.ik_stepsize * dq_rev | |
| """ | |
| 2) Joint limit handling | |
| If a joint is predicted to exceed its limits in the next probe step, | |
| its corresponding Jacobian column is zeroed out and the IK step is | |
| recomputed without that joint. | |
| Input: | |
| - dq_rev: Initial revolute joint position change (n_rev,) | |
| Output: | |
| - dq_rev: Updated revolute joint position change (n_rev,) | |
| - J_mask_final: Final Jacobian used after joint limit handling (m x n_rev | |
| """ | |
| # Prepare current q and default violated idxs | |
| q_rev_curr = self.ik_env.get_qpos_joints(self.ik_env.rev_joint_names) | |
| violated_idxs = np.array([], dtype=int) | |
| J_mask_final = J_rev_coupled_masked.copy() | |
| # Joint limit violation handling | |
| if joint_limit_handle_flag: | |
| violated_idxs = [] | |
| q_use = q_rev_curr[self.idxs_rev_use] | |
| dq_use = dq_rev[self.idxs_rev_use] | |
| q_use_next_probe = q_use + self.max_probe*np.sign(dq_use) | |
| q_mins_use = self.ik_env.rev_joint_mins[self.idxs_rev_use] | |
| q_maxs_use = self.ik_env.rev_joint_maxs[self.idxs_rev_use] | |
| hit_max = (q_use_next_probe > q_maxs_use) & (dq_use > 0) | |
| hit_min = (q_use_next_probe < q_mins_use) & (dq_use < 0) | |
| violated_idxs = np.where(hit_max | hit_min)[0] | |
| if len(violated_idxs) > 0: # recompute dq without violated joints | |
| J_rev_coupled_masked2 = J_rev_coupled_masked.copy() | |
| for v_local in violated_idxs: | |
| v_global = self.idxs_rev_use[v_local] | |
| J_rev_coupled_masked2[:, v_global] = 0.0 | |
| JJt2 = J_rev_coupled_masked2 @ J_rev_coupled_masked2.T | |
| J_pinv2 = J_rev_coupled_masked2.T @ np.linalg.inv(JJt2 + self.dls_damping*np.eye(m)) | |
| dq_rev = J_pinv2 @ p_err_array | |
| dq_rev = self.ik_stepsize * dq_rev | |
| J_mask_final = J_rev_coupled_masked2 | |
| """ | |
| 3) Nullspace control | |
| Apply nullspace control to the valid joints (not violated in joint limit handling) | |
| to move towards home position while ensuring no leakage in end-effector space. | |
| Input: | |
| - dq_rev: Revolute joint position change after joint limit handling (n_rev,) | |
| - J_mask_final: Final Jacobian used after joint limit handling (m x n_rev) | |
| Output: | |
| - dq_rev: Updated revolute joint position change after nullspace control (n_rev,) | |
| """ | |
| # Nullspace control | |
| if nullspace_control_flag and self.q_home_rev is not None: | |
| k_use = len(self.idxs_rev_use) | |
| valid_mask = np.ones(k_use, dtype=bool) | |
| if joint_limit_handle_flag and len(violated_idxs) > 0: | |
| valid_mask[violated_idxs] = False | |
| valid_local = np.where(valid_mask)[0] | |
| if valid_local.size > 0: | |
| idxs_rev_use_valid = [self.idxs_rev_use[i] for i in valid_local] | |
| J_use = J_mask_final[:, idxs_rev_use_valid] # (m x k_valid) | |
| m_use,k_valid = J_use.shape | |
| JJt_use = J_use @ J_use.T | |
| J_use_pinv = J_use.T @ np.linalg.inv(JJt_use + self.dls_damping*np.eye(m_use)) | |
| N_use = np.eye(k_valid) - (J_use_pinv @ J_use) | |
| q_err_use = self.q_home_rev[idxs_rev_use_valid] - q_rev_curr[idxs_rev_use_valid] | |
| dq_use_null = self.k_null * (N_use @ q_err_use) | |
| dq_rev[idxs_rev_use_valid] += dq_use_null | |
| # Leakage correction | |
| leak = J_use @ dq_rev[idxs_rev_use_valid] - p_err_array | |
| if np.linalg.norm(leak) > 1e-9: | |
| dq_corr_use = -(J_use_pinv@leak) | |
| dq_rev[idxs_rev_use_valid] += dq_corr_use | |
| """ | |
| 4) Integratation and forward kinematics update | |
| Integrate only the active joints, fill in passive joints using joint | |
| coupling equations, and clip to joint limits. | |
| Input: | |
| - dq_rev: Revolute joint position change after nullspace control (n_rev,) | |
| Output: | |
| - q_rev_next: Updated revolute joint positions after integration (n_rev,) | |
| """ | |
| # Integrate revolute joint positions | |
| dq_active = dq_rev[self.idxs_rev_active] | |
| dq_active = trim_scale(x=dq_active,th=self.ik_update_th) | |
| q_active = self.ik_env.get_qpos_joints(self.ik_env.active_joint_names) + dq_active # integrate | |
| q_rev_next = q_rev_curr.copy() | |
| q_rev_next[self.idxs_rev_active] = q_active | |
| # Fill in passive joint from joint coupling information | |
| for eq_idx in range(self.ik_env.model.eq_data.shape[0]): | |
| passive_joint = self.ik_env.model.joint(self.ik_env.model.eq_obj1id[eq_idx]).name | |
| active_joint = self.ik_env.model.joint(self.ik_env.model.eq_obj2id[eq_idx]).name | |
| coef = self.ik_env.model.eq_data[eq_idx,:5] | |
| x0 = self.ik_env.model.joint(active_joint).qpos0[0] | |
| x = q_rev_next[self.ik_env.rev_joint_names.index(active_joint)] | |
| y = self.ik_env.evaluate_poly(x=x, coef=coef, x0=x0) | |
| q_rev_next[self.ik_env.rev_joint_names.index(passive_joint)] = y | |
| """ | |
| 5) Clip joint range and forward update | |
| Clip joint range after integration and update ik_env forward kinematics. | |
| Also compute and store the IK error after the update. | |
| Input: | |
| - q_rev_next: Updated revolute joint positions after integration (n_rev,) | |
| Output: | |
| - ik_env: Updated ik_env after forward kinematics update | |
| - q_rev_list: List of revolute joint positions at each tick | |
| - ik_err_array: Updated IK error array with error after this tick | |
| """ | |
| # Clip joint range after integration | |
| q_rev_next = np.clip(q_rev_next, self.ik_env.rev_joint_mins, self.ik_env.rev_joint_maxs) | |
| # Update ik_env | |
| self.ik_env.forward(q=q_rev_next,joint_names=self.ik_env.rev_joint_names) | |
| # Compute IK error after update and append error | |
| self.update_ik_state() | |
| q_rev_list.append(self.ik_env.get_qpos_joints(self.ik_env.rev_joint_names)) | |
| ik_err = np.linalg.norm(np.hstack(self.p_err_list)) | |
| # ik_err = np.max(np.linalg.norm(np.vstack(self.p_err_list),axis=1)) | |
| ik_err_array[ik_tick+1] = ik_err | |
| """ | |
| IK loop ends here, and select the best solution | |
| -------------- | |
| Input: | |
| - q_rev_list: List of revolute joint positions at each tick | |
| - ik_err_array: IK error array with error at each tick | |
| Output: | |
| - q_rev_best: Best revolute joint positions with the lowest IK error (n_rev,) | |
| - ik_err_best: Lowest IK error achieved (scalar) | |
| """ | |
| # Select the best one | |
| idx_best = np.argmin(ik_err_array) | |
| q_rev_best = q_rev_list[idx_best] | |
| ik_err_best = ik_err_array[idx_best] | |
| # End timer (for whole compute_dq) | |
| elapsed_time = self.tt.toc() # elapsed time in seconds | |
| # Return | |
| info = { | |
| 'ik_err_array': ik_err_array, | |
| 'idx_best': idx_best, | |
| 'ik_err_best': ik_err_best, | |
| 'elapsed_time': elapsed_time, | |
| } | |
| return q_rev_best, info | |
| def render(self, env, r=0.01, alpha=1.0, cmap_name='rainbow'): | |
| """ | |
| Render the current and target positions of the IK targets in the environment. | |
| """ | |
| # Plot IK current and target positions | |
| n_trgt = len(self.site_names) | |
| colors = get_colors(cmap_name=cmap_name,n_color=n_trgt,alpha=alpha) | |
| for ik_idx,site_name in enumerate(self.site_names): | |
| p_trgt = self.p_trgt_list[ik_idx] | |
| # p_curr = self.p_curr_list[ik_idx] | |
| p_curr = env.get_p_site(site_name=site_name) | |
| rgba = colors[ik_idx] | |
| # Plot current position | |
| env.plot_sphere(p_curr, r, rgba) | |
| # Plot target position | |
| env.plot_sphere(p_trgt, r, rgba) | |
| # Plot line between current and target | |
| env.plot_line_fr2to(p_fr=p_curr,p_to=p_trgt,rgba=rgba) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment