diff --git a/connector/README.md b/connector/README.md
new file mode 100644
index 0000000..1b24935
--- /dev/null
+++ b/connector/README.md
@@ -0,0 +1,5 @@
+# IoT Protocol Connector
+
+## Overview
+
+Defines IoT protocol `connector` interface
diff --git a/connector/build.gradle.kts b/connector/build.gradle.kts
new file mode 100644
index 0000000..0937a96
--- /dev/null
+++ b/connector/build.gradle.kts
@@ -0,0 +1,9 @@
+dependencies {
+ api(project(":data"))
+ api(ZeroLibs.qwe_protocol)
+ api(ZeroLibs.qwe_http_metadata)
+ api(ZeroLibs.qwe_micro_rpc)
+ api(ZeroLibs.qwe_scheduler_metadata)
+
+ testImplementation(testFixtures(ZeroLibs.qwe_base))
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseProtocol.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseProtocol.java
new file mode 100644
index 0000000..f09a0a2
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseProtocol.java
@@ -0,0 +1,20 @@
+package io.github.zero88.qwe.iot.connector;
+
+import io.github.zero88.qwe.component.HasSharedData;
+import io.github.zero88.qwe.component.SharedDataLocalProxy;
+import io.github.zero88.qwe.protocol.HasProtocol;
+
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+
+@Getter
+@Accessors(fluent = true)
+@RequiredArgsConstructor
+public abstract class BaseProtocol implements HasSharedData, HasProtocol {
+
+ @NonNull
+ private final SharedDataLocalProxy sharedData;
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseService.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseService.java
new file mode 100644
index 0000000..599236e
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/BaseService.java
@@ -0,0 +1,13 @@
+package io.github.zero88.qwe.iot.connector;
+
+import io.github.zero88.qwe.component.SharedDataLocalProxy;
+
+import lombok.NonNull;
+
+public abstract class BaseService extends BaseProtocol implements ConnectorService {
+
+ protected BaseService(@NonNull SharedDataLocalProxy sharedData) {
+ super(sharedData);
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorService.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorService.java
new file mode 100644
index 0000000..97ab972
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorService.java
@@ -0,0 +1,22 @@
+package io.github.zero88.qwe.iot.connector;
+
+import io.github.zero88.qwe.component.HasSharedData;
+import io.github.zero88.qwe.event.EventListener;
+import io.github.zero88.qwe.protocol.HasProtocol;
+
+/**
+ * Represents a connector protocol service
+ */
+public interface ConnectorService extends EventListener, HasProtocol, HasSharedData {
+
+ /**
+ * Defines service domain name that will be used to distinguish to other domain services.
+ *
+ * One {@code domain} service can group multiple {@code function} services. Check {@link FunctionService}.
+ *
+ * @return domain name
+ * @apiNote It is used to generated HTTP path and Event address then it must be unique
+ */
+ String domain();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorServiceApis.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorServiceApis.java
new file mode 100644
index 0000000..3c886f9
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/ConnectorServiceApis.java
@@ -0,0 +1,57 @@
+package io.github.zero88.qwe.iot.connector;
+
+import java.util.Collections;
+import java.util.Set;
+
+import io.github.zero88.qwe.micro.http.ActionMethodMapping;
+import io.github.zero88.qwe.micro.http.EventHttpService;
+import io.github.zero88.qwe.micro.http.EventMethodDefinition;
+import io.github.zero88.utils.Urls;
+
+import lombok.NonNull;
+
+public interface ConnectorServiceApis extends ConnectorService, EventHttpService {
+
+ @Override
+ default String api() {
+ return String.join(".", domain(), protocol().type().toLowerCase(), getClass().getSimpleName());
+ }
+
+ @Override
+ default Set definitions() {
+ return Collections.singleton(EventMethodDefinition.create(fullServicePath(), paramPath(), eventMethodMap()));
+ }
+
+ /**
+ * Full HTTP service path
+ *
+ * @return full HTTP service path
+ */
+ default String fullServicePath() {
+ return Urls.combinePath(domain(), protocol().type().toLowerCase(), servicePath());
+ }
+
+ /**
+ * Service discovery HTTP path for a specific protocol resource
+ *
+ * @return path
+ */
+ @NonNull String servicePath();
+
+ /**
+ * Parameter path for manipulating a specific protocol resource
+ *
+ * @return param path
+ */
+ String paramPath();
+
+ /**
+ * Event action and HTTP method mapping
+ *
+ * @return event method map
+ * @see ActionMethodMapping
+ * @see ConnectorService#getAvailableEvents()
+ */
+ @NonNull ActionMethodMapping eventMethodMap();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionService.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionService.java
new file mode 100644
index 0000000..6554e45
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionService.java
@@ -0,0 +1,14 @@
+package io.github.zero88.qwe.iot.connector;
+
+import lombok.NonNull;
+
+/**
+ * Connector function service that interacts with only one particular {@code protocol object} per one time
+ *
+ * @see ConnectorService
+ */
+public interface FunctionService extends ConnectorService {
+
+ @NonNull String function();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionServiceApis.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionServiceApis.java
new file mode 100644
index 0000000..411e024
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/FunctionServiceApis.java
@@ -0,0 +1,16 @@
+package io.github.zero88.qwe.iot.connector;
+
+import io.github.zero88.utils.Urls;
+
+public interface FunctionServiceApis extends FunctionService, ConnectorServiceApis {
+
+ default String fullServicePath() {
+ return Urls.combinePath(domain(), protocol().type().toLowerCase(), servicePath(), function());
+ }
+
+ @Override
+ default String paramPath() {
+ return null;
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/Subject.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/Subject.java
new file mode 100644
index 0000000..2daf0b8
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/Subject.java
@@ -0,0 +1,20 @@
+package io.github.zero88.qwe.iot.connector;
+
+import io.github.zero88.qwe.dto.JsonData;
+import io.vertx.core.json.JsonObject;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+/**
+ * Represents for the thing that is being discovered or watched or supervised
+ */
+public interface Subject extends JsonData {
+
+ @JsonProperty("key")
+ String key();
+
+ default JsonObject toDetail() {
+ return JsonData.super.toJson();
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/Commander.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/Commander.java
new file mode 100644
index 0000000..8310cae
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/Commander.java
@@ -0,0 +1,36 @@
+package io.github.zero88.qwe.iot.connector.command;
+
+import java.util.Collection;
+import java.util.Collections;
+
+import io.github.zero88.qwe.dto.msg.RequestData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.event.EventContractor;
+import io.github.zero88.qwe.iot.connector.ConnectorService;
+import io.github.zero88.qwe.iot.connector.FunctionService;
+import io.reactivex.Single;
+import io.vertx.core.json.JsonObject;
+
+import lombok.NonNull;
+
+/**
+ * Represents for ad-hoc command that send a specific request to a particular {@code protocol device}
+ *
+ * @see ConnectorService
+ */
+public interface Commander extends FunctionService {
+
+ @Override
+ default String domain() {
+ return "command";
+ }
+
+ @EventContractor(action = "SEND", returnType = Single.class)
+ Single send(@NonNull RequestData requestData);
+
+ @Override
+ default @NonNull Collection getAvailableEvents() {
+ return Collections.singletonList(EventAction.SEND);
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/CommanderApis.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/CommanderApis.java
new file mode 100644
index 0000000..50c71ae
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/command/CommanderApis.java
@@ -0,0 +1,19 @@
+package io.github.zero88.qwe.iot.connector.command;
+
+import java.util.Collections;
+
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.iot.connector.FunctionServiceApis;
+import io.github.zero88.qwe.micro.http.ActionMethodMapping;
+import io.vertx.core.http.HttpMethod;
+
+import lombok.NonNull;
+
+public interface CommanderApis extends Commander, FunctionServiceApis {
+
+ @Override
+ default @NonNull ActionMethodMapping eventMethodMap() {
+ return ActionMethodMapping.create(Collections.singletonMap(EventAction.SEND, HttpMethod.POST));
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/Coordinator.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/Coordinator.java
new file mode 100644
index 0000000..82983be
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/Coordinator.java
@@ -0,0 +1,117 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import io.github.zero88.qwe.dto.ErrorMessage;
+import io.github.zero88.qwe.dto.msg.RequestData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.event.EventContractor;
+import io.github.zero88.qwe.event.EventContractor.Param;
+import io.github.zero88.qwe.event.EventPattern;
+import io.github.zero88.qwe.event.Waybill;
+import io.github.zero88.qwe.iot.connector.ConnectorService;
+import io.github.zero88.qwe.iot.connector.FunctionService;
+import io.github.zero88.qwe.iot.connector.Subject;
+import io.reactivex.Single;
+import io.vertx.core.json.JsonObject;
+
+import lombok.NonNull;
+
+/**
+ * Represents for an {@code coordinator service} that supervises a particular {@code subject} then notifying it to the
+ * registered {@code subscribers}.
+ *
+ * The end-to-end process is named as a {@code coordinator channel}
+ *
+ * @param Type of subject
+ * @see Subject
+ * @see ConnectorService
+ */
+public interface Coordinator extends FunctionService {
+
+ @Override
+ default String domain() {
+ return "coordinator";
+ }
+
+ /**
+ * Register a {@code coordinator channel}
+ *
+ * @param requestData request data
+ * @return coordinator channel
+ * @see #parseCoordinatorInput(RequestData)
+ * @see CoordinatorChannel
+ */
+ @EventContractor(action = "CREATE_OR_UPDATE", returnType = Single.class)
+ Single register(@NonNull RequestData requestData);
+
+ /**
+ * Unregister a {@code coordinator channel}
+ *
+ * @param requestData request data
+ * @return coordinator output
+ * @see CoordinatorChannel
+ */
+ @EventContractor(action = "REMOVE", returnType = Single.class)
+ Single unregister(@NonNull RequestData requestData);
+
+ /**
+ * Query a {@code coordinator channel} by {@code subject}
+ *
+ * @param requestData request data
+ * @return coordinator output
+ * @see CoordinatorChannel
+ */
+ @EventContractor(action = "GET_ONE", returnType = Single.class)
+ Single get(@NonNull RequestData requestData);
+
+ /**
+ * Defines a handler that listens an event of {@code subject} then notifies to list of {@code subscribers}
+ *
+ * @param data subject event data
+ * @param error error message if any error
+ * @return true as ack
+ */
+ @EventContractor(action = "MONITOR", returnType = boolean.class)
+ boolean superviseThenNotify(@Param("data") JsonObject data, @Param("error") ErrorMessage error);
+
+ /**
+ * Parse a coordinator input from request to prepare {@code coordinator channel}
+ *
+ * @param requestData request data
+ * @return coordinator input
+ */
+ @NonNull CoordinatorInput parseCoordinatorInput(@NonNull RequestData requestData);
+
+ @Override
+ default @NonNull Collection getAvailableEvents() {
+ return Arrays.asList(EventAction.CREATE_OR_UPDATE, EventAction.REMOVE, EventAction.GET_ONE, EventAction.PATCH,
+ EventAction.MONITOR);
+ }
+
+ /**
+ * Declares a coordinator address to register a callback address when a {@code subject} notifies an event or change
+ *
+ * @return waybill
+ * @see #superviseThenNotify(JsonObject, ErrorMessage)
+ * @see Waybill
+ */
+ default Waybill coordinatorInfo() {
+ return Waybill.builder()
+ .address(this.getClass().getName())
+ .action(EventAction.MONITOR)
+ .pattern(EventPattern.PUBLISH_SUBSCRIBE)
+ .build();
+ }
+
+ /**
+ * Defines an {@code subject} address that want to supervise is where a {@code coordinator} can ask
+ *
+ * @param payload request data that send to {@code subject} address
+ * @return waybill
+ * @see Waybill
+ */
+ Waybill subjectInfo(JsonObject payload);
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorApis.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorApis.java
new file mode 100644
index 0000000..a4a2115
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorApis.java
@@ -0,0 +1,16 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import io.github.zero88.qwe.iot.connector.FunctionServiceApis;
+import io.github.zero88.qwe.iot.connector.Subject;
+import io.github.zero88.qwe.micro.http.ActionMethodMapping;
+
+import lombok.NonNull;
+
+public interface CoordinatorApis extends Coordinator, FunctionServiceApis {
+
+ @Override
+ default @NonNull ActionMethodMapping eventMethodMap() {
+ return ActionMethodMapping.by(ActionMethodMapping.CRD_MAP, getAvailableEvents());
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannel.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannel.java
new file mode 100644
index 0000000..51b676e
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannel.java
@@ -0,0 +1,83 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.iot.connector.Subject;
+import io.github.zero88.qwe.iot.connector.subscriber.Subscriber;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherOption;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherType;
+import io.vertx.core.json.JsonObject;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+
+import lombok.Builder;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.Setter;
+import lombok.Singular;
+import lombok.experimental.Accessors;
+import lombok.experimental.FieldNameConstants;
+import lombok.extern.jackson.Jacksonized;
+
+/**
+ * Represents for a coordinator channel
+ */
+@Data
+@Builder
+@Jacksonized
+@FieldNameConstants
+public final class CoordinatorChannel implements JsonData {
+
+ private final String key;
+ private final JsonObject subject;
+ private final WatcherType watcherType;
+ private final WatcherOption watcherOption;
+ private final String watcherKey;
+ @Singular
+ private final List subscribers;
+ @Setter
+ @Accessors(fluent = true)
+ @JsonProperty(Fields.watcherOutput)
+ private JsonObject watcherOutput;
+
+ public static @NonNull CoordinatorChannel from(@NonNull CoordinatorInput extends Subject> input,
+ @NonNull WatcherType watcherType, @NonNull String watcherKey,
+ JsonObject watcherOutput) {
+ return CoordinatorChannel.builder()
+ .key(input.getSubject().key())
+ .watcherType(watcherType)
+ .watcherKey(watcherKey)
+ .watcherOutput(watcherOutput)
+ .watcherOption(input.getWatcherOption())
+ .subject(input.getSubject().toDetail())
+ .subscribers(input.getSubscribers()
+ .stream()
+ .map(Subscriber::toJson)
+ .collect(Collectors.toList()))
+ .build();
+ }
+
+ @JsonProperty(Fields.watcherType)
+ public String watcherType() {
+ return watcherType.type();
+ }
+
+ private String getKey() {
+ return key;
+ }
+
+ @JsonProperty(Fields.key)
+ public String key() {
+ return Optional.ofNullable(getKey())
+ .orElseGet(() -> Optional.ofNullable(subject).map(s -> s.getString("key")).orElse(null));
+ }
+
+ public JsonObject persist() {
+ return toJson(Collections.singleton(Fields.watcherOutput));
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInput.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInput.java
new file mode 100644
index 0000000..8db6dc7
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInput.java
@@ -0,0 +1,48 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import java.util.List;
+
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.iot.connector.Subject;
+import io.github.zero88.qwe.iot.connector.subscriber.Subscriber;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherOption;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.Singular;
+import lombok.experimental.FieldNameConstants;
+import lombok.extern.jackson.Jacksonized;
+
+/**
+ * Coordinator input to prepare a {@code coordinator channel}
+ *
+ * @param Type of Subject
+ * @see Subject
+ * @see Subscriber
+ * @see WatcherOption
+ */
+@Data
+@Builder
+@Jacksonized
+@FieldNameConstants
+public final class CoordinatorInput implements JsonData {
+
+ @NonNull
+ private final S subject;
+ @Default
+ @NonNull
+ private final WatcherOption watcherOption = WatcherOption.builder().build();
+ @NonNull
+ @Singular
+ private final List subscribers;
+
+ public CoordinatorInput validate() {
+ if (getSubscribers().isEmpty()) {
+ throw new IllegalArgumentException("Must provides at least subscribers");
+ }
+ return this;
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/InboundCoordinator.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/InboundCoordinator.java
new file mode 100644
index 0000000..c173678
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/InboundCoordinator.java
@@ -0,0 +1,134 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import io.github.zero88.qwe.dto.ErrorMessage;
+import io.github.zero88.qwe.dto.msg.RequestData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.event.EventContractor;
+import io.github.zero88.qwe.event.EventContractor.Param;
+import io.github.zero88.qwe.iot.connector.Subject;
+import io.github.zero88.qwe.iot.connector.rpc.persistence.HasPersistenceClient;
+import io.github.zero88.qwe.iot.connector.rpc.persistence.PersistenceClient;
+import io.github.zero88.qwe.iot.connector.rpc.scheduler.HasSchedulerClient;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherOption;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherType;
+import io.github.zero88.qwe.scheduler.model.job.EventbusJobModel;
+import io.github.zero88.qwe.scheduler.model.job.QWEJobModel;
+import io.github.zero88.qwe.scheduler.model.trigger.QWETriggerModel;
+import io.github.zero88.qwe.scheduler.service.SchedulerRegisterArgs;
+import io.github.zero88.qwe.scheduler.service.SchedulerRegisterResp;
+import io.reactivex.Single;
+import io.vertx.core.json.JsonObject;
+
+import lombok.NonNull;
+
+/**
+ * Represents for an {@code coordinator service} that watches a particular {@code internal subject} then notifying this
+ * events to external {@code subscribers}.
+ *
+ * @param Type of subject
+ * @see Subject
+ * @see Coordinator
+ * @see HasSchedulerClient
+ * @see HasPersistenceClient
+ */
+public interface InboundCoordinator
+ extends Coordinator, HasSchedulerClient, HasPersistenceClient {
+
+ @EventContractor(action = "CREATE_OR_UPDATE", returnType = Single.class)
+ default Single register(@NonNull RequestData requestData) {
+ return this.validateInCreation(parseCoordinatorInput(requestData)).flatMap(this::addWatcher);
+ }
+
+ @EventContractor(action = "REMOVE", returnType = Single.class)
+ default Single unregister(@NonNull RequestData requestData) {
+ return Single.just(parseCoordinatorInput(requestData))
+ .flatMap(input -> validateInQuery(input).flatMap(channel -> this.removeWatcher(input, channel)));
+ }
+
+ @EventContractor(action = "GET_ONE", returnType = Single.class)
+ default Single get(@NonNull RequestData requestData) {
+ return Single.just(parseCoordinatorInput(requestData))
+ .flatMap(this::validateInQuery)
+ .flatMap(this::getCurrentWatcherStatus);
+ }
+
+ @EventContractor(action = "MONITOR", returnType = boolean.class)
+ boolean superviseThenNotify(@Param("data") JsonObject data, @Param("error") ErrorMessage error);
+
+ default Single> validateInCreation(@NonNull CoordinatorInput input) {
+ return Single.just(input.validate());
+ }
+
+ Single validateInQuery(@NonNull CoordinatorInput input);
+
+ default Single addWatcher(CoordinatorInput input) {
+ return Single.just(input.getWatcherOption())
+ .filter(WatcherOption::isRealtime)
+ .flatMapSingleElement(ignore -> addRealtimeWatcher(input))
+ .switchIfEmpty(addPollingWatcher(input));
+ }
+
+ default Single getCurrentWatcherStatus(@NonNull CoordinatorChannel channel) {
+ return Single.just(channel.getWatcherType())
+ .filter(t -> t == WatcherType.POLLING)
+ .flatMapSingleElement(ignore -> getCurrentPollingWatcher(channel))
+ .switchIfEmpty(getCurrentRealtimeWatcher(channel));
+ }
+
+ default Single removeWatcher(@NonNull CoordinatorInput coordinatorInput,
+ @NonNull CoordinatorChannel coordinatorChannel) {
+ if (coordinatorChannel.getWatcherType() == WatcherType.REALTIME) {
+ return removeRealtimeWatcher(coordinatorInput, coordinatorChannel);
+ }
+ if (coordinatorChannel.getWatcherType() == WatcherType.POLLING) {
+ return removePollingWatcher(coordinatorInput, coordinatorChannel);
+ }
+ throw new IllegalArgumentException("Unknown watcher type " + coordinatorChannel.getWatcherType());
+ }
+
+ Single addRealtimeWatcher(@NonNull CoordinatorInput coordinatorInput);
+
+ Single getCurrentRealtimeWatcher(@NonNull CoordinatorChannel channel);
+
+ Single removeRealtimeWatcher(@NonNull CoordinatorInput coordinatorInput,
+ @NonNull CoordinatorChannel coordinatorChannel);
+
+ Single addPollingWatcher(@NonNull CoordinatorInput coordinatorInput);
+
+ default Single getCurrentPollingWatcher(@NonNull CoordinatorChannel channel) {
+ final SchedulerRegisterArgs args = SchedulerRegisterArgs.builder()
+ .jobKey(channel.getWatcherKey())
+ .triggerKey(channel.getWatcherKey())
+ .build();
+ return schedulerService().execute(EventAction.GET_ONE, args,
+ protocol().type() + ":" + function() + ":" + domain() + " polling watcher")
+ .map(resp -> channel.watcherOutput(resp.toJson()));
+ }
+
+ default Single removePollingWatcher(@NonNull CoordinatorInput coordinatorInput,
+ @NonNull CoordinatorChannel coordinatorChannel) {
+ return removeScheduler(coordinatorChannel.key()).map(resp -> coordinatorChannel.watcherOutput(resp.toJson()));
+ }
+
+ default Single addScheduler(@NonNull WatcherOption option, @NonNull String jobName,
+ @NonNull String triggerName,
+ @NonNull JsonObject processPayload) {
+ final QWEJobModel job = EventbusJobModel.builder()
+ .group(protocol().type())
+ .name(jobName)
+ .process(subjectInfo(processPayload))
+ .callback(coordinatorInfo())
+ .build();
+ final QWETriggerModel trigger = QWETriggerModel.from(protocol().type(), jobName, option.getTriggerOption());
+ final SchedulerRegisterArgs args = SchedulerRegisterArgs.builder().job(job).trigger(trigger).build();
+ return schedulerService().execute(EventAction.CREATE, args,
+ protocol().type() + ":" + function() + ":" + domain() + " polling watcher");
+ }
+
+ default Single removeScheduler(@NonNull String jobName) {
+ final String jobKey = SchedulerRegisterArgs.createKey(protocol().type(), jobName);
+ return schedulerService().execute(EventAction.REMOVE, SchedulerRegisterArgs.builder().jobKey(jobKey).build(),
+ protocol().type() + ":" + function() + ":" + domain() + " polling watcher");
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/OutboundCoordinator.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/OutboundCoordinator.java
new file mode 100644
index 0000000..9db9859
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/coordinator/OutboundCoordinator.java
@@ -0,0 +1,88 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import java.util.Collection;
+
+import io.github.zero88.qwe.dto.msg.RequestData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.event.EventContractor;
+import io.github.zero88.qwe.event.EventListener;
+import io.github.zero88.qwe.iot.data.IoTEntity;
+import io.github.zero88.qwe.micro.http.ActionMethodMapping;
+import io.reactivex.Single;
+
+import lombok.NonNull;
+
+/**
+ * Represents for a {@code RpcClient service} that listens an {@code external event} from outside services then
+ * dispatching event to a corresponding inner service handler
+ *
+ * @param Type of entity object
+ * @see EventListener
+ */
+public interface OutboundCoordinator
extends EventListener {
+
+ @Override
+ default @NonNull Collection getAvailableEvents() {
+ return ActionMethodMapping.DML_MAP.get().keySet();
+ }
+
+ /**
+ * Defines itself address in eventbus network
+ *
+ * @return Eventbus address
+ */
+ default String address() {
+ return this.getClass().getName();
+ }
+
+ /**
+ * Defines whether listening global event in {@code declared entity} regardless if entity protocol isn't matched
+ * with declared protocol
+ *
+ * @return {@code true} if global
+ */
+ default boolean isGlobal() {
+ return false;
+ }
+
+ /**
+ * Defines listener for updating existing resource by primary key
+ *
+ * @param requestData Request data
+ * @return json object that includes status message
+ * @see EventAction#CREATE
+ */
+ @EventContractor(action = "CREATE", returnType = Single.class)
+ @NonNull Single create(@NonNull RequestData requestData);
+
+ /**
+ * Defines listener for updating existing resource by primary key
+ *
+ * @param requestData Request data
+ * @return json object that includes status message
+ * @see EventAction#UPDATE
+ */
+ @EventContractor(action = "UPDATE", returnType = Single.class)
+ @NonNull Single
update(@NonNull RequestData requestData);
+
+ /**
+ * Defines listener for patching existing resource by primary key
+ *
+ * @param requestData Request data
+ * @return json object that includes status message
+ * @see EventAction#PATCH
+ */
+ @EventContractor(action = "PATCH", returnType = Single.class)
+ @NonNull Single
patch(@NonNull RequestData requestData);
+
+ /**
+ * Defines listener for deleting existing resource by primary key
+ *
+ * @param requestData Request data
+ * @return json object that includes status message
+ * @see EventAction#REMOVE
+ */
+ @EventContractor(action = "REMOVE", returnType = Single.class)
+ @NonNull Single
delete(@NonNull RequestData requestData);
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerService.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerService.java
new file mode 100644
index 0000000..cfcef04
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerService.java
@@ -0,0 +1,49 @@
+package io.github.zero88.qwe.iot.connector.discovery;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import io.github.zero88.qwe.dto.msg.RequestData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.event.EventContractor;
+import io.github.zero88.qwe.iot.connector.ConnectorService;
+import io.github.zero88.qwe.iot.data.IoTEntities;
+import io.github.zero88.qwe.iot.data.IoTEntity;
+import io.github.zero88.qwe.protocol.Protocol;
+import io.reactivex.Single;
+
+import lombok.NonNull;
+
+/**
+ * Represents for an {@code explorer service} that is able to discover a particular {@code protocol} data object on
+ * demand.
+ *
+ * @param
Type of IoT entity
+ * @param Type of IoT entity key
+ * @param Type of IoT entities that wraps IoT entity
+ * @see Protocol
+ */
+public interface ExplorerService, X extends IoTEntities> extends ConnectorService {
+
+ /**
+ * Defines service function name that will be used to distinguish to other services.
+ *
+ * @return function name
+ * @apiNote It is used to generated HTTP path and Event address then it must be unique
+ */
+ default String domain() {
+ return "discover";
+ }
+
+ @EventContractor(action = "GET_ONE", returnType = Single.class)
+ Single discover(@NonNull RequestData reqData);
+
+ @EventContractor(action = "GET_LIST", returnType = Single.class)
+ Single discoverMany(@NonNull RequestData reqData);
+
+ @Override
+ default @NonNull Collection getAvailableEvents() {
+ return Arrays.asList(EventAction.GET_LIST, EventAction.GET_ONE);
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerServiceApis.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerServiceApis.java
new file mode 100644
index 0000000..245f2cc
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/discovery/ExplorerServiceApis.java
@@ -0,0 +1,31 @@
+package io.github.zero88.qwe.iot.connector.discovery;
+
+import io.github.zero88.qwe.iot.connector.ConnectorServiceApis;
+import io.github.zero88.qwe.iot.data.IoTEntities;
+import io.github.zero88.qwe.iot.data.IoTEntity;
+import io.github.zero88.qwe.micro.http.ActionMethodMapping;
+import io.github.zero88.qwe.micro.http.EventHttpService;
+
+import lombok.NonNull;
+
+/**
+ * Represents for {@code Discovery APIs} that expose as public endpoints
+ *
+ * @param Type of IoT entity
+ * @see ExplorerService
+ * @see EventHttpService
+ */
+public interface ExplorerServiceApis, X extends IoTEntities>
+ extends ExplorerService, ConnectorServiceApis {
+
+ /**
+ * Event action and HTTP method mapping
+ *
+ * @return event method map
+ * @see ActionMethodMapping
+ */
+ default @NonNull ActionMethodMapping eventMethodMap() {
+ return ActionMethodMapping.DQL_MAP;
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/RpcClient.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/RpcClient.java
new file mode 100644
index 0000000..0bd86e5
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/RpcClient.java
@@ -0,0 +1,23 @@
+package io.github.zero88.qwe.iot.connector.rpc;
+
+import io.github.zero88.qwe.protocol.HasProtocol;
+import io.github.zero88.qwe.rpc.GatewayServiceInvoker;
+
+import lombok.NonNull;
+
+public interface RpcClient extends HasProtocol, GatewayServiceInvoker {
+
+ @NonNull String function();
+
+ @Override
+ @NonNull
+ default String requester() {
+ return protocol().type() + "-" + function();
+ }
+
+ @Override
+ default String serviceLabel() {
+ return protocol().type() + function() + " RPC client";
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/HasPersistenceClient.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/HasPersistenceClient.java
new file mode 100644
index 0000000..ff22f0a
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/HasPersistenceClient.java
@@ -0,0 +1,9 @@
+package io.github.zero88.qwe.iot.connector.rpc.persistence;
+
+import lombok.NonNull;
+
+public interface HasPersistenceClient {
+
+ @NonNull T persistenceService();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/PersistenceClient.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/PersistenceClient.java
new file mode 100644
index 0000000..dc6e056
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/persistence/PersistenceClient.java
@@ -0,0 +1,22 @@
+package io.github.zero88.qwe.iot.connector.rpc.persistence;
+
+import io.github.zero88.qwe.iot.connector.rpc.RpcClient;
+
+import lombok.NonNull;
+
+public interface PersistenceClient extends RpcClient {
+
+ @Override
+ default @NonNull String function() {
+ return "persistence";
+ }
+
+ /**
+ * A persistence service name
+ *
+ * @return persistence service name
+ */
+ @Override
+ @NonNull String destination();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/HasSchedulerClient.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/HasSchedulerClient.java
new file mode 100644
index 0000000..1c5da65
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/HasSchedulerClient.java
@@ -0,0 +1,9 @@
+package io.github.zero88.qwe.iot.connector.rpc.scheduler;
+
+import lombok.NonNull;
+
+public interface HasSchedulerClient {
+
+ @NonNull SchedulerClient schedulerService();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/SchedulerClient.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/SchedulerClient.java
new file mode 100644
index 0000000..b8399fd
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/rpc/scheduler/SchedulerClient.java
@@ -0,0 +1,41 @@
+package io.github.zero88.qwe.iot.connector.rpc.scheduler;
+
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.event.EventAction;
+import io.github.zero88.qwe.exceptions.ServiceException;
+import io.github.zero88.qwe.iot.connector.rpc.RpcClient;
+import io.github.zero88.qwe.scheduler.service.SchedulerRegisterArgs;
+import io.github.zero88.qwe.scheduler.service.SchedulerRegisterResp;
+import io.reactivex.Single;
+
+import lombok.NonNull;
+
+public interface SchedulerClient extends RpcClient {
+
+ @Override
+ default @NonNull String function() {
+ return "schedule";
+ }
+
+ /**
+ * A scheduler service name
+ *
+ * @return scheduler service name
+ */
+ @Override
+ @NonNull String destination();
+
+ @Override
+ default boolean throwIfResponseError() {
+ return true;
+ }
+
+ default Single execute(@NonNull EventAction action, @NonNull SchedulerRegisterArgs args,
+ @NonNull String schedulerIdentifier) {
+ return this.execute(action, args.toJson())
+ .map(resp -> JsonData.from(resp, SchedulerRegisterResp.class))
+ .onErrorResumeNext(
+ t -> Single.error(new ServiceException("Unable to " + action + " " + schedulerIdentifier, t)));
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/RpcScanner.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/RpcScanner.java
new file mode 100644
index 0000000..87ea900
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/RpcScanner.java
@@ -0,0 +1,28 @@
+package io.github.zero88.qwe.iot.connector.scanner;
+
+import io.github.zero88.qwe.iot.connector.ConnectorService;
+import io.github.zero88.qwe.iot.data.IoTEntity;
+
+import lombok.NonNull;
+
+/**
+ * Represents for a {@code RpcClient service} that is able to scan from a specific external source to extract an
+ * appropriate {@code protocol} data object then using them to initialize itself service in startup.
+ *
+ * @param Type of {@code IoTEntity}
+ * @see ScannerSource
+ * @since 1.0.0
+ */
+public interface RpcScanner
extends ConnectorService {
+
+ @NonNull ScannerSource source();
+
+ /**
+ * Declares context that represents for the protocol entity
+ *
+ * @return class of protocol entity
+ * @see IoTEntity
+ */
+ @NonNull Class
context();
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/ScannerSource.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/ScannerSource.java
new file mode 100644
index 0000000..298ab75
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/scanner/ScannerSource.java
@@ -0,0 +1,6 @@
+package io.github.zero88.qwe.iot.connector.scanner;
+
+/**
+ * Defines scanner source
+ */
+public interface ScannerSource {}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/Subscriber.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/Subscriber.java
new file mode 100644
index 0000000..736e040
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/Subscriber.java
@@ -0,0 +1,25 @@
+package io.github.zero88.qwe.iot.connector.subscriber;
+
+import io.github.zero88.qwe.dto.JsonData;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.annotation.JsonUnwrapped;
+
+/**
+ * Represents for subscriber
+ *
+ * @see SubscriberType
+ */
+public interface Subscriber extends JsonData {
+
+ @JsonUnwrapped
+ SubscriberType getType();
+
+ String getCode();
+
+ @JsonProperty("key")
+ default String key() {
+ return getType().type() + "::" + getCode();
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/SubscriberType.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/SubscriberType.java
new file mode 100644
index 0000000..16c60b3
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/subscriber/SubscriberType.java
@@ -0,0 +1,19 @@
+package io.github.zero88.qwe.iot.connector.subscriber;
+
+import io.github.zero88.qwe.dto.EnumType;
+import io.github.zero88.qwe.dto.EnumType.AbstractEnumType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+public final class SubscriberType extends AbstractEnumType {
+
+ private SubscriberType(String type) {
+ super(type);
+ }
+
+ @JsonCreator
+ public static SubscriberType factory(String name) {
+ return EnumType.factory(name, SubscriberType.class, true);
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOption.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOption.java
new file mode 100644
index 0000000..4d3cfae
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOption.java
@@ -0,0 +1,70 @@
+package io.github.zero88.qwe.iot.connector.watcher;
+
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.scheduler.model.trigger.TriggerOption;
+import io.github.zero88.qwe.scheduler.model.trigger.TriggerType;
+import io.vertx.core.json.JsonObject;
+
+import lombok.Builder;
+import lombok.Builder.Default;
+import lombok.Data;
+import lombok.NonNull;
+import lombok.extern.jackson.Jacksonized;
+
+/**
+ * Defines watcher mechanism in realtime or polling
+ *
+ * @see WatcherType
+ */
+@Data
+@Builder
+@Jacksonized
+public final class WatcherOption implements JsonData {
+
+ /**
+ * Enable realtime mode that run when any event is occurred in a watcher object
+ */
+ @Default
+ private final boolean realtime = true;
+ /**
+ * Defines a real-time watcher is maintained in how long
+ *
+ * @apiNote Default is {@code -1} mean no expired
+ */
+ @Default
+ private final int lifetimeInSeconds = -1;
+
+ /**
+ * Fallback to polling mechanism with default trigger option if {@code realtime} mechanism is not supported
+ *
+ * @see #triggerOption
+ */
+ @Default
+ private final boolean fallbackPolling = true;
+
+ /**
+ * Enable polling mode that run on a schedule or periodical, such as reading a sensor every five milliseconds
+ */
+ @Default
+ private final boolean polling = false;
+
+ /**
+ * Defines trigger option if enable trigger mode.
+ *
+ * @apiNote Default option is {@code periodic} with {@code interval = 5}
+ */
+ @Default
+ private final TriggerOption triggerOption = TriggerOption.builder()
+ .type(TriggerType.PERIODIC)
+ .intervalInSeconds(5)
+ .build();
+
+ public static WatcherOption parse(@NonNull JsonObject watcher) {
+ final WatcherOption opt = JsonData.from(watcher, WatcherOption.class);
+ if (!opt.isRealtime() && !opt.isPolling()) {
+ throw new IllegalArgumentException("Must enabled one of realtime or polling mechanism");
+ }
+ return opt;
+ }
+
+}
diff --git a/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherType.java b/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherType.java
new file mode 100644
index 0000000..7503692
--- /dev/null
+++ b/connector/src/main/java/io/github/zero88/qwe/iot/connector/watcher/WatcherType.java
@@ -0,0 +1,22 @@
+package io.github.zero88.qwe.iot.connector.watcher;
+
+import io.github.zero88.qwe.dto.EnumType;
+import io.github.zero88.qwe.dto.EnumType.AbstractEnumType;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+
+public final class WatcherType extends AbstractEnumType {
+
+ public static final WatcherType POLLING = new WatcherType("POLLING");
+ public static final WatcherType REALTIME = new WatcherType("REALTIME");
+
+ private WatcherType(String type) {
+ super(type);
+ }
+
+ @JsonCreator
+ public static WatcherType factory(String type) {
+ return EnumType.factory(type, WatcherType.class, true);
+ }
+
+}
diff --git a/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannelTest.java b/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannelTest.java
new file mode 100644
index 0000000..bf68dfa
--- /dev/null
+++ b/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorChannelTest.java
@@ -0,0 +1,52 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import org.json.JSONException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.github.zero88.qwe.JsonHelper;
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.iot.connector.mock.MockSubject;
+import io.github.zero88.qwe.iot.connector.mock.MockSubscriber;
+import io.github.zero88.qwe.iot.connector.watcher.WatcherType;
+import io.vertx.core.json.JsonObject;
+
+public class CoordinatorChannelTest {
+
+ @Test
+ void test_serialize() throws JSONException {
+ final CoordinatorInput input = CoordinatorInput.builder().subject(
+ new MockSubject("m1")).subscriber(new MockSubscriber("s1")).build();
+ final CoordinatorChannel channel = CoordinatorChannel.from(input, WatcherType.REALTIME, "watcherKey",
+ new JsonObject().put("a", "b"));
+ final JsonObject expected = new JsonObject(
+ "{\"subject\":{\"key\":\"m1\"},\"watcherKey\":\"watcherKey\",\"watcherType\":\"REALTIME\"," +
+ "\"watcherOption\":{\"realtime\":true,\"lifetimeInSeconds\":-1,\"fallbackPolling\":true," +
+ "\"polling\":false,\"triggerOption\":{\"type\":\"PERIODIC\",\"intervalInSeconds\":5,\"repeat\":-1}}," +
+ "\"subscribers\":[{\"code\":\"s1\",\"type\":\"MOCK\",\"key\":\"MOCK::s1\"}],\"key\":\"m1\", " +
+ "\"watcherOutput\":{\"a\":\"b\"}}");
+ JsonHelper.assertJson(expected, channel.toJson());
+ }
+
+ @Test
+ void test_deserialize() {
+ JsonObject input = new JsonObject(
+ "{\"watcherOutput\":{\"jobKey\":\"BACnet.udp4-wlp4s0-47808_1110_device:1110\"," +
+ "\"triggerKey\":\"BACnet.udp4-wlp4s0-47808_1110_device:1110\"," +
+ "\"firstFireTime\":{\"local\":\"2021-02-04T17:25:26.072+07:00[Asia/Ho_Chi_Minh]\"," +
+ "\"utc\":\"2021-02-04T10:25:26.072Z\"}},\"subject\":{\"networkId\":\"udp4-wlp4s0-47808\"," +
+ "\"deviceInstance\":1110,\"objectCode\":\"device:1110\"," +
+ "\"key\":\"udp4-wlp4s0-47808_1110_device:1110\"},\"subscribers\":[{\"wsPath\":\"/cov\"," +
+ "\"publishAddress\":\"bacnet.websocket.cov\",\"action\":\"MONITOR\",\"code\":\"bacnet_cov\"," +
+ "\"type\":\"WEBSOCKET_SERVER\",\"key\":\"WEBSOCKET_SERVER::bacnet_cov\"}]," +
+ "\"watcherOption\":{\"realtime\":true,\"lifetimeInSeconds\":-1,\"fallbackPolling\":true," +
+ "\"polling\":false,\"triggerOption\":{\"type\":\"PERIODIC\",\"intervalInSeconds\":5," +
+ "\"repeat\":-1}},\"watcherType\":\"POLLING\"}");
+ final CoordinatorChannel from = JsonData.from(input, CoordinatorChannel.class);
+ Assertions.assertEquals("udp4-wlp4s0-47808_1110_device:1110", from.key());
+ Assertions.assertEquals(WatcherType.POLLING, from.getWatcherType());
+ Assertions.assertEquals(1, from.getSubscribers().size());
+ System.out.println(from.toJson());
+ }
+
+}
diff --git a/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInputTest.java b/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInputTest.java
new file mode 100644
index 0000000..5e452be
--- /dev/null
+++ b/connector/src/test/java/io/github/zero88/qwe/iot/connector/coordinator/CoordinatorInputTest.java
@@ -0,0 +1,25 @@
+package io.github.zero88.qwe.iot.connector.coordinator;
+
+import org.json.JSONException;
+import org.junit.jupiter.api.Test;
+
+import io.github.zero88.qwe.JsonHelper;
+import io.github.zero88.qwe.iot.connector.mock.MockSubject;
+import io.github.zero88.qwe.iot.connector.mock.MockSubscriber;
+import io.vertx.core.json.JsonObject;
+
+public class CoordinatorInputTest {
+
+ @Test
+ public void test_serialize() throws JSONException {
+ final CoordinatorInput option = CoordinatorInput.builder().subject(
+ new MockSubject("m1")).subscriber(new MockSubscriber("s1")).build();
+ final JsonObject expected = new JsonObject(
+ "{\"subject\":{\"key\":\"m1\"},\"watcherOption\":{\"realtime\":true,\"lifetimeInSeconds\":-1," +
+ "\"fallbackPolling\":true,\"polling\":false,\"triggerOption\":{\"type\":\"PERIODIC\"," +
+ "\"intervalInSeconds\":5,\"repeat\":-1}},\"subscribers\":[{\"code\":\"s1\",\"type\":\"MOCK\",\"key" +
+ "\":\"MOCK::s1\"}]}");
+ JsonHelper.assertJson(expected, option.toJson());
+ }
+
+}
diff --git a/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubject.java b/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubject.java
new file mode 100644
index 0000000..cf8084e
--- /dev/null
+++ b/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubject.java
@@ -0,0 +1,22 @@
+package io.github.zero88.qwe.iot.connector.mock;
+
+import io.github.zero88.qwe.iot.connector.Subject;
+
+import lombok.Builder;
+import lombok.Getter;
+import lombok.NonNull;
+import lombok.RequiredArgsConstructor;
+import lombok.experimental.Accessors;
+import lombok.extern.jackson.Jacksonized;
+
+@Getter
+@Builder
+@Jacksonized
+@Accessors(fluent = true)
+@RequiredArgsConstructor
+public class MockSubject implements Subject {
+
+ @NonNull
+ private final String key;
+
+}
diff --git a/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubscriber.java b/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubscriber.java
new file mode 100644
index 0000000..45ac1aa
--- /dev/null
+++ b/connector/src/test/java/io/github/zero88/qwe/iot/connector/mock/MockSubscriber.java
@@ -0,0 +1,20 @@
+package io.github.zero88.qwe.iot.connector.mock;
+
+import io.github.zero88.qwe.iot.connector.subscriber.Subscriber;
+import io.github.zero88.qwe.iot.connector.subscriber.SubscriberType;
+
+import lombok.Getter;
+import lombok.RequiredArgsConstructor;
+
+@Getter
+@RequiredArgsConstructor
+public class MockSubscriber implements Subscriber {
+
+ private final String code;
+
+ @Override
+ public SubscriberType getType() {
+ return SubscriberType.factory("mock");
+ }
+
+}
diff --git a/connector/src/test/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOptionTest.java b/connector/src/test/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOptionTest.java
new file mode 100644
index 0000000..e496237
--- /dev/null
+++ b/connector/src/test/java/io/github/zero88/qwe/iot/connector/watcher/WatcherOptionTest.java
@@ -0,0 +1,70 @@
+package io.github.zero88.qwe.iot.connector.watcher;
+
+import org.json.JSONException;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import io.github.zero88.qwe.JsonHelper;
+import io.github.zero88.qwe.dto.JsonData;
+import io.github.zero88.qwe.scheduler.model.trigger.TriggerOption;
+import io.github.zero88.qwe.scheduler.model.trigger.TriggerType;
+import io.vertx.core.json.JsonObject;
+
+public class WatcherOptionTest {
+
+ @Test
+ public void test_default() throws JSONException {
+ final WatcherOption option = WatcherOption.builder().build();
+ final JsonObject expected = new JsonObject(
+ "{\"realtime\":true,\"lifetimeInSeconds\":-1,\"fallbackPolling\":true,\"polling\":false," +
+ "\"triggerOption\":{\"type\":\"PERIODIC\",\"intervalInSeconds\":5,\"repeat\":-1}}");
+ JsonHelper.assertJson(expected, option.toJson());
+ final WatcherOption from = JsonData.from(expected, WatcherOption.class);
+ Assertions.assertTrue(from.isRealtime());
+ Assertions.assertFalse(from.isPolling());
+ Assertions.assertTrue(from.isFallbackPolling());
+ Assertions.assertEquals(TriggerType.PERIODIC, from.getTriggerOption().getType());
+ Assertions.assertEquals(5, from.getTriggerOption().getIntervalInSeconds());
+ Assertions.assertEquals(-1, from.getTriggerOption().getRepeat());
+ Assertions.assertNull(from.getTriggerOption().getExpression());
+ Assertions.assertNull(from.getTriggerOption().getTimezone());
+ }
+
+ @Test
+ public void test_serialize() throws JSONException {
+ final TriggerOption trigger = TriggerOption.builder()
+ .type(TriggerType.CRON)
+ .timezone("UTC+7")
+ .expression("* * * * * ? *")
+ .build();
+ final WatcherOption option = WatcherOption.builder()
+ .realtime(false)
+ .lifetimeInSeconds(10)
+ .fallbackPolling(false)
+ .polling(true)
+ .triggerOption(trigger)
+ .build();
+ final JsonObject expected = new JsonObject(
+ "{\"realtime\":false,\"lifetimeInSeconds\":10,\"fallbackPolling\":false,\"polling\":true," +
+ "\"triggerOption\":{\"type\":\"CRON\",\"expression\":\"* * * * * ? *\",\"timezone\":\"UTC+7\"," +
+ "\"repeat\":-1}}");
+ JsonHelper.assertJson(expected, option.toJson());
+ }
+
+ @Test
+ public void test_deserialize() {
+ final JsonObject json = new JsonObject(
+ "{\"realtime\":true,\"lifetimeInSeconds\":10,\"fallbackPolling\":true,\"polling\":true," +
+ "\"triggerOption\":{\"type\":\"CRON\",\"expression\":\"* 0 * ? * * *\",\"timezone\":\"UTC+10\"}}");
+ final WatcherOption from = JsonData.from(json, WatcherOption.class);
+ Assertions.assertTrue(from.isRealtime());
+ Assertions.assertTrue(from.isPolling());
+ Assertions.assertTrue(from.isFallbackPolling());
+ Assertions.assertEquals(10, from.getLifetimeInSeconds());
+ Assertions.assertEquals(TriggerType.CRON, from.getTriggerOption().getType());
+ Assertions.assertEquals(-1, from.getTriggerOption().getRepeat());
+ Assertions.assertEquals("* 0 * ? * * *", from.getTriggerOption().getExpression());
+ Assertions.assertEquals("UTC+10", from.getTriggerOption().getTimezone());
+ }
+
+}