Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/distributed create entity #1321

Open
wants to merge 19 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
e7f41d3
wip: removeattribute after csr call
thomasBousselin Jan 16, 2025
c695c58
wip: getFilteredAndRemoved on ExpandedEntity
thomasBousselin Jan 16, 2025
73fe8af
feat: functionning call for distributeCreateEntity
thomasBousselin Jan 17, 2025
f1225d5
feat: createEntity with error gestion
thomasBousselin Jan 20, 2025
8f3699d
fix: minor formating for pr
thomasBousselin Jan 20, 2025
c491bc8
fix: build
thomasBousselin Jan 20, 2025
487d0a9
fix: existing tests
thomasBousselin Jan 20, 2025
4ac2e4e
refactor: BatchEntityError.error as ProblemDetail
thomasBousselin Jan 21, 2025
aa19ccf
feat: first easy fixes
thomasBousselin Jan 21, 2025
a7699c4
feat: case when the entity is entirely merged + test for create entit…
thomasBousselin Jan 22, 2025
3d96a98
fix: verify entity before distribution + accept non Problem Detail error
thomasBousselin Jan 22, 2025
3d1a307
fix: case receive 207
thomasBousselin Jan 22, 2025
a1d7d14
refactor: rename InternalCsrFilter to RegistrationInfoFilter
thomasBousselin Jan 23, 2025
55b1ced
refactor: put entity filter from the csr info in CSR class
thomasBousselin Jan 23, 2025
90eabea
fix: test on getAssociatedAttributes
thomasBousselin Jan 23, 2025
65ea75e
feat: test for postDistributedInformation
thomasBousselin Jan 23, 2025
f317520
feat: test for distributeCreateEntityForContextSources
thomasBousselin Jan 23, 2025
8265759
feat: test for distributeCreateEntity
thomasBousselin Jan 24, 2025
9533ddf
fix: only call with content-type json-ld (as @context always is on th…
thomasBousselin Jan 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package com.egm.stellio.search.csr.model
import com.egm.stellio.shared.model.EntityTypeSelection
import java.net.URI

data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
open class CSRFilters( // we should use a combination of EntitiesQuery TemporalQuery (when we implement all operations)
val ids: Set<URI> = emptySet(),
val typeSelection: EntityTypeSelection? = null,
val idPattern: String? = null,
Expand All @@ -13,12 +13,24 @@ data class CSRFilters( // we should use a combination of EntitiesQuery TemporalQ
ids: Set<URI> = emptySet(),
typeSelection: EntityTypeSelection? = null,
idPattern: String? = null,
operations: List<Operation>
operations: List<Operation>?
) :
this(
ids = ids,
typeSelection = typeSelection,
idPattern = idPattern,
csf = operations.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
csf = operations?.joinToString("|") { "${ContextSourceRegistration::operations.name}==${it.key}" }
)
}

class InternalCSRFilters(
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
ids: Set<URI> = emptySet(),
val types: Set<String>? = null,
idPattern: String? = null,
operations: List<Operation>? = null
) : CSRFilters(
ids = ids,
typeSelection = types?.joinToString("|"),
idPattern = idPattern,
operations = operations
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadRequestDataException
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.toAPIException
import com.egm.stellio.shared.util.DataTypes
import com.egm.stellio.shared.util.JSON_LD_MEDIA_TYPE
Expand Down Expand Up @@ -154,6 +155,29 @@ data class ContextSourceRegistration(
if (!id.isAbsolute)
BadRequestDataException(invalidUriMessage("$id")).left()
else Unit.right()

fun getMatchingPropertiesAndRelationships(
csrFilters: InternalCSRFilters
): Pair<Set<ExpandedTerm>?, Set<ExpandedTerm>?> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

without reading the code, the returned structure is a mystery (do you even need to separate them?)

Copy link
Contributor Author

@thomasBousselin thomasBousselin Jan 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hum, i think so because the contextSource distinguish the "propertyNames" from the "relationshipNames" (5.2.10).

It could make sense for example if you want to have a CSR who have all the properties but only one relationship.

(I considered that if "propertyNames"(or "relationshipNames") where not provided that meant all the properties (or relationships) were distributed to the context source)

val matchingInformation = getMatchingInformation(csrFilters)
val properties = if (matchingInformation.any { it.propertyNames == null }) null
else matchingInformation.flatMap { it.propertyNames!! }.toSet()
val relationships = if (matchingInformation.any { it.relationshipNames == null }) null
else matchingInformation.flatMap { it.relationshipNames!! }.toSet()
return properties to relationships
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
}

private fun getMatchingInformation(csrFilters: InternalCSRFilters): List<RegistrationInfo> =
information.filter { info ->
info.entities?.any { entityInfo ->
entityInfo.id?.let { csrFilters.ids.contains(it) } ?: true &&
entityInfo.types.let { types -> types.any { csrFilters.types?.contains(it) ?: true } } &&
entityInfo.idPattern?.let { pattern ->
csrFilters.ids.any { pattern.toRegex().matches(it.toString()) }
} ?: true
} ?: true
}

companion object {

fun deserialize(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
package com.egm.stellio.search.csr.service

import arrow.core.Either
import arrow.core.getOrNone
import arrow.core.left
import arrow.core.raise.either
import arrow.core.right
import com.egm.stellio.search.csr.model.ContextSourceRegistration
import com.egm.stellio.search.csr.model.InternalCSRFilters
import com.egm.stellio.search.csr.model.Mode
import com.egm.stellio.shared.model.APIException
import com.egm.stellio.shared.model.BadGatewayException
import com.egm.stellio.shared.model.CompactedEntity
import com.egm.stellio.shared.model.ContextSourceException
import com.egm.stellio.shared.model.ExpandedEntity
import com.egm.stellio.shared.model.ExpandedTerm
import com.egm.stellio.shared.model.GatewayTimeoutException
import com.egm.stellio.shared.util.JsonLdUtils.compactEntity
import com.egm.stellio.shared.util.JsonLdUtils.logger
import com.egm.stellio.shared.util.toUri
import org.json.XMLTokener.entity
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import org.springframework.http.HttpHeaders
import org.springframework.http.HttpMethod
import org.springframework.stereotype.Service
import org.springframework.web.reactive.function.client.WebClient
import org.springframework.web.reactive.function.client.awaitBodyOrNull
import org.springframework.web.reactive.function.client.awaitExchange
import java.net.URI

typealias DistributionStatus = Either<Pair<APIException, ContextSourceRegistration?>, Unit>
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved

@Service
class DistributedEntityProvisionService(
private val contextSourceRegistrationService: ContextSourceRegistrationService,
) {
val createPath = "/ngsi-ld/v1/entities"

val logger: Logger = LoggerFactory.getLogger(javaClass)

suspend fun distributeCreateEntity(
httpHeaders: HttpHeaders,
entity: ExpandedEntity,
contexts: List<String>,
): Pair<List<DistributionStatus>, ExpandedEntity> {
val csrFilters =
InternalCSRFilters(
ids = setOf(entity.id.toUri()),
types = entity.types.toSet()
)

val matchingCSR = contextSourceRegistrationService.getContextSourceRegistrations(
filters = csrFilters,
).groupBy { it.mode }

val (exclusiveErrors, entityAfterExclusive) = distributeCreateEntityForContextSources(
matchingCSR[Mode.EXCLUSIVE], // could be only one
csrFilters,
entity,
httpHeaders,
contexts
)
val (redirectErrors, entityAfterRedirect) = distributeCreateEntityForContextSources(
matchingCSR[Mode.REDIRECT],
csrFilters,
entityAfterExclusive,
httpHeaders,
contexts
)
val (inclusiveError, _) = distributeCreateEntityForContextSources(
matchingCSR[Mode.INCLUSIVE],
csrFilters,
entityAfterRedirect,
httpHeaders,
contexts
)
return exclusiveErrors.toMutableList() +
redirectErrors.toMutableList() +
inclusiveError.toMutableList() to entityAfterRedirect
}

private suspend fun distributeCreateEntityForContextSources(
csrs: List<ContextSourceRegistration>?,
csrFilters: InternalCSRFilters,
entity: ExpandedEntity,
headers: HttpHeaders,
contexts: List<String>
): Pair<List<DistributionStatus>, ExpandedEntity> {
val allCreatedAttrs = mutableSetOf<ExpandedTerm>()
val responses: List<DistributionStatus> = csrs?.mapNotNull { csr ->
csr.getMatchingPropertiesAndRelationships(csrFilters)
.let { (properties, relationships) -> entity.getAssociatedAttributes(properties, relationships) }
.let { attrs ->
if (attrs.isEmpty()) {
null
} else {
postDistributedInformation(
headers,
compactEntity(entity.filterAttributes(attrs, emptySet()), contexts),
csr,
createPath
).fold(
{ (it to csr).left() },
{
allCreatedAttrs.addAll(attrs)
Unit.right()
}
)
}
}
} ?: emptyList()
return responses to entity.omitAttributes(allCreatedAttrs)
}

private suspend fun postDistributedInformation(
httpHeaders: HttpHeaders,
entity: CompactedEntity,
csr: ContextSourceRegistration,
path: String,
): Either<APIException, Unit> = either {
val uri = URI("${csr.endpoint}$path")

val request = WebClient.create()
.method(HttpMethod.POST)
.uri { uriBuilder ->
uriBuilder.scheme(uri.scheme)
.host(uri.host)
.port(uri.port)
.path(uri.path)
.build()
}.headers { newHeaders ->
httpHeaders.getOrNone(HttpHeaders.LINK).onSome { link -> newHeaders[HttpHeaders.LINK] = link }
}.bodyValue(entity)

return runCatching {
val (statusCode, response, _) = request.awaitExchange { response ->
Triple(response.statusCode(), response.awaitBodyOrNull<String>(), response.headers())
}

if (statusCode.is2xxSuccessful) {
logger.info("Successfully post data to CSR ${csr.id} at $uri")
Unit.right()
thomasBousselin marked this conversation as resolved.
Show resolved Hide resolved
} else if (response == null) {
val message = "No error message received from CSR ${csr.id} at $uri"
logger.warn(message)
GatewayTimeoutException(message).left()
} else {
logger.warn("Error creating an entity for CSR at $uri: $response")
ContextSourceException(response).left()
}
}.fold(
onSuccess = { it },
onFailure = { e ->
logger.warn("Error contacting CSR at $uri: ${e.message}")
logger.warn(e.stackTraceToString())
BadGatewayException(
"Error connecting to CSR at $uri: \"${e.cause}:${e.message}\""
).left()
}
)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,13 @@ data class BatchEntitySuccess(
val updateResult: UpdateResult? = null
)

/**
* BatchEntityError type as defined in 5.2.17
*/
data class BatchEntityError(
val entityId: URI,
val error: MutableList<String>
val error: MutableList<String>,
val registrationId: URI? = null
)

typealias JsonLdNgsiLdEntity = Pair<ExpandedEntity, NgsiLdEntity>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import arrow.core.right
import com.egm.stellio.search.csr.model.addWarnings
import com.egm.stellio.search.csr.service.ContextSourceUtils
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.csr.service.DistributedEntityProvisionService
import com.egm.stellio.search.csr.service.DistributionStatus
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
import com.egm.stellio.search.entity.service.LinkedEntityService
Expand Down Expand Up @@ -71,6 +73,7 @@ class EntityHandler(
private val entityService: EntityService,
private val entityQueryService: EntityQueryService,
private val distributedEntityConsumptionService: DistributedEntityConsumptionService,
private val distributedEntityProvisionService: DistributedEntityProvisionService,
private val linkedEntityService: LinkedEntityService
) : BaseHandler() {

Expand All @@ -87,14 +90,46 @@ class EntityHandler(
val sub = getSubFromSecurityContext()
val (body, contexts) =
extractPayloadAndContexts(requestBody, httpHeaders, applicationProperties.contexts.core).bind()

val expandedEntity = expandJsonLdEntity(body, contexts)
val ngsiLdEntity = expandedEntity.toNgsiLdEntity().bind()

entityService.createEntity(ngsiLdEntity, expandedEntity, sub.getOrNull()).bind()
val (distributionStatuses, remainingEntity) = distributedEntityProvisionService
.distributeCreateEntity(httpHeaders, expandedEntity, contexts)

val allStatuses = if (remainingEntity.members.isNotEmpty()) {
val localStatus: DistributionStatus = either {
val ngsiLdEntity = remainingEntity.toNgsiLdEntity().bind()
entityService.createEntity(ngsiLdEntity, remainingEntity, sub.getOrNull()).bind()
}.fold({ (it to null).left() }, { it.right() })
distributionStatuses + listOf(localStatus)
} else distributionStatuses

when {
allStatuses.size == 1 && allStatuses.first().isLeft() ->
allStatuses.first().leftOrNull()!!.let { (onlyException, _) -> onlyException.toErrorResponse() }

allStatuses.any { it.isLeft() } -> {
val result = BatchOperationResult(
errors = allStatuses.mapNotNull {
it.fold(
{ (exception, csr) ->
BatchEntityError(
expandedEntity.id.toUri(),
mutableListOf(exception.type.toString(), exception.message, exception.detail),
csr?.id
)
},
{ null }
)
}.toMutableList()
)
ResponseEntity.status(HttpStatus.MULTI_STATUS).body(result)
}

ResponseEntity.status(HttpStatus.CREATED)
.location(URI("/ngsi-ld/v1/entities/${ngsiLdEntity.id}"))
.build<String>()
else -> ResponseEntity.status(HttpStatus.CREATED)
.location(URI("/ngsi-ld/v1/entities/${expandedEntity.id}"))
.build<String>()
}
}.fold(
{ it.toErrorResponse() },
{ it }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.egm.stellio.search.authorization.web
import com.egm.stellio.search.authorization.service.AuthorizationService
import com.egm.stellio.search.common.config.SearchProperties
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.csr.service.DistributedEntityProvisionService
import com.egm.stellio.search.entity.service.EntityEventService
import com.egm.stellio.search.entity.service.EntityQueryService
import com.egm.stellio.search.entity.service.EntityService
Expand Down Expand Up @@ -44,6 +45,9 @@ class AnonymousUserHandlerTests {
@MockkBean
private lateinit var distributedEntityConsumptionService: DistributedEntityConsumptionService

@MockkBean
private lateinit var distributedEntityProvisionService: DistributedEntityProvisionService

@MockkBean(relaxed = true)
private lateinit var linkedEntityService: LinkedEntityService

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import com.egm.stellio.search.csr.CsrUtils.gimmeRawCSR
import com.egm.stellio.search.csr.model.MiscellaneousWarning
import com.egm.stellio.search.csr.model.NGSILDWarning
import com.egm.stellio.search.csr.service.DistributedEntityConsumptionService
import com.egm.stellio.search.csr.service.DistributedEntityProvisionService
import com.egm.stellio.search.csr.service.DistributionStatus
import com.egm.stellio.search.entity.model.EntitiesQueryFromGet
import com.egm.stellio.search.entity.model.NotUpdatedDetails
import com.egm.stellio.search.entity.model.UpdateResult
Expand Down Expand Up @@ -110,6 +112,9 @@ class EntityHandlerTests {
@MockkBean
private lateinit var distributedEntityConsumptionService: DistributedEntityConsumptionService

@MockkBean
private lateinit var distributedEntityProvisionService: DistributedEntityProvisionService

@MockkBean(relaxed = true)
private lateinit var linkedEntityService: LinkedEntityService

Expand All @@ -126,7 +131,7 @@ class EntityHandlerTests {
}

@BeforeEach
fun mockCSR() {
fun mockNoCSR() {
coEvery {
distributedEntityConsumptionService
.distributeRetrieveEntityOperation(any(), any(), any())
Expand All @@ -135,6 +140,11 @@ class EntityHandlerTests {
distributedEntityConsumptionService
.distributeQueryEntitiesOperation(any(), any(), any())
} returns Triple(emptyList(), emptyList(), emptyList())
val capturedExpandedEntity = slot<ExpandedEntity>()
coEvery {
distributedEntityProvisionService
.distributeCreateEntity(any(), capture(capturedExpandedEntity), any())
} answers { emptyList<DistributionStatus>() to capturedExpandedEntity.captured }
}

private val beehiveId = "urn:ngsi-ld:BeeHive:TESTC".toUri()
Expand Down
Loading
Loading