diff --git a/CHANGES_NEXT_RELEASE b/CHANGES_NEXT_RELEASE index b7f170a829..966088838b 100644 --- a/CHANGES_NEXT_RELEASE +++ b/CHANGES_NEXT_RELEASE @@ -17,7 +17,8 @@ * Support for attributes of type JsonProperty * Support for the new URL parameter "format" for output formats (normalized, concise, simplified) * New service: DELETE /ngsi-ld/v1/entities (URL param 'type' only - the rest are missing still) - + * Distributed Subscriptions + ## Notes * TRoE is still not prepared for attributes of type Vocab/Json/Language, so, attributes of those types are not stored in the historical database * Modified the @context hosting feature to be according to API spec diff --git a/src/app/orionld/orionld.cpp b/src/app/orionld/orionld.cpp index d5b50acfc7..6582e11cfc 100644 --- a/src/app/orionld/orionld.cpp +++ b/src/app/orionld/orionld.cpp @@ -75,6 +75,7 @@ extern "C" { #include "kbase/kInit.h" // kInit +#include "kbase/kStringSplit.h" // kStringSplit #include "kalloc/kaInit.h" // kaInit #include "kalloc/kaBufferInit.h" // kaBufferInit #include "kalloc/kaBufferReset.h" // kaBufferReset @@ -321,7 +322,7 @@ bool noArrayReduction = false; #define SOCKET_SERVICE_PORT_DESC "port to receive new socket service connections" #define DISTRIBUTED_DESC "turn on distributed operation" #define BROKER_ID_DESC "identity of this broker instance for registrations - for the Via header" -#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps)" +#define WIP_DESC "Enable concepts that are 'Work In Progress' (e.g. -wip entityMaps,distSubs)" #define FORWARDING_DESC "turn on distributed operation (deprecated)" #define ID_INDEX_DESC "automatic mongo index on _id.id" #define NOSWAP_DESC "no swapping - for testing only!!!" @@ -1062,8 +1063,18 @@ int main(int argC, char* argV[]) if (wip[0] != 0) { - if (strcmp(wip, "entityMaps") == 0) - entityMapsEnabled = true; + char* wipV[3]; + int wips = kStringSplit(wip, ',', wipV, 3); + + for (int ix = 0; ix < wips; ix++) + { + if (strcmp(wipV[ix], "entityMaps") == 0) + entityMapsEnabled = true; + else if (strcmp(wipV[ix], "distSubs") == 0) + distSubsEnabled = true; + else + LM_X(1, ("Invalid value for -wip comma-separated list (allowed: 'entityMaps', 'distSubs')")); + } } #if 0 @@ -1348,27 +1359,29 @@ int main(int argC, char* argV[]) LM_K(("Initialization is Done")); LM_K((" Accepting REST requests on port %d (experimental API endpoints are %sabled)", port, (experimental == true)? "en" : "dis")); - LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled")); - LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled")); - LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled")); + LM_K((" TRoE: %s", (troe == true)? "Enabled" : "Disabled")); + LM_K((" Distributed Operation: %s", (distributed == true)? "Enabled" : "Disabled")); + LM_K((" Health Check: %s", (socketService == true)? "Enabled" : "Disabled")); + LM_K((" Entity Maps: %s", (entityMapsEnabled == true)? "Enabled" : "Disabled")); + LM_K((" Distributed Subscriptions: %s", (distSubsEnabled == true)? "Enabled" : "Disabled")); if (troe) - LM_K((" Postgres Server Version: %s", postgresServerVersion)); + LM_K((" Postgres Server Version: %s", postgresServerVersion)); - LM_K((" Mongo Server Version: %s", mongocServerVersion)); + LM_K((" Mongo Server Version: %s", mongocServerVersion)); if (mongocOnly == true) { - LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)")); - LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S)); + LM_K((" Mongo Driver: mongoc driver- ONLY (MongoDB C++ Legacy Driver is DISABLED)")); + LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S)); } else if (experimental == true) { - LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2")); - LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S)); + LM_K((" Mongo Driver: mongoc driver for NGSI-LD requests, Legacy Mongo C++ Driver for NGSIv1&2")); + LM_K((" MongoC Driver Version: %s", MONGOC_VERSION_S)); } else - LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)")); + LM_K((" Mongo Driver: Legacy C++ Driver (deprecated by mongodb)")); // Startup is done - we can free up the allocated kalloc buffers - assuming socketService doesn't use kalloc ... kaBufferReset(&orionldState.kalloc, KFALSE); diff --git a/src/app/orionld/orionldRestServices.cpp b/src/app/orionld/orionldRestServices.cpp index 7e25c782ff..5b66ddea36 100644 --- a/src/app/orionld/orionldRestServices.cpp +++ b/src/app/orionld/orionldRestServices.cpp @@ -26,6 +26,7 @@ #include "orionld/serviceRoutines/orionldPostEntities.h" #include "orionld/serviceRoutines/orionldPostNotify.h" +#include "orionld/serviceRoutines/orionldPostNotification.h" #include "orionld/serviceRoutines/orionldPostEntity.h" #include "orionld/serviceRoutines/orionldPostSubscriptions.h" #include "orionld/serviceRoutines/orionldPostRegistrations.h" @@ -123,6 +124,7 @@ static OrionLdRestServiceSimplified postServiceV[] = { "/ngsi-ld/v1/entities/*/attrs", orionldPostEntity }, { "/ngsi-ld/v1/entities", orionldPostEntities }, { "/ngsi-ld/ex/v1/notify", orionldPostNotify }, + { "/ngsi-ld/ex/v1/notifications/*", orionldPostNotification }, { "/ngsi-ld/v1/entityOperations/create", orionldPostBatchCreate }, { "/ngsi-ld/v1/entityOperations/upsert", orionldPostBatchUpsert }, { "/ngsi-ld/v1/entityOperations/update", orionldPostBatchUpdate }, diff --git a/src/lib/cache/CachedSubscription.h b/src/lib/cache/CachedSubscription.h index 85a008dcda..8d73509889 100644 --- a/src/lib/cache/CachedSubscription.h +++ b/src/lib/cache/CachedSubscription.h @@ -38,6 +38,7 @@ #include "orionld/types/OrionldAlteration.h" // OrionldAlterationTypes #include "orionld/types/OrionldTenant.h" // OrionldTenant #include "orionld/types/OrionldContext.h" // OrionldContext +#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription @@ -132,6 +133,8 @@ struct CachedSubscription double createdAt; double modifiedAt; + SubordinateSubscription* subordinateP; // Linked list of subordinate subscriptions + struct CachedSubscription* next; bool inDB; // Used by mongocSubCachePopulateByTenant to find subs that have been removed }; diff --git a/src/lib/logMsg/traceLevels.h b/src/lib/logMsg/traceLevels.h index 91344f6eae..2c0980c4d9 100644 --- a/src/lib/logMsg/traceLevels.h +++ b/src/lib/logMsg/traceLevels.h @@ -85,6 +85,7 @@ typedef enum TraceLevels LmtDistOpResponseDetail, // Details on responses to distributed requests LmtDistOpResponseHeaders, // HTTP headers of responses to distributed requests LmtRegMatch, // Distributed Operations: registration matching + LmtDistOpSubMatch, // Matching subscriptions with registrations // // Distributed Operations - misc diff --git a/src/lib/orionld/common/orionldRequestSend.cpp b/src/lib/orionld/common/orionldRequestSend.cpp index b88a3f56d8..940ba0b51c 100644 --- a/src/lib/orionld/common/orionldRequestSend.cpp +++ b/src/lib/orionld/common/orionldRequestSend.cpp @@ -22,7 +22,6 @@ * * Author: Ken Zangelin */ -#include // bcopy #include // curl #include "logMsg/logMsg.h" // LM_* diff --git a/src/lib/orionld/common/orionldState.cpp b/src/lib/orionld/common/orionldState.cpp index 1aa16c5006..cbf5e0fc05 100644 --- a/src/lib/orionld/common/orionldState.cpp +++ b/src/lib/orionld/common/orionldState.cpp @@ -109,6 +109,7 @@ size_t hostHeaderLen; PernotSubCache pernotSubCache; EntityMap* entityMaps = NULL; // Used by GET /entities in the distributed case, for pagination bool entityMapsEnabled = false; +bool distSubsEnabled = false; diff --git a/src/lib/orionld/common/orionldState.h b/src/lib/orionld/common/orionldState.h index 9832634618..e57371f27b 100644 --- a/src/lib/orionld/common/orionldState.h +++ b/src/lib/orionld/common/orionldState.h @@ -612,7 +612,8 @@ extern bool noCache; // From orionld.cpp extern uint32_t cSubCounters; // Number of subscription counter updates before flush from sub-cache to DB extern PernotSubCache pernotSubCache; extern EntityMap* entityMaps; // Used by GET /entities in the distributed case, for pagination -extern bool entityMapsEnabled; +extern bool entityMapsEnabled; // Enable Entity Maps +extern bool distSubsEnabled; // Enable distributed subscriptions extern bool noArrayReduction; // Used by arrayReduce in pCheckAttribute.cpp extern char localIpAndPort[135]; // Local address for X-Forwarded-For (from orionld.cpp) diff --git a/src/lib/orionld/dbModel/dbModelToApiSubscription.cpp b/src/lib/orionld/dbModel/dbModelToApiSubscription.cpp index 4fa86cf462..2d0cb81676 100644 --- a/src/lib/orionld/dbModel/dbModelToApiSubscription.cpp +++ b/src/lib/orionld/dbModel/dbModelToApiSubscription.cpp @@ -33,6 +33,7 @@ extern "C" #include "orionld/types/QNode.h" // QNode #include "orionld/types/OrionldRenderFormat.h" // OrionldRenderFormat +#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription #include "orionld/common/orionldState.h" // orionldState #include "orionld/common/orionldError.h" // orionldError #include "orionld/common/numberToDate.h" // numberToDate @@ -168,7 +169,13 @@ static bool notificationStatus(KjNode* dbLastSuccessP, KjNode* dbLastFailureP) // } // }, // "expiresAt": "2028-12-31T10:00:00", => "expiresAt" => "expiration" -// "throttling": 5 => SAME +// "throttling": 5, => SAME +// "subordinate": [ +// { +// "registrationId": "urn:R1", +// "subscriptionId": "urn:ngsi-ld:Subscription:S1:1" +// } +// ] // } // KjNode* dbModelToApiSubscription @@ -215,6 +222,7 @@ KjNode* dbModelToApiSubscription KjNode* dbCreatedAtP = NULL; KjNode* dbModifiedAtP = NULL; KjNode* timeIntervalNodeP = kjLookup(dbSubP, "timeInterval"); + KjNode* subordinateNodeP = kjLookup(dbSubP, "subordinate"); if ((orionldState.uriParamOptions.sysAttrs == true) || (forSubCache == true)) { @@ -693,6 +701,21 @@ KjNode* dbModelToApiSubscription dbLdContextP->name = (char*) "jsonldContext"; } + // + // + if (subordinateNodeP != NULL) + { + kjChildRemove(dbSubP, subordinateNodeP); + kjChildAdd(apiSubP, subordinateNodeP); + + for (KjNode* subordinateP = subordinateNodeP->value.firstChildP; subordinateP != NULL; subordinateP = subordinateP->next) + { + KjNode* runNoP = kjLookup(subordinateP, "runNo"); + + if (runNoP != NULL) + kjChildRemove(subordinateP, runNoP); + } + } if (qNodePP != NULL) // FIXME: This is more than a bit weird ... *qNodePP = NULL; diff --git a/src/lib/orionld/http/CMakeLists.txt b/src/lib/orionld/http/CMakeLists.txt index 64dc089adf..8dcf33151e 100644 --- a/src/lib/orionld/http/CMakeLists.txt +++ b/src/lib/orionld/http/CMakeLists.txt @@ -23,6 +23,8 @@ CMAKE_MINIMUM_REQUIRED(VERSION 3.5) SET (SOURCES httpHeaderLocationAdd.cpp httpHeaderLinkAdd.cpp + httpRequest.cpp + httpRequestHeaderAdd.cpp ) # Include directories diff --git a/src/lib/orionld/http/httpRequest.cpp b/src/lib/orionld/http/httpRequest.cpp new file mode 100644 index 0000000000..da6c1bf972 --- /dev/null +++ b/src/lib/orionld/http/httpRequest.cpp @@ -0,0 +1,276 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include // curl + +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +#include "kjson/kjParse.h" // kjParse +#include "kjson/kjRender.h" // kjFastRender +#include "kjson/kjRenderSize.h" // kjFastRenderSize +} + +#include "logMsg/logMsg.h" // LM_* +#include "logMsg/traceLevels.h" // Lmt* + +#include "orionld/common/orionldState.h" // orionldState +#include "orionld/types/OrionldProblemDetails.h" // OrionldProblemDetails +#include "orionld/types/OrionldResponseBuffer.h" // OrionldResponseBuffer +#include "orionld/http/httpRequest.h" // HttpKeyValue, Own interface + + + +// ----------------------------------------------------------------------------- +// +// curlDebug - from orionldRequestSend.cpp +// +extern int curlDebug(CURL* handle, curl_infotype type, char* data, size_t size, void* userptr); + + + +// ----------------------------------------------------------------------------- +// +// writeCallback - +// +static size_t writeCallback(void* contents, size_t size, size_t members, void* userP) +{ + size_t bytesToCopy = size * members; + OrionldResponseBuffer* rBufP = (OrionldResponseBuffer*) userP; + int xtraBytes = 512; + + LM_T(LmtCurl, ("CURL: got %d bytes of payload body: %s", bytesToCopy, contents)); + + if (bytesToCopy + rBufP->used >= rBufP->size) + { + if (rBufP->buf == rBufP->internalBuffer) + { + rBufP->buf = (char*) malloc(rBufP->size + bytesToCopy + xtraBytes); + + if (rBufP->buf == NULL) + LM_X(1, ("Runtime Error (out of memory)")); + + rBufP->size = rBufP->size + bytesToCopy + xtraBytes; + + if (rBufP->used > 0) // Copy contents from internal buffer that got too small + { + memcpy(rBufP->buf, rBufP->internalBuffer, rBufP->used); + } + } + else + { + rBufP->buf = (char*) realloc(rBufP->buf, rBufP->size + bytesToCopy + xtraBytes); + rBufP->size = rBufP->size + bytesToCopy + xtraBytes; + } + + if (rBufP->buf == NULL) + LM_X(1, ("Runtime Error (out of memory)")); + + // + // Save pointer to allocated buffer for later call to free() + // + orionldState.delayedFreePointer = rBufP->buf; + } + + memcpy(&rBufP->buf[rBufP->used], contents, bytesToCopy); + + rBufP->used += bytesToCopy; + rBufP->buf[rBufP->used] = 0; + + return bytesToCopy; +} + + + +// ----------------------------------------------------------------------------- +// +// httpRequest - +// +// RETURN VALUE +// -1 on fatal error (error info saved in pdP) +// +// PARAMETERS +// verb: +// ip: +// urlIn: with or without URL parameters (if 'uriParams' == NULL, it is used "as is") +// body: request payload body as KjNode tree +// uriParams: URI parameters, as an array of HttpKeyValue - NULL if "url" already contains the URL parameters +// headers: HTTP Headers, as an array of HttpKeyValue - both input and output +// tmo: Timeout of reading the response? in milliseconds +// response: response payload body, as a KjNode tree +// pdP: OrionldProblemDetails reference +// +int httpRequest +( + const char* ip, + const char* verb, + const char* urlIn, + KjNode* body, + HttpKeyValue* uriParams, + HttpKeyValue* headers, + int tmo, + KjNode** responseTree, + OrionldProblemDetails* pdP +) +{ + int httpStatus = 500; + char* url = (char*) urlIn; + + // FIXME: Use: + // LmtDistOpRequest (verb, path, and body of a distributed request) + // LmtDistOpRequestHeaders (HTTP headers of distributed requests) + // LmtDistOpRequestParams (URL parameters of distributed requests) + // LmtDistOpResponse (body and status code of the response to a distributed request) + // LmtDistOpResponseHeaders (HTTP headers of responses to distributed requests) + // + + *responseTree = NULL; + if (uriParams != NULL) + { + int len = 0; + + for (int ix = 0; uriParams[ix].key != NULL; ix++) + { + LM_T(LmtDistOpRequestParams, ("URI param: %s=%s", uriParams[ix].key, uriParams[ix].value)); + len += strlen(uriParams[ix].key) + 2 + strlen(uriParams[ix].value); // 2: =& + } + + len += 1 + strlen(urlIn) + 1; // 1:? 1:\0 + + url = kaAlloc(&orionldState.kalloc, len); + if (url == NULL) + { + pdP->title = (char*) "Out of memory"; + pdP->detail = (char*) "trying to allocate memory for full URL (including URL Params)"; + LM_RE(-1, ("%s: %s - total length: %d", pdP->title, pdP->detail, len)); + } + + snprintf(url, len, "%s?", urlIn); + + for (int ix = 0; uriParams[ix].key != NULL; ix++) + { + strcat(url, uriParams[ix].key); + strcat(url, "="); + strcat(url, uriParams[ix].value); + if (uriParams[ix+1].key != NULL) + strcat(url, "&"); + } + } + + struct curl_context cc; + struct curl_slist* curlHeaders = NULL; + CURLcode cCode; + + get_curl_context(ip, &cc); + if (cc.curl == NULL) + { + pdP->title = (char*) "Unable to obtain CURL context"; + pdP->detail = (char*) "FIXME: get the curl error string"; + pdP->status = 500; + + LM_RE(-1, ("Internal Error (Unable to obtain CURL context)")); + } + + // + // Is curl to be debugged? + // + if (debugCurl == true) + { + curl_easy_setopt(cc.curl, CURLOPT_VERBOSE, 1L); + curl_easy_setopt(cc.curl, CURLOPT_DEBUGFUNCTION, curlDebug); + } + + // + // Prepare the CURL handle + // + OrionldResponseBuffer rBuf; + + bzero(&rBuf, sizeof(rBuf)); + + curl_easy_setopt(cc.curl, CURLOPT_URL, url); // Set the URL Path + curl_easy_setopt(cc.curl, CURLOPT_CUSTOMREQUEST, verb); // Set the HTTP verb + curl_easy_setopt(cc.curl, CURLOPT_WRITEFUNCTION, writeCallback); // Callback function for writes + curl_easy_setopt(cc.curl, CURLOPT_WRITEDATA, &rBuf); // Custom data for response handling + curl_easy_setopt(cc.curl, CURLOPT_TIMEOUT_MS, tmo); // Timeout, in milliseconds + curl_easy_setopt(cc.curl, CURLOPT_FAILONERROR, true); // Fail On Error - to detect 404 etc. + curl_easy_setopt(cc.curl, CURLOPT_FOLLOWLOCATION, 1L); // Follow redirections + + if (body != NULL) + { + unsigned long bodySize = kjFastRenderSize(body) + 64; + char* serializedBody = kaAlloc(&orionldState.kalloc, bodySize); + + kjFastRender(body, serializedBody); + LM_T(LmtDistOpRequest, ("Body for HTTP request: '%s'", serializedBody)); + curl_easy_setopt(cc.curl, CURLOPT_POSTFIELDS, (u_int8_t*) serializedBody); + + int contentLen = strlen(serializedBody); + char ibuf[64]; + + snprintf(ibuf, sizeof(ibuf), "Content-Length:%d", contentLen); + curlHeaders = curl_slist_append(curlHeaders, ibuf); + LM_T(LmtDistOpRequestHeaders, ("Content-Length HTTP Header: %s", ibuf)); + } + + if (headers != NULL) + { + for (int ix = 0; headers[ix].key != NULL; ix++) + { + char buf[256]; + + LM_T(LmtDistOpRequestHeaders, ("HTTP Header: %s: %s", headers[ix].key, headers[ix].value)); + + snprintf(buf, sizeof(buf) - 1, "%s:%s", headers[ix].key, headers[ix].value); + curlHeaders = curl_slist_append(curlHeaders, buf); + } + + curl_easy_setopt(cc.curl, CURLOPT_HTTPHEADER, curlHeaders); + } + + LM_T(LmtDistOpRequest, ("Sending HTTP request: %s %s", verb, url)); + cCode = curl_easy_perform(cc.curl); + + // + // Cleanup + // + if (headers != NULL) + curl_slist_free_all(curlHeaders); + + release_curl_context(&cc); + + if (cCode != CURLE_OK) + { + pdP->detail = (char*) url; + pdP->title = (char*) "Internal CURL Error"; + + LM_RE(-1, ("Internal Error (curl_easy_perform returned error code %d)", cCode)); + } + else + httpStatus = 201; // FIXME: get it from the response instead!!! + + // The downloaded buffer is in rBuf.buf - let's parse it into a KjNode tree! + *responseTree = kjParse(orionldState.kjsonP, rBuf.buf); + + return httpStatus; +} diff --git a/src/lib/orionld/http/httpRequest.h b/src/lib/orionld/http/httpRequest.h new file mode 100644 index 0000000000..685aabbd9b --- /dev/null +++ b/src/lib/orionld/http/httpRequest.h @@ -0,0 +1,69 @@ +#ifndef SRC_LIB_ORIONLD_HTTP_HTTPREQUEST_H_ +#define SRC_LIB_ORIONLD_HTTP_HTTPREQUEST_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +} + +#include "orionld/types/OrionldProblemDetails.h" // OrionldProblemDetails +#include "orionld/types/HttpKeyValue.h" // HttpKeyValue + + + +// ----------------------------------------------------------------------------- +// +// httpRequest - +// +// RETURN VALUE +// -1 on fatal error (details saved in pd) +// +// PARAMETERS +// verb: +// ip: +// url: with or without URL parameters +// body: request payload body as KjNode tree +// uriParams: URI parameters, as an array of HttpKeyValue - NULL if "url" already contains the URL parameters +// headers: HTTP Headers, as an array of HttpKeyValue - both input and output +// tmo: Timeout of reading the response? in milliseconds +// response: response payload body, as a KjNode tree +// pdP: ProblemDetails reference +// +extern int httpRequest +( + const char* verb, + const char* ip, + const char* url, + KjNode* body, + HttpKeyValue* uriParams, + HttpKeyValue* headers, + int tmo, + KjNode** response, + OrionldProblemDetails* pdP +); + +#endif // SRC_LIB_ORIONLD_HTTP_HTTPREQUEST_H_ diff --git a/src/lib/orionld/http/httpRequestHeaderAdd.cpp b/src/lib/orionld/http/httpRequestHeaderAdd.cpp new file mode 100644 index 0000000000..bfc5a17c49 --- /dev/null +++ b/src/lib/orionld/http/httpRequestHeaderAdd.cpp @@ -0,0 +1,59 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include // NULL +#include // snprintf + +extern "C" +{ +#include "kalloc/kaAlloc.h" // kaAlloc +} + +#include "logMsg/logMsg.h" // LM_* + +#include "orionld/types/HttpKeyValue.h" // HttpKeyValue +#include "orionld/common/orionldState.h" // orionldState +#include "orionld/http/httpRequestHeaderAdd.h" // Own interface + + + +// ----------------------------------------------------------------------------- +// +// httpRequestHeaderAdd - +// +// NOTE: +// HTTP headers are in an array of HttpKeyValue. +// the headerP references an index in that array and the caller needs to increment the index +// +void httpRequestHeaderAdd(HttpKeyValue* headerP, const char* key, const char* value, int ivalue) +{ + char* val = (value == NULL)? kaAlloc(&orionldState.kalloc, 16) : (char*) value; + + if (value == NULL) + snprintf(val, 15, "%d", ivalue); + + LM_T(LmtSR, ("Adding HTTP header: %s=%s", key, val)); + headerP->key = (char*) key; + headerP->value = (char*) val; +} diff --git a/src/lib/orionld/http/httpRequestHeaderAdd.h b/src/lib/orionld/http/httpRequestHeaderAdd.h new file mode 100644 index 0000000000..e216b579f2 --- /dev/null +++ b/src/lib/orionld/http/httpRequestHeaderAdd.h @@ -0,0 +1,38 @@ +#ifndef SRC_LIB_ORIONLD_HTTP_HTTPREQUESTHEADERADD_H_ +#define SRC_LIB_ORIONLD_HTTP_HTTPREQUESTHEADERADD_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include "orionld/types/HttpKeyValue.h" // HttpKeyValue + + + +// ----------------------------------------------------------------------------- +// +// httpRequestHeaderAdd - +// +extern void httpRequestHeaderAdd(HttpKeyValue* headerP, const char* key, const char* value, int ivalue); + +#endif // SRC_LIB_ORIONLD_HTTP_HTTPREQUESTHEADERADD_H_ diff --git a/src/lib/orionld/kjTree/kjTreeFromCachedSubscription.cpp b/src/lib/orionld/kjTree/kjTreeFromCachedSubscription.cpp index 0b343e52c5..50c91f9dac 100644 --- a/src/lib/orionld/kjTree/kjTreeFromCachedSubscription.cpp +++ b/src/lib/orionld/kjTree/kjTreeFromCachedSubscription.cpp @@ -37,6 +37,7 @@ extern "C" #include "cache/subCache.h" // CachedSubscription +#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription #include "orionld/common/orionldState.h" // orionldState #include "orionld/common/numberToDate.h" // numberToDate #include "orionld/common/eqForDot.h" // eqForDot @@ -503,6 +504,30 @@ KjNode* kjTreeFromCachedSubscription(CachedSubscription* cSubP, bool sysAttrs, b kjChildAdd(sP, nodeP); } + // + // Subordinate subscriptions + // + if ((distSubsEnabled == true) && (cSubP->subordinateP != NULL)) + { + SubordinateSubscription* subordinateP = cSubP->subordinateP; + KjNode* subSubArray = kjArray(orionldState.kjsonP, "subordinate"); + + while (subordinateP != NULL) + { + KjNode* subSub = kjObject(orionldState.kjsonP, NULL); + KjNode* remoteSubId = kjString(orionldState.kjsonP, "registrationId", subordinateP->registrationId); + KjNode* registrationId = kjString(orionldState.kjsonP, "subscriptionId", subordinateP->subscriptionId); + + kjChildAdd(subSub, remoteSubId); + kjChildAdd(subSub, registrationId); + kjChildAdd(subSubArray, subSub); + + subordinateP = subordinateP->next; + } + + kjChildAdd(sP, subSubArray); + } + // // origin // diff --git a/src/lib/orionld/regMatch/CMakeLists.txt b/src/lib/orionld/regMatch/CMakeLists.txt index 060c00856f..1527e984a1 100644 --- a/src/lib/orionld/regMatch/CMakeLists.txt +++ b/src/lib/orionld/regMatch/CMakeLists.txt @@ -37,6 +37,7 @@ SET (SOURCES regMatchAttributesForQuery.cpp regMatchEntityInfoForQuery.cpp regMatchForEntitiesQuery.cpp + regMatchSubscription.cpp ) # Include directories diff --git a/src/lib/orionld/regMatch/regMatchSubscription.cpp b/src/lib/orionld/regMatch/regMatchSubscription.cpp new file mode 100644 index 0000000000..46045f21dd --- /dev/null +++ b/src/lib/orionld/regMatch/regMatchSubscription.cpp @@ -0,0 +1,102 @@ +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +#include "kjson/kjLookup.h" // kjLookup +} + +#include "logMsg/logMsg.h" // LM_* + +#include "cache/CachedSubscription.h" // CachedSubscription + +#include "orionld/common/orionldState.h" // orionldState +#include "orionld/types/RegCache.h" // RegCache +#include "orionld/types/RegCacheItem.h" // RegCacheItem +#include "orionld/regMatch/regMatchSubscription.h" // Own interface + + + +// ----------------------------------------------------------------------------- +// +// regMatchSubscription - +// +bool regMatchSubscription +( + RegCacheItem* rciP, + CachedSubscription* cSubP, + char** entityTypeP +) +{ + KjNode* regInfoP = kjLookup(rciP->regTree, "information"); + + if (regInfoP == NULL) + return false; + + for (unsigned long ix = 0; ix < cSubP->entityIdInfos.size(); ix++) + { + EntityInfo* eiP = cSubP->entityIdInfos[ix]; + + // For now, only match subs/regs with entity type only + LM_T(LmtSR, ("entityType : '%s', entityId: '%s'", eiP->entityType.c_str(), eiP->entityId.c_str())); + if ((eiP->entityType != "") && (eiP->entityId == ".*")) + { + const char* entityType = eiP->entityType.c_str(); + + // We have the entity type of the subscription, now match against the registration + for (RegCacheItem* rciP = orionldState.tenantP->regCache->regList; rciP != NULL; rciP = rciP->next) + { + KjNode* informationP = kjLookup(rciP->regTree, "information"); + if (informationP == NULL) + continue; + + for (KjNode* infoP = informationP->value.firstChildP; infoP != NULL; infoP = infoP->next) + { + KjNode* entitiesP = kjLookup(infoP, "entities"); + if (entitiesP == NULL) + continue; + + for (KjNode* entityInfoP = entitiesP->value.firstChildP; entityInfoP != NULL; entityInfoP = entityInfoP->next) + { + // Only supported if only an entity type is given + KjNode* typeP = entityInfoP->value.firstChildP; + + if (strcmp(typeP->name, "type") != 0) + continue; + + if (typeP->next != NULL) + continue; + + LM_T(LmtSR, ("Found a matching registration for entity type '%s': %s", entityType, rciP->regId)); + *entityTypeP = (char*) entityType; + return true; + } + } + } + } + } + + return false; +} diff --git a/src/lib/orionld/regMatch/regMatchSubscription.h b/src/lib/orionld/regMatch/regMatchSubscription.h new file mode 100644 index 0000000000..617a9876e5 --- /dev/null +++ b/src/lib/orionld/regMatch/regMatchSubscription.h @@ -0,0 +1,44 @@ +#ifndef SRC_LIB_ORIONLD_REGMATCH_REGMATCHSUBSCRIPTION_H_ +#define SRC_LIB_ORIONLD_REGMATCH_REGMATCHSUBSCRIPTION_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include "cache/CachedSubscription.h" // CachedSubscription +#include "orionld/types/RegCacheItem.h" // RegCacheItem + + + +// ----------------------------------------------------------------------------- +// +// regMatchSubscription - +// +extern bool regMatchSubscription +( + RegCacheItem* regP, + CachedSubscription* cSubP, + char** entityTypeP +); + +#endif // SRC_LIB_ORIONLD_REGMATCH_REGMATCHSUBSCRIPTION_H_ diff --git a/src/lib/orionld/service/orionldServiceInit.cpp b/src/lib/orionld/service/orionldServiceInit.cpp index 8bf609c622..6c449e5cb9 100644 --- a/src/lib/orionld/service/orionldServiceInit.cpp +++ b/src/lib/orionld/service/orionldServiceInit.cpp @@ -80,6 +80,7 @@ #include "orionld/serviceRoutines/orionldPostContexts.h" // orionldPostContexts #include "orionld/serviceRoutines/orionldDeleteContext.h" // orionldDeleteContext #include "orionld/serviceRoutines/orionldPostNotify.h" // orionldPostNotify +#include "orionld/serviceRoutines/orionldPostNotification.h" // orionldPostNotification #include "orionld/serviceRoutines/orionldGetTemporalEntities.h" // orionldGetTemporalEntities #include "orionld/serviceRoutines/orionldGetTemporalEntity.h" // orionldGetTemporalEntity @@ -214,6 +215,10 @@ static void restServicePrepare(OrionLdRestService* serviceP, OrionLdRestServiceS { serviceP->uriParams |= ORIONLD_URIPARAM_SUBSCRIPTION_ID; } + else if (serviceP->serviceRoutine == orionldPostNotification) + { + serviceP->uriParams |= ORIONLD_URIPARAM_SUBSCRIPTION_ID; + } else if (serviceP->serviceRoutine == orionldGetEntities) { serviceP->uriParams |= ORIONLD_URIPARAM_OPTIONS; diff --git a/src/lib/orionld/serviceRoutines/CMakeLists.txt b/src/lib/orionld/serviceRoutines/CMakeLists.txt index 02f0319df1..6929239bf3 100644 --- a/src/lib/orionld/serviceRoutines/CMakeLists.txt +++ b/src/lib/orionld/serviceRoutines/CMakeLists.txt @@ -66,6 +66,7 @@ SET (SOURCES orionldPostContexts.cpp orionldDeleteContext.cpp orionldPostNotify.cpp + orionldPostNotification.cpp orionldGetPing.cpp orionldPatchEntity2.cpp orionldOptions.cpp diff --git a/src/lib/orionld/serviceRoutines/orionldPostNotification.cpp b/src/lib/orionld/serviceRoutines/orionldPostNotification.cpp new file mode 100644 index 0000000000..13631bf8f0 --- /dev/null +++ b/src/lib/orionld/serviceRoutines/orionldPostNotification.cpp @@ -0,0 +1,107 @@ +/* +* + Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +#include "kjson/kjLookup.h" // kjLookup +} + +#include "logMsg/logMsg.h" // LM_* + +#include "cache/CachedSubscription.h" // CachedSubscription +#include "cache/subCache.h" // subCacheItemLookup + +#include "orionld/types/HttpKeyValue.h" // HttpKeyValue +#include "orionld/types/OrionLdRestService.h" // OrionLdRestService +#include "orionld/common/orionldState.h" // orionldState +#include "orionld/common/orionldError.h" // orionldError +#include "orionld/http/httpRequest.h" // httpRequest +#include "orionld/http/httpRequestHeaderAdd.h" // httpRequestHeaderAdd +#include "orionld/serviceRoutines/orionldPostNotification.h" // Own interface + + + +// ---------------------------------------------------------------------------- +// +// orionldPostNotification - +// +bool orionldPostNotification(void) +{ + char* parentSubId = orionldState.wildcard[0]; + + if (distSubsEnabled == false) + { + LM_W(("Got a notification on remote subscription subordinate to '%s', but, distributed subscriptions are not enabled", parentSubId)); + + orionldError(OrionldOperationNotSupported, "Distributed Subscriptions Are Not Enabled", orionldState.serviceP->url, 501); + orionldState.noLinkHeader = true; // We don't want the Link header for non-implemented requests + + return true; + } + + LM_T(LmtSR, ("Got a notification on remote subscription subordinate to '%s'", parentSubId)); + + kjTreeLog(orionldState.requestTree, "notification", LmtSR); + + CachedSubscription* cSubP = subCacheItemLookup(orionldState.tenantP->tenant, parentSubId); + if (cSubP == NULL) + { + LM_W(("Got a notification from a remote subscription '%s' on IP:PORT, but, its local parent subscription was not found", parentSubId)); + return false; + } + + // Modify the payload body to fit the "new" notification triggered + KjNode* subIdNodeP = kjLookup(orionldState.requestTree, "subscriptionId"); + if (subIdNodeP != NULL) + subIdNodeP->value.s = parentSubId; + + // Send the notification + OrionldProblemDetails pd; + char url[256]; + KjNode* responseTree = NULL; + int httpStatus; + HttpKeyValue uriParams[2]; + HttpKeyValue headers[3]; + + bzero(&uriParams, sizeof(uriParams)); + bzero(&headers, sizeof(headers)); + + uriParams[0].key = (char*) "subscriptionId"; + uriParams[0].value = (char*) parentSubId; + + snprintf(url, sizeof(url), "%s://%s:%d/%s", cSubP->protocolString, cSubP->ip, cSubP->port, cSubP->rest); + LM_T(LmtSR, ("ip: '%s'", cSubP->ip)); + LM_T(LmtSR, ("url: '%s'", url)); + + httpRequestHeaderAdd(&headers[0], "Content-Type", "application/json", 0); + httpStatus = httpRequest(cSubP->ip, "POST", url, orionldState.requestTree, uriParams, headers, 5000, &responseTree, &pd); + if (httpStatus != 200) + { + LM_W(("httpRequest for a forwarded notification gave HTTP status %d", httpStatus)); + kjTreeLog(responseTree, "forwarded notification response body", LmtSR); + } + + return true; +} diff --git a/src/lib/orionld/serviceRoutines/orionldPostNotification.h b/src/lib/orionld/serviceRoutines/orionldPostNotification.h new file mode 100644 index 0000000000..fa0db153bb --- /dev/null +++ b/src/lib/orionld/serviceRoutines/orionldPostNotification.h @@ -0,0 +1,37 @@ +#ifndef SRC_LIB_ORIONLD_SERVICEROUTINES_ORIONLDPOSTNOTIFICATION_H_ +#define SRC_LIB_ORIONLD_SERVICEROUTINES_ORIONLDPOSTNOTIFICATION_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ + + + +// ---------------------------------------------------------------------------- +// +// orionldPostNotification - +// +extern bool orionldPostNotification(void); + +#endif // SRC_LIB_ORIONLD_SERVICEROUTINES_ORIONLDPOSTNOTIFICATION_H_ diff --git a/src/lib/orionld/serviceRoutines/orionldPostSubscriptions.cpp b/src/lib/orionld/serviceRoutines/orionldPostSubscriptions.cpp index 94ec76e343..5d2664aaab 100644 --- a/src/lib/orionld/serviceRoutines/orionldPostSubscriptions.cpp +++ b/src/lib/orionld/serviceRoutines/orionldPostSubscriptions.cpp @@ -39,11 +39,15 @@ extern "C" #include "orionld/types/QNode.h" // QNode #include "orionld/types/PernotSubscription.h" // PernotSubscription #include "orionld/types/PernotSubCache.h" // PernotSubCache +#include "orionld/types/RegCache.h" // RegCache +#include "orionld/types/RegCacheItem.h" // RegCacheItem +#include "orionld/types/SubordinateSubscription.h" // SubordinateSubscription #include "orionld/common/orionldState.h" // orionldState, coreContextUrl #include "orionld/common/orionldError.h" // orionldError #include "orionld/common/uuidGenerate.h" // uuidGenerate #include "orionld/common/subCacheApiSubscriptionInsert.h" // subCacheApiSubscriptionInsert #include "orionld/http/httpHeaderLocationAdd.h" // httpHeaderLocationAdd +#include "orionld/http/httpRequestHeaderAdd.h" // httpRequestHeaderAdd #include "orionld/legacyDriver/legacyPostSubscriptions.h" // legacyPostSubscriptions #include "orionld/kjTree/kjChildPrepend.h" // kjChildPrepend #include "orionld/dbModel/dbModelFromApiSubscription.h" // dbModelFromApiSubscription @@ -60,10 +64,124 @@ extern "C" #include "orionld/q/qAliasCompact.h" // qAliasCompact #include "orionld/q/qPresent.h" // qPresent #include "orionld/payloadCheck/pCheckSubscription.h" // pCheckSubscription +#include "orionld/http/httpRequest.h" // httpRequest +#include "orionld/common/tenantList.h" // tenant0 +#include "orionld/regMatch/regMatchSubscription.h" // regMatchSubscription #include "orionld/serviceRoutines/orionldPostSubscriptions.h" // Own Interface +// ----------------------------------------------------------------------------- +// +// subordinateCreate - create a subordinate subscription on another endpoint +// +// { +// "id": "cSubP->subId:000x", +// "type": "Subscription", +// "entities": [ +// { +// "type": "xxx" +// } +// ], +// "notification": { +// "endpoint": { +// "uri":
+// } +// } +// } +// +SubordinateSubscription* subordinateCreate(CachedSubscription* cSubP, RegCacheItem* rciP, const char* entityType) +{ + LM_T(LmtSR, ("Creating a subscription subordinate to '%s' on '%s'", cSubP->subscriptionId, rciP->regId)); + + int runNo = 1; + + for (SubordinateSubscription* subordinateP = cSubP->subordinateP; subordinateP != NULL; subordinateP = subordinateP->next) + { + runNo = MAX(runNo, subordinateP->runNo); + } + + char subSubId[128]; + snprintf(subSubId, sizeof(subSubId), "%s:%d", cSubP->subscriptionId, runNo); + + char notificationUrl[256]; + snprintf(notificationUrl, sizeof(notificationUrl), "http://%s/ngsi-ld/ex/v1/notifications/%s", localIpAndPort, cSubP->subscriptionId); + + KjNode* bodyP = kjObject(orionldState.kjsonP, NULL); + KjNode* idP = kjString(orionldState.kjsonP, "id", subSubId); + KjNode* typeP = kjString(orionldState.kjsonP, "type", "Subscription"); + KjNode* entitiesP = kjArray(orionldState.kjsonP, "entities"); + KjNode* entityP = kjObject(orionldState.kjsonP, NULL); + KjNode* entityTypeP = kjString(orionldState.kjsonP, "type", entityType); + KjNode* notificationP = kjObject(orionldState.kjsonP, "notification"); + KjNode* endpointP = kjObject(orionldState.kjsonP, "endpoint"); + KjNode* urlP = kjString(orionldState.kjsonP, "uri", notificationUrl); + + kjChildAdd(bodyP, idP); + kjChildAdd(bodyP, typeP); + kjChildAdd(bodyP, entitiesP); + kjChildAdd(bodyP, notificationP); + + kjChildAdd(entitiesP, entityP); + kjChildAdd(entityP, entityTypeP); + + kjChildAdd(notificationP, endpointP); + kjChildAdd(endpointP, urlP); + + kjTreeLog(bodyP, "Subordinate subscription", LmtSR); + + HttpKeyValue headers[3]; + int headerIx = 0; + + bzero(&headers, sizeof(headers)); + + if (orionldState.tenantP != &tenant0) + httpRequestHeaderAdd(&headers[headerIx++], "NGSILD-Tenant", orionldState.tenantP->tenant, 0); + + KjNode* responseBody = NULL; + int tmo = 5000; + OrionldProblemDetails pd; + char rciIp[128]; + char rciUrl[512]; + char* colon = strchr(rciP->ipAndPort, ':'); + + if (colon != NULL) + *colon = 0; + strncpy(rciIp, rciP->ipAndPort, sizeof(rciIp) - 1); + if (colon != NULL) + *colon = ':'; + + LM_T(LmtSR, ("IP of registration: '%s'", rciIp)); + snprintf(rciUrl, sizeof(rciUrl) - 1, "http://%s/ngsi-ld/v1/subscriptions", rciP->ipAndPort); + + httpRequestHeaderAdd(&headers[headerIx++], "Content-Type", "application/json", 0); + int httpStatus = httpRequest(rciIp, "POST", rciUrl, bodyP, NULL, headers, tmo, &responseBody, &pd); + if (httpStatus != 201) + { + LM_W(("Attempt to create subordinate subscription failed with a %d", httpStatus)); + return NULL; + } + + SubordinateSubscription* subordinateP = (SubordinateSubscription*) calloc(1, sizeof(SubordinateSubscription)); + if (subordinateP == NULL) + LM_X(1, ("Out of memory allocating a subordinate subscription")); + + subordinateP->subscriptionId = strdup(subSubId); + if (subordinateP->subscriptionId == NULL) + LM_X(1, ("Out of memory allocating the id of a subordinate subscription")); + + subordinateP->registrationId = rciP->regId; + subordinateP->runNo = runNo; + subordinateP->next = cSubP->subordinateP; + + cSubP->subordinateP = subordinateP; + LM_T(LmtSR, ("***************** Added subordinate subs to '%s' at %p", cSubP->subscriptionId, subordinateP)); + + return subordinateP; +} + + + // ---------------------------------------------------------------------------- // // orionldPostSubscriptions - @@ -301,6 +419,47 @@ bool orionldPostSubscriptions(void) // LM_T(LmtPernotLoop, ("pernotSubCache.newSubs == %d", pernotSubCache.newSubs)); } + + // + // Any subordinate subscriptions needed? + // + if ((distSubsEnabled == true) && (orionldState.uriParams.local == false)) + { + // + // Find matching regs + // Create a subordinate subscription in brokers behind matching regs, if "subCreate" is in "operations" + // + for (RegCacheItem* rciP = orionldState.tenantP->regCache->regList; rciP != NULL; rciP = rciP->next) + { + char* entityTypeP; + + if (regMatchSubscription(rciP, cSubP, &entityTypeP) == true) + { + SubordinateSubscription* subSubP = subordinateCreate(cSubP, rciP, entityTypeP); + + // Add the subordinate to subP + KjNode* subordinateP = kjLookup(subP, "subordinate"); + + if (subordinateP == NULL) + { + subordinateP = kjArray(orionldState.kjsonP, "subordinate"); + kjChildAdd(subP, subordinateP); + } + + KjNode* subSubNodeP = kjObject(orionldState.kjsonP, NULL); // No name - part of array + KjNode* subIdP = kjString(orionldState.kjsonP, "subscriptionId", subSubP->subscriptionId); + KjNode* regIdP = kjString(orionldState.kjsonP, "registrationId", subSubP->registrationId); + KjNode* runNoP = kjInteger(orionldState.kjsonP, "runNo", subSubP->runNo); + + kjChildAdd(subSubNodeP, subIdP); + kjChildAdd(subSubNodeP, regIdP); + kjChildAdd(subSubNodeP, runNoP); + + kjChildAdd(subordinateP, subSubNodeP); + } + } + } + // dbModel KjNode* dbSubscriptionP = subP; subIdP->name = (char*) "_id"; // 'id' needs to be '_id' - mongo stuff ... diff --git a/src/lib/orionld/types/HttpKeyValue.h b/src/lib/orionld/types/HttpKeyValue.h new file mode 100644 index 0000000000..2833088fdb --- /dev/null +++ b/src/lib/orionld/types/HttpKeyValue.h @@ -0,0 +1,41 @@ +#ifndef SRC_LIB_ORIONLD_TYPES_HTTPKEYVALUE_H_ +#define SRC_LIB_ORIONLD_TYPES_HTTPKEYVALUE_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ + + + +// ----------------------------------------------------------------------------- +// +// HttpKeyValue - +// +typedef struct HttpKeyValue +{ + char* key; + char* value; +} HttpKeyValue; + +#endif // SRC_LIB_ORIONLD_TYPES_HTTPKEYVALUE_H_ diff --git a/src/lib/orionld/types/SubCache.h b/src/lib/orionld/types/SubCache.h new file mode 100644 index 0000000000..97427baaec --- /dev/null +++ b/src/lib/orionld/types/SubCache.h @@ -0,0 +1,44 @@ +#ifndef SRC_LIB_ORIONLD_TYPES_SUBCACHE_H_ +#define SRC_LIB_ORIONLD_TYPES_SUBCACHE_H_ + +/* +* +* Copyright 2023 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include "orionld/types/OrionldTenant.h" // OrionldTenant +#include "orionld/types/SubCacheItem.h" // SubCacheItem + + + +// ----------------------------------------------------------------------------- +// +// SubCache - +// +typedef struct SubCache +{ + OrionldTenant* tenantP; + SubCacheItem* subList; + SubCacheItem* last; +} SubCache; + +#endif // SRC_LIB_ORIONLD_TYPES_SUBCACHE_H_ diff --git a/src/lib/orionld/types/SubCacheItem.h b/src/lib/orionld/types/SubCacheItem.h new file mode 100644 index 0000000000..795e4d1eaf --- /dev/null +++ b/src/lib/orionld/types/SubCacheItem.h @@ -0,0 +1,72 @@ +#ifndef SRC_LIB_ORIONLD_TYPES_SUBCACHEITEM_H_ +#define SRC_LIB_ORIONLD_TYPES_SUBCACHEITEM_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include // types: uint32_t, ... + +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +} + +#include "orionld/types/OrionldContext.h" // OrionldContext + + + +// ----------------------------------------------------------------------------- +// +// SubDeltas - +// +typedef struct SubDeltas +{ + uint32_t timesSent; + uint32_t timesFailed; + double lastSuccess; + double lastFailure; +} SubDeltas; + + + +// ----------------------------------------------------------------------------- +// +// SubCacheItem - +// +typedef struct SubCacheItem +{ + char* subId; // Set when creating subscription - points inside subTree + KjNode* subTree; + SubDeltas deltas; + bool dirty; // The subscription has been patched - not only counters differ from copy in DB + OrionldContext* contextP; // Set when creating/patching registration + char* hostAlias; // Broker identity - for the Via header + + // "Shortcuts" and transformations + double modifiedAt; // Copied from inside the subTree + + struct SubCacheItem* next; +} SubCacheItem; + +#endif // SRC_LIB_ORIONLD_TYPES_SUBCACHEITEM_H_ diff --git a/src/lib/orionld/types/SubordinateSubscription.h b/src/lib/orionld/types/SubordinateSubscription.h new file mode 100644 index 0000000000..7c2a3ad22c --- /dev/null +++ b/src/lib/orionld/types/SubordinateSubscription.h @@ -0,0 +1,48 @@ +#ifndef SRC_LIB_ORIONLD_TYPES_SUBORDINATESUBSCRIPTION_H_ +#define SRC_LIB_ORIONLD_TYPES_SUBORDINATESUBSCRIPTION_H_ + +/* +* +* Copyright 2024 FIWARE Foundation e.V. +* +* This file is part of Orion-LD Context Broker. +* +* Orion-LD Context Broker is free software: you can redistribute it and/or +* modify it under the terms of the GNU Affero General Public License as +* published by the Free Software Foundation, either version 3 of the +* License, or (at your option) any later version. +* +* Orion-LD Context Broker is distributed in the hope that it will be useful, +* but WITHOUT ANY WARRANTY; without even the implied warranty of +* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +* General Public License for more details. +* +* You should have received a copy of the GNU Affero General Public License +* along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +* +* For those usages not covered by this license please contact with +* orionld at fiware dot org +* +* Author: Ken Zangelin +*/ +#include // types: uint64_t, ... + +extern "C" +{ +#include "kjson/KjNode.h" // KjNode +} + + +// ----------------------------------------------------------------------------- +// +// SubordinateSubscription - +// +typedef struct SubordinateSubscription +{ + char* subscriptionId; // The id of the external subscription + int runNo; + char* registrationId; // The id of the registration that matched, made the external subscription being created + struct SubordinateSubscription* next; +} SubordinateSubscription; + +#endif // SRC_LIB_ORIONLD_TYPES_SUBORDINATESUBSCRIPTION_H_ diff --git a/test/functionalTest/cases/0000_ngsild/ngsild_new_distributed_subscription_creation.test b/test/functionalTest/cases/0000_ngsild/ngsild_new_distributed_subscription_creation.test new file mode 100644 index 0000000000..8132dfa63d --- /dev/null +++ b/test/functionalTest/cases/0000_ngsild/ngsild_new_distributed_subscription_creation.test @@ -0,0 +1,351 @@ +# Copyright 2024 FIWARE Foundation e.V. +# +# This file is part of Orion-LD Context Broker. +# +# Orion-LD Context Broker is free software: you can redistribute it and/or +# modify it under the terms of the GNU Affero General Public License as +# published by the Free Software Foundation, either version 3 of the +# License, or (at your option) any later version. +# +# Orion-LD Context Broker is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Affero +# General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with Orion-LD Context Broker. If not, see http://www.gnu.org/licenses/. +# +# For those usages not covered by this license please contact with +# orionld at fiware dot org + +# VALGRIND_READY - to mark the test ready for valgrindTestSuite.sh + +--NAME-- +Distributed Subscriptions - creation and deletion + +--SHELL-INIT-- +dbInit CB +dbInit CP1 +orionldStart CB -experimental -wip distSubs +orionldStart CP1 -experimental +accumulatorStart --pretty-print 127.0.0.1 ${LISTENER_PORT} + +--SHELL-- + +# +# 01. Create a registration R1 on CB, on entity type T, with endpoint CP1 +# 02. Create a subscription S1 on CB, on entity type T - forwarded to CP1 +# 03. Query CP1 for its subscriptions - see the CB::S1 subordinate subscription +# 04. Create an entity matching the subscription in CP1 - get notification in CB and then acc +# 05. Dump the accumulator to see the notification +# 06. GET the subscription, see the subordinate subscription +# 07. See the subscription in mongo +# 08. GET the subscription from DB, see the subordinate subscription +# + +echo "01. Create a registration R1 on CB, on entity type T, with endpoint CP1" +echo "=======================================================================" +payload='{ + "id": "urn:R1", + "type": "ContextSourceRegistration", + "information": [ + { + "entities": [ + { + "type": "T" + } + ] + } + ], + "endpoint": "localhost:'$CP1_PORT'", + "operations": [ "createEntity", "retrieveEntity", "mergeEntity", "createSubscription" ] +}' +orionCurl --url /ngsi-ld/v1/csourceRegistrations --payload "$payload" +echo +echo + + +echo "02. Create a subscription S1 on CB, on entity type T - forwarded to CP1" +echo "=======================================================================" +payload='{ + "id": "urn:ngsi-ld:Subscription:S1", + "type": "Subscription", + "entities": [ + { + "type": "T" + } + ], + "notification": { + "endpoint": { + "uri": "http://localhost:'$LISTENER_PORT'/notify" + } + } +}' +orionCurl --url /ngsi-ld/v1/subscriptions --payload "$payload" +echo +echo + + +echo "03. Query CP1 for its subscriptions - see the CB::S1 subordinate subscription" +echo "=============================================================================" +orionCurl --url /ngsi-ld/v1/subscriptions --port $CP1_PORT +echo +echo + + +echo "04. Create an entity matching the subscription in CP1 - get notification in CB and then acc" +echo "===========================================================================================" +payload='{ + "id": "urn:E1", + "type": "T", + "P1": 1 +}' +orionCurl --url /ngsi-ld/v1/entities --payload "$payload" --port $CP1_PORT +echo +echo + + +echo "05. Dump the accumulator to see the notification" +echo "================================================" +accumulatorDump +echo +echo + + +echo "06. GET the subscription, see the subordinate subscription" +echo "==========================================================" +orionCurl --url /ngsi-ld/v1/subscriptions/urn:ngsi-ld:Subscription:S1 +echo +echo + + +echo "07. See the subscription in mongo" +echo "=================================" +mongoCmd2 ftest "db.csubs.findOne()" +echo +echo + + +echo "08. GET the subscription from DB, see the subordinate subscription" +echo "==================================================================" +orionCurl --url /ngsi-ld/v1/subscriptions/urn:ngsi-ld:Subscription:S1?options=fromDb +echo +echo + + + +--REGEXPECT-- +01. Create a registration R1 on CB, on entity type T, with endpoint CP1 +======================================================================= +HTTP/1.1 201 Created +Content-Length: 0 +Date: REGEX(.*) +Location: /ngsi-ld/v1/csourceRegistrations/urn:R1 + + + +02. Create a subscription S1 on CB, on entity type T - forwarded to CP1 +======================================================================= +HTTP/1.1 201 Created +Content-Length: 0 +Date: REGEX(.*) +Location: /ngsi-ld/v1/subscriptions/urn:ngsi-ld:Subscription:S1 + + + +03. Query CP1 for its subscriptions - see the CB::S1 subordinate subscription +============================================================================= +HTTP/1.1 200 OK +Content-Length: REGEX(.*) +Content-Type: application/json +Date: REGEX(.*) +Link: ; rel="h "type": "T" } ], - "expiresAt": "2024-06-01T10:00:00.123Z", + "expiresAt": "2034-06-01T10:00:00.123Z", "geoQ": { "coordinates": "5,5", "geometry": "Point", @@ -438,7 +438,7 @@ MongoDB server version: REGEX(.*) "ldContext" : "https://uri.etsi.org/ngsi-ld/v1/ngsi-ld-core-context-v1.6.jsonld", "ldQ" : "https://uri=etsi=org/ngsi-ld/default-context/a.value>12", "name" : "New name", - "expiration" : 1717236000.123 + "expiration" : 2032768800.123 } bye @@ -464,7 +464,7 @@ Link: ; rel="h "type": "T" } ], - "expiresAt": "2024-06-01T10:00:00.123Z", + "expiresAt": "2034-06-01T10:00:00.123Z", "geoQ": { "coordinates": [ 5,