Skip to content

Commit

Permalink
Merge pull request #683 from zeromq/draft
Browse files Browse the repository at this point in the history
  • Loading branch information
aminya authored Dec 30, 2024
2 parents b817aac + ad38ee1 commit 0afa859
Show file tree
Hide file tree
Showing 10 changed files with 88 additions and 73 deletions.
1 change: 0 additions & 1 deletion .github/workflows/docs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ jobs:
steps:
- uses: actions/checkout@v4


- name: Cache
uses: actions/cache@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ macro(set_option_from_env OPTION_NAME)
message(STATUS "${OPTION_NAME}: ${${OPTION_NAME}}")
endmacro()

option(ZMQ_DRAFT "Build and install draft APIs" OFF)
option(ZMQ_DRAFT "Build and install draft APIs (e.g. `server-client`, `radio-dish`, `scatter-gather`)" ON)
set_option_from_env(ZMQ_DRAFT)

option(ZMQ_CURVE "Enable CURVE security" ON)
Expand Down
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
- Fully usable with TypeScript (3+).
- Compatible with Zeromq 4/5 via "zeromq/v5-compat"
- Secure Curve protocol with Libsodium
- Zeromq Draft API support

## Useful links

Expand Down Expand Up @@ -142,6 +143,8 @@ etc.

#### Draft support

(Enabled by default)

