Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor, to make dds notifications have TRoE and Notifications #1710

Merged
merged 1 commit into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion src/lib/orionld/dds/ddsConfigTopicToAttribute.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,11 +58,17 @@ char* ddsConfigTopicToAttribute(const char* topic, char** entityIdPP, char** ent

const char* path[4] = { "dds", "ngsild", "topics", NULL };
static KjNode* topicsP = kjNavigate(ddsConfigTree, path, NULL, NULL);
KjNode* topicP = kjLookup(topicsP, topic);

if (topicsP == NULL)
KT_RE(NULL, "the field dds/ngsild/topic not found in DDS config file (looking for topic '%s')", topic);

KjNode* topicP = kjLookup(topicsP, topic);

if (topicP == NULL)
KT_RE(NULL, "topic '%s' not found in DDS config file", topic);

KT_T(StDdsConfig, "Found topic '%s' in config file", topic);

KjNode* attributeP = kjLookup(topicP, "attribute");

if (attributeP == NULL)
Expand Down
15 changes: 12 additions & 3 deletions src/lib/orionld/dds/ddsInit.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ extern "C"
#include "kjson/KjNode.h" // KjNode
}

#include "logMsg/logMsg.h" // lmOut

#include "orionld/common/traceLevels.h" // kjTreeLog2
#include "orionld/common/orionldState.h" // configFile
#include "orionld/kjTree/kjNavigate.h" // kjNavigate
Expand Down Expand Up @@ -71,13 +73,20 @@ void ddsTypeNotification(const char* typeName, const char* topicName, const char
//
void ddsLog(const char* fileName, int lineNo, const char* funcName, int category, const char* msg)
{
int level = 0;
char severity = ddsCategoryToKlogSeverity(category, &level);

char* filename = (fileName != NULL)? (char*) fileName : (char*) "no-filename";
char* funcname = (funcName != NULL)? (char*) funcName : (char*) "no-funcname";

#if 1
int level = 0;
char severity = ddsCategoryToKlogSeverity(category, &level);

ktOut(filename, lineNo, funcname, severity, level, msg);
#else
int level = 'W';
char lmType = ddsCategoryToKlogSeverity(category, &level); // Think it will work also for LM

lmOut((char*) msg, lmType, filename, lineNo, funcname, level);
#endif
}


Expand Down
40 changes: 32 additions & 8 deletions src/lib/orionld/dds/ddsNotification.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,15 +34,37 @@ extern "C"
#include "orionld/common/orionldState.h" // orionldState, kjTreeLog
#include "orionld/common/traceLevels.h" // KT_T trace levels
#include "orionld/common/tenantList.h" // tenant0
#include "orionld/types/OrionLdRestService.h" // OrionLdRestService, OrionLdRestServiceVector, OrionldServiceRoutine
#include "orionld/service/orionldServiceInit.h" // orionldRestServiceV
#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute
#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease
#include "orionld/context/orionldContextItemExpand.h" // orionldContextItemExpand
#include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat
#include "orionld/serviceRoutines/orionldPutAttribute.h" // orionldPutAttribute
#include "orionld/dds/kjTreeLog.h" // kjTreeLog2
#include "orionld/dds/ddsConfigTopicToAttribute.h" // ddsConfigTopicToAttribute
#include "orionld/dds/ddsNotification.h" // Own interface



// -----------------------------------------------------------------------------
//
// serviceLookupByRoutine -
//
static OrionLdRestService* serviceLookupByRoutine(OrionldServiceRoutine serviceRoutine, Verb verb)
{
OrionLdRestServiceVector* serviceVectorP = &orionldRestServiceV[verb];

for (int ix = 0; ix < serviceVectorP->services; ix++)
{
if (serviceVectorP->serviceV[ix].serviceRoutine == serviceRoutine)
return &serviceVectorP->serviceV[ix];
}

return NULL;
}



// -----------------------------------------------------------------------------
//
// ddsNotification -
Expand Down Expand Up @@ -123,19 +145,21 @@ void ddsNotification(const char* typeName, const char* topicName, const char* js
orionldState.in.pathAttrExpanded = (char*) topicName;
orionldState.ddsSample = true;
orionldState.ddsPublishTime = publishTime;
orionldState.apiVersion = API_VERSION_NGSILD_V1;

kjChildAdd(attrNodeP, participantIdNodeP);

KjNode* publishedAt = kjInteger(orionldState.kjsonP, "publishedAt", publishTime);
kjChildAdd(attrNodeP, publishedAt);

//
// If the entity does not exist, it needs to be created
// Except of course, if it is registered and exists elsewhere
//
orionldState.serviceP = serviceLookupByRoutine(orionldPutAttribute, HTTP_PUT);

orionldPutAttribute();

// Do what's needed from the function requestCompleted
if (orionldState.alterations != NULL)
orionldAlterationsTreat(orionldState.alterations);
//
// Cleanup
//
void* con_cls;
extern void requestCompleted(void* cls, MHD_Connection* connection, void** con_cls, MHD_RequestTerminationCode toe);
requestCompleted(NULL, NULL, &con_cls, MHD_REQUEST_TERMINATED_COMPLETED_OK);
}
48 changes: 0 additions & 48 deletions src/lib/orionld/mhd/mhdConnectionTreat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1369,54 +1369,6 @@ MHD_Result mhdConnectionTreat(void)
//
mhdReply(orionldState.responseTree); // orionldState.responsePayload freed and NULLed by mhdReply()


//
// FIXME: Delay until requestCompleted. The call to orionldStateRelease as well
//
// Call TRoE Routine (if there is one) to save the TRoE data.
// Only if the Service Routine was successful, of course
// AND if there is any request tree to process
//
if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false))
{
if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL))
{
//
// Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL
//
if (orionldState.troeError == true)
LM_E(("Internal Error (something went wrong during TRoE processing)"));
else
{
//
// Special case - Entity creation with no attribute
// As both the entity id and the entity type have been removed from the payload body, the payload body is now empty.
// We still have to record the creation of the entity in the TRoE database!
//
// If the incoming request an empty array/object, then don't call the TRoE routine
// - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities)
//
bool invokeTroe = false;

if (orionldState.verb == HTTP_DELETE) invokeTroe = true;
if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true;
if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true;

if (invokeTroe == true)
{
PERFORMANCE(troeStart);
orionldState.serviceP->troeRoutine();
PERFORMANCE(troeEnd);
}
}
}
}

