Skip to content

Commit

Permalink
Startup thread pool on start (#188)
Browse files Browse the repository at this point in the history
- Updates acquire-core-libs, acquire-video-runtime, and acquire-driver
common submodules to current main versions. (Closes #179. Closes #184).
- Starts the thread pool on `Zarr::start()`. Destroys the thread pool on
`Zarr::stop()`.
  • Loading branch information
aliddell authored Dec 11, 2023
1 parent 2eda2f5 commit eb6018c
Show file tree
Hide file tree
Showing 7 changed files with 184 additions and 6 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## Unreleased

### Changed

- The thread pool starts on `Zarr::start()` and shuts down on `Zarr::stop()`.

## [0.1.6](https://github.com/acquire-project/acquire-driver-zarr/compare/v0.1.5...v0.1.6) - 2023-11-28

### Fixed
Expand Down
11 changes: 8 additions & 3 deletions src/zarr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,10 @@ zarr::Zarr::start()
write_all_array_metadata_();
write_external_metadata_();

thread_pool_ = std::make_shared<common::ThreadPool>(
std::thread::hardware_concurrency(),
[this](const std::string& err) { this->set_error(err); });

error_ = false;
}

Expand All @@ -438,7 +442,10 @@ zarr::Zarr::stop() noexcept
}
writers_.clear();

// call await_stop() before destroying to give jobs a chance to
// finish
thread_pool_->await_stop();
thread_pool_ = nullptr;

is_ok = 1;
} catch (const std::exception& exc) {
Expand Down Expand Up @@ -548,9 +555,7 @@ zarr::Zarr::Zarr()
.destroy = ::zarr_destroy,
.reserve_image_shape = ::zarr_reserve_image_shape,
}
, thread_pool_{ std::make_shared<common::ThreadPool>(
std::thread::hardware_concurrency(),
[this](const std::string& err) { this->set_error(err); }) }
, thread_pool_{ nullptr }
, pixel_scale_um_{ 1, 1 }
, planes_per_chunk_{ 0 }
, enable_multiscale_{ false }
Expand Down
1 change: 1 addition & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ else ()
list-devices
external-metadata-with-whitespace-ok
get-meta
restart-stopped-zarr-resets-threadpool
unit-tests
multiscale-with-trivial-tile-size
no-set-chunking
Expand Down
2 changes: 1 addition & 1 deletion tests/acquire-driver-common
166 changes: 166 additions & 0 deletions tests/restart-stopped-zarr-resets-threadpool.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
/// @file tests/reuse-zarr-writer-resets-thread-pool
/// @brief Test that restarting a previously stopped Zarr writer resets the
/// thread pool.

#include "platform.h" // lib
#include "logger.h"
#include "device/kit/driver.h"
#include "device/hal/driver.h"
#include "device/hal/storage.h"
#include "device/props/storage.h"

#include <cstdio>
#include <string>
#include <stdexcept>
#include <vector>

#define containerof(P, T, F) ((T*)(((char*)(P)) - offsetof(T, F)))

/// Helper for passing size static strings as function args.
/// For a function: `f(char*,size_t)` use `f(SIZED("hello"))`.
/// Expands to `f("hello",5)`.
#define SIZED(str) str, sizeof(str) - 1

#define L (aq_logger)
#define LOG(...) L(0, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define ERR(...) L(1, __FILE__, __LINE__, __FUNCTION__, __VA_ARGS__)
#define EXPECT(e, ...) \
do { \
if (!(e)) { \
char buf[1 << 8] = { 0 }; \
ERR(__VA_ARGS__); \
snprintf(buf, sizeof(buf) - 1, __VA_ARGS__); \
throw std::runtime_error(buf); \
} \
} while (0)
#define CHECK(e) EXPECT(e, "Expression evaluated as false: %s", #e)
#define DEVOK(e) CHECK(Device_Ok == (e))
#define OK(e) CHECK(AcquireStatus_Ok == (e))

void
reporter(int is_error,
const char* file,
int line,
const char* function,
const char* msg)
{
fprintf(is_error ? stderr : stdout,
"%s%s(%d) - %s: %s\n",
is_error ? "ERROR " : "",
file,
line,
function,
msg);
}

typedef struct Driver* (*init_func_t)(void (*reporter)(int is_error,
const char* file,
int line,
const char* function,
const char* msg));

struct Storage*
get_zarr(lib* lib)
{

CHECK(lib_open_by_name(lib, "acquire-driver-zarr"));

auto init = (init_func_t)lib_load(lib, "acquire_driver_init_v0");
auto driver = init(reporter);
CHECK(driver);

struct Storage* zarr = nullptr;
for (uint32_t i = 0; i < driver->device_count(driver); ++i) {
DeviceIdentifier id;
DEVOK(driver->describe(driver, &id, i));
std::string dev_name{ id.name };

if (id.kind == DeviceKind_Storage && dev_name == "Zarr") {
struct Device* device = nullptr;

DEVOK(driver_open_device(driver, i, &device));
zarr = containerof(device, struct Storage, device);
break;
}
}

return zarr;
}

void
configure(struct Storage* zarr)
{
struct StorageProperties props = { 0 };
storage_properties_init(&props, 0, SIZED(TEST ".zarr"), nullptr, 0, { 0 });

CHECK(DeviceState_Armed == zarr->set(zarr, &props));
}

void
start_write_stop(struct Storage* zarr)
{
CHECK(DeviceState_Running == zarr->start(zarr));
struct ImageShape shape = {
.dims = {
.channels = 1,
.width = 64,
.height = 48,
.planes = 1,
},
.strides = {
.channels = 1,
.width = 1,
.height = 64,
.planes = 64 * 48
},
.type = SampleType_u8,
};
zarr->reserve_image_shape(zarr, &shape);

auto* frame = (struct VideoFrame*)malloc(sizeof(VideoFrame) + 64 * 48);
frame->bytes_of_frame = sizeof(*frame) + 64 * 48;

frame->shape = shape;
frame->frame_id = 0;
frame->hardware_frame_id = 0;
frame->timestamps = { 0, 0 };

// if the thread pool is not available, this will fail
size_t nbytes{ frame->bytes_of_frame };
CHECK(DeviceState_Running == zarr->append(zarr, frame, &nbytes));
CHECK(nbytes == 64 * 48 + sizeof(*frame));

CHECK(DeviceState_Running == zarr->append(zarr, frame, &nbytes));
CHECK(nbytes == 64 * 48 + sizeof(*frame));

free(frame);

CHECK(DeviceState_Armed == zarr->stop(zarr));
}

int
main()
{
logger_set_reporter(reporter);
lib lib{};

try {
struct Storage* zarr = get_zarr(&lib);
CHECK(zarr);

configure(zarr);

start_write_stop(zarr);
start_write_stop(zarr); // thread pool should reset here

lib_close(&lib);
return 0;
} catch (std::exception& e) {
ERR("%s", e.what());
} catch (...) {
ERR("Unknown exception");
}

lib_close(&lib);
return 1;
}

0 comments on commit eb6018c

Please sign in to comment.