By default `libzmq` is built with support for `Draft` patterns (e.g.
`server-client`, `radio-dish`, `scatter-gather`). If you want to build `libzmq`
without support for `Draft`, you can specify the following in `.npmrc`:
Expand Down
16 changes: 6 additions & 10 deletions src/draft.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,27 +41,23 @@ interface RadioGroupOptions {
export interface Radio extends Writable<MessageLike, [RadioGroupOptions]> {}
allowMethods(Radio.prototype, ["send"])

const join = (Socket.prototype as any).join
const leave = (Socket.prototype as any).leave

export class Dish extends Socket {
constructor(options?: SocketOptions<Dish>) {
super(SocketType.Dish, options)
}

/* TODO: These methods might accept arrays in their C++ implementation for
the sake of simplicity. */

join(...values: Array<Buffer | string>): void {
for (const value of values) {
join(value)
const {join} = Socket.prototype as Socket & {
join: (value: Array<string | Buffer>) => void
}
join(values)
}

leave(...values: Array<Buffer | string>): void {
for (const value of values) {
leave(value)
const {leave} = Socket.prototype as Socket & {
leave: (value: Array<string | Buffer>) => void
}
leave(values)
}
}

Expand Down
3 changes: 2 additions & 1 deletion src/module.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ struct Terminator {
});

using namespace std::chrono_literals;
if (terminate.wait_for(500ms) == std::future_status::timeout) {
const auto timeout = 500ms;
if (terminate.wait_for(timeout) == std::future_status::timeout) {
/* We can't use process.emitWarning, because the Node.js runtime
has already shut down. So we mimic it instead. */
(void)fprintf(stderr,
Expand Down
21 changes: 6 additions & 15 deletions src/outgoing_msg.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

#include "./module.h"
#include "util/error.h"
#include "util/string_or_buffer.h"

namespace zmq {
OutgoingMsg::OutgoingMsg(Napi::Value value, std::reference_wrapper<Module> module) {
Expand Down Expand Up @@ -116,21 +117,10 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {
return false;
}

auto group = [&]() {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
} else if (value.IsBuffer()) {
Napi::Object buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
} else {
return std::string();
}
}();
const auto group = convert_string_or_buffer(value);

for (auto& part : parts) {
if (zmq_msg_set_group(part, group.c_str()) < 0) {
if (zmq_msg_set_group(part.get(), group.c_str()) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand All @@ -141,14 +131,15 @@ bool OutgoingMsg::Parts::SetGroup(Napi::Value value) {

bool OutgoingMsg::Parts::SetRoutingId(Napi::Value value) {
if (value.IsUndefined()) {
// https://clang.llvm.org/extra/clang-tidy/checks/readability/identifier-length.html
ErrnoException(value.Env(), EINVAL).ThrowAsJavaScriptException();
return false;
}

auto id = value.As<Napi::Number>().Uint32Value();
auto routing_id = value.As<Napi::Number>().Uint32Value();

for (auto& part : parts) {
if (zmq_msg_set_routing_id(part, id) < 0) {
if (zmq_msg_set_routing_id(part.get(), routing_id) < 0) {
ErrnoException(value.Env(), zmq_errno()).ThrowAsJavaScriptException();
return false;
}
Expand Down
87 changes: 43 additions & 44 deletions src/socket.cc
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "util/async_scope.h"
#include "util/error.h"
#include "util/object.h"
#include "util/string_or_buffer.h"
#include "util/take.h"
#include "util/uvdelayed.h"
#include "util/uvwork.h"
Expand Down Expand Up @@ -102,8 +103,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
return;
}

uv_os_sock_t file_descriptor = 0;
std::function<void()> const finalize = nullptr;
auto file_descriptor = uv_os_sock_t{};

const auto error = [this]() {
[[maybe_unused]] auto err = zmq_close(socket);
Expand All @@ -125,22 +125,27 @@ Socket::Socket(const Napi::CallbackInfo& info)
}
#endif

std::function<void()> finalize = nullptr;

/* Currently only some DRAFT sockets are threadsafe. */
if (thread_safe) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
/* Threadsafe sockets do not expose an FD we can integrate into the
event loop, so we have to construct one by creating a zmq_poller. */
auto poll = zmq_poller_new();
auto* poll = zmq_poller_new();
if (poll == nullptr) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
error();
}

/* Callback to free the underlying poller. Move the poller to transfer
ownership after the constructor has completed. */
finalize = [=]() mutable {
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
assert(err == 0);
finalize = [poll]() mutable {
if (poll != nullptr) {
[[maybe_unused]] auto err = zmq_poller_destroy(&poll);
assert(err == 0);
poll = nullptr;
}
};

if (zmq_poller_add(poll, socket, nullptr, ZMQ_POLLIN | ZMQ_POLLOUT) < 0) {
Expand All @@ -149,7 +154,7 @@ Socket::Socket(const Napi::CallbackInfo& info)
error();
}

if (zmq_poller_fd(poll, &fd) < 0) {
if (zmq_poller_fd(poll, &file_descriptor) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
finalize();
error();
Expand Down Expand Up @@ -327,17 +332,17 @@ void Socket::Receive(const Napi::Promise::Deferred& res) {
switch (type) {
case ZMQ_SERVER: {
auto meta = Napi::Object::New(Env());
meta.Set("routingId", zmq_msg_routing_id(part));
list[i++] = meta;
meta.Set("routingId", zmq_msg_routing_id(part.get()));
list[i_part++] = meta;
break;
}

case ZMQ_DISH: {
auto meta = Napi::Object::New(Env());
auto data = zmq_msg_group(part);
const auto* data = zmq_msg_group(part.get());
auto length = strnlen(data, ZMQ_GROUP_MAX_LENGTH);
meta.Set("group", Napi::Buffer<char>::Copy(Env(), data, length));
list[i++] = meta;
list[i_part++] = meta;
break;
}
}
Expand Down Expand Up @@ -610,7 +615,9 @@ Napi::Value Socket::Send(const Napi::CallbackInfo& info) {
Arg::Required<Arg::Object>("Options must be an object"),
};

if (args.ThrowIfInvalid(info)) return Env().Undefined();
if (args.ThrowIfInvalid(info)) {
return Env().Undefined();
}

break;
}
Expand Down Expand Up @@ -748,24 +755,20 @@ Napi::Value Socket::Receive(const Napi::CallbackInfo& info) {

void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->JoinElement(value);
}
#endif
}

if (args.ThrowIfInvalid(info)) return;
void Socket::JoinElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

if (!ValidateOpen()) return;

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
}();
const auto str = convert_string_or_buffer(value);

if (zmq_join(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand All @@ -776,24 +779,20 @@ void Socket::Join([[maybe_unused]] const Napi::CallbackInfo& info) {

void Socket::Leave([[maybe_unused]] const Napi::CallbackInfo& info) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
Arg::Validator args{
Arg::Required<Arg::String, Arg::Buffer>("Group must be a string or buffer"),
};

if (args.ThrowIfInvalid(info)) return;
for (size_t i_value = 0; i_value < info.Length(); ++i_value) {
const auto& value = info[i_value];
this->LeaveElement(value);
}
#endif
}

if (!ValidateOpen()) return;
void Socket::LeaveElement([[maybe_unused]] const Napi::Value& value) {
#ifdef ZMQ_HAS_POLLABLE_THREAD_SAFE
if (!ValidateOpen()) {
return;
}

auto str = [&]() {
if (info[0].IsString()) {
return std::string(info[0].As<Napi::String>());
} else {
Napi::Object buf = info[0].As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto value = buf.As<Napi::Buffer<char>>().Data();
return std::string(value, length);
}
}();
const auto str = convert_string_or_buffer(value);

if (zmq_leave(socket, str.c_str()) < 0) {
ErrnoException(Env(), zmq_errno()).ThrowAsJavaScriptException();
Expand Down
3 changes: 3 additions & 0 deletions src/socket.h
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,9 @@ class Socket : public Napi::ObjectWrap<Socket>, public Closable {
force_inline void Send(const Napi::Promise::Deferred& res, OutgoingMsg::Parts& parts);
force_inline void Receive(const Napi::Promise::Deferred& res);

inline void JoinElement(const Napi::Value& value);
inline void LeaveElement(const Napi::Value& value);

class Poller : public zmq::Poller<Poller> {
std::reference_wrapper<Socket> socket;
std::optional<Napi::Promise::Deferred> read_deferred;
Expand Down
3 changes: 2 additions & 1 deletion src/util/arguments.h
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,8 @@ class Validator {
if constexpr (I == NumArgs) {
if (info.Length() > NumArgs) {
auto msg = "Expected " + std::to_string(NumArgs) + " argument"
+ (NumArgs != 1 ? "s" : "");
+ (NumArgs != 1 ? "s" : "") + " but received "
+ std::to_string(info.Length());
return Napi::TypeError::New(info.Env(), msg);
}

Expand Down
22 changes: 22 additions & 0 deletions src/util/string_or_buffer.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
#pragma once

#include <napi.h>

#include <string>

namespace zmq {

inline std::string convert_string_or_buffer(const Napi::Value& value) {
if (value.IsString()) {
return std::string(value.As<Napi::String>());
}
if (value.IsBuffer()) {
auto buf = value.As<Napi::Object>();
auto length = buf.As<Napi::Buffer<char>>().Length();
auto* value = buf.As<Napi::Buffer<char>>().Data();
return {value, length};
}
throw Napi::TypeError::New(value.Env(), "Value must be a string or buffer");
}

} // namespace zmq

0 comments on commit 0afa859

Please sign in to comment.