//
// Cleanup
//
orionldStateRelease();

PERFORMANCE(requestPartEnd);

return MHD_YES;
Expand Down
55 changes: 51 additions & 4 deletions src/lib/rest/rest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,21 +61,23 @@ extern "C"
#include "orionld/types/OrionldHeader.h" // orionldHeaderAdd
#include "orionld/types/OrionldMimeType.h" // mimeTypeFromString
#include "orionld/types/ApiVersion.h" // ApiVersion
#include "orionld/types/OrionLdRestService.h" // ORIONLD_URIPARAM_LIMIT, ...
#include "orionld/common/orionldState.h" // orionldState, multitenancy, ...
#include "orionld/common/performance.h" // REQUEST_PERFORMANCE
#include "orionld/common/orionldError.h" // orionldError
#include "orionld/common/orionldTenantGet.h" // orionldTenantGet
#include "orionld/common/tenantList.h" // tenant0
#include "orionld/common/stringStrip.h" // stringStrip
#include "orionld/http/verbGet.h" // verbGet
#include "orionld/mongoc/mongocConnectionRelease.h" // Own interface
#include "orionld/mongoc/mongocConnectionRelease.h" // mongocConnectionRelease
#include "orionld/notifications/orionldAlterationsTreat.h" // orionldAlterationsTreat
#include "orionld/mhd/mhdConnectionInit.h" // mhdConnectionInit
#include "orionld/mhd/mhdConnectionPayloadRead.h" // mhdConnectionPayloadRead
#include "orionld/mhd/mhdConnectionTreat.h" // mhdConnectionTreat
#include "orionld/distOp/distOpListRelease.h" // distOpListRelease
#include "orionld/service/orionldServiceNotFound.h" // orionldServiceNotFound
#include "orionld/payloadCheck/pCheckUri.h" // pCheckUri
#include "orionld/serviceRoutines/orionldPostEntities.h" // orionldPostEntities

