From e0bfb56b3f591c7d68b96890257913efe14801a8 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Sun, 29 Dec 2024 02:58:34 -0800 Subject: [PATCH 1/3] feat: enable Draft API support --- CMakeLists.txt | 2 +- README.md | 3 +++ src/module.h | 3 ++- src/outgoing_msg.cc | 17 +++++++------- src/socket.cc | 55 ++++++++++++++++++++++++++------------------- 5 files changed, 47 insertions(+), 33 deletions(-) diff --git a/CMakeLists.txt b/CMakeLists.txt index 82ccc5d1..1380b8ee 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -28,7 +28,7 @@ macro(set_option_from_env OPTION_NAME) message(STATUS "${OPTION_NAME}: ${${OPTION_NAME}}") endmacro() -option(ZMQ_DRAFT "Build and install draft APIs" OFF) +option(ZMQ_DRAFT "Build and install draft APIs (e.g. `server-client`, `radio-dish`, `scatter-gather`)" ON) set_option_from_env(ZMQ_DRAFT) option(ZMQ_CURVE "Enable CURVE security" ON) diff --git a/README.md b/README.md index bfe0c16f..8f193756 100644 --- a/README.md +++ b/README.md @@ -12,6 +12,7 @@ - Fully usable with TypeScript (3+). - Compatible with Zeromq 4/5 via "zeromq/v5-compat" - Secure Curve protocol with Libsodium +- Zeromq Draft API support ## Useful links @@ -142,6 +143,8 @@ etc. #### Draft support +(Enabled by default) + By default `libzmq` is built with support for `Draft` patterns (e.g. `server-client`, `radio-dish`, `scatter-gather`). If you want to build `libzmq` without support for `Draft`, you can specify the following in `.npmrc`: diff --git a/src/module.h b/src/module.h index 063e5674..b28d789c 100644 --- a/src/module.h +++ b/src/module.h @@ -32,7 +32,8 @@ struct Terminator { }); using namespace std::chrono_literals; - if (terminate.wait_for(500ms) == std::future_status::timeout) { + const auto timeout = 500ms; + if (terminate.wait_for(timeout) == std::future_status::timeout) { /* We can't use process.emitWarning, because the Node.js runtime has already shut down. So we mimic it instead. */ (void)fprintf(stderr, diff --git a/src/outgoing_msg.cc b/src/outgoing_msg.cc index 3585ee38..458ca799 100644 --- a/src/outgoing_msg.cc +++ b/src/outgoing_msg.cc @@ -119,18 +119,18 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) { auto group = [&]() { if (value.IsString()) { return std::string(value.As()); - } else if (value.IsBuffer()) { - Napi::Object buf = value.As(); + } + if (value.IsBuffer()) { + auto buf = value.As(); auto length = buf.As>().Length(); - auto value = buf.As>().Data(); + auto* value = buf.As>().Data(); return std::string(value, length); - } else { - return std::string(); } + return std::string(); }(); for (auto& part : parts) { - if (zmq_msg_set_group(part, group.c_str()) < 0) { + if (zmq_msg_set_group(part.get(), group.c_str()) < 0) { ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException(); return false; } @@ -141,14 +141,15 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) { bool OutgoingMsg::Parts::SetRoutingId(Napi::Value value) { if (value.IsUndefined()) { + // https://clang.llvm.org/extra/clang-tidy/checks/readability/identifier-length.html ErrnoException(value.Env(), EINVAL).ThrowAsJavaScriptException(); return false; } - auto id = value.As().Uint32Value(); + auto routing_id = value.As().Uint32Value(); for (auto& part : parts) { - if (zmq_msg_set_routing_id(part, id) < 0) { + if (zmq_msg_set_routing_id(part.get(), routing_id) < 0) { ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException(); return false; } diff --git a/src/socket.cc b/src/socket.cc index e66768e8..8f22ebd1 100644 --- a/src/socket.cc +++ b/src/socket.cc @@ -103,7 +103,6 @@ Socket::Socket(const Napi::CallbackInfo& info) } uv_os_sock_t file_descriptor = 0; - std::function const finalize = nullptr; const auto error = [this]() { [[maybe_unused]] auto err = zmq_close(socket); @@ -125,12 +124,14 @@ Socket::Socket(const Napi::CallbackInfo& info) } #endif + std::function finalize = nullptr; + /* Currently only some DRAFT sockets are threadsafe. */ if (thread_safe) { #ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE /* Threadsafe sockets do not expose an FD we can integrate into the event loop, so we have to construct one by creating a zmq_poller. */ - auto poll = zmq_poller_new(); + auto* poll = zmq_poller_new(); if (poll == nullptr) { ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException(); error(); @@ -138,7 +139,7 @@ Socket::Socket(const Napi::CallbackInfo& info) /* Callback to free the underlying poller. Move the poller to transfer ownership after the constructor has completed. */ - finalize = [=]() mutable { + finalize = [&]() { [[maybe_unused]] auto err = zmq_poller_destroy(&poll); assert(err == 0); }; @@ -149,7 +150,7 @@ Socket::Socket(const Napi::CallbackInfo& info) error(); } - if (zmq_poller_fd(poll, &fd) < 0) { + if (zmq_poller_fd(poll, &file_descriptor) < 0) { ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException(); finalize(); error(); @@ -327,17 +328,17 @@ void Socket::Receive(const Napi::Promise::Deferred& res) { switch (type) { case ZMQ_SERVER: { auto meta = Napi::Object::New(Env()); - meta.Set("routingId", zmq_msg_routing_id(part)); - list[i++] = meta; + meta.Set("routingId", zmq_msg_routing_id(part.get())); + list[i_part++] = meta; break; } case ZMQ_DISH: { auto meta = Napi::Object::New(Env()); - auto data = zmq_msg_group(part); + const auto* data = zmq_msg_group(part.get()); auto length = strnlen(data, ZMQ_GROUP_MAX_LENGTH); meta.Set("group", Napi::Buffer::Copy(Env(), data, length)); - list[i++] = meta; + list[i_part++] = meta; break; } } @@ -610,7 +611,9 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) { Arg::Required("Options must be an object"), }; - if (args.ThrowIfInvalid(info)) return Env().Undefined(); + if (args.ThrowIfInvalid(info)) { + return Env().Undefined(); + } break; } @@ -752,19 +755,22 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) { Arg::Required("Group must be a string or buffer"), }; - if (args.ThrowIfInvalid(info)) return; + if (args.ThrowIfInvalid(info)) { + return; + } - if (!ValidateOpen()) return; + if (!ValidateOpen()) { + return; + } auto str = [&]() { if (info[0].IsString()) { return std::string(info[0].As()); - } else { - Napi::Object buf = info[0].As(); - auto length = buf.As>().Length(); - auto value = buf.As>().Data(); - return std::string(value, length); } + auto buf = info[0].As(); + auto length = buf.As>().Length(); + auto* value = buf.As>().Data(); + return std::string(value, length); }(); if (zmq_join(socket, str.c_str()) < 0) { @@ -780,19 +786,22 @@ void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) { Arg::Required("Group must be a string or buffer"), }; - if (args.ThrowIfInvalid(info)) return; + if (args.ThrowIfInvalid(info)) { + return; + } - if (!ValidateOpen()) return; + if (!ValidateOpen()) { + return; + } auto str = [&]() { if (info[0].IsString()) { return std::string(info[0].As()); - } else { - Napi::Object buf = info[0].As(); - auto length = buf.As>().Length(); - auto value = buf.As>().Data(); - return std::string(value, length); } + auto buf = info[0].As(); + auto length = buf.As>().Length(); + auto* value = buf.As>().Data(); + return std::string(value, length); }(); if (zmq_leave(socket, str.c_str()) < 0) { From d79406ba247c7717701c30c56586e8d59130dedc Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Sun, 29 Dec 2024 04:13:39 -0800 Subject: [PATCH 2/3] fix: support arrays in C++ join/leave --- src/draft.ts | 23 ++++++++--------- src/outgoing_msg.cc | 14 ++--------- src/socket.cc | 49 ++++++++++++++----------------------- src/socket.h | 3 +++ src/util/arguments.h | 3 ++- src/util/string_or_buffer.h | 22 +++++++++++++++++ 6 files changed, 59 insertions(+), 55 deletions(-) create mode 100644 src/util/string_or_buffer.h diff --git a/src/draft.ts b/src/draft.ts index cd159b20..d67cd808 100644 --- a/src/draft.ts +++ b/src/draft.ts @@ -41,27 +41,28 @@ interface RadioGroupOptions { export interface Radio extends Writable {} allowMethods(Radio.prototype, ["send"]) -const join = (Socket.prototype as any).join -const leave = (Socket.prototype as any).leave +const join = ( + Socket.prototype as Socket & { + join: (value: Array) => void + } +).join +const leave = ( + Socket.prototype as Socket & { + leave: (value: Array) => void + } +).leave export class Dish extends Socket { constructor(options?: SocketOptions) { super(SocketType.Dish, options) } - /* TODO: These methods might accept arrays in their C++ implementation for - the sake of simplicity. */ - join(...values: Array): void { - for (const value of values) { - join(value) - } + join(values) } leave(...values: Array): void { - for (const value of values) { - leave(value) - } + leave(values) } } diff --git a/src/outgoing_msg.cc b/src/outgoing_msg.cc index 458ca799..77d868d7 100644 --- a/src/outgoing_msg.cc +++ b/src/outgoing_msg.cc @@ -5,6 +5,7 @@ #include "./module.h" #include "util/error.h" +#include "util/string_or_buffer.h" namespace zmq { OutgoingMsg::OutgoingMsg(Napi::Value value, std::reference_wrapper module) { @@ -116,18 +117,7 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) { return false; } - auto group = [&]() { - if (value.IsString()) { - return std::string(value.As()); - } - if (value.IsBuffer()) { - auto buf = value.As(); - auto length = buf.As>().Length(); - auto* value = buf.As>().Data(); - return std::string(value, length); - } - return std::string(); - }(); + const auto group = convert_string_or_buffer(value); for (auto& part : parts) { if (zmq_msg_set_group(part.get(), group.c_str()) < 0) { diff --git a/src/socket.cc b/src/socket.cc index 8f22ebd1..43176cb2 100644 --- a/src/socket.cc +++ b/src/socket.cc @@ -14,6 +14,7 @@ #include "util/async_scope.h" #include "util/error.h" #include "util/object.h" +#include "util/string_or_buffer.h" #include "util/take.h" #include "util/uvdelayed.h" #include "util/uvwork.h" @@ -102,7 +103,7 @@ Socket::Socket(const Napi::CallbackInfo& info) return; } - uv_os_sock_t file_descriptor = 0; + auto file_descriptor = uv_os_sock_t{}; const auto error = [this]() { [[maybe_unused]] auto err = zmq_close(socket); @@ -751,27 +752,20 @@ Napi::Value Socket::Receive(const Napi::CallbackInfo& info) { void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) { #ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE - Arg::Validator args{ - Arg::Required("Group must be a string or buffer"), - }; - - if (args.ThrowIfInvalid(info)) { - return; + for (size_t i_value = 0; i_value < info.Length(); ++i_value) { + const auto& value = info[i_value]; + this->JoinElement(value); } +#endif +} +void Socket::JoinElement([[maybe_unused]] const Napi::Value& value) { +#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE if (!ValidateOpen()) { return; } - auto str = [&]() { - if (info[0].IsString()) { - return std::string(info[0].As()); - } - auto buf = info[0].As(); - auto length = buf.As>().Length(); - auto* value = buf.As>().Data(); - return std::string(value, length); - }(); + const auto str = convert_string_or_buffer(value); if (zmq_join(socket, str.c_str()) < 0) { ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException(); @@ -782,27 +776,20 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) { void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) { #ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE - Arg::Validator args{ - Arg::Required("Group must be a string or buffer"), - }; - - if (args.ThrowIfInvalid(info)) { - return; + for (size_t i_value = 0; i_value < info.Length(); ++i_value) { + const auto& value = info[i_value]; + this->LeaveElement(value); } +#endif +} +void Socket::LeaveElement([[maybe_unused]] const Napi::Value& value) { +#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE if (!ValidateOpen()) { return; } - auto str = [&]() { - if (info[0].IsString()) { - return std::string(info[0].As()); - } - auto buf = info[0].As(); - auto length = buf.As>().Length(); - auto* value = buf.As>().Data(); - return std::string(value, length); - }(); + const auto str = convert_string_or_buffer(value); if (zmq_leave(socket, str.c_str()) < 0) { ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException(); diff --git a/src/socket.h b/src/socket.h index ed1e0cc9..68beb0ef 100644 --- a/src/socket.h +++ b/src/socket.h @@ -73,6 +73,9 @@ class Socket : public Napi::ObjectWrap, public Closable { force_inline void Send(const Napi::Promise::Deferred& res, OutgoingMsg::Parts& parts); force_inline void Receive(const Napi::Promise::Deferred& res); + inline void JoinElement(const Napi::Value& value); + inline void LeaveElement(const Napi::Value& value); + class Poller : public zmq::Poller { std::reference_wrapper socket; std::optional read_deferred; diff --git a/src/util/arguments.h b/src/util/arguments.h index bbda1cca..3dba23b7 100644 --- a/src/util/arguments.h +++ b/src/util/arguments.h @@ -88,7 +88,8 @@ class Validator { if constexpr (I == NumArgs) { if (info.Length() > NumArgs) { auto msg = "Expected " + std::to_string(NumArgs) + " argument" - + (NumArgs != 1 ? "s" : ""); + + (NumArgs != 1 ? "s" : "") + " but received " + + std::to_string(info.Length()); return Napi::TypeError::New(info.Env(), msg); } diff --git a/src/util/string_or_buffer.h b/src/util/string_or_buffer.h new file mode 100644 index 00000000..6c43b113 --- /dev/null +++ b/src/util/string_or_buffer.h @@ -0,0 +1,22 @@ +#pragma once + +#include + +#include + +namespace zmq { + +inline std::string convert_string_or_buffer(const Napi::Value& value) { + if (value.IsString()) { + return std::string(value.As()); + } + if (value.IsBuffer()) { + auto buf = value.As(); + auto length = buf.As>().Length(); + auto* value = buf.As>().Data(); + return {value, length}; + } + throw Napi::TypeError::New(value.Env(), "Value must be a string or buffer"); +} + +} // namespace zmq From ad38ee1e80ae41429c0f6a3df75419a654121999 Mon Sep 17 00:00:00 2001 From: Amin Yahyaabadi Date: Mon, 30 Dec 2024 15:30:09 -0800 Subject: [PATCH 3/3] fix: fix finalization of thread-safe poller --- .github/workflows/docs.yml | 1 - src/draft.ts | 17 ++++++----------- src/socket.cc | 9 ++++++--- 3 files changed, 12 insertions(+), 15 deletions(-) diff --git a/.github/workflows/docs.yml b/.github/workflows/docs.yml index b7cbb54a..53de9edb 100644 --- a/.github/workflows/docs.yml +++ b/.github/workflows/docs.yml @@ -27,7 +27,6 @@ jobs: steps: - uses: actions/checkout@v4 - - name: Cache uses: actions/cache@v4 with: diff --git a/src/draft.ts b/src/draft.ts index d67cd808..d0ee38e4 100644 --- a/src/draft.ts +++ b/src/draft.ts @@ -41,27 +41,22 @@ interface RadioGroupOptions { export interface Radio extends Writable {} allowMethods(Radio.prototype, ["send"]) -const join = ( - Socket.prototype as Socket & { - join: (value: Array) => void - } -).join -const leave = ( - Socket.prototype as Socket & { - leave: (value: Array) => void - } -).leave - export class Dish extends Socket { constructor(options?: SocketOptions) { super(SocketType.Dish, options) } join(...values: Array): void { + const {join} = Socket.prototype as Socket & { + join: (value: Array) => void + } join(values) } leave(...values: Array): void { + const {leave} = Socket.prototype as Socket & { + leave: (value: Array) => void + } leave(values) } } diff --git a/src/socket.cc b/src/socket.cc index 43176cb2..95385db2 100644 --- a/src/socket.cc +++ b/src/socket.cc @@ -140,9 +140,12 @@ Socket::Socket(const Napi::CallbackInfo& info) /* Callback to free the underlying poller. Move the poller to transfer ownership after the constructor has completed. */ - finalize = [&]() { - [[maybe_unused]] auto err = zmq_poller_destroy(&poll); - assert(err == 0); + finalize = [poll]() mutable { + if (poll != nullptr) { + [[maybe_unused]] auto err = zmq_poller_destroy(&poll); + assert(err == 0); + poll = nullptr; + } }; if (zmq_poller_add(poll, socket, nullptr, ZMQ_POLLIN | ZMQ_POLLOUT) < 0) {