Skip to content

Commit

Permalink
Modify example to use optas
Browse files Browse the repository at this point in the history
  • Loading branch information
cmower committed Jul 12, 2023
1 parent 225b25a commit bcd4d4e
Show file tree
Hide file tree
Showing 2 changed files with 175 additions and 70 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,25 +8,85 @@

from .admittance_controller import AdmittanceController

np.set_printoptions(precision=3, suppress=True, linewidth=1000)


class EndEffectorWrenchEstimator:
def __init__(self, Nmax, smooth, end_effector_link, robot_model):
self.jac = robot_model.get_global_link_geometric_jacobian_function(
end_effector_link, numpy_output=True
)
self.data = []
self.Nmax = Nmax
self.smooth = smooth
self.offset = None
self.wrench_estimate = np.zeros(6)

def append(self, joint_position, external_torque):
ndata = len(self.data)
if ndata < self.Nmax:
# self.data.append(self.estimate_wrench(joint_position, external_torque))
self.data.append(external_torque)
elif ndata == self.Nmax:
self.offset = np.mean(self.data, axis=0)

def ready(self):
return self.offset is not None

def estimate_wrench(self, joint_position, external_torque):
J = self.jac(joint_position)
Jinv = np.linalg.pinv(J, rcond=0.05)
tau_ext = np.asarray(external_torque)
f_ext = Jinv.T @ tau_ext
return f_ext.tolist()

def estimate_offset_wrench(self, joint_position, external_torque):
# Estimate offset external torque
external_torque = np.array(external_torque) - self.offset

# Estimate wrench from raw joint state
wrench_estimate = np.array(
self.estimate_wrench(joint_position, external_torque)
)

# Remove offset
# wrench_estimate -= self.offset

# Smooth estimate
self.wrench_estimate = (
self.smooth * wrench_estimate + (1.0 - self.smooth) * self.wrench_estimate
)

return self.wrench_estimate.tolist()


class AdmittanceControlNode(Node):
def __init__(self, node_name="admittance_control_node") -> None:
super().__init__(node_name=node_name)

# parameters
self.declare_parameter("robot_description", "")
self.declare_parameter("base_link", "lbr_link_0")
self.declare_parameter("end_effector_link", "lbr_link_ee")

self.init_ = False
self.lbr_state_ = LBRState()
# Setup controller
end_effector_link = str(self.get_parameter("end_effector_link").value)
robot_description = str(self.get_parameter("robot_description").value)
self.controller = AdmittanceController(
self,
robot_description,
end_effector_link,
)

