-
Notifications
You must be signed in to change notification settings - Fork 561
/
Copy pathdata_acquisition_plugin_service.py
615 lines (508 loc) · 27.6 KB
/
data_acquisition_plugin_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
# Copyright (c) 2023 Boston Dynamics, Inc. All rights reserved.
#
# Downloading, reproducing, distributing or otherwise using the SDK Software
# is subject to the terms and conditions of the Boston Dynamics Software
# Development Kit License (20191101-BDSDK-SL).
"""A set of helper functions for implementing a Data Acquisition plugin service.
The DataAcquisitionPluginService class in this module is the recommended way to create a
plugin service for data acquisition. To use it, you must define a function for performing the
data collection with the following signature:
``def data_collect_fn(request:AcquirePluginDataRequest, store_helper: DataAcquisitionStoreHelper)``
This function is responsible for collecting all of your plugin's data, and storing it with through
the store_helper. The store_helper (DataAcquisitionStoreHelper) kicks off asynchronous data store calls
for each piece of data, metadata, or images. The plugin service class has an internal helper function
which calls DataAcquisitionStoreHelper.wait_for_Stores_complete method that blocks until all saving
to the data acquisition store is finished. The helper class will monitor the futures for completion and
update the state status and errors appropriately.
If errors occur during the data collection+saving process, use state.add_errors to report which
intended DataIdentifiers had problems:
``state.add_errors([DataIdentifiers], 'Failure to collect data 1')``
Long-running acquisitions should be sure to call state.cancel_check() occasionally to exit early
and cleanly if the acquisition has been cancelled by the user or a timeout. If your plugin has
extra cleanup that it needs to perform in the event of a cancelled request, wrap your check
in a try-except block that can catch the RequestCancelledError thrown by the cancel_check function,
then perform the cleanup and re-raise the exception. For example::
try:
# Data collection and storage here
state.cancel_check()
except RequestCancelledError:
# Perform cleanup here
raise
Note, the data acquisition plugin service helper class will monitor and respond to the GetStatus RPC.
However, the data_collect_fn function should update the status to STATUS_SAVING when it transitions to
storing the data.
"""
import logging
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from bosdyn.api import data_acquisition_pb2, data_acquisition_plugin_service_pb2_grpc, header_pb2
from bosdyn.api.data_acquisition_pb2 import DataAcquisitionCapability as Capability
from bosdyn.client import Robot
from bosdyn.client.data_acquisition_store import DataAcquisitionStoreClient
from bosdyn.client.data_buffer import DataBufferClient
from bosdyn.client.server_util import ResponseContext, populate_response_header
from bosdyn.client.service_customization_helpers import create_value_validator
_LOGGER = logging.getLogger(__name__)
# How long should completed requests be queryable?
kDefaultRequestExpiration = 30
class RequestCancelledError(Exception):
"""The request has been cancelled and should no longer be handled."""
def make_error(data_id, error_msg, error_data=None):
"""Helper to simplify creating a DataError to send to RequestState.add_errors.
Args:
data_id (DataIdentifier): The proto for the data identifier which has an error.
error_msg (string): The error message to associate with the data id.
error_data (google.protobuf.Any): Additional data to be packed with the error.
Returns:
The DataError protobuf message for this errored data id.
"""
proto = data_acquisition_pb2.DataError(data_id=data_id, error_message=error_msg)
if error_data is not None:
proto.error_data.Pack(error_data)
return proto
class RequestState(object):
"""Interface for a data collection to update its state as it proceeds.
Each AcquireData RPC made to the plugin service will create an instance of RequestState
to manage the incoming acquisition request's overall state, including if it has been
cancelled, any errors that occur, and the current status of the request.
"""
# Statuses for the GetStatus RPC which indicate the data acquisition and saving is still
# in progress and has not completed or failed.
kNonError = {
data_acquisition_pb2.GetStatusResponse.STATUS_ACQUIRING,
data_acquisition_pb2.GetStatusResponse.STATUS_SAVING
}
def __init__(self):
self._lock = threading.Lock()
# Boolean indicating if a CancelAcquisition RPC has been received for this acquisition request.
self._cancelled = False
# The current status of the request, including any data errors.
self._status_proto = data_acquisition_pb2.GetStatusResponse(
status=data_acquisition_pb2.GetStatusResponse.STATUS_ACQUIRING)
# The time which the acquisition request completes; used by the RequestManager for cleanup.
self._completion_time = None
def set_status(self, status):
"""Update the status of the request.
Args:
status (GetStatusResponse.Status): An updated status enum to be set in the
stored GetStatusResponse.
"""
with self._lock:
self._cancel_check_locked()
self._status_proto.status = status
def set_complete_if_no_error(self, logger=None):
"""Mark that everything is complete."""
with self._lock:
self._cancel_check_locked()
if self._status_proto.status in self.kNonError:
self._status_proto.status = self._status_proto.STATUS_COMPLETE
return True
if logger:
# Intentionally doing the formatting here while we're holding the lock.
logger.error('Error encountered during request:\n{}'.format(self._status_proto))
return False
def add_saved(self, data_ids):
"""Record that some data was saved successfully.
Args:
data_ids (Iterable[DataId]): Data IDs that have been successfully saved.
"""
with self._lock:
self._cancel_check_locked()
self._status_proto.data_saved.extend(data_ids)
def add_errors(self, data_errors):
"""Report that some errors have occurred during the data capture.
Use the make_error function to simplify creating data errors.
Args:
data_errors (Iterable[DataError]): data errors to include as errors in the status.
"""
with self._lock:
self._cancel_check_locked()
self._status_proto.data_errors.extend(data_errors)
self._status_proto.status = data_acquisition_pb2.GetStatusResponse.STATUS_DATA_ERROR
_LOGGER.error('Errors occurred during acquisition:\n%s', data_errors)
def has_data_errors(self):
"""Return True if any data errors have been added to this status."""
with self._lock:
self._cancel_check_locked()
return bool(self._status_proto.data_errors)
def cancel_check(self):
"""Raises RequestCancelledError if the request has already been cancelled."""
with self._lock:
self._cancel_check_locked()
def is_cancelled(self):
"""Query if the request is already cancelled."""
with self._lock:
return self._cancelled
def _cancel_check_locked(self):
if self._cancelled:
raise RequestCancelledError
class DataAcquisitionStoreHelper(object):
"""This class simplifies the management of data acquisition stores for a single request.
Request state will be updated according to store progress.
Args:
store_client (bosdyn.client.DataAcquisitionStoreClient): A data acquisition store client.
state (RequestState): state of the request, to be modified with errors or completion.
cancel_interval (float): How often to check for cancellation of the request while
waiting for the futures to complete.
Attributes:
store_client (bosdyn.client.DataAcquisitionStoreClient): A data acquisition store client.
state (RequestState): state of the request, to be modified with errors or completion.
cancel_interval (float): How often to check for cancellation of the request while
waiting for the futures to complete.
data_id_future_pairs (List[Pair(DataIdentifier, Future)]): The data identifier and the associated
future which results from the async store data rpc.
"""
def __init__(self, store_client, state, cancel_interval=1):
self.store_client = store_client
self.state = state
self.cancel_interval = cancel_interval
self.data_id_future_pairs = []
def store_metadata(self, metadata, data_id):
"""Store metadata with the data acquisition store service.
Args:
metadata (bosdyn.api.AssociatedMetadata): Metadata message to store.
data_id (bosdyn.api.DataIdentifier) : Data identifier to use for storing this data.
Raises:
RPCError: Problem communicating with the robot.
"""
future = self.store_client.store_metadata_async(metadata, data_id)
self.data_id_future_pairs.append((data_id, future))
def store_image(self, image_capture, data_id):
"""Store an image with the data acquisition store service.
Args:
image_capture (bosdyn.api.ImageCapture): Image to store.
data_id (bosdyn.api.DataIdentifier) : Data identifier to use for storing this data.
Raises:
RPCError: Problem communicating with the robot.
"""
future = self.store_client.store_image_async(image_capture, data_id)
self.data_id_future_pairs.append((data_id, future))
def store_data(self, message, data_id, file_extension=None):
"""Store a data message with the data acquisition store service.
Args:
message (bytes): Data to store.
data_id (bosdyn.api.DataIdentifier) : Data identifier to use for storing this data.
file_extension (string) : File extension to use for writing the data to a file.
Raises:
RPCError: Problem communicating with the robot.
"""
future = self.store_client.store_data_async(message, data_id, file_extension)
self.data_id_future_pairs.append((data_id, future))
def store_data_as_chunks(self, message, data_id, file_extension=None):
"""Store a data message by streaming with the data acquisition store service.
Args:
message (bytes): Data to store.
data_id (bosdyn.api.DataIdentifier) : Data identifier to use for storing this data.
file_extension (string) : File extension to use for writing the data to a file.
Raises:
RPCError: Problem communicating with the robot.
"""
future = self.store_client.store_data_as_chunks_async(message, data_id, file_extension)
self.data_id_future_pairs.append((data_id, future))
def store_file(self, file_path, data_id, file_extension=None):
"""Store a file with the data acquisition store service.
Args:
file_path (string): Path to the file to store.
data_id (bosdyn.api.DataIdentifier) : Data identifier to use for storing this file.
file_extension (string) : File extension to use for writing the data to the file.
Raises:
RPCError: Problem communicating with the robot.
"""
future = self.store_client.store_file_async(file_path, data_id, file_extension)
self.data_id_future_pairs.append((data_id, future))
def cancel_check(self):
"""Raises RequestCancelledError if the request has already been cancelled."""
self.state.cancel_check()
def wait_for_stores_complete(self):
"""Block and wait for all stores to complete. Update state with store success/failures.
Raises:
RequestCancelledError: The data acquisition request was cancelled.
"""
self.state.cancel_check()
# Block until all futures are done.
while not all(future.done() for _, future in self.data_id_future_pairs):
time.sleep(self.cancel_interval)
self.state.cancel_check()
# Check each future status and update the status saved and errors.
for data_id, future in self.data_id_future_pairs:
if future.exception() is None:
self.state.add_saved([data_id])
else:
self.state.add_errors(
[make_error(data_id, 'Failed to store data: {}'.format(future.exception()))])
return not self.state.has_data_errors()
class DataAcquisitionPluginService(
data_acquisition_plugin_service_pb2_grpc.DataAcquisitionPluginServiceServicer):
"""Implementation of a data acquisition plugin. It relies on the provided data_collect_fn
to implement the heart of the data collection and storage.
Args:
robot: Authenticated robot object.
capabilities: List of DataAcquisitionCapability that describe what this plugin can do.
data_collect_fn: Function that performs the data collection and storage. Ordered input
arguments (to data_collect_fn): data_acquisition_pb2.AcquirePluginDataRequest, DataAcquisitionStoreHelper.
Output(to data_collect_fn): None
acquire_response_fn: Optional function that can validate a request and provide a timeout deadline. Function returns
a boolean indicating if the request is valid; if False, the response is returned immediately without calling
the data collection function or saving any data. Ordered input arguments (to acquire_response_fn):
data_acquisition_pb2.AcquirePluginDataRequest, data_acquisition_pb2.AcquirePluginDataResponse. Output (to
data_collect_fn): Boolean
live_response_fn: Optional function that sends signals data to the robot for purposes of displaying it on the tablet and Orbit during teleoperation. Input argument (to live_response_fn):
data_acquisition_pb2.LiveDataRequest.
executor: Optional thread pool.
Attributes:
logger (logging.Logger): Logger used by the service.
capabilities (List[DataAcquisitionCapability]): List of capabilities that describe what
this plugin can do.
data_collect_fn: Function that performs the data collection and storage.
acquire_response_fn: Function that can validate a request and provide a timeout deadline.
live_response_fn: Function that sends signals data to the robot for purposes of displaying it on the tablet and Orbit during teleoperation.
request_manager (RequestManager): Helper class which manages the RequestStates created with
each acquisition RPC.
executor (ThreadPoolExecutor): Thread pool to run the plugin service on.
robot (Robot): Authenticated robot object.
store_client (DataAcquisitionStoreClient): Client for the data acquisition store service.
"""
service_type = 'bosdyn.api.DataAcquisitionPluginService'
def __init__(self, robot, capabilities, data_collect_fn, acquire_response_fn=None,
executor=None, logger=None, live_response_fn=None):
super(DataAcquisitionPluginService, self).__init__()
self.logger = logger or _LOGGER
self.capabilities = capabilities
self.value_validators = {
capture.name: create_value_validator(capture.custom_params)
for capture in self.capabilities
}
self.data_collect_fn = data_collect_fn
self.acquire_response_fn = acquire_response_fn
self.live_response_fn = live_response_fn
self.request_manager = RequestManager()
self.executor = executor or ThreadPoolExecutor(max_workers=2)
self.robot = robot
self.store_client = robot.ensure_client(DataAcquisitionStoreClient.default_service_name)
self.data_buffer_client = robot.ensure_client(DataBufferClient.default_service_name)
def validate_params(self, request, response):
"""Validate that any parameters set in the request are valid according the spec."""
for capture in request.acquisition_requests.data_captures:
try:
error = self.value_validators[capture.name](capture.custom_params)
if error is not None:
response.custom_param_error.CopyFrom(error)
response.status = response.STATUS_CUSTOM_PARAMS_ERROR
return False
except KeyError:
response.status = response.STATUS_UNKNOWN_CAPTURE_TYPE
return False
return True
def _data_collection_wrapper(self, request_id, request, state):
"""Helper function which initiates the data collection and storage in sequence.
Args:
request_id (int): The request_id for the acquisition request being inspected.
request (DataAcquisitionPluginRequest): The data acquisition request.
state (RequestState): The associated internal request state for the data.
"""
try:
store_helper = DataAcquisitionStoreHelper(self.store_client, state)
self.data_collect_fn(request, store_helper)
store_helper.wait_for_stores_complete()
state.set_complete_if_no_error(logger=self.logger)
except RequestCancelledError:
# Cannot use set_status because it will raise the exception again.
with state._lock:
state._status_proto.status = state._status_proto.STATUS_ACQUISITION_CANCELLED
self.logger.info('Request %d cancelled', request_id)
except Exception as e: # pylint: disable=broad-except
self.logger.exception("Failed during call to user function")
with state._lock:
state._status_proto.status = state._status_proto.STATUS_INTERNAL_ERROR
state._status_proto.header.error.message = str(e)
finally:
self.request_manager.mark_request_finished(request_id)
self.logger.info('Finished request %d', request_id)
def AcquirePluginData(self, request, context):
"""Trigger a data acquisition and store results in the data acquisition store service.
Args:
request (data_acquisition_pb2.AcquirePluginDataRequest): The data acquisition request.
context (GRPC ClientContext): tracks internal grpc statuses and information.
Returns:
An AcquirePluginDataResponse containing a request_id to use with GetStatus.
"""
response = data_acquisition_pb2.AcquirePluginDataResponse()
with ResponseContext(response, request, self.data_buffer_client):
self._start_plugin_acquire(request, response)
return response
def _start_plugin_acquire(self, request, response):
"""Initiates the data collection function and mutates the AcquirePluginDataResponse rpc.
Args:
request (data_acquisition_pb2.AcquirePluginDataRequest): The data acquisition request.
response (data_acquisition_pb2.AcquirePluginDataResponse): The data acquisition response.
Returns:
Mutates the AcquirePluginDataResponse proto and also returns it.
"""
if not self.validate_params(request, response):
return response
if self.acquire_response_fn is not None:
try:
if not self.acquire_response_fn(request, response):
return response
except Exception as e:
self.logger.exception('Failed during call to user acquire response function')
populate_response_header(response, request,
error_code=header_pb2.CommonError.CODE_INTERNAL_ERROR,
error_msg=str(e))
return response
self.request_manager.cleanup_requests()
response.request_id, state = self.request_manager.add_request()
self.logger.info('Beginning request %d for %s', response.request_id,
[capture.name for capture in request.acquisition_requests.data_captures])
self.executor.submit(self._data_collection_wrapper, response.request_id, request, state)
response.status = data_acquisition_pb2.AcquireDataResponse.STATUS_OK
populate_response_header(response, request)
return response
def GetStatus(self, request, context):
"""Query the status of a data acquisition by ID.
Args:
request (data_acquisition_pb2.GetStatusRequest): The get status request.
context (GRPC ClientContext): tracks internal grpc statuses and information.
Returns:
An GetStatusResponse containing the details of the data acquisition.
"""
response = data_acquisition_pb2.GetStatusResponse()
with ResponseContext(response, request, self.data_buffer_client):
try:
# Note: this needs to be a copy from and not '=' such that the response that is logged
# in the request context gets updated.
response.CopyFrom(self.request_manager.get_status_proto(request.request_id))
except KeyError:
response.status = response.STATUS_REQUEST_ID_DOES_NOT_EXIST
populate_response_header(response, request)
return response
def GetServiceInfo(self, request, context):
"""Get a list of data acquisition capabilities.
Args:
request (data_acquisition_pb2.GetServiceInfoRequest): The get service info request.
context (GRPC ClientContext): tracks internal grpc statuses and information.
Returns:
An GetServiceInfoResponse containing the list of data acquisition capabilities for the plugin.
"""
response = data_acquisition_pb2.GetServiceInfoResponse()
with ResponseContext(response, request, self.data_buffer_client):
response.capabilities.data_sources.extend(self.capabilities)
populate_response_header(response, request)
return response
def CancelAcquisition(self, request, context):
"""Cancel a data acquisition by ID.
Args:
request (data_acquisition_pb2.CancelAcquisitionRequest): The cancel acquisition request.
context (GRPC ClientContext): tracks internal grpc statuses and information.
Returns:
An CancelAcquisitionResponse containing the status of the cancel operation.
"""
response = data_acquisition_pb2.CancelAcquisitionResponse()
with ResponseContext(response, request, self.data_buffer_client):
try:
self.request_manager.mark_request_cancelled(request.request_id)
self.logger.info('Cancelling request %d', request.request_id)
except KeyError:
response.status = response.STATUS_REQUEST_ID_DOES_NOT_EXIST
else:
response.status = response.STATUS_OK
populate_response_header(response, request)
return response
def GetLiveData(self, request, context):
"""Get the live data available from this plugin.
Args:
request (data_acquisition_pb2.LiveDataRequest): The live data request with params.
context (GRPC ClientContext): tracks internal grpc statuses and information (unused).
Returns:
response (data_acquisition_pb2.LiveDataResponse): Result of live_response_fn.
"""
response = data_acquisition_pb2.LiveDataResponse()
with ResponseContext(response, request, self.data_buffer_client):
try:
if self.live_response_fn is not None:
response = self.live_response_fn(request)
populate_response_header(response, request)
else:
populate_response_header(
response, request,
error_code=header_pb2.CommonError.CODE_INTERNAL_SERVER_ERROR,
error_msg="live_response_fn is None")
except Exception as general_error: # pylint: disable=broad-exception-caught
self.logger.exception("Failed during call to user live response function")
populate_response_header(
response, request, error_code=header_pb2.CommonError.CODE_INTERNAL_SERVER_ERROR,
error_msg=str(general_error))
return response
# pylint: disable=protected-access
class RequestManager:
"""Manage request lifecycles and status.
The RequestManager manages some internals of the RequestState class, so it will access its
protected variables. We leave those variables protected so that users of the RequestState
class are less tempted to fiddle with them incorrectly, but we turn off the linting for the
rest of this file.
"""
def __init__(self):
self._lock = threading.Lock()
self._requests = {}
self._counter = 0
def add_request(self):
"""Create a new request to manage"""
with self._lock:
self._counter += 1
state = RequestState()
self._requests[self._counter] = state
return self._counter, state
def get_request_state(self, request_id):
"""Get the RequestState object for managing a request.
Args:
request_id (int): The request_id for the acquisition request being inspected.
"""
with self._lock:
return self._requests[request_id]
def get_status_proto(self, request_id):
"""Get a copy of the current status for the specified request.
Args:
request_id (int): The request_id for the acquisition request being inspected.
"""
state = self.get_request_state(request_id)
status = data_acquisition_pb2.GetStatusResponse()
with state._lock:
status.CopyFrom(state._status_proto)
return status
def mark_request_cancelled(self, request_id):
"""Mark a request as cancelled, and no longer able to be updated.
Args:
request_id (int): The request_id for the acquisition request being cancelled.
"""
state = self.get_request_state(request_id)
with state._lock:
state._cancelled = True
state._status_proto.status = state._status_proto.STATUS_CANCEL_IN_PROGRESS
def mark_request_finished(self, request_id):
"""Mark a request as finished, and able to be removed later.
Args:
request_id (int): The request_id for the acquisition request being completed.
"""
state = self.get_request_state(request_id)
with state._lock:
state._completion_time = time.time()
def cleanup_requests(self, older_than_time=None):
"""Remove all requests that were completed farther in the past than older_than_time.
Defaults to removing anything older than 30 seconds.
Args:
older_than_time (float): Optional time (in seconds) that requests will be removed after.
"""
older_than_time = older_than_time or time.time() - kDefaultRequestExpiration
with self._lock:
# Grab the contents to iterate through outside of the lock
requests = list(self._requests.items())
to_remove = []
for request_id, state in requests:
with state._lock:
if state._completion_time is not None and state._completion_time < older_than_time:
to_remove.append(request_id)
with self._lock:
for key in to_remove:
# Won't fail even if the key was already removed
self._requests.pop(key, None)