forked from JetsonHacksNano/CSI-Camera
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathros2_simple_camera.py
106 lines (86 loc) · 3.19 KB
/
ros2_simple_camera.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
#!/usr/bin/python3
# https://github.com/Devanthro/csi_camera/blob/main/csi_camera/ros2_simple_camera.py
import cv2
import rclpy
from rclpy.node import Node
from sensor_msgs.msg import CompressedImage
from builtin_interfaces.msg import Time
from cv_bridge import CvBridge
def gstreamer_pipeline(
sensor_id=0,
capture_width=3840,
capture_height=2160,
display_width=3840,
display_height=2160,
framerate=20,
flip_method=1,
):
return (
"nvarguscamerasrc sensor_mode=1 sensor-id=%d !"
"video/x-raw(memory:NVMM), width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! "
"nvvidconv flip-method=%d ! "
"video/x-raw, width=(int)%d, height=(int)%d, format=(string)BGRx ! "
"videoconvert ! "
"video/x-raw, format=(string)BGR ! appsink"
% (
sensor_id,
capture_width,
capture_height,
framerate,
flip_method,
display_width,
display_height,
)
)
class CameraNode(Node):
def __init__(self, **kwargs):
super().__init__("camera_node")
image_topic_ = self.declare_parameter(
"image_topic", "/image/left/image_compressed").value
self.frame_id_ = self.declare_parameter("frame_id", "camera").value
self.image_publisher_ = self.create_publisher(CompressedImage, image_topic_, 1)
self.br = CvBridge()
self.get_logger().info(
f"Starting publisher:\n {gstreamer_pipeline(flip_method=0)}")
self.video_capture = cv2.VideoCapture(
gstreamer_pipeline(flip_method=1,framerate=60), cv2.CAP_GSTREAMER)
self.timer = self.create_timer(1.0/60, self.image_callback)
self.publish_timer = self.create_timer(1/10.0, self.publish_callback)
self.last_image = None
def close_videocapture(self):
self.video_capture.release()
def image_callback(self):
if self.video_capture.isOpened():
try:
ret_val, self.last_image = self.video_capture.read()
except Exception as e:
print(e)
def publish_callback(self):
time_msg = self.get_time_msg()
img_msg = self.get_image_msg(self.last_image, time_msg)
self.image_publisher_.publish(img_msg)
def get_time_msg(self):
time_msg = Time()
msg_time = self.get_clock().now().seconds_nanoseconds()
time_msg.sec = int(msg_time[0])
time_msg.nanosec = int(msg_time[1])
return time_msg
def get_image_msg(self, image, time):
"""
Get image message, takes image as input and returns CvBridge image message
:param image: cv2 image
:return: sensor_msgs/Imag
"""
img_msg = self.br.cv2_to_compressed_imgmsg(image) # , encoding="bgr8")
img_msg.header.stamp = time
img_msg.header.frame_id = self.frame_id_
return img_msg
def main(args=None):
rclpy.init(args=args)
camera_node = CameraNode()
rclpy.spin(camera_node)
camera_node.close_videocapture()
camera_node.destroy_node()
rclpy.shutdown()
if __name__ == "__main__":
main()