From c97a44d4bb3f0b44f4b3956e104741fdf14323ec Mon Sep 17 00:00:00 2001 From: "g.dimitropoulos" Date: Wed, 9 Aug 2023 09:09:09 +0200 Subject: [PATCH] [#19] Add functionality for: - handle commands and configs - use pubsub as internal message - updating db for device configs Signed-off-by: Georgios Dimitropoulos --- device-communication/README.md | 126 +++++++- device-communication/pom.xml | 68 +++- .../src/main/docker/Dockerfile.legacy-jar | 1 + .../api/config/DeviceCommandConstants.java | 1 - .../api/config/PubSubConstants.java | 48 +++ .../api/handler/DeviceCommandHandler.java | 18 +- .../api/mapper/DeviceConfigMapper.java | 3 + .../repository/DeviceConfigRepository.java | 89 ++++++ .../DeviceConfigRepositoryImpl.java | 302 ++++++++++++++++++ .../repository/DeviceConfigsRepository.java | 52 --- .../DeviceConfigsRepositoryImpl.java | 204 ------------ .../api/repository/DeviceRepository.java | 45 +++ .../api/repository/DeviceRepositoryImpl.java | 97 ++++++ .../api/service/DeviceCommandServiceImpl.java | 40 --- .../api/service/DeviceServiceAbstract.java | 48 +++ .../service/command/DeviceCommandService.java | 38 +++ .../command/DeviceCommandServiceImpl.java | 103 ++++++ .../InternalMessagePublisher.java | 35 ++ .../InternalMessageSubscriber.java | 34 ++ .../communication/InternalMessaging.java | 27 ++ .../service/communication/PubSubService.java | 170 ++++++++++ .../{ => database}/DatabaseSchemaCreator.java | 2 +- .../{ => database}/DatabaseService.java | 2 +- .../{ => database}/DatabaseServiceImpl.java | 2 +- .../core/app/InternalMessagingConfig.java | 111 +++++++ .../core/app/InternalMessagingConstants.java | 33 ++ .../core/http/AbstractVertxHttpServer.java | 2 +- .../core/utils/ResponseUtils.java | 34 +- .../src/main/resources/application.yaml | 32 +- .../handler/DeviceCommandsHandlerTest.java | 68 +++- .../command/DeviceCommandServiceImplTest.java | 155 +++++++++ .../communication/PubSubServiceTest.java | 197 ++++++++++++ 32 files changed, 1818 insertions(+), 369 deletions(-) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepository.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepositoryImpl.java delete mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepository.java delete mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepositoryImpl.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepository.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepositoryImpl.java delete mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandServiceImpl.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceServiceAbstract.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandService.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImpl.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessagePublisher.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessaging.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseSchemaCreator.java (92%) rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseService.java (93%) rename device-communication/src/main/java/org/eclipse/hono/communication/api/service/{ => database}/DatabaseServiceImpl.java (96%) create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java create mode 100644 device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConstants.java create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImplTest.java create mode 100644 device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java diff --git a/device-communication/README.md b/device-communication/README.md index bbed3af9..c2f9538f 100644 --- a/device-communication/README.md +++ b/device-communication/README.md @@ -1,6 +1,6 @@ # Device Communication API -Device communication API enables users and applications to send configurations and commands to devices via HTTP +Device communication API enables users and applications to send configurations and commands to devices via HTTP(S) endpoints. ![img.png](img.png) @@ -18,10 +18,14 @@ router. #### commands/{tenantId}/{deviceId} -- POST : post a command for a specific device (NOT IMPLEMENTED YET) +- POST : post a command for a specific device

+#### states/{tenantId}/{deviceId}?numStates=(int 0 - 10) + +- GET : list of device states + #### configs/{tenantId}/{deviceId}?numVersion=(int 0 - 10) - GET : list of device config versions @@ -30,24 +34,98 @@ router. For more information please see resources/api/openApi file. +## Pub/Sub - Internal Messaging + +API communicates with hono components via the internal messaging interface (implemented from Google's PubSub). +All the settings for the InternalMessaging component are in the application.yaml file. By publish/subscribe to a topic +application sends or expects some message attributes. + +### Events + +API will subscribe to all tenants' event topic at startup. + +Expected message Attributes: + +- deviceId +- tenantId +- content-type + +Application will proceed only empty Notifications events (content-type is +application/vnd.eclipse-hono-empty-notification). + +### States + +API will subscribe to all tenants' state topic at startup. + +Expected message Attributes: + +- deviceId +- tenantId + +States are read only. + +### Configs + +Application will publish the latest device configuration when: + +- an empty Notifications event was received +- a new device config was created + +Message will be published with the following attributes: + +- deviceId +- tenantId +- config-version + +The Body will be a JSON object with the device config object. + +After publishing device configs, application subscribes to config_response topic and waits for the device to ack the +configs. + +### Config ACK + +Expected message attributes: + +- deviceId +- tenantId +- configVersion (the config version received from device) + +If configVersion is not set, application will ack always the latest config. + +### Commands + +A command will be published from API to the command topic. + +Attributes: + +- deviceId +- tenantId +- subject (always set to "command") + +Body: + +The command as string. + ## Database -Application uses PostgreSQL database. All the database configurations can be found in application.yaml file. +Application uses PostgresSQL database. All the database configurations can be found in application.yaml file. ### Tables -- DeviceConfig
+- device_configs
Is used for saving device config versions -- DeviceRegistration
+- device_registrations
Is used for validating if a device exist +- device_status
+ Is used for saving device states ### Migrations -When Application starts, tables will be created by the DatabaseSchemaCreator service. +When Applications starts tables will be created by the DatabaseSchemaCreator service. -### Running PostgreSQL container locally +### Running postgresSQL container local -For running the PostgreSQL Database locally with docker, run: +For running the PostgresSQL Database local with docker run: `````` @@ -58,14 +136,14 @@ docker run -p 5432:5432 --name some-postgres -e POSTGRES_PASSWORD=mysecretpasswo After the container is running, log in to the container and with psql create the database. Then we have to set the application settings. -Default PostgreSQL values: +Default postgresSQl values: - userName = postgres - password = mysecretpassword ## Build and Push API Docker Image -Mavens auto build and push functionality can be enabled from application.yaml settings: +Mavens auto build and push functionality ca be enabled from application.yaml settings: ```` @@ -74,17 +152,19 @@ quarkus: builder: docker build: true push: true - image: "//hono-device-communication" + image: "gcr.io/sotec-iot-core-dev/hono-device-communication" ```` -By running maven package, install or deploy, this will automatically build the docker image and if push is enabled it will -push the image to the given registry. +By running maven package, install or deploy will automatically build the docker image and if push is enabled it will +push the image +to the given registry. ## OpenApi Contract-first For creating the endpoints, Vertx takes the openApi definition file and maps every endpoint operation-ID with a specific -Handler function. +Handler +function. ## Handlers @@ -99,4 +179,22 @@ Adding new Endpoint steps: 2. Use an existing const Class or create a new one under /config and set the operation id name 3. Implement an HttpEndpointHandler and set the Routes +## PubSub Events + +Application subscribes and uses to the following topics: + +1. TENANT_ID.command +2. TENANT_ID.command_response +3. TENANT_ID.event +4. TENANT_ID.event.state +5. registry-tenant.notification + +## Automatically create PubSub topics and subscriptions + +Application creates all tenants topics and subscriptions when: + +1. Application starts if are not exist +2. New tenant is created + + diff --git a/device-communication/pom.xml b/device-communication/pom.xml index 16a757ab..bf122f8f 100644 --- a/device-communication/pom.xml +++ b/device-communication/pom.xml @@ -7,7 +7,7 @@ device-communication 1.0-SNAPSHOT - 3.10.1 + 3.10.0 17 UTF-8 UTF-8 @@ -15,9 +15,16 @@ io.quarkus.platform 2.15.3.Final true - 3.0.0-M7 + 3.0.0-M9 1.5.3.Final 2.1.0 + 4.3.7 + 2.2.3 + 2.1 + 2.16.0.Final + 1.3.0 + 3.0.0-M9 + 31.1-jre @@ -50,10 +57,6 @@ io.quarkus quarkus-vertx - - io.vertx - vertx-core - io.vertx vertx-web @@ -87,17 +90,17 @@ io.vertx vertx-pg-client - 4.3.7 + ${vertx-pg-client.version} jakarta.persistence jakarta.persistence-api - 2.2.3 + ${jakarta.persistence-api.version} com.ongres.scram client - 2.1 + ${com.ongres.scram.client.version} @@ -115,13 +118,58 @@ io.quarkus quarkus-container-image-docker - 2.16.0.Final + ${quarkus-container-image-docker.version} io.vertx vertx-health-check + + + io.quarkiverse.googlecloudservices + quarkus-google-cloud-pubsub + ${quarkus-google-cloud-pubsub.version} + + + + org.apache.maven.plugins + maven-surefire-plugin + ${maven-surefire-plugin.version} + maven-plugin + + + + com.google.guava + guava + ${com.google.guava.version} + + + io.vertx + vertx-core + 4.3.7 + + + org.eclipse.hono + hono-core + 2.4.0-SNAPSHOT + + + org.eclipse.hono + hono-client-pubsub-common + 2.4.0-SNAPSHOT + + + org.eclipse.hono + hono-client-notification + 2.4.0-SNAPSHOT + + + commons-codec + commons-codec + 1.15 + + diff --git a/device-communication/src/main/docker/Dockerfile.legacy-jar b/device-communication/src/main/docker/Dockerfile.legacy-jar index 108988bd..3835932a 100644 --- a/device-communication/src/main/docker/Dockerfile.legacy-jar +++ b/device-communication/src/main/docker/Dockerfile.legacy-jar @@ -81,6 +81,7 @@ ENV LANGUAGE='en_US:en' RUN mkdir api/ RUN mkdir db/ +RUN mkdir creds/ COPY target/classes/api/* /api/ COPY target/classes/db/* /db/ diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceCommandConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceCommandConstants.java index ec3778f2..bc95f743 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceCommandConstants.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/DeviceCommandConstants.java @@ -27,6 +27,5 @@ public final class DeviceCommandConstants { public static final String POST_DEVICE_COMMAND_OP_ID = "postCommand"; private DeviceCommandConstants() { - // avoid instantiation } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java new file mode 100644 index 00000000..c6adb524 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/config/PubSubConstants.java @@ -0,0 +1,48 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.config; + +import java.util.List; + +/** + * Constant values for PubSub. + */ +public final class PubSubConstants { + + public static final String TENANT_NOTIFICATIONS = "registry-tenant.notification"; + public static final String TELEMETRY_ENDPOINT = "telemetry"; + public static final String EVENT_ENDPOINT = "event"; + public static final String EVENT_STATES_SUBTOPIC_ENDPOINT = "event.state"; + public static final String COMMAND_ENDPOINT = "command"; + public static final String COMMAND_RESPONSE_ENDPOINT = "command_response"; + + private PubSubConstants() { + } + + /** + * Gets the list of all topics need to be created per tenant. + * + * @return List of all topics. + */ + public static List getTenantTopics() { + return List.of(EVENT_ENDPOINT, + COMMAND_ENDPOINT, + COMMAND_RESPONSE_ENDPOINT, + EVENT_STATES_SUBTOPIC_ENDPOINT, + TELEMETRY_ENDPOINT); + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceCommandHandler.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceCommandHandler.java index e95fc0aa..1297bb37 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceCommandHandler.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/handler/DeviceCommandHandler.java @@ -18,13 +18,17 @@ import javax.enterprise.context.ApplicationScoped; +import org.eclipse.hono.communication.api.config.ApiCommonConstants; import org.eclipse.hono.communication.api.config.DeviceCommandConstants; -import org.eclipse.hono.communication.api.service.DeviceCommandService; +import org.eclipse.hono.communication.api.data.DeviceCommandRequest; +import org.eclipse.hono.communication.api.service.command.DeviceCommandService; import org.eclipse.hono.communication.core.http.HttpEndpointHandler; +import org.eclipse.hono.communication.core.utils.ResponseUtils; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.openapi.RouterBuilder; + /** * Handler for device command endpoints. */ @@ -48,12 +52,20 @@ public void addRoutes(final RouterBuilder routerBuilder) { .handler(this::handlePostCommand); } + /** - * Handle post device commands. + * Handle Post device command. * * @param routingContext The RoutingContext */ public void handlePostCommand(final RoutingContext routingContext) { - commandService.postCommand(routingContext); + final var deviceConfig = routingContext.body() + .asJsonObject() + .mapTo(DeviceCommandRequest.class); + final var tenantId = routingContext.pathParam(ApiCommonConstants.TENANT_PATH_PARAMS); + final var deviceId = routingContext.pathParam(ApiCommonConstants.DEVICE_PATH_PARAMS); + commandService.postCommand(deviceConfig, tenantId, deviceId) + .onSuccess(res -> routingContext.response().setStatusCode(200).end()) + .onFailure(err -> ResponseUtils.errorResponse(routingContext, err)); } } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceConfigMapper.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceConfigMapper.java index 5a7783f5..97852b48 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceConfigMapper.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/mapper/DeviceConfigMapper.java @@ -49,6 +49,9 @@ public interface DeviceConfigMapper { */ @Mapping(target = "version", source = "request.versionToUpdate") @Mapping(target = "cloudUpdateTime", expression = "java(getDateTime())") + @Mapping(target = "tenantId", ignore = true) + @Mapping(target = "deviceId", ignore = true) + @Mapping(target = "deviceAckTime", ignore = true) DeviceConfigEntity configRequestToDeviceConfigEntity(DeviceConfigRequest request); default String getDateTime() { diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepository.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepository.java new file mode 100644 index 00000000..1f97bf14 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepository.java @@ -0,0 +1,89 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + +import java.util.List; + +import org.eclipse.hono.communication.api.data.DeviceConfig; +import org.eclipse.hono.communication.api.data.DeviceConfigEntity; +import org.eclipse.hono.communication.api.data.DeviceConfigInternalResponse; + +import io.vertx.core.Future; + +/** + * Device config repository interface. + */ +public interface DeviceConfigRepository { + + /** + * Lists all config versions for a specific device. Result is order by version desc + * + * @param deviceId The device id + * @param tenantId The tenant id + * @param limit The number of config to show + * @return A Future with a List of DeviceConfigs + */ + Future> listAll(String deviceId, String tenantId, int limit); + + + /** + * Creates a new config version and deletes the oldest version if the total num of versions in DB is bigger than the MAX_LIMIT. + * + * @param entity The instance to insert + * @return A Future of the created DeviceConfigEntity + */ + Future createNew(DeviceConfigEntity entity); + + /** + * Update the deviceAckTime field. + * + * @param config The device config + * @param deviceAckTime The ack Time + * @return Future of Void + */ + + Future updateDeviceAckTime(DeviceConfigEntity config, String deviceAckTime); + + /** + * Update the error field. + * + * @param configErrorResponse The error response object + * @return Future of Void + */ + + Future updateDeviceConfigError(DeviceConfigInternalResponse configErrorResponse); + + + /** + * Get device latest config max(version). + * + * @param tenantId The tenant id + * @param deviceId The device id + * @return Future of DeviceConfigEntity + */ + Future getDeviceLatestConfig(String tenantId, String deviceId); + + /** + * Get a specific device config. + * + * @param tenantId The tenant id + * @param deviceId The device id + * @param version The config version + * @return Future of DeviceConfigEntity + */ + Future getDeviceConfig(String tenantId, String deviceId, int version); +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepositoryImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepositoryImpl.java new file mode 100644 index 00000000..a71d3f55 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigRepositoryImpl.java @@ -0,0 +1,302 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NoSuchElementException; + +import javax.enterprise.context.ApplicationScoped; + + +import org.eclipse.hono.communication.api.data.DeviceConfig; +import org.eclipse.hono.communication.api.data.DeviceConfigEntity; +import org.eclipse.hono.communication.api.data.DeviceConfigInternalResponse; +import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; +import org.eclipse.hono.communication.api.service.database.DatabaseService; +import org.graalvm.collections.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.vertx.core.Future; +import io.vertx.sqlclient.RowIterator; +import io.vertx.sqlclient.SqlConnection; +import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.SqlTemplate; + + +/** + * Repository class for making CRUD operations for device config entities. + */ +@ApplicationScoped +public class DeviceConfigRepositoryImpl implements DeviceConfigRepository { + + private static final String SQL_LIST = "SELECT version, cloud_update_time, device_ack_time, binary_data " + + "FROM device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId} ORDER BY version DESC LIMIT #{limit}"; + private static final String SQL_DELETE_MIN_VERSION = "DELETE FROM device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId} " + + "and version = (SELECT MIN(version) from device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId}) RETURNING version"; + private static final String SQL_FIND_TOTAL_AND_MAX_VERSION = "SELECT COALESCE(COUNT(*), 0) as total, COALESCE(MAX(version), 0) as max_version from device_configs " + + "WHERE device_id = #{deviceId} and tenant_id = #{tenantId}"; + private static final String SQL_UPDATE_DEVICE_ACK_TIME = "UPDATE device_configs SET device_ack_time = #{deviceAckTime} " + + "WHERE tenant_id = #{tenantId} AND device_id = #{deviceId} AND version = #{version}"; + private static final String SQL_UPDATE_ERROR = "UPDATE device_configs SET device_ack_error = #{device_ack_error} " + + "WHERE tenant_id = #{tenantId} AND device_id = #{deviceId} AND version = #{version}"; + private static final String SQL_FIND_DEVICE_CONFIG = "SELECT * FROM device_configs " + + "WHERE tenant_id = #{tenantId} AND device_id = #{deviceId} AND version = #{version}"; + private static final String SQL_INSERT = "INSERT INTO device_configs (version, tenant_id, device_id, cloud_update_time, device_ack_time, binary_data, device_ack_error) " + + "VALUES (#{version}, #{tenantId}, #{deviceId}, #{cloudUpdateTime}, #{deviceAckTime}, #{binaryData}, #{device_ack_error}) RETURNING version"; + private static final String DEVICE_ACK_TIME_CAPTION = "deviceAckTime"; + private static final String ERROR_CAPTION = "device_ack_error"; + private static final String DEVICE_ID_CAPTION = "deviceId"; + private static final String TENANT_ID_CAPTION = "tenantId"; + private static final String VERSION_CAPTION = "version"; + private static final int MAX_LIMIT = 10; + private final Logger log = LoggerFactory.getLogger(DeviceConfigRepositoryImpl.class); + + private final DatabaseService db; + + private final DeviceRepository deviceRepository; + + /** + * Creates a new DeviceConfigRepositoryImpl. + * + * @param db The database connection + * @param deviceRepository The device repository interface + */ + public DeviceConfigRepositoryImpl(final DatabaseService db, + final DeviceRepository deviceRepository) { + + this.db = db; + this.deviceRepository = deviceRepository; + } + + + private Future> findMaxVersionAndTotalEntries(final SqlConnection sqlConnection, final String deviceId, final String tenantId) { + final RowMapper> rowMapper = row -> + Pair.create(row.getInteger("total"), row.getInteger("max_version")); + return SqlTemplate + .forQuery(sqlConnection, SQL_FIND_TOTAL_AND_MAX_VERSION) + .mapTo(rowMapper) + .execute(Map.of(DEVICE_ID_CAPTION, deviceId, TENANT_ID_CAPTION, tenantId)).map(rowSet -> { + final RowIterator> iterator = rowSet.iterator(); + return iterator.next(); + }); + + } + + + @Override + public Future> listAll(final String deviceId, final String tenantId, final int limit) { + final int queryLimit = limit == 0 ? MAX_LIMIT : limit; + return db.getDbClient().withConnection( + sqlConnection -> deviceRepository.searchForDevice(deviceId, tenantId) + .compose( + counter -> { + if (counter < 1) { + throw new DeviceNotFoundException(String.format("Device with id %s and tenant id %s doesn't exist", + deviceId, + tenantId)); + } + return SqlTemplate + .forQuery(sqlConnection, SQL_LIST) + .mapTo(DeviceConfig.class) + .execute(Map.of(DEVICE_ID_CAPTION, deviceId, TENANT_ID_CAPTION, tenantId, "limit", queryLimit)) + .map(rowSet -> { + final List configs = new ArrayList<>(); + rowSet.forEach(configs::add); + return configs; + }) + .onSuccess(success -> log.info( + String.format("Listing all configs for device %s and tenant %s", + deviceId, tenantId))) + .onFailure(throwable -> log.error("Error: {}", throwable.getMessage())); + })); + } + + + /** + * Inserts a new entity in to the db. + * + * @param sqlConnection The sql connection instance + * @param entity The instance to insert + * @return A Future of the created DeviceConfigEntity + */ + private Future insert(final SqlConnection sqlConnection, final DeviceConfigEntity entity) { + return SqlTemplate + .forUpdate(sqlConnection, SQL_INSERT) + .mapFrom(DeviceConfigEntity.class) + .mapTo(DeviceConfigEntity.class) + .execute(entity) + .map(rowSet -> { + if (rowSet.rowCount() == 0) { + throw new IllegalStateException(String.format("Can't create device config: %s", entity)); + } + + final RowIterator iterator = rowSet.iterator(); + entity.setVersion(iterator.next().getVersion()); + return entity; + + }) + .onSuccess(success -> log.info("Device config created successfully: {}", success)) + .onFailure(throwable -> log.error(throwable.getMessage())); + + } + + /** + * Delete the smallest config version. + * + * @param sqlConnection The sql connection instance + * @param entity The device config for searching and deleting the smallest version + * @return A Future of the deleted version + */ + + private Future deleteMinVersion(final SqlConnection sqlConnection, final DeviceConfigEntity entity) { + final RowMapper rowMapper = row -> row.getInteger(VERSION_CAPTION); + return SqlTemplate + .forQuery(sqlConnection, SQL_DELETE_MIN_VERSION) + .mapFrom(DeviceConfigEntity.class) + .mapTo(rowMapper) + .execute(entity) + .map(rowSet -> { + final RowIterator iterator = rowSet.iterator(); + return iterator.next(); + }) + .onSuccess(deletedVersion -> log.info("Device config version {} was deleted", deletedVersion)); + } + + + @Override + public Future createNew(final DeviceConfigEntity entity) { + return db.getDbClient().withTransaction( + sqlConnection -> deviceRepository.searchForDevice(entity.getDeviceId(), entity.getTenantId()) + .compose( + counter -> { + if (counter < 1) { + throw new DeviceNotFoundException(String.format("Device with id %s and tenant id %s doesn't exist", + entity.getDeviceId(), + entity.getTenantId())); + } + return findMaxVersionAndTotalEntries(sqlConnection, entity.getDeviceId(), entity.getTenantId()) + .compose( + values -> { + final int total = values.getLeft(); + final int maxVersion = values.getRight(); + + entity.setVersion(maxVersion + 1); + + if (total > MAX_LIMIT - 1) { + return deleteMinVersion(sqlConnection, entity).compose( + ok -> insert(sqlConnection, entity) + + ); + } + return insert(sqlConnection, entity); + } + ); + }).onFailure(error -> log.error(error.getMessage()))); + } + + + @Override + public Future updateDeviceAckTime(final DeviceConfigEntity config, final String deviceAckTime) { + final Map parameters = new HashMap<>(); + parameters.put(DEVICE_ID_CAPTION, config.getDeviceId()); + parameters.put(TENANT_ID_CAPTION, config.getTenantId()); + parameters.put(DEVICE_ACK_TIME_CAPTION, deviceAckTime); + parameters.put(VERSION_CAPTION, config.getVersion()); + + return updateConfigField(parameters, SQL_UPDATE_DEVICE_ACK_TIME, config.toString()); + } + + @Override + public Future updateDeviceConfigError(final DeviceConfigInternalResponse configErrorResponse) { + final Map parameters = new HashMap<>(); + parameters.put(DEVICE_ID_CAPTION, configErrorResponse.getDeviceId()); + parameters.put(TENANT_ID_CAPTION, configErrorResponse.getTenantId()); + parameters.put(VERSION_CAPTION, Integer.parseInt(configErrorResponse.getVersion())); + parameters.put(ERROR_CAPTION, configErrorResponse.getDeviceAckError()); + + return updateConfigField(parameters, SQL_UPDATE_ERROR, configErrorResponse.toString()); + } + + private Future updateConfigField(final Map parameters, final String sqlUpdateStatement, final String configString) { + return db.getDbClient().withTransaction( + sqlConnection -> SqlTemplate + .forQuery(sqlConnection, sqlUpdateStatement) + .execute(parameters) + .flatMap(rowSet -> { + if (rowSet.rowCount() > 0) { + return Future.succeededFuture(); + } else { + final var msg = "Entity doesn't exist: %s".formatted(configString); + log.error(msg); + throw new NoSuchElementException(msg); + } + })); + } + + @Override + public Future getDeviceLatestConfig(final String tenantId, final String deviceId) { + + return db.getDbClient().withConnection( + sqlConnection -> findMaxVersionAndTotalEntries(sqlConnection, deviceId, tenantId) + .compose( + values -> { + final int total = values.getLeft(); + final int maxVersion = values.getRight(); + + if (total == 0) { + return Future.failedFuture(new NoSuchElementException("No configs are found for device %s and tenant %s".formatted(deviceId, tenantId))); + } + return findDeviceConfig(sqlConnection, tenantId, deviceId, maxVersion); + + } + + )); + } + + @Override + public Future getDeviceConfig(final String tenantId, final String deviceId, final int version) { + + return db.getDbClient().withConnection( + sqlConnection -> findDeviceConfig(sqlConnection, tenantId, deviceId, version) + ); + } + + private Future findDeviceConfig(final SqlConnection sqlConnection, final String tenantId, final String deviceId, final int version) { + final Map parameters = new HashMap<>(); + parameters.put(DEVICE_ID_CAPTION, deviceId); + parameters.put(TENANT_ID_CAPTION, tenantId); + parameters.put(VERSION_CAPTION, version); + + return SqlTemplate + .forQuery(sqlConnection, SQL_FIND_DEVICE_CONFIG) + .mapTo(DeviceConfigEntity.class) + .execute(parameters).map(rowSet -> { + final RowIterator iterator = rowSet.iterator(); + return iterator.next(); + + }); + + } +} + + + diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepository.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepository.java deleted file mode 100644 index 4e08c1b5..00000000 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepository.java +++ /dev/null @@ -1,52 +0,0 @@ -/* - * *********************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - *

- * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - *

- * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - *

- * SPDX-License-Identifier: EPL-2.0 - * ********************************************************** - * - */ - -package org.eclipse.hono.communication.api.repository; - -import java.util.List; - -import org.eclipse.hono.communication.api.data.DeviceConfig; -import org.eclipse.hono.communication.api.data.DeviceConfigEntity; - -import io.vertx.core.Future; -import io.vertx.sqlclient.SqlConnection; - -/** - * Device config repository interface. - */ -public interface DeviceConfigsRepository { - - /** - * Lists all config versions for a specific device. Result is order by version desc - * - * @param sqlConnection The sql connection instance - * @param deviceId The device id - * @param tenantId The tenant id - * @param limit The number of config to show - * @return A Future with a List of DeviceConfigs - */ - Future> listAll(SqlConnection sqlConnection, String deviceId, String tenantId, int limit); - - - /** - * Creates a new config version and deletes the oldest version if the total num of versions in DB is bigger than the MAX_LIMIT. - * - * @param sqlConnection The sql connection instance - * @param entity The instance to insert - * @return A Future of the created DeviceConfigEntity - */ - Future createNew(SqlConnection sqlConnection, DeviceConfigEntity entity); -} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepositoryImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepositoryImpl.java deleted file mode 100644 index f2ace5b8..00000000 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceConfigsRepositoryImpl.java +++ /dev/null @@ -1,204 +0,0 @@ -/* - * *********************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - *

- * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - *

- * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - *

- * SPDX-License-Identifier: EPL-2.0 - * ********************************************************** - * - */ - -package org.eclipse.hono.communication.api.repository; - -import java.util.ArrayList; -import java.util.List; -import java.util.Map; - -import javax.enterprise.context.ApplicationScoped; - -import org.eclipse.hono.communication.api.data.DeviceConfig; -import org.eclipse.hono.communication.api.data.DeviceConfigEntity; -import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; -import org.eclipse.hono.communication.core.app.DatabaseConfig; -import org.graalvm.collections.Pair; - -import io.vertx.core.Future; -import io.vertx.core.impl.logging.Logger; -import io.vertx.core.impl.logging.LoggerFactory; -import io.vertx.sqlclient.RowIterator; -import io.vertx.sqlclient.SqlConnection; -import io.vertx.sqlclient.templates.RowMapper; -import io.vertx.sqlclient.templates.SqlTemplate; - -/** - * Repository class for making CRUD operations for device config entities. - */ -@ApplicationScoped -public class DeviceConfigsRepositoryImpl implements DeviceConfigsRepository { - private final String SQL_INSERT = "INSERT INTO device_configs (version, tenant_id, device_id, cloud_update_time, device_ack_time, binary_data) " + - "VALUES (#{version}, #{tenantId}, #{deviceId}, #{cloudUpdateTime}, #{deviceAckTime}, #{binaryData}) RETURNING version"; - private final String SQL_LIST = "SELECT version, cloud_update_time, device_ack_time, binary_data " + - "FROM device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId} ORDER BY version DESC LIMIT #{limit}"; - private final String SQL_DELETE_MIN_VERSION = "DELETE FROM device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId} " + - "and version = (SELECT MIN(version) from device_configs WHERE device_id = #{deviceId} and tenant_id = #{tenantId}) RETURNING version"; - private final String SQL_FIND_TOTAL_AND_MAX_VERSION = "SELECT COALESCE(COUNT(*), 0) as total, COALESCE(MAX(version), 0) as max_version from device_configs " + - "WHERE device_id = #{deviceId} and tenant_id = #{tenantId}"; - - private final int MAX_LIMIT = 10; - private final Logger log = LoggerFactory.getLogger(DeviceConfigsRepositoryImpl.class); - private String SQL_COUNT_DEVICES_WITH_PK_FILTER = "SELECT COUNT(*) as total FROM public.%s where %s = #{tenantId} and %s = #{deviceId}"; - - - /** - * Creates a new DeviceConfigsRepositoryImpl. - * - * @param databaseConfig The database configs - */ - public DeviceConfigsRepositoryImpl(final DatabaseConfig databaseConfig) { - - SQL_COUNT_DEVICES_WITH_PK_FILTER = String.format(SQL_COUNT_DEVICES_WITH_PK_FILTER, - databaseConfig.getDeviceRegistrationTableName(), - databaseConfig.getDeviceRegistrationTenantIdColumn(), - databaseConfig.getDeviceRegistrationDeviceIdColumn()); - } - - - private Future searchForDevice(final SqlConnection sqlConnection, final String deviceId, final String tenantId) { - final RowMapper ROW_MAPPER = row -> row.getInteger("total"); - return SqlTemplate - .forQuery(sqlConnection, SQL_COUNT_DEVICES_WITH_PK_FILTER) - .mapTo(ROW_MAPPER) - .execute(Map.of("deviceId", deviceId, "tenantId", tenantId)).map(rowSet -> { - final RowIterator iterator = rowSet.iterator(); - return iterator.next(); - }); - - } - - private Future> findMaxVersionAndTotalEntries(final SqlConnection sqlConnection, final String deviceId, final String tenantId) { - final RowMapper> ROW_MAPPER = row -> - Pair.create(row.getInteger("total"), row.getInteger("max_version")); - return SqlTemplate - .forQuery(sqlConnection, SQL_FIND_TOTAL_AND_MAX_VERSION) - .mapTo(ROW_MAPPER) - .execute(Map.of("deviceId", deviceId, "tenantId", tenantId)).map(rowSet -> { - final RowIterator> iterator = rowSet.iterator(); - return iterator.next(); - }); - - } - - @Override - public Future> listAll(final SqlConnection sqlConnection, final String deviceId, final String tenantId, final int limit) { - final int queryLimit = limit == 0 ? MAX_LIMIT : limit; - return searchForDevice(sqlConnection, deviceId, tenantId) - .compose( - counter -> { - if (counter < 1) { - throw new DeviceNotFoundException(String.format("Device with id %s and tenant id %s doesn't exist", - deviceId, - tenantId)); - } - return SqlTemplate - .forQuery(sqlConnection, SQL_LIST) - .mapTo(DeviceConfig.class) - .execute(Map.of("deviceId", deviceId, "tenantId", tenantId, "limit", queryLimit)) - .map(rowSet -> { - final List configs = new ArrayList<>(); - rowSet.forEach(configs::add); - return configs; - }) - .onSuccess(success -> log.info( - String.format("Listing all configs for device %s and tenant %s", - deviceId, tenantId))) - .onFailure(throwable -> log.error("Error: {}", throwable)); - }); - } - - - /** - * Inserts a new entity in to the db. - * - * @param sqlConnection The sql connection instance - * @param entity The instance to insert - * @return A Future of the created DeviceConfigEntity - */ - private Future insert(final SqlConnection sqlConnection, final DeviceConfigEntity entity) { - return SqlTemplate - .forUpdate(sqlConnection, SQL_INSERT) - .mapFrom(DeviceConfigEntity.class) - .mapTo(DeviceConfigEntity.class) - .execute(entity) - .map(rowSet -> { - final RowIterator iterator = rowSet.iterator(); - if (iterator.hasNext()) { - entity.setVersion(iterator.next().getVersion()); - return entity; - } else { - throw new IllegalStateException(String.format("Can't create device config: %s", entity)); - } - }) - .onSuccess(success -> log.info(String.format("Device config created successfully: %s", success.toString()))) - .onFailure(throwable -> log.error(throwable.getMessage())); - - } - - /** - * Delete the smallest config version. - * - * @param sqlConnection The sql connection instance - * @param entity The device config for searching and deleting the smallest version - * @return A Future of the deleted version - */ - - private Future deleteMinVersion(final SqlConnection sqlConnection, final DeviceConfigEntity entity) { - final RowMapper ROW_MAPPER = row -> row.getInteger("version"); - return SqlTemplate - .forQuery(sqlConnection, SQL_DELETE_MIN_VERSION) - .mapFrom(DeviceConfigEntity.class) - .mapTo(ROW_MAPPER) - .execute(entity) - .map(rowSet -> { - final RowIterator iterator = rowSet.iterator(); - return iterator.next(); - }) - .onSuccess(deletedVersion -> log.info(String.format("Device config version %s was deleted", deletedVersion))); - } - - - @Override - public Future createNew(final SqlConnection sqlConnection, final DeviceConfigEntity entity) { - return searchForDevice(sqlConnection, entity.getDeviceId(), entity.getTenantId()) - .compose( - counter -> { - if (counter < 1) { - throw new DeviceNotFoundException(String.format("Device with id %s and tenant id %s doesn't exist", - entity.getDeviceId(), - entity.getTenantId())); - } - return findMaxVersionAndTotalEntries(sqlConnection, entity.getDeviceId(), entity.getTenantId()) - .compose( - values -> { - final int total = values.getLeft(); - final int maxVersion = values.getRight(); - - entity.setVersion(maxVersion + 1); - - if (total > MAX_LIMIT - 1) { - return deleteMinVersion(sqlConnection, entity).compose( - ok -> insert(sqlConnection, entity) - - ); - } - return insert(sqlConnection, entity); - } - ); - }).onFailure(error -> log.error(error.getMessage())); - } -} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepository.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepository.java new file mode 100644 index 00000000..4ab86489 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepository.java @@ -0,0 +1,45 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + +import java.util.List; + +import io.vertx.core.Future; + + +/** + * Device repository interface. + */ +public interface DeviceRepository { + + /** + * Check if device exist. + * + * @param deviceId The device id + * @param tenantId The tenant id + * @return Future of integer + */ + Future searchForDevice(String deviceId, String tenantId); + + /** + * Lists all unique tenants. + * + * @return Future of list with all tenants. + */ + Future> listDistinctTenants(); + +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepositoryImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepositoryImpl.java new file mode 100644 index 00000000..d719e464 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/repository/DeviceRepositoryImpl.java @@ -0,0 +1,97 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.repository; + + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; + +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.hono.communication.api.service.database.DatabaseService; +import org.eclipse.hono.communication.core.app.DatabaseConfig; + +import io.vertx.core.Future; +import io.vertx.sqlclient.RowIterator; +import io.vertx.sqlclient.templates.RowMapper; +import io.vertx.sqlclient.templates.SqlTemplate; + + +/** + * Device registrations repository. + */ +@ApplicationScoped +public class DeviceRepositoryImpl implements DeviceRepository { + + private static final String SQL_LIST_TENANTS = "SELECT %s FROM %s"; + private final DatabaseConfig databaseConfig; + private final DatabaseService db; + private final String SQL_COUNT_DEVICES_WITH_PK_FILTER; + + /** + * Creates a new DeviceRepositoryImpl. + * + * @param databaseConfig The database configs + * @param databaseService The database service + */ + public DeviceRepositoryImpl(final DatabaseConfig databaseConfig, final DatabaseService databaseService) { + + this.databaseConfig = databaseConfig; + this.db = databaseService; + + SQL_COUNT_DEVICES_WITH_PK_FILTER = String.format("SELECT COUNT(*) as total FROM public.%s where %s = #{tenantId} and %s = #{deviceId}", + databaseConfig.getDeviceRegistrationTableName(), + databaseConfig.getDeviceRegistrationTenantIdColumn(), + databaseConfig.getDeviceRegistrationDeviceIdColumn()); + } + + + @Override + public Future searchForDevice(final String deviceId, final String tenantId) { + final RowMapper rowMapper = row -> row.getInteger("total"); + return db.getDbClient().withConnection( + sqlConnection -> SqlTemplate + .forQuery(sqlConnection, SQL_COUNT_DEVICES_WITH_PK_FILTER) + .mapTo(rowMapper) + .execute(Map.of("deviceId", deviceId, "tenantId", tenantId)).map(rowSet -> { + final RowIterator iterator = rowSet.iterator(); + return iterator.next(); + })); + } + + + @Override + public Future> listDistinctTenants() { + + final var sqlCommand = SQL_LIST_TENANTS.formatted( + databaseConfig.getTenantTableIdColumn(), + databaseConfig.getTenantTableName()); + + return db.getDbClient().withConnection( + sqlConnection -> SqlTemplate + .forQuery(sqlConnection, sqlCommand) + .execute(Collections.emptyMap()) + .map(rowSet -> { + final List tenants = new ArrayList<>(); + rowSet.forEach(tenant -> tenants.add(tenant.getString(databaseConfig.getTenantTableIdColumn()))); + return tenants; + })); + + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandServiceImpl.java deleted file mode 100644 index a8d2709c..00000000 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceCommandServiceImpl.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * *********************************************************** - * Copyright (c) 2023 Contributors to the Eclipse Foundation - *

- * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - *

- * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - *

- * SPDX-License-Identifier: EPL-2.0 - * ********************************************************** - * - */ - -package org.eclipse.hono.communication.api.service; - -import javax.enterprise.context.ApplicationScoped; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.vertx.ext.web.RoutingContext; - -/** - * Service for device commands. - */ -@ApplicationScoped -public class DeviceCommandServiceImpl implements DeviceCommandService { - private final Logger log = LoggerFactory.getLogger(DeviceCommandServiceImpl.class); - - - @Override - public void postCommand(final RoutingContext routingContext) { - // TODO publish command and send response - log.info("postCommand received"); - routingContext.response().setStatusCode(501).end(); - } -} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceServiceAbstract.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceServiceAbstract.java new file mode 100644 index 00000000..9db3bc3e --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DeviceServiceAbstract.java @@ -0,0 +1,48 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service; + +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectReader; + + +/** + * Abstract device service class. + */ +public abstract class DeviceServiceAbstract { + + protected final ObjectReader or = new ObjectMapper().reader(); + + protected final InternalMessagingConfig messagingConfig; + protected final InternalMessaging internalMessaging; + + /** + * Creates a new DeviceServiceAbstract. + * + * @param messagingConfig The internal messaging configs + * @param internalMessaging The internal messaging interface + */ + protected DeviceServiceAbstract(final InternalMessagingConfig messagingConfig, + final InternalMessaging internalMessaging) { + + this.messagingConfig = messagingConfig; + this.internalMessaging = internalMessaging; + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandService.java new file mode 100644 index 00000000..614f38fa --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandService.java @@ -0,0 +1,38 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.command; + +import org.eclipse.hono.communication.api.data.DeviceCommandRequest; + +import io.vertx.core.Future; + +/** + * Device commands interface. + */ +public interface DeviceCommandService { + + /** + * proceed Post device command. + * + * @param commandRequest The commandRequest + * @param tenantId Tenant id + * @param deviceId Device Id + * @return Future of Void + */ + Future postCommand(DeviceCommandRequest commandRequest, String tenantId, String deviceId); + +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImpl.java new file mode 100644 index 00000000..34319b57 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImpl.java @@ -0,0 +1,103 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.command; + +import java.util.Map; + +import javax.inject.Singleton; + +import org.apache.commons.codec.binary.Base64; +import org.eclipse.hono.communication.api.data.DeviceCommandRequest; +import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; +import org.eclipse.hono.communication.api.repository.DeviceRepository; +import org.eclipse.hono.communication.api.service.DeviceServiceAbstract; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.eclipse.hono.communication.core.utils.StringValidateUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.base.Strings; + +import io.vertx.core.Future; + + +/** + * Service for device commands. + */ +@Singleton +public class DeviceCommandServiceImpl extends DeviceServiceAbstract implements DeviceCommandService { + + public static final String DEVICE_ID = "device_id"; + public static final String TENANT_ID = "tenant_id"; + public static final String SUBJECT = "subject"; + private final Logger log = LoggerFactory.getLogger(DeviceCommandServiceImpl.class); + private final DeviceRepository deviceRepository; + + /** + * Creates a new DeviceCommandServiceImpl. + * + * @param deviceRepository The device repository interface + * @param internalMessaging The internal messaging interface + * @param messagingConfig The internal messaging configs + */ + public DeviceCommandServiceImpl(final DeviceRepository deviceRepository, + final InternalMessaging internalMessaging, + final InternalMessagingConfig messagingConfig) { + + super(messagingConfig, internalMessaging); + this.deviceRepository = deviceRepository; + } + + @Override + public Future postCommand(final DeviceCommandRequest commandRequest, final String tenantId, final String deviceId) { + + if (!StringValidateUtils.isBase64(commandRequest.getBinaryData())) { + return Future.failedFuture(new IllegalStateException("Field binaryData type should be String base64 encoded.")); + } + + + return deviceRepository.searchForDevice(deviceId, tenantId) + .compose( + counter -> { + + if (counter < 1) { + throw new DeviceNotFoundException(String.format("Device with id %s and tenant id %s doesn't exist", + deviceId, + tenantId)); + } + final String subject = Strings.isNullOrEmpty(commandRequest.getSubfolder()) ? "command" : commandRequest.getSubfolder(); + final var topic = String.format(messagingConfig.getCommandTopicFormat(), tenantId); + final Map attributes = Map.of(DEVICE_ID, deviceId, TENANT_ID, tenantId, SUBJECT, subject); + try { + final var command = Base64.decodeBase64(commandRequest.getBinaryData().getBytes()); + + internalMessaging.publish(topic, command, attributes); + log.info("Command {} was published successfully to topic {}", command, topic); + } catch (Exception ex) { + log.error("Command can't be published: {}", ex.getMessage()); + return Future.failedFuture(ex); + + } + return Future.succeededFuture(); + + } + + ); + + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessagePublisher.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessagePublisher.java new file mode 100644 index 00000000..ff8b83c3 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessagePublisher.java @@ -0,0 +1,35 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import java.util.Map; + +/** + * Interface for internal communication topic publisher. + */ +public interface InternalMessagePublisher { + + /** + * Publish a message to a topic. + * + * @param topic The topic to publish the message + * @param message The message to publish + * @param attributes The message attributes + * @throws Exception Throws Exception if subscription can't be created + */ + void publish(String topic, byte[] message, Map attributes) throws Exception; +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java new file mode 100644 index 00000000..7f09daa3 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessageSubscriber.java @@ -0,0 +1,34 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import com.google.cloud.pubsub.v1.MessageReceiver; + +/** + * Interface for internal communication topic subscriber. + */ +public interface InternalMessageSubscriber { + + /** + * Subscribe to a topic. + * + * @param topic The topic to subscribe + * @param callbackHandler The function to be called when a message is received + */ + void subscribe(String topic, MessageReceiver callbackHandler); +} + diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessaging.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessaging.java new file mode 100644 index 00000000..21d5a1d4 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/InternalMessaging.java @@ -0,0 +1,27 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +/** + * Internal messaging interface. + */ +public interface InternalMessaging extends InternalMessageSubscriber, InternalMessagePublisher { + + +} + + diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java new file mode 100644 index 00000000..be2ec6b2 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/communication/PubSubService.java @@ -0,0 +1,170 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import java.util.concurrent.TimeUnit; +import java.util.stream.StreamSupport; + +import javax.annotation.PreDestroy; +import javax.enterprise.context.ApplicationScoped; + +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.api.core.ApiFuture; +import com.google.api.core.ApiFutureCallback; +import com.google.api.core.ApiFutures; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.cloud.pubsub.v1.SubscriptionAdminClient; +import com.google.cloud.pubsub.v1.SubscriptionAdminSettings; +import com.google.common.util.concurrent.MoreExecutors; +import com.google.protobuf.ByteString; +import com.google.pubsub.v1.ProjectName; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.PushConfig; +import com.google.pubsub.v1.Subscription; +import com.google.pubsub.v1.TopicName; + +/** + * Internal messaging interface implementation. + */ +@ApplicationScoped +public class PubSubService implements InternalMessaging { + + public static final String COMMUNICATION_API_SUBSCRIPTION_NAME = "%s-communication-api"; + private final Logger log = LoggerFactory.getLogger(PubSubService.class); + private final Map activeSubscriptions = new HashMap<>(); + + private final String projectId; + private TopicName topicName; + + + /** + * Creates a new PubSubService. + * + * @param configs The internal messaging configs + */ + public PubSubService(final InternalMessagingConfig configs) { + this.projectId = configs.getProjectId(); + } + + /** + * Stops every subscription at destroy time. + */ + @PreDestroy + void destroy() { + + activeSubscriptions.forEach((topic, subscriber) -> { + if (subscriber != null) { + subscriber.stopAsync(); + } + }); + + activeSubscriptions.clear(); + } + + @Override + public void publish(final String topic, final byte[] message, final Map attributes) throws Exception { + final Publisher publisher = Publisher.newBuilder(TopicName.of(projectId, topic)) + .build(); + try { + final var data = ByteString.copyFrom(message); + final var pubsubMessage = PubsubMessage + .newBuilder() + .setData(data) + .putAllAttributes(attributes) + .build(); + final ApiFuture messageIdFuture = publisher.publish(pubsubMessage); + ApiFutures.addCallback(messageIdFuture, new ApiFutureCallback<>() { + public void onSuccess(final String messageId) { + log.debug("Message was published with id {}", messageId); + } + + public void onFailure(final Throwable t) { + log.error("failed to publish: {}", t.getMessage()); + } + }, MoreExecutors.directExecutor()); + } catch (Exception ex) { + log.error(String.format("Error publish to topic %s: %s", topic, ex.getMessage())); + } finally { + publisher.shutdown(); + publisher.awaitTermination(1, TimeUnit.MINUTES); + } + } + + + @Override + public void subscribe(final String topic, final MessageReceiver callbackHandler) { + if (activeSubscriptions.containsKey(topic)) { + return; + } + topicName = TopicName.of(projectId, topic); + final ProjectSubscriptionName subscriptionName; + try { + subscriptionName = initSubscription(topic); + final Subscriber subscriber = Subscriber.newBuilder(subscriptionName, callbackHandler).build(); + subscriber.startAsync().awaitRunning(); + activeSubscriptions.put(topic, subscriber); + log.info("Successfully subscribe to topic: {}", topicName.getTopic()); + } catch (Exception ex) { + log.error("Error subscribe to topic {}: {}", topic, ex.getMessage()); + } + } + + /** + * If the subscription doesn't exist creates a new one. + * + * @param topic Topic name, it will be used for creating the subscription topic_name-sub + * @return The ProjectSubscriptionName object + * @throws IOException if subscription can't be created + */ + ProjectSubscriptionName initSubscription(final String topic) throws IOException { + final var subscriptionName = ProjectSubscriptionName.of( + projectId, + String.format(COMMUNICATION_API_SUBSCRIPTION_NAME, topic) + ); + final var subscriptionAdminSettings = SubscriptionAdminSettings.newBuilder() + .build(); + try (SubscriptionAdminClient subscriptionAdminClient = SubscriptionAdminClient.create(subscriptionAdminSettings)) { + final var subscriptions = subscriptionAdminClient.listSubscriptions(ProjectName.of(projectId)) + .iterateAll(); + final Optional existing = StreamSupport + .stream(subscriptions.spliterator(), false) + .filter(sub -> sub.getName().equals(subscriptionName.toString())) + .findFirst(); + + if (existing.isEmpty()) { + + subscriptionAdminClient.createSubscription( + subscriptionName.toString(), + topicName, + PushConfig.getDefaultInstance(), + 50 + ); + } + } + return subscriptionName; + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreator.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreator.java similarity index 92% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreator.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreator.java index 86702dd9..fa8180e8 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseSchemaCreator.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseSchemaCreator.java @@ -14,7 +14,7 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; /** * Interface for creating Database Tables at application startup. diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseService.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseService.java similarity index 93% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseService.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseService.java index e8e5bd87..f4c422eb 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseService.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseService.java @@ -14,7 +14,7 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; import io.vertx.pgclient.PgPool; diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseServiceImpl.java b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImpl.java similarity index 96% rename from device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseServiceImpl.java rename to device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImpl.java index ab815ba2..9b4c9d85 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/api/service/DatabaseServiceImpl.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/api/service/database/DatabaseServiceImpl.java @@ -14,7 +14,7 @@ * */ -package org.eclipse.hono.communication.api.service; +package org.eclipse.hono.communication.api.service.database; import javax.enterprise.context.ApplicationScoped; diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java new file mode 100644 index 00000000..df3abf31 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConfig.java @@ -0,0 +1,111 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.core.app; + +import javax.inject.Singleton; + +import org.eclipse.hono.util.CommandConstants; +import org.eclipse.hono.util.EventConstants; +import org.eclipse.microprofile.config.inject.ConfigProperty; + +/** + * Configs for internal communication service. + */ +@Singleton +public class InternalMessagingConfig { + + @ConfigProperty(name = "app.projectId") + String projectId; + + // Message Attributes + + @ConfigProperty(name = "app.internalMessaging.message.attributeKeys.contentTypeKey") + String contentTypeKey; + @ConfigProperty(name = "app.internalMessaging.message.attributeKeys.origAdapterKey") + String origAdapterKey; + @ConfigProperty(name = "app.internalMessaging.message.attributeKeys.origAddressKey") + String origAddressKey; + @ConfigProperty(name = "app.internalMessaging.message.attributeKeys.ttdKey") + String ttdKey; + + + //Event + @ConfigProperty(name = "app.internalMessaging.event.topicFormat") + String eventTopicFormat; + + + // State + @ConfigProperty(name = "app.internalMessaging.state.topicFormat") + String stateTopicFormat; + + // Config + @ConfigProperty(name = "app.internalMessaging.command.ackTopic") + String commandAckTopicFormat; + @ConfigProperty(name = "app.internalMessaging.command.configAckDelay") + String configAckDelay; + + // Command + @ConfigProperty(name = "app.internalMessaging.command.topicFormat") + String commandTopicFormat; + + public String getCommandTopicFormat() { + return commandTopicFormat; + } + + public String getCommandAckTopicFormat() { + return commandAckTopicFormat; + } + public long getConfigAckDelay() { + return Long.parseLong(configAckDelay); + } + + public String getProjectId() { + return projectId; + } + + public String getContentTypeKey() { + return contentTypeKey; + } + + public String getOrigAdapterKey() { + return origAdapterKey; + } + + public String getOrigAddressKey() { + return origAddressKey; + } + + public String getTtdKey() { + return ttdKey; + } + + public String getEventTopicFormat() { + return eventTopicFormat; + } + + public String getEmptyNotificationEventContentType() { + return EventConstants.CONTENT_TYPE_EMPTY_NOTIFICATION; + } + + public String getDeliveryFailureNotificationContentType() { + return CommandConstants.CONTENT_TYPE_DELIVERY_FAILURE_NOTIFICATION; + } + + public String getStateTopicFormat() { + return stateTopicFormat; + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConstants.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConstants.java new file mode 100644 index 00000000..d248a0a8 --- /dev/null +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/app/InternalMessagingConstants.java @@ -0,0 +1,33 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.core.app; + +/** + * Internal Messaging Constant values. + */ +public class InternalMessagingConstants { + + public static final String DEVICE_ID = "device_id"; + public static final String TENANT_ID = "tenant_id"; + public static final String SUBJECT = "subject"; + public static final String CORRELATION_ID = "correlation-id"; + public static final String RESPONSE_REQUIRED = "response-required"; + public static final String STATUS = "status"; + + private InternalMessagingConstants() { + } +} diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/http/AbstractVertxHttpServer.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/http/AbstractVertxHttpServer.java index 493f22af..c3069e35 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/core/http/AbstractVertxHttpServer.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/http/AbstractVertxHttpServer.java @@ -36,7 +36,7 @@ public abstract class AbstractVertxHttpServer { * @param appConfigs The application configs * @param vertx The quarkus Vertx instance */ - public AbstractVertxHttpServer(final ApplicationConfig appConfigs, final Vertx vertx) { + protected AbstractVertxHttpServer(final ApplicationConfig appConfigs, final Vertx vertx) { this.appConfigs = appConfigs; this.vertx = vertx; } diff --git a/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/ResponseUtils.java b/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/ResponseUtils.java index f53db74a..3e9c136c 100644 --- a/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/ResponseUtils.java +++ b/device-communication/src/main/java/org/eclipse/hono/communication/core/utils/ResponseUtils.java @@ -18,6 +18,7 @@ import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; +import io.netty.handler.codec.http.HttpResponseStatus; import io.vertx.core.json.Json; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.RoutingContext; @@ -32,7 +33,6 @@ public abstract class ResponseUtils { private static final String APPLICATION_JSON_TYPE = "application/json"; private ResponseUtils() { - // avoid instantiation } /** @@ -44,35 +44,11 @@ private ResponseUtils() { public static void successResponse(final RoutingContext rc, final Object response) { rc.response() - .setStatusCode(200) + .setStatusCode(HttpResponseStatus.OK.code()) .putHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON_TYPE) .end(Json.encodePrettily(response)); } - /** - * Build success response using 201 Created as its status code and response object as body. - * - * @param rc Routing context - * @param response Response body - */ - public static void createdResponse(final RoutingContext rc, - final Object response) { - rc.response() - .setStatusCode(201) - .putHeader(CONTENT_TYPE_HEADER, APPLICATION_JSON_TYPE) - .end(Json.encodePrettily(response)); - } - - /** - * Build success response using 204 No Content as its status code and no response body. - * - * @param rc Routing context - */ - public static void noContentResponse(final RoutingContext rc) { - rc.response() - .setStatusCode(204) - .end(); - } /** * Build error response using 400 Bad Request, 404 Not Found or 500 Internal Server Error @@ -92,15 +68,15 @@ public static void errorResponse(final RoutingContext rc, final Throwable error) || error instanceof BadRequestException) { // Bad Request - status = 400; + status = HttpResponseStatus.BAD_REQUEST.code(); message = error.getMessage(); } else if (error instanceof DeviceNotFoundException) { // Not Found - status = 404; + status = HttpResponseStatus.NOT_FOUND.code(); message = error.getMessage(); } else { // Internal Server Error - status = 500; + status = HttpResponseStatus.INTERNAL_SERVER_ERROR.code(); if (error != null) { message = String.format("Internal Server Error: %s", error.getMessage()); } else { diff --git a/device-communication/src/main/resources/application.yaml b/device-communication/src/main/resources/application.yaml index 93ed970c..570c62d5 100644 --- a/device-communication/src/main/resources/application.yaml +++ b/device-communication/src/main/resources/application.yaml @@ -1,6 +1,29 @@ app: name: "Device Communication" version: ${COM_APP_VERSION:"v1"} + projectId: ${COM_PROJECT_ID:project-id} + + internalMessaging: + message: + attributeKeys: + deviceIdKey: ${COM_EVENT_ATTRIBUTE_DEVICE_ID:device_id} + tenantIdKey: ${COM_EVENT_ATTRIBUTE_TENANT_ID:tenant_id} + configVersionIdKey: ${COM_EVENT_ATTRIBUTE_CONFIG_VERSION:config-version} + contentTypeKey: ${COM_EVENT_ATTRIBUTE_CONTENT_TYPE:content-type} + origAdapterKey: ${COM_EVENT_ATTRIBUTE_ORIG_ADAPTER:orig_adapter} + origAddressKey: ${COM_EVENT_ATTRIBUTE_ORIG_ADDRESS:orig_address} + ttdKey: ${COM_EVENT_ATTRIBUTE_TTD:ttd} + event: + topicFormat: ${COM_EVENT_ON_CONNECT_TOPIC_FORMAT:%s.event} # TENANT_NAME.event + emptyNotificationEventContentType: ${COM_EVENT_EMPTY_NOTIFICATION_CONTENT_TYPE:application/vnd.eclipse-hono-empty-notification} + + state: + topicFormat: ${COM_STATE_TOPIC_FORMAT:%s.event.state} # TENANT_NAME.event.state + command: + topicFormat: ${COM_COMMAND_TOPIC_FORMAT:%s.command} # TENANT_NAME.command + ackTopic: ${COM_CONFIG_ACK_TOPIC:%s.command_response} # TENANT_NAME.command_response + configAckDelay: ${COM_CONFIG_ACK_DELAY:10000} + vertx: openapi: file: ${COM_OPENAPI_FILE_PATH:api/hono-device-communication-v1.yaml} @@ -9,19 +32,24 @@ vertx: url: ${COM_SERVER_HOST:0.0.0.0} port: ${COM_SERVER_PORT:8080} paths: - base: ${COM_SERVER_BASE_PATH:/api/v1/} # base path should always end with "/" + base: ${COM_SERVER_BASE_PATH:/v1/} # base path should always end with "/" liveness: ${COM_SERVER_LIVENESS_PATH:/alive} readiness: ${COM_SERVER_READINESS_PATH:/ready} database: pool-max-size: ${COM_POOL_MAX_SIZE:5} name: ${COM_DB_NAME:hono} - host: ${COM_DB_HOST:localhost} + host: ${COM_DB_HOST:host.docker.internal} port: ${COM_DB_PORT:5432} username: ${COM_DB_USERNAME:postgres} password: ${COM_DB_PASSWORD:mysecretpassword} db-kind: "postgresql" + # Tenant table configs. Used for listing tenants + tenant: + table: ${COM_DB_TENANT_TABLE:tenants} + tenant-id-column: ${COM_DB_TENANT_COL_NAME:tenant_id} + # Device registration table configs. Used for validating devices device-registration: table: ${COM_DB_DEVICE_REG_TABLE:device_registrations} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceCommandsHandlerTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceCommandsHandlerTest.java index 533259d2..2bd60909 100644 --- a/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceCommandsHandlerTest.java +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/handler/DeviceCommandsHandlerTest.java @@ -17,20 +17,30 @@ package org.eclipse.hono.communication.api.handler; import static org.mockito.ArgumentMatchers.anyString; -import static org.mockito.Mockito.*; - +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import org.eclipse.hono.communication.api.config.ApiCommonConstants; import org.eclipse.hono.communication.api.config.DeviceCommandConstants; -import org.eclipse.hono.communication.api.service.DeviceCommandService; -import org.eclipse.hono.communication.api.service.DeviceCommandServiceImpl; +import org.eclipse.hono.communication.api.data.DeviceCommandRequest; +import org.eclipse.hono.communication.api.service.command.DeviceCommandService; +import org.eclipse.hono.communication.api.service.command.DeviceCommandServiceImpl; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import io.vertx.core.Future; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; +import io.vertx.ext.web.RequestBody; import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.openapi.Operation; import io.vertx.ext.web.openapi.RouterBuilder; - class DeviceCommandsHandlerTest { private final DeviceCommandService commandServiceMock; @@ -39,12 +49,17 @@ class DeviceCommandsHandlerTest { private final Operation operationMock; private final DeviceCommandHandler deviceCommandsHandler; + private final RequestBody requestBodyMock; + private final HttpServerResponse responseMock; + DeviceCommandsHandlerTest() { operationMock = mock(Operation.class); commandServiceMock = mock(DeviceCommandServiceImpl.class); routerBuilderMock = mock(RouterBuilder.class); routingContextMock = mock(RoutingContext.class); deviceCommandsHandler = new DeviceCommandHandler(commandServiceMock); + requestBodyMock = mock(RequestBody.class); + responseMock = mock(HttpServerResponse.class); } @AfterEach @@ -53,7 +68,9 @@ void tearDown() { commandServiceMock, routerBuilderMock, routingContextMock, - operationMock); + operationMock, + requestBodyMock, + responseMock); } @BeforeEach @@ -79,12 +96,43 @@ void addRoutes() { } @Test - void handlePostCommand() { - doNothing().when(commandServiceMock).postCommand(routingContextMock); - + public void testHandlePostCommand() { + // Arrange + final String tenantId = "tenant1"; + final String deviceId = "device1"; + + final DeviceCommandRequest deviceCommandRequest = new DeviceCommandRequest(); + deviceCommandRequest.setBinaryData("value1"); + + // Set up mock behavior for RoutingContext + when(routingContextMock.body()).thenReturn(requestBodyMock); + when(requestBodyMock.asJsonObject()).thenReturn(new JsonObject("{}")); + when(routingContextMock.pathParam(ApiCommonConstants.TENANT_PATH_PARAMS)).thenReturn(tenantId); + when(routingContextMock.pathParam(ApiCommonConstants.DEVICE_PATH_PARAMS)).thenReturn(deviceId); + when(routingContextMock.response()).thenReturn(responseMock); + when(responseMock.setStatusCode(200)).thenReturn(responseMock); + + // Set up mock behavior for CommandService + when(commandServiceMock.postCommand(any(), any(), any())).thenReturn(Future.succeededFuture()); + + // Act deviceCommandsHandler.handlePostCommand(routingContextMock); - verify(commandServiceMock, times(1)).postCommand(routingContextMock); + // Assert + verify(routingContextMock, times(1)).body(); + verify(requestBodyMock).asJsonObject(); + verify(routingContextMock).body(); + verify(routingContextMock).response(); + + verify(commandServiceMock).postCommand(any(), anyString(), anyString()); + verify(routingContextMock, times(1)).pathParam(ApiCommonConstants.TENANT_PATH_PARAMS); + verify(routingContextMock, times(1)).pathParam(ApiCommonConstants.DEVICE_PATH_PARAMS); + + + verify(responseMock, times(1)).setStatusCode(200); + verify(responseMock, times(1)).end(); + + } } diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImplTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImplTest.java new file mode 100644 index 00000000..0279e535 --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/command/DeviceCommandServiceImplTest.java @@ -0,0 +1,155 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.command; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.anyString; +import static org.mockito.Mockito.doNothing; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoMoreInteractions; +import static org.mockito.Mockito.when; + +import java.io.IOException; + +import org.eclipse.hono.communication.api.data.DeviceCommandRequest; +import org.eclipse.hono.communication.api.exception.DeviceNotFoundException; +import org.eclipse.hono.communication.api.repository.DeviceRepository; +import org.eclipse.hono.communication.api.service.communication.InternalMessaging; +import org.eclipse.hono.communication.api.service.database.DatabaseService; +import org.eclipse.hono.communication.api.service.database.DatabaseServiceImpl; +import org.eclipse.hono.communication.core.app.DatabaseConfig; +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +import io.vertx.core.Future; + +class DeviceCommandServiceImplTest { + + private final DeviceRepository repositoryMock; + private final DatabaseService dbMock; + private final DeviceCommandServiceImpl deviceCommandService; + private final InternalMessagingConfig communicationConfig; + private final InternalMessaging internalCommunication; + private final DatabaseConfig databaseConfig; + + + DeviceCommandServiceImplTest() { + this.repositoryMock = mock(DeviceRepository.class); + this.dbMock = mock(DatabaseServiceImpl.class); + this.communicationConfig = mock(InternalMessagingConfig.class); + this.internalCommunication = mock(InternalMessaging.class); + this.databaseConfig = mock(DatabaseConfig.class); + this.deviceCommandService = new DeviceCommandServiceImpl( + + repositoryMock, + internalCommunication, + communicationConfig); + } + + @BeforeEach + void setUp() { + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(dbMock, + databaseConfig, + repositoryMock, + internalCommunication, + communicationConfig); + } + + + @Test + public void postCommand_whenDeviceExists_shouldSucceed() throws Exception { + final String deviceId = "device123"; + final String tenantId = "tenant123"; + final DeviceCommandRequest commandRequest = new DeviceCommandRequest(); + commandRequest.setBinaryData("dGVzdCBjb25maWcgMjIyMjIy"); + + when(repositoryMock.searchForDevice(deviceId, tenantId)).thenReturn(Future.succeededFuture(1)); + when(communicationConfig.getCommandTopicFormat()).thenReturn("%s.command"); + doNothing().when(internalCommunication).publish(anyString(), any(), any()); + + final Future result = deviceCommandService.postCommand(commandRequest, tenantId, deviceId); + + verify(repositoryMock).searchForDevice(deviceId, tenantId); + verify(communicationConfig).getCommandTopicFormat(); + verify(internalCommunication).publish(anyString(), any(), any()); + Assertions.assertTrue(result.succeeded()); + } + + @Test + public void postCommand_whenDeviceDoesNotExist_shouldThrowDeviceNotFoundException() { + final String deviceId = "device123"; + final String tenantId = "tenant123"; + final DeviceCommandRequest commandRequest = new DeviceCommandRequest(); + commandRequest.setBinaryData("dGVzdCBjb25maWcgMjIyMjIy"); + + when(repositoryMock.searchForDevice(deviceId, tenantId)).thenReturn(Future.succeededFuture(0)); + + final Future result = deviceCommandService.postCommand(commandRequest, tenantId, deviceId); + verify(repositoryMock).searchForDevice(deviceId, tenantId); + Assertions.assertTrue(result.failed()); + Assertions.assertSame(result.cause().getClass(), DeviceNotFoundException.class); + } + + @Test + public void postCommand_publish_error_shouldFailed() throws Exception { + final String deviceId = "device123"; + final String tenantId = "tenant123"; + final DeviceCommandRequest commandRequest = new DeviceCommandRequest(); + commandRequest.setBinaryData("dGVzdCBjb25maWcgMjIyMjIy"); + + when(repositoryMock.searchForDevice(deviceId, tenantId)).thenReturn(Future.succeededFuture(1)); + when(communicationConfig.getCommandTopicFormat()).thenReturn("%s.command"); + doThrow(new IOException()).when(internalCommunication).publish(anyString(), any(), any()); + + final Future result = deviceCommandService.postCommand(commandRequest, tenantId, deviceId); + + verify(repositoryMock).searchForDevice(deviceId, tenantId); + verify(communicationConfig).getCommandTopicFormat(); + verify(internalCommunication).publish(anyString(), any(), any()); + Assertions.assertTrue(result.failed()); + Assertions.assertSame(result.cause().getClass(), IOException.class); + } + + @Test + public void postCommand_noBase64_shouldFail() throws Exception { + final String deviceId = "device123"; + final String tenantId = "tenant123"; + final DeviceCommandRequest commandRequest = new DeviceCommandRequest(); + commandRequest.setBinaryData("test 2"); + + when(repositoryMock.searchForDevice(deviceId, tenantId)).thenReturn(Future.succeededFuture(1)); + when(communicationConfig.getCommandTopicFormat()).thenReturn("%s.command"); + doThrow(new IOException()).when(internalCommunication).publish(anyString(), any(), any()); + + final Future result = deviceCommandService.postCommand(commandRequest, tenantId, deviceId); + + + Assertions.assertTrue(result.failed()); + Assertions.assertSame(result.cause().getClass(), IllegalStateException.class); + } + + +} diff --git a/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java new file mode 100644 index 00000000..53dda994 --- /dev/null +++ b/device-communication/src/test/java/org/eclipse/hono/communication/api/service/communication/PubSubServiceTest.java @@ -0,0 +1,197 @@ +/* + * *********************************************************** + * Copyright (c) 2023 Contributors to the Eclipse Foundation + *

+ * See the NOTICE file(s) distributed with this work for additional + * information regarding copyright ownership. + *

+ * This program and the accompanying materials are made available under the + * terms of the Eclipse Public License 2.0 which is available at + * http://www.eclipse.org/legal/epl-2.0 + *

+ * SPDX-License-Identifier: EPL-2.0 + * ********************************************************** + * + */ + +package org.eclipse.hono.communication.api.service.communication; + +import static org.mockito.Mockito.*; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.TimeUnit; + + +import org.eclipse.hono.communication.core.app.InternalMessagingConfig; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentMatchers; +import org.mockito.MockedStatic; + + +import com.google.api.core.ApiFuture; +import com.google.cloud.pubsub.v1.MessageReceiver; +import com.google.cloud.pubsub.v1.Publisher; +import com.google.cloud.pubsub.v1.Subscriber; +import com.google.pubsub.v1.ProjectSubscriptionName; +import com.google.pubsub.v1.PubsubMessage; +import com.google.pubsub.v1.TopicName; + + +class PubSubServiceTest { + + private static final String PROJECT_ID = "your-project-id"; + private final String topic = "my-topic"; + private final byte[] message = "Hello world!".getBytes(); + private final Map attributes = new HashMap<>(); + private InternalMessagingConfig configMock; + private Publisher.Builder publisherBuilderMock; + private Publisher publisherMock; + private Subscriber.Builder subscriberBuilderMock; + private Subscriber subscriberMock; + private MessageReceiver messageReceiverMock; + + @BeforeEach + void setUp() { + configMock = mock(InternalMessagingConfig.class); + publisherBuilderMock = mock(Publisher.Builder.class); + publisherMock = mock(Publisher.class); + subscriberBuilderMock = mock(Subscriber.Builder.class); + subscriberMock = mock(Subscriber.class); + messageReceiverMock = mock(MessageReceiver.class); + } + + @AfterEach + void tearDown() { + verifyNoMoreInteractions(configMock, publisherBuilderMock, publisherMock, subscriberBuilderMock, subscriberMock, messageReceiverMock); + } + + @Test + void testPublishSuccessful() throws Exception { + + attributes.put("key1", "value1"); + + try (MockedStatic publisherMockedStatic = mockStatic(Publisher.class)) { + + final Publisher.Builder builderMock = mock(Publisher.Builder.class); + + final ApiFuture mockedApiFuture = mock(ApiFuture.class); + publisherMockedStatic.when(() -> Publisher.newBuilder(ArgumentMatchers.any(TopicName.class))).thenReturn(builderMock); + + when(builderMock.build()).thenReturn(publisherMock); + when(publisherMock.publish(ArgumentMatchers.any(PubsubMessage.class))).thenReturn(mockedApiFuture); + when(mockedApiFuture.get()).thenReturn("message-id"); + when(configMock.getProjectId()).thenReturn(PROJECT_ID); + + final PubSubService pubSubService = new PubSubService(configMock); + + // When + pubSubService.publish(topic, message, attributes); + + // Then + verify(configMock).getProjectId(); + verify(publisherMock).shutdown(); + verify(publisherMock).publish(ArgumentMatchers.any(PubsubMessage.class)); + verify(publisherMock).awaitTermination(1, TimeUnit.MINUTES); + publisherMockedStatic.verify(() -> Publisher.newBuilder(ArgumentMatchers.any(TopicName.class))); + publisherMockedStatic.verifyNoMoreInteractions(); + } + + } + + @Test + void testPublish_failed_null_pubSUb_message() throws Exception { + attributes.put("key1", "value1"); + + try (MockedStatic publisherMockedStatic = mockStatic(Publisher.class)) { + try (MockedStatic pubsubMessageMockedStatic = mockStatic(PubsubMessage.class)) { + + + final Publisher.Builder builderMock = mock(Publisher.Builder.class); + + final ApiFuture mockedApiFuture = mock(ApiFuture.class); + publisherMockedStatic.when(() -> Publisher.newBuilder(ArgumentMatchers.any(TopicName.class))).thenReturn(builderMock); + + when(builderMock.build()).thenReturn(publisherMock); + when(publisherMock.publish(ArgumentMatchers.any(PubsubMessage.class))).thenReturn(mockedApiFuture); + when(mockedApiFuture.get()).thenReturn("message-id"); + when(configMock.getProjectId()).thenReturn(PROJECT_ID); + + final PubSubService pubSubService = new PubSubService(configMock); + + pubSubService.publish(topic, message, attributes); + + verify(configMock).getProjectId(); + verify(publisherMock).shutdown(); + verify(publisherMock).awaitTermination(1, TimeUnit.MINUTES); + publisherMockedStatic.verify(() -> Publisher.newBuilder(ArgumentMatchers.any(TopicName.class))); + pubsubMessageMockedStatic.verify(PubsubMessage::newBuilder); + publisherMockedStatic.verifyNoMoreInteractions(); + pubsubMessageMockedStatic.verifyNoMoreInteractions(); + } + } + + } + + + @Test + public void testSubscribe_success() throws Exception { + final String subscriptionName = "my-sub"; + final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionName); + try (MockedStatic subscriberMockedStatic = mockStatic(Subscriber.class)) { + subscriberMockedStatic.when(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)).thenReturn(subscriberBuilderMock); + when(configMock.getProjectId()).thenReturn(PROJECT_ID); + when(subscriberBuilderMock.build()).thenReturn(subscriberMock); + final PubSubService pubSubServiceSpyClient = spy(new PubSubService(configMock)); + doReturn(projectSubscriptionName).when(pubSubServiceSpyClient).initSubscription(topic); + + + pubSubServiceSpyClient.subscribe(topic, messageReceiverMock); + + verify(configMock).getProjectId(); + verify(subscriberBuilderMock).build(); + verify(pubSubServiceSpyClient, times(1)).initSubscription(topic); + subscriberMockedStatic.verify(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)); + verify(subscriberMock, times(1)).startAsync(); + verify(pubSubServiceSpyClient).subscribe(topic, messageReceiverMock); + verifyNoMoreInteractions(pubSubServiceSpyClient, subscriberMock); + + + subscriberMockedStatic.verifyNoMoreInteractions(); + + + } + + } + + @Test + public void testSubscribe_failed() throws Exception { + final String subscriptionName = "my-sub"; + final ProjectSubscriptionName projectSubscriptionName = ProjectSubscriptionName.of(PROJECT_ID, subscriptionName); + try (MockedStatic subscriberMockedStatic = mockStatic(Subscriber.class)) { + subscriberMockedStatic.when(() -> Subscriber.newBuilder(projectSubscriptionName, messageReceiverMock)).thenReturn(subscriberBuilderMock); + when(configMock.getProjectId()).thenReturn(PROJECT_ID); + when(subscriberBuilderMock.build()).thenReturn(subscriberMock); + final PubSubService pubSubServiceSpyClient = spy(new PubSubService(configMock)); + doThrow(new IOException()).when(pubSubServiceSpyClient).initSubscription(topic); + + + pubSubServiceSpyClient.subscribe(topic, messageReceiverMock); + + + verify(configMock, times(1)).getProjectId(); + verify(pubSubServiceSpyClient, times(1)).initSubscription(topic); + verify(pubSubServiceSpyClient, times(1)).subscribe(topic, messageReceiverMock); + verifyNoMoreInteractions(pubSubServiceSpyClient, subscriberMock); + + + subscriberMockedStatic.verifyNoMoreInteractions(); + } + + } + + +}