-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathbag_data.py
executable file
·148 lines (112 loc) · 4.12 KB
/
bag_data.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
#!/usr/bin/env python2
# -*- coding: utf-8 -*-
"""
Created on Fri Sep 8 11:05:28 2017
@author: Robin Amsters
@email: [email protected]
General purpose file for obtaining data from rosbag files
"""
import rosbag
import numpy as np
from file_select_gui import get_file_path
from tf.transformations import quaternion_from_euler, euler_from_quaternion
# ==============================================================================
def get_topic_data(bagFile, topic, return_t=False):
"""
Return all messages from a specific topic
"""
all_msg = []
if return_t:
all_t = []
# Initialize rosbag object
bag = rosbag.Bag(bagFile)
for topic, msg, t in bag.read_messages(topics=[topic]):
all_msg = np.append(all_msg, msg)
if return_t:
all_t = np.append(all_t, t.to_sec())
if return_t:
return all_msg, all_t
else:
return all_msg
# ==============================================================================
def get_joint_data(bagFile, joint_name, duration=True):
"""
Function that filters bag files to obtain data from a joint that is
published to the /tf topic.
Only x-, y- and theta-coordinates are returned
if convert_to_sec is set to True (default), then the first timestamp
will be taken as zero seconds, and the following timesteps that are
retured will be relative to this timestamp.
"""
# INITIALIZATION
x = np.array([])
y = np.array([])
theta = np.array([])
all_t = np.array([])
bag = rosbag.Bag(bagFile) # Initialize rosbag object
first = True # True on first iteration to take first timestamp
# Add message values to collections
for topic, msg, t in bag.read_messages(topics=['/tf']):
joint = msg.transforms[0].child_frame_id
translation = msg.transforms[0].transform.translation
orientation = msg.transforms[0].transform.rotation
euler = euler_from_quaternion(
[orientation.x, orientation.y, orientation.z, orientation.w])
if joint == joint_name:
# Get timestamp in seconds
t = msg.transforms[0].header.stamp
t_sec = t.to_sec()
if duration:
if first:
t_0 = t_sec
first = False
all_t = np.append(all_t, t_sec - t_0)
else:
all_t = np.append(all_t, t_sec)
# Get x, y and z coordinates
x = np.append(x, translation.x)
y = np.append(y, translation.y)
theta = np.append(theta, euler[2])
pose = [x, y, theta]
return pose, all_t
# ==============================================================================
# DEMO
# ==============================================================================
if __name__ == "__main__":
# Open bagfile and obtain robot position
bagFile = get_file_path('Select bag file').name
pose, all_t = get_joint_data(bagFile, 'base_footprint')
# Get laptop charge data from rostopic
charge_msgs = get_topic_data(bagFile, '/laptop_charge')
t_charge = []
voltage_charge = []
first = True
for charge_msg in charge_msgs:
# Get timestamp in seconds
t = charge_msg.header.stamp
t_sec = t.to_sec()
# Take first timestep as 0 seconds
if first:
t_0 = t_sec
first = False
# Add data to collections
t_charge = np.append(t_charge, t_sec - t_0)
voltage_charge = np.append(voltage_charge, charge_msg.voltage)
# Plotting
import matplotlib.pyplot as plt
fig = plt.figure(1)
# Plot robot pose
ax = fig.add_subplot(211)
ax.scatter(pose[0], pose[1], color='black', s=5, label='robot pose')
ax.set_xlabel('X-coordinate [m]')
ax.set_ylabel('Y-coordinate [m]')
plt.legend()
plt.grid()
# Plot laptop charge
ax = fig.add_subplot(212)
ax.plot(t_charge, voltage_charge, color='black', label='laptop charge')
ax.set_xlabel('Time [s]')
ax.set_ylabel('Voltage [V]')
plt.legend()
plt.grid()
plt.show()