#include "rest/HttpHeaders.h" // HTTP_* defines
#include "rest/Verb.h"
Expand Down Expand Up @@ -363,7 +365,7 @@ static MHD_Result httpHeaderGet(void* cbDataP, MHD_ValueKind kind, const char* k
*
* requestCompleted -
*/
static void requestCompleted
void requestCompleted
(
void* cls,
MHD_Connection* connection,
Expand All @@ -374,7 +376,6 @@ static void requestCompleted
PERFORMANCE(requestCompletedStart);

ConnectionInfo* ciP = (ConnectionInfo*) *con_cls;
const char* spath = ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP->servicePathV.size() > 0))? ciP->servicePathV[0].c_str() : "";
struct timespec reqEndTime;

//
Expand All @@ -387,6 +388,46 @@ static void requestCompleted
PERFORMANCE(notifEnd);
}

//
// Call TRoE Routine (if there is one) to save the TRoE data.
// Only if the Service Routine was successful, of course
// AND if there is any request tree to process
//
if ((orionldState.httpStatusCode >= 200) && (orionldState.httpStatusCode <= 300) && (orionldState.noDbUpdate == false))
{
if ((orionldState.serviceP != NULL) && (orionldState.serviceP->troeRoutine != NULL))
{
//
// Also, if something went wrong during processing, the SR can flag this by setting the requestTree to NULL
//
if (orionldState.troeError == true)
LM_E(("Internal Error (something went wrong during TRoE processing)"));
else
{
//
// Special case - Entity creation with no attribute
// As both the entity id and the entity type have been removed from the payload body, the payload body is now empty.
// We still have to record the creation of the entity in the TRoE database!
//
// If the incoming request an empty array/object, then don't call the TRoE routine
// - EXCEPT if it's a POST /entities request (service routine is orionldPostEntities)
//
bool invokeTroe = false;

if (orionldState.verb == HTTP_DELETE) invokeTroe = true;
if (orionldState.serviceP->serviceRoutine == orionldPostEntities) invokeTroe = true;
if ((orionldState.requestTree != NULL) && (orionldState.requestTree->value.firstChildP != NULL)) invokeTroe = true;

if (invokeTroe == true)
{
PERFORMANCE(troeStart);
orionldState.serviceP->troeRoutine();
PERFORMANCE(troeEnd);
}
}
}
}

if ((orionldState.in.payload != NULL) && (orionldState.in.payload != orionldState.preallocReqBuf))
{
free(orionldState.in.payload);
Expand Down Expand Up @@ -503,6 +544,7 @@ static void requestCompleted
//
if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (metricsMgr.isOn()))
{
const char* spath = (ciP->servicePathV.size() > 0)? ciP->servicePathV[0].c_str() : "";
metricsMgr.add(orionldState.tenantP->tenant, spath, METRIC_TRANS_IN, 1);

if (orionldState.httpStatusCode >= 400)
Expand Down Expand Up @@ -530,7 +572,7 @@ static void requestCompleted
extern void delayedReleaseExecute(void);
delayedReleaseExecute();

if (orionldState.apiVersion != API_VERSION_NGSILD_V1)
if ((orionldState.apiVersion != API_VERSION_NGSILD_V1) && (ciP != NULL))
delete(ciP);

kaBufferReset(&orionldState.kalloc, false); // 'false': it's reused, but in a different thread ...
Expand Down Expand Up @@ -582,6 +624,11 @@ static void requestCompleted
LM_T(LmtPerformance, ("TPUT: mongoConnect Accumulated: %f (%d calls)", performanceTimestamps.mongoConnectAccumulated, performanceTimestamps.getMongoConnectionCalls));
}
#endif

//
// Cleanup
//
orionldStateRelease();
}


Expand Down
Loading