-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathstream_to_cbf.cpp
276 lines (241 loc) · 9.27 KB
/
stream_to_cbf.cpp
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
#include <errno.h>
#include <inttypes.h>
#include <memory>
#include <sstream>
#include <stdexcept>
#include <stdint.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <cbflib/cbf.h>
#include <bitshuffle.h>
#include <lz4.h>
#include "bigpicture_utils.h"
#include "dectris_stream.h"
#include "dectris_utils.h"
#include "stream_to_cbf.h"
using namespace bigpicture;
bool stream_to_cbf::parse(const void* data, size_t len) {
bool received_series_end = false;
switch (m_parse_state) {
case parse_state_t::global_header:
if (m_global.parse(data, len)) {
m_parse_state = parse_state_t::new_frame;
m_buffer.reset((m_global.config().bit_depth_image/8) *
m_global.config().x_pixels_in_detector *
m_global.config().y_pixels_in_detector);
}
break;
case parse_state_t::new_frame:
if (parse_part1_or_series_end(data, len)) {
// Parsed series end
received_series_end = true;
reset(); // sets state to global_header
} else {
// Parsed part 1
build_cbf_header();
m_parse_state = parse_state_t::midframe_part2;
}
break;
case parse_state_t::midframe_part2:
parse_part2(data, len);
m_parse_state = parse_state_t::midframe_part3;
break;
case parse_state_t::midframe_part3:
parse_part3(data, len);
build_cbf_data();
m_parse_state = parse_state_t::midframe_part4;
break;
case parse_state_t::midframe_part4:
parse_part4(data, len);
if (m_using_image_appendix) {
m_parse_state = parse_state_t::midframe_appendix;
} else {
flush(); // TODO: remove me, call flush() in dectris_streamer
m_parse_state = parse_state_t::new_frame;
}
break;
case parse_state_t::midframe_appendix:
parse_appendix(data, len);
flush(); // TODO: remove me, call flush() in dectris_streamer
m_parse_state = parse_state_t::new_frame;
break;
default:
assert(false && "stream_to_cbf is in an unknown state");
break;
}
// Return whether or not we're starting a new series.
return received_series_end;
}
bool stream_to_cbf::parse_part1_or_series_end(const void* data, size_t len) {
/*
As will all other message parts containing json, we are required to copy
the data into simdjson's "padded string" machinery in order for it to parse.
Because all our JSON messages are so small, the cost is negligible compared
to even simdjson's extraordinary parsing speed, and especially relative to
optimizations around I/O.
*/
int64_t series_id;
std::string htype;
// simdjson requires us to copy our plain-old json into their padded string construct.
simdjson::padded_string padded(static_cast<const char*>(data), len);
json_obj json = m_parser.parse(padded).get<json_obj>();
extract_json_value(htype, json, "htype");
if (htype.compare("dseries_end-1.0") == 0) { // series end
extract_json_value(series_id, json, "series");
if (series_id != m_global.series_id()) {
std::stringstream ss;
ss << "Invalid series end message, expected series id: " << m_global.series_id()
<< ", received " << series_id << std::endl;
throw std::runtime_error(ss.str());
}
std::clog << "INFO: series end record - " << padded << std::endl;
return true;
} else if (htype.compare("dimage-1.0") != 0) { // not part 1
std::stringstream ss;
ss << "Expected either a \"dimage-1.0\" (\"Frame Part 1\") or \"dseries_end-1.0\""
<< " (\"End of Series\") message, received \"" << htype << "\"";
throw std::runtime_error(ss.str());
}
// Received a part 1 message
extract_json_value(m_frame_id, json, "frame");
/*
Validate that the series id matches. If the metadata is incorrect for an
image, we have no predictable way to find the correct metadata, the entire
minicbf is useless.
*/
extract_json_value(series_id, json, "series");
if (series_id != m_global.series_id()) {
std::stringstream ss;
ss << "Invalid frame part 1 message, expected series id: " << m_global.series_id()
<< ", received " << series_id << std::endl;
throw std::runtime_error(ss.str());
}
// Start a new frame and put the global data in first.
cbf_new_datablock(m_cbf, "image");
return false;
}
inline void stream_to_cbf::parse_part2(const void* data, size_t len) {
/*
Nothing to do except validate message type in debug builds.
We already know the dimensions of our image series from the config
parameters.
*/
#ifndef NDEBUG
simdjson::padded_string padded(static_cast<const char*>(data), len);
json_obj record = m_parser.parse(padded).get<json_obj>();
validate_htype(record, "dimage_d-1.0");
#endif
}
inline void stream_to_cbf::parse_part3(const void* data, size_t len) {
m_buffer.decode(m_global.config().compression, data, len,
m_global.config().bit_depth_image/8);
}
inline void stream_to_cbf::parse_part4(const void* data, size_t len) {
/*
Nothing to do except validate message type in debug builds.
We don't really need the exposure time, start time, and stop time because
we have the configured exposure time in the global data, and the measured
exposure time per image does not vary significantly.
*/
#ifndef NDEBUG
simdjson::padded_string padded(static_cast<const char*>(data), len);
json_obj record = m_parser.parse(padded).get<json_obj>();
validate_htype(record, "dconfig-1.0");
#endif
}
inline void stream_to_cbf::parse_appendix(const void* data, size_t len) {
/*
This general-purpose class doesn't do anything with the image appendix,
but future user-specific implementations may use it to do things such
as determine a specific landing directory and file-naming convention.
*/
m_appendix = std::string(static_cast<const char*>(data), len);
}
void stream_to_cbf::build_cbf_header() {
// FIXME: Is it really necessary to convert the pixel size to an integer number?
// eiger2cbf does it, but surely there's some documentation that can decisively
// say one way or another whether this is needed or unnecessary loss of precision.
static const char header_format[] =
"\n"
"# Detector: %s, S/N %s\n"
"# Pixel_size %" PRId64 "e-6 m x %" PRId64 "e-6 m\n"
"# Silicon sensor, thickness %.6lf m\n"
"# Exposure_time %lf s\n"
"# Exposure_period %lf s\n"
"# Count_cutoff %" PRId64 " counts\n"
"# Wavelength %lf A\n"
"# Detector_distance %lf m\n"
"# Beam_xy (%d, %d) pixels\n"
"# Start_angle %lf deg.\n"
"# Angle_increment %lf deg.\n";
static char header_content[4096];
const detector_config_t& config = m_global.config();
snprintf(static_cast<char*>(header_content), sizeof(header_content), header_format,
config.description.c_str(), config.detector_number.c_str(),
(int64_t)(config.x_pixel_size * 1E6), (int64_t)(config.y_pixel_size * 1E6),
config.sensor_thickness,
config.count_time,
config.frame_time,
config.countrate_correction_count_cutoff,
config.wavelength,
config.detector_distance,
(int)config.beam_center_x, (int)config.beam_center_y,
config.omega_start + ((double)(m_frame_id-1))*config.omega_increment,
config.omega_increment);
cbf_new_datablock(m_cbf, "image_1");
cbf_new_category(m_cbf, "array_data");
cbf_new_column(m_cbf, "header_convention");
cbf_set_value(m_cbf, "SLS_1.0");
cbf_new_column(m_cbf, "header_contents");
cbf_set_value(m_cbf, header_content);
#ifndef NDEBUG
std::clog << "DEBUG: minicbf header" << header_content << "\n";
#endif
}
inline void stream_to_cbf::build_cbf_data() {
const detector_config_t& config = m_global.config();
cbf_new_category(m_cbf, "array_data");
cbf_new_column(m_cbf, "data");
cbf_set_integerarray_wdims_fs(m_cbf,
CBF_BYTE_OFFSET,
1, // binary id
m_buffer.get(),
m_global.config().bit_depth_image/8, // bytes per pixel
1, // signed?
config.x_pixels_in_detector * config.y_pixels_in_detector,
"little_endian",
config.x_pixels_in_detector,
config.y_pixels_in_detector,
0,
0); //padding
}
void stream_to_cbf::flush() {
// Build a filepath and open the output file.
// TODO: The current implementation litters output files in the cwd of the process.
// We need to determine a sufficiently general-purpose directory structure
// which is relatively neat and orderly.
std::stringstream ss_filename;
ss_filename << m_global.series_id() << "-" << m_frame_id << ".cbf";
// We open a file handle but pass ownership to libcbf.
FILE* file_handle = fopen(ss_filename.str().c_str(), "wb");
if (file_handle == nullptr) {
std::stringstream ss;
ss << "libc error: " << ss_filename.str() << " - " << strerror(errno) << "\n";
throw std::system_error(errno, std::system_category(), ss.str());
}
// Write the file; set readable to 1 so we can close the file handle ourselves.
// NOTE: We do not fclose because cbf_write_file() does it for us.
int cbf_err = cbf_write_file(m_cbf, file_handle, /*readable*/1, /*format*/CBF,
MSG_DIGEST|MIME_HEADERS|PAD_4K, /*encoding*/ENC_BASE64);
//fclose(file_handle);
if (cbf_err != 0) {
std::stringstream ss;
ss << "libcbf error code " << cbf_err << ": " << ss_filename.str()
<< " - " << cbf_strerror(cbf_err) << "\n";
throw std::runtime_error(ss.str());
}
#ifndef NDEBUG
std::clog << "DEBUG: " << ss_filename.str() << " committed to storage\n";
#endif
}