self.controller_ = AdmittanceController(
robot_description=str(self.get_parameter("robot_description").value),
base_link=str(self.get_parameter("base_link").value),
end_effector_link=str(self.get_parameter("end_effector_link").value),
# Setup wrench estimator
Nmax = 500
smooth = 0.1
self.wrench_estimator = EndEffectorWrenchEstimator(
Nmax, smooth, end_effector_link, self.controller.robot_model
)

# Setup joint position
self.joint_position = None

# publishers and subscribers
self.lbr_state_sub_ = self.create_subscription(
LBRState,
Expand All @@ -49,26 +109,41 @@ def __init__(self, node_name="admittance_control_node") -> None:
)

def on_lbr_state_(self, lbr_state: LBRState) -> None:
self.smooth_lbr_state_(lbr_state, 0.95)
# Extract joint state
q, tau_ext = lbr_state.measured_joint_position, lbr_state.external_torque
if self.joint_position is None:
self.joint_position = np.array(q)

lbr_command = self.controller_(self.lbr_state_)
self.lbr_command_pub_.publish(lbr_command)
# Update wrench estimator (return if not ready)
self.wrench_estimator.append(q, tau_ext)

def smooth_lbr_state_(self, lbr_state: LBRState, alpha: float):
if not self.init_:
self.lbr_state_ = lbr_state
self.init_ = True
if not self.wrench_estimator.ready():
return

self.lbr_state_.measured_joint_position = (
(1 - alpha) * np.array(self.lbr_state_.measured_joint_position.tolist())
+ alpha * np.array(lbr_state.measured_joint_position.tolist())
).data
# Retrieve wrench estimate
f_ext = np.array(self.wrench_estimator.estimate_offset_wrench(q, tau_ext))
self.get_logger().info("f_ext=" + repr(f_ext))

# Retrieve current joint state (overwrite q from lbr_state)
q = self.joint_position.copy()

# Retrieve sample time
dt = lbr_state.sample_time

# Compute goal joint position using admittance controller
qg = self.controller(q, f_ext, dt)

# Send command
lbr_command = LBRCommand()
lbr_command.joint_position = qg.tolist()
self.lbr_command_pub_.publish(lbr_command)

dq = (qg - q) / dt
self.get_logger().info("dq=" + repr(dq))
# self.get_logger().info("qg=" + repr(qg))

self.lbr_state_.external_torque = (
(1 - alpha) * np.array(self.lbr_state_.external_torque.tolist())
+ alpha * np.array(lbr_state.external_torque.tolist())
).data
# Update joint position
self.joint_position = qg.copy()


def main(args=None):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,63 +1,93 @@
import kinpy
import optas
import numpy as np

from lbr_fri_msgs.msg import LBRCommand, LBRState


class AdmittanceController(object):
def __init__(
self,
robot_description: str,
base_link: str = "lbr_link_0",
end_effector_link: str = "lbr_link_ee",
f_ext_th: np.ndarray = np.array([2.0, 2.0, 2.0, 0.5, 0.5, 0.5]),
dq_gain: np.ndarray = np.array([1.5, 1.5, 1.5, 1.5, 1.5, 1.5, 1.5]),
dx_gain: np.ndarray = np.array([1.0, 1.0, 1.0, 20.0, 40.0, 60.0]),
node,
robot_description,
end_effector_link,
) -> None:
self.lbr_command_ = LBRCommand()
# Setup class attributes
self.node = node

self.chain_ = kinpy.build_serial_chain_from_urdf(
data=robot_description,
root_link_name=base_link,
end_link_name=end_effector_link,
# Setup robot
self.robot_model = optas.RobotModel(
urdf_string=robot_description, time_derivs=[1]
)
self.name = self.robot_model.get_name()

self.dof_ = len(self.chain_.get_joint_parameter_names())
self.jacobian_ = np.zeros((6, self.dof_))
self.jacobian_inv_ = np.zeros((self.dof_, 6))
self.q = np.zeros(self.dof_)
self.dq_ = np.zeros(self.dof_)
self.tau_ext_ = np.zeros(6)
self.dq_gain_ = np.diag(dq_gain)
self.dx_gain_ = np.diag(dx_gain)
self.f_ext_ = np.zeros(6)
self.f_ext_th_ = f_ext_th
self.alpha_ = 0.99

def __call__(self, lbr_state: LBRState) -> LBRCommand:
self.q_ = np.array(lbr_state.measured_joint_position.tolist())
self.tau_ext_ = np.array(lbr_state.external_torque.tolist())

self.jacobian_ = self.chain_.jacobian(self.q_)

self.jacobian_inv_ = np.linalg.pinv(self.jacobian_, rcond=0.05)
self.f_ext_ = self.jacobian_inv_.T @ self.tau_ext_

self.f_ext_ = np.where(
abs(self.f_ext_) > self.f_ext_th_,
self.dx_gain_ @ np.sign(self.f_ext_) * (abs(self.f_ext_) - self.f_ext_th_),
0.0,
# Setup optimization builder
T = 1
builder = optas.OptimizationBuilder(
T, robots=self.robot_model, derivs_align=True
)

# additional smoothing required in python
self.dq_ = (
self.alpha_ * self.dq_
+ (1 - self.alpha_) * self.dq_gain_ @ self.jacobian_inv_ @ self.f_ext_
# Setup parameters
f_ext = builder.add_parameter("f_ext", 6)
qc = builder.add_parameter("qc", self.robot_model.ndof)
dt = builder.add_parameter("dt")

# Get model state
dq = builder.get_model_state(self.name, 0, time_deriv=1)
q = qc + dt * dq

# Compute jacobian at current state
Jc = self.robot_model.get_global_link_geometric_jacobian(end_effector_link, qc)

# Compute current eff position
pc = self.robot_model.get_global_link_position(end_effector_link, qc)

# Compute goal end-effector velocity
gain = optas.DM([0.01, 0.01, 0.01, 0.25, 0.25, 0.25])
dx_lim = optas.DM([0.04, 0.04, 0.04, 0.2, 0.2, 0.2])
dxg = optas.clip(gain * f_ext, -dx_lim, dx_lim)
self.goal_eff_velocity = optas.Function("dxg", [f_ext], [dxg])

# Cost: goal end-effector velocity
dx = Jc @ dq
builder.add_cost_term("goal_eff_velocity", optas.sumsqr(dx - dxg))

# Compute next eff position
p = pc + dt * dx[:3]

# Constraint: y limit
py = p[1]
builder.add_bound_inequality_constraint("y_bound", -0.3, py, 0.3)

# Cost: minimize joint velocity
w = 0.01
builder.add_cost_term("min_dq", w * optas.sumsqr(dq))

# Constraint: joint position limits
builder.add_bound_inequality_constraint(
"joint_position_limits",
self.robot_model.lower_actuated_joint_limits,
q,
self.robot_model.upper_actuated_joint_limits,
)

self.lbr_command_.joint_position = (
np.array(lbr_state.measured_joint_position.tolist())
+ lbr_state.sample_time * self.dq_
).data
# Setup solver
self.solver = optas.CasADiSolver(builder.build()).setup("qpoases")
self.solution = None

def __call__(self, q, f_ext, dt):
# Compute/report goal end-effector velocity
# dxg = self.goal_eff_velocity(f_ext).toarray().flatten()
# self.node.get_logger().info("dxg=" + repr(dxg))

# Reset optimization problem
if self.solution is not None:
self.solver.reset_initial_seed(self.solution)

self.solver.reset_parameters({"f_ext": f_ext, "dt": dt, "qc": q})

# Solve problem
self.solution = self.solver.solve()

# Compute goal state
dq = self.solution[f"{self.name}/dq"].toarray().flatten()
qg = q + dt * dq

return self.lbr_command_
return qg

0 comments on commit bcd4d4e

Please sign in to comment.