-
Notifications
You must be signed in to change notification settings - Fork 24
/
Copy pathneedle_pick.py
139 lines (121 loc) · 6.07 KB
/
needle_pick.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import os
import time
import numpy as np
import pybullet as p
from surrol.tasks.psm_env import PsmEnv
from surrol.utils.pybullet_utils import (
get_link_pose,
wrap_angle
)
from surrol.const import ASSET_DIR_PATH
class NeedlePick(PsmEnv):
POSE_TRAY = ((0.55, 0, 0.6751), (0, 0, 0))
WORKSPACE_LIMITS = ((0.50, 0.60), (-0.05, 0.05), (0.685, 0.745)) # reduce tip pad contact
SCALING = 5.
# TODO: grasp is sometimes not stable; check how to fix it
def _env_setup(self):
super(NeedlePick, self)._env_setup()
# np.random.seed(4) # for experiment reproduce
self.has_object = True
self._waypoint_goal = True
# robot
workspace_limits = self.workspace_limits1
pos = (workspace_limits[0][0],
workspace_limits[1][1],
(workspace_limits[2][1] + workspace_limits[2][0]) / 2)
orn = (0.5, 0.5, -0.5, -0.5)
joint_positions = self.psm1.inverse_kinematics((pos, orn), self.psm1.EEF_LINK_INDEX)
self.psm1.reset_joint(joint_positions)
self.block_gripper = False
# physical interaction
self._contact_approx = False
# tray pad
obj_id = p.loadURDF(os.path.join(ASSET_DIR_PATH, 'tray/tray_pad.urdf'),
np.array(self.POSE_TRAY[0]) * self.SCALING,
p.getQuaternionFromEuler(self.POSE_TRAY[1]),
globalScaling=self.SCALING)
self.obj_ids['fixed'].append(obj_id) # 1
# needle
yaw = (np.random.rand() - 0.5) * np.pi
obj_id = p.loadURDF(os.path.join(ASSET_DIR_PATH, 'needle/needle_40mm.urdf'),
(workspace_limits[0].mean() + (np.random.rand() - 0.5) * 0.1, # TODO: scaling
workspace_limits[1].mean() + (np.random.rand() - 0.5) * 0.1,
workspace_limits[2][0] + 0.01),
p.getQuaternionFromEuler((0, 0, yaw)),
useFixedBase=False,
globalScaling=self.SCALING)
p.changeVisualShape(obj_id, -1, specularColor=(80, 80, 80))
self.obj_ids['rigid'].append(obj_id) # 0
self.obj_id, self.obj_link1 = self.obj_ids['rigid'][0], 1
def _sample_goal(self) -> np.ndarray:
""" Samples a new goal and returns it.
"""
workspace_limits = self.workspace_limits1
goal = np.array([workspace_limits[0].mean() + 0.01 * np.random.randn() * self.SCALING,
workspace_limits[1].mean() + 0.01 * np.random.randn() * self.SCALING,
workspace_limits[2][1] - 0.04 * self.SCALING])
return goal.copy()
def _sample_goal_callback(self):
""" Define waypoints
"""
super()._sample_goal_callback()
self._waypoints = [None, None, None, None] # four waypoints
pos_obj, orn_obj = get_link_pose(self.obj_id, self.obj_link1)
self._waypoint_z_init = pos_obj[2]
orn = p.getEulerFromQuaternion(orn_obj)
orn_eef = get_link_pose(self.psm1.body, self.psm1.EEF_LINK_INDEX)[1]
orn_eef = p.getEulerFromQuaternion(orn_eef)
yaw = orn[2] if abs(wrap_angle(orn[2] - orn_eef[2])) < abs(wrap_angle(orn[2] + np.pi - orn_eef[2])) \
else wrap_angle(orn[2] + np.pi) # minimize the delta yaw
# # for physical deployment only
# print(" -> Needle pose: {}, {}".format(np.round(pos_obj, 4), np.round(orn_obj, 4)))
# qs = self.psm1.get_current_joint_position()
# joint_positions = self.psm1.inverse_kinematics(
# (np.array(pos_obj) + np.array([0, 0, (-0.0007 + 0.0102)]) * self.SCALING,
# p.getQuaternionFromEuler([-90 / 180 * np.pi, -0 / 180 * np.pi, yaw])),
# self.psm1.EEF_LINK_INDEX)
# self.psm1.reset_joint(joint_positions)
# print("qs: {}".format(joint_positions))
# print("Cartesian: {}".format(self.psm1.get_current_position()))
# self.psm1.reset_joint(qs)
self._waypoints[0] = np.array([pos_obj[0], pos_obj[1],
pos_obj[2] + (-0.0007 + 0.0102 + 0.005) * self.SCALING, yaw, 0.5]) # approach
self._waypoints[1] = np.array([pos_obj[0], pos_obj[1],
pos_obj[2] + (-0.0007 + 0.0102) * self.SCALING, yaw, 0.5]) # approach
self._waypoints[2] = np.array([pos_obj[0], pos_obj[1],
pos_obj[2] + (-0.0007 + 0.0102) * self.SCALING, yaw, -0.5]) # grasp
self._waypoints[3] = np.array([self.goal[0], self.goal[1],
self.goal[2] + 0.0102 * self.SCALING, yaw, -0.5]) # lift up
def _meet_contact_constraint_requirement(self):
# add a contact constraint to the grasped block to make it stable
if self._contact_approx:
return True # mimic the dVRL setting
else:
pose = get_link_pose(self.obj_id, self.obj_link1)
return pose[0][2] > self._waypoint_z_init + 0.005 * self.SCALING
def get_oracle_action(self, obs) -> np.ndarray:
"""
Define a human expert strategy
"""
# four waypoints executed in sequential order
action = np.zeros(5)
action[4] = -0.5
for i, waypoint in enumerate(self._waypoints):
if waypoint is None:
continue
delta_pos = (waypoint[:3] - obs['observation'][:3]) / 0.01 / self.SCALING
delta_yaw = (waypoint[3] - obs['observation'][5]).clip(-0.4, 0.4)
if np.abs(delta_pos).max() > 1:
delta_pos /= np.abs(delta_pos).max()
scale_factor = 0.4
delta_pos *= scale_factor
action = np.array([delta_pos[0], delta_pos[1], delta_pos[2], delta_yaw, waypoint[4]])
if np.linalg.norm(delta_pos) * 0.01 / scale_factor < 1e-4 and np.abs(delta_yaw) < 1e-2:
self._waypoints[i] = None
break
return action
if __name__ == "__main__":
env = NeedlePick(render_mode='human') # create one process and corresponding env
env.test()
env.close()
time.sleep(2)