diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/SchemaController.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/SchemaController.scala index 9438a3d01..e12b4c10e 100644 --- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/SchemaController.scala +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/SchemaController.scala @@ -146,9 +146,9 @@ class SchemaController @Autowired()( try { for { // the parsing of sparkStruct can fail, therefore we try to save it first before saving the attachment - update <- schemaService.schemaUpload(username, menasAttachment.refName, menasAttachment.refVersion - 1, sparkStruct) + (update, validation) <- schemaService.schemaUpload(username, menasAttachment.refName, menasAttachment.refVersion - 1, sparkStruct) _ <- attachmentService.uploadAttachment(menasAttachment) - } yield update.map(_._1) // v2 disregarding the validation + } yield Some(update) // v2 disregarding the validation; conforming to V2 Option[Entity] signature } catch { case e: SchemaParsingException => throw e.copy(schemaType = schemaType) //adding schema type } diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/MappingTableControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/MappingTableControllerV3.scala new file mode 100644 index 000000000..3e7a4f030 --- /dev/null +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/MappingTableControllerV3.scala @@ -0,0 +1,91 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.controllers.v3 + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.{HttpStatus, ResponseEntity} +import org.springframework.security.access.prepost.PreAuthorize +import org.springframework.security.core.annotation.AuthenticationPrincipal +import org.springframework.security.core.userdetails.UserDetails +import org.springframework.web.bind.annotation._ +import za.co.absa.enceladus.model._ +import za.co.absa.enceladus.rest_api.services.v3.MappingTableServiceV3 + +import java.util.concurrent.CompletableFuture +import javax.servlet.http.HttpServletRequest +import scala.concurrent.Future + +@RestController +@RequestMapping(Array("/api-v3/mapping-tables")) +class MappingTableControllerV3 @Autowired()(mappingTableService: MappingTableServiceV3) + extends VersionedModelControllerV3(mappingTableService) { + + import za.co.absa.enceladus.rest_api.utils.implicits._ + + import scala.concurrent.ExecutionContext.Implicits.global + + @GetMapping(path = Array("/{name}/{version}/defaults")) + @ResponseStatus(HttpStatus.OK) + def getDefaults(@PathVariable name: String, + @PathVariable version: String): CompletableFuture[Seq[DefaultValue]] = { + + forVersionExpression(name, version)(mappingTableService.getVersion).map { // "latest" version is accepted + case Some(entity) => entity.defaultMappingValue + case None => throw notFound() + } + } + + @PutMapping(path = Array("/{name}/{version}/defaults")) + @ResponseStatus(HttpStatus.CREATED) + def updateDefaults(@AuthenticationPrincipal user: UserDetails, + @PathVariable name: String, + @PathVariable version: String, + @RequestBody newDefaults: Array[DefaultValue], + request: HttpServletRequest + ): CompletableFuture[ResponseEntity[Validation]] = { + withMappingTableToResponse(name, version, user, request) { existingMt => + mappingTableService.updateDefaults(user.getUsername, name, existingMt.version, newDefaults.toList) + } + } + + @PostMapping(path = Array("/{name}/{version}/defaults")) + @ResponseStatus(HttpStatus.CREATED) + def addDefault(@AuthenticationPrincipal user: UserDetails, + @PathVariable name: String, + @PathVariable version: String, + @RequestBody newDefault: DefaultValue, + request: HttpServletRequest + ): CompletableFuture[ResponseEntity[Validation]] = { + withMappingTableToResponse(name, version, user, request) { existingMt => + mappingTableService.addDefault(user.getUsername, name, existingMt.version, newDefault) + } + } + + private def withMappingTableToResponse(name: String, version: String, user: UserDetails, request: HttpServletRequest, + stripLastSegments: Int = 3, suffix: String = s"/defaults") + (updateExistingMtFn: MappingTable => Future[Option[(MappingTable, Validation)]]): + Future[ResponseEntity[Validation]] = { + for { + existingMtOpt <- forVersionExpression(name, version)(mappingTableService.getVersion) + existingMt = existingMtOpt.getOrElse(throw notFound()) + updatedMtAndValidationOpt <- updateExistingMtFn(existingMt) + (updatedMt, validation) = updatedMtAndValidationOpt.getOrElse(throw notFound()) + response = createdWithNameVersionLocationBuilder(name, updatedMt.version, request, + stripLastSegments, suffix).body(validation) // for .../defaults: stripping /{name}/{version}/defaults + } yield response + } + +} diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/PropertyDefinitionControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/PropertyDefinitionControllerV3.scala new file mode 100644 index 000000000..e80c86778 --- /dev/null +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/PropertyDefinitionControllerV3.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.controllers.v3 + +import com.mongodb.client.result.UpdateResult +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.{HttpStatus, ResponseEntity} +import org.springframework.security.access.prepost.PreAuthorize +import org.springframework.security.core.annotation.AuthenticationPrincipal +import org.springframework.security.core.userdetails.UserDetails +import org.springframework.web.bind.annotation._ +import za.co.absa.enceladus.model.properties.PropertyDefinition +import za.co.absa.enceladus.model.{ExportableObject, Validation} +import za.co.absa.enceladus.rest_api.services.PropertyDefinitionService + +import java.util.Optional +import java.util.concurrent.CompletableFuture +import javax.servlet.http.HttpServletRequest + +@RestController +@RequestMapping(path = Array("/api-v3/property-definitions/datasets"), produces = Array("application/json")) +class PropertyDefinitionControllerV3 @Autowired()(propertyDefService: PropertyDefinitionService) + extends VersionedModelControllerV3(propertyDefService) { + + // super-class implementation is sufficient, but the following changing endpoints need admin-auth + + @PostMapping(Array("/{name}/import")) + @ResponseStatus(HttpStatus.CREATED) + @PreAuthorize("@authConstants.hasAdminRole(authentication)") + override def importSingleEntity(@AuthenticationPrincipal principal: UserDetails, + @PathVariable name: String, + @RequestBody importObject: ExportableObject[PropertyDefinition], + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + super.importSingleEntity(principal, name, importObject, request) + } + + @PostMapping(Array("")) + @ResponseStatus(HttpStatus.CREATED) + @PreAuthorize("@authConstants.hasAdminRole(authentication)") + override def create(@AuthenticationPrincipal principal: UserDetails, + @RequestBody item: PropertyDefinition, + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + + super.create(principal, item, request) + } + + @PutMapping(Array("/{name}/{version}")) + @ResponseStatus(HttpStatus.CREATED) + @PreAuthorize("@authConstants.hasAdminRole(authentication)") + override def edit(@AuthenticationPrincipal user: UserDetails, + @PathVariable name: String, + @PathVariable version: Int, + @RequestBody item: PropertyDefinition, + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + + super.edit(user, name, version, item, request) + } + + @DeleteMapping(Array("/{name}", "/{name}/{version}")) + @ResponseStatus(HttpStatus.OK) + @PreAuthorize("@authConstants.hasAdminRole(authentication)") + override def disable(@PathVariable name: String, + @PathVariable version: Optional[String]): CompletableFuture[UpdateResult] = { + + super.disable(name, version) + } + + // todo add "enable" with preAuth check when available, too + +} + diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/SchemaControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/SchemaControllerV3.scala new file mode 100644 index 000000000..ed7c3e8df --- /dev/null +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/SchemaControllerV3.scala @@ -0,0 +1,199 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.controllers.v3 + +import org.apache.spark.sql.types.StructType +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.http.{HttpStatus, ResponseEntity} +import org.springframework.security.access.prepost.PreAuthorize +import org.springframework.security.core.annotation.AuthenticationPrincipal +import org.springframework.security.core.userdetails.UserDetails +import org.springframework.web.bind.annotation._ +import org.springframework.web.multipart.MultipartFile +import za.co.absa.enceladus.model.{Schema, Validation} +import za.co.absa.enceladus.model.menas._ +import za.co.absa.enceladus.rest_api.controllers.SchemaController +import za.co.absa.enceladus.rest_api.exceptions.ValidationException +import za.co.absa.enceladus.rest_api.models.rest.exceptions.SchemaParsingException +import za.co.absa.enceladus.rest_api.repositories.RefCollection +import za.co.absa.enceladus.rest_api.services.v3.SchemaServiceV3 +import za.co.absa.enceladus.rest_api.services.{AttachmentService, SchemaRegistryService} +import za.co.absa.enceladus.rest_api.utils.SchemaType +import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor +import za.co.absa.enceladus.rest_api.utils.parsers.SchemaParser + +import java.util.concurrent.CompletableFuture +import javax.servlet.http.{HttpServletRequest, HttpServletResponse} +import scala.concurrent.Future +import scala.util.{Failure, Success, Try} + + +@RestController +@RequestMapping(Array("/api-v3/schemas")) +class SchemaControllerV3 @Autowired()( + schemaService: SchemaServiceV3, + attachmentService: AttachmentService, + sparkMenasConvertor: SparkMenasSchemaConvertor, + schemaRegistryService: SchemaRegistryService + ) + extends VersionedModelControllerV3(schemaService) { + + import za.co.absa.enceladus.rest_api.utils.implicits._ + + import scala.concurrent.ExecutionContext.Implicits.global + + @GetMapping(path = Array("/{name}/{version}/json"), produces = Array("application/json")) + @ResponseStatus(HttpStatus.OK) + def getJson(@PathVariable name: String, + @PathVariable version: String, + @RequestParam(defaultValue = "false") pretty: Boolean): CompletableFuture[String] = { + forVersionExpression(name, version)(schemaService.getVersion).map { + case Some(schema) => + if (schema.fields.isEmpty) throw ValidationException( + Validation.empty.withError("schema-fields", s"Schema $name v$version exists, but has no fields!") + ) + val sparkStruct = StructType(sparkMenasConvertor.convertMenasToSparkFields(schema.fields)) + if (pretty) sparkStruct.prettyJson else sparkStruct.json + case None => + throw notFound() + } + + } + + @GetMapping(path = Array("/{name}/{version}/original")) + @ResponseStatus(HttpStatus.OK) + def exportOriginalSchemaFile(@AuthenticationPrincipal principal: UserDetails, + @PathVariable name: String, + @PathVariable version: String, + response: HttpServletResponse): CompletableFuture[Array[Byte]] = { + forVersionExpression(name, version)(attachmentService.getSchemaByNameAndVersion).map { attachment => + response.addHeader("mime-type", attachment.fileMIMEType) + attachment.fileContent + } + } + + @PostMapping(Array("/{name}/{version}/from-file")) + @ResponseStatus(HttpStatus.CREATED) + def handleFileUpload(@AuthenticationPrincipal principal: UserDetails, + @PathVariable name: String, + @PathVariable version: Int, + @RequestParam file: MultipartFile, + @RequestParam format: String, + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + + val fileContent = new String(file.getBytes) + + val schemaType = SchemaType.fromSchemaName(format) + val sparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(fileContent) + + // for avro schema type, always force the same mime-type to be persisted + val mime = if (schemaType == SchemaType.Avro) { + SchemaController.avscContentType + } else { + file.getContentType + } + + val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase, + refName = name, + refVersion = version + 1, // version is the current one, refVersion is the to-be-created one + attachmentType = MenasAttachment.ORIGINAL_SCHEMA_ATTACHMENT, + filename = file.getOriginalFilename, + fileContent = file.getBytes, + fileMIMEType = mime) + + uploadSchemaToMenas(principal.getUsername, menasFile, sparkStruct, schemaType).map { case (updatedSchema, validation) => + createdWithNameVersionLocationBuilder(name, updatedSchema.version, request, + stripLastSegments = 3).body(validation) // stripping: /{name}/{version}/from-file + } + } + + @PostMapping(Array("/{name}/{version}/from-remote-uri")) + @ResponseStatus(HttpStatus.CREATED) + def handleRemoteFile(@AuthenticationPrincipal principal: UserDetails, + @PathVariable name: String, + @PathVariable version: Int, + @RequestParam remoteUrl: String, + @RequestParam format: String, + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + + val schemaType = SchemaType.fromSchemaName(format) + val schemaResponse = schemaRegistryService.loadSchemaByUrl(remoteUrl) + val sparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(schemaResponse.fileContent) + + val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase, + refName = name, + refVersion = version + 1, // version is the current one, refVersion is the to-be-created one + attachmentType = MenasAttachment.ORIGINAL_SCHEMA_ATTACHMENT, + filename = schemaResponse.url.getFile, + fileContent = schemaResponse.fileContent.getBytes, + fileMIMEType = schemaResponse.mimeType) + + uploadSchemaToMenas(principal.getUsername, menasFile, sparkStruct, schemaType).map { case (updatedSchema, validation) => + createdWithNameVersionLocationBuilder(name, updatedSchema.version, request, + stripLastSegments = 3).body(validation) // stripping: /{name}/{version}/from-remote-uri + } + } + + @PostMapping(Array("/{name}/{version}/from-registry")) + @ResponseStatus(HttpStatus.CREATED) + def handleSubject(@AuthenticationPrincipal principal: UserDetails, + @PathVariable name: String, + @PathVariable version: Int, + @RequestParam subject: String, + @RequestParam format: String, + request: HttpServletRequest): CompletableFuture[ResponseEntity[Validation]] = { + + val schemaType = SchemaType.fromSchemaName(format) + val valueSchemaResponse = Try { + schemaRegistryService.loadSchemaBySubjectName(s"$subject") + } match { + case Success(schemaResponse) => schemaResponse + case Failure(_) => schemaRegistryService.loadSchemaBySubjectName(s"$subject-value") // fallback to -value + } + + val valueSparkStruct = SchemaParser.getFactory(sparkMenasConvertor).getParser(schemaType).parse(valueSchemaResponse.fileContent) + + val menasFile = MenasAttachment(refCollection = RefCollection.SCHEMA.name().toLowerCase, + refName = name, + refVersion = version + 1, // version is the current one, refVersion is the to-be-created one + attachmentType = MenasAttachment.ORIGINAL_SCHEMA_ATTACHMENT, + filename = valueSchemaResponse.url.getFile, // only the value file gets saved as an attachment + fileContent = valueSchemaResponse.fileContent.getBytes, + fileMIMEType = valueSchemaResponse.mimeType) + + uploadSchemaToMenas(principal.getUsername, menasFile, valueSparkStruct, schemaType).map { case (updatedSchema, validation) => + createdWithNameVersionLocationBuilder(name, updatedSchema.version, request, + stripLastSegments = 3).body(validation) // stripping: /{name}/{version}/from-registry + } + } + + private def uploadSchemaToMenas(username: String, menasAttachment: MenasAttachment, sparkStruct: StructType, + schemaType: SchemaType.Value): Future[(Schema, Validation)] = { + try { + for { + // the parsing of sparkStruct can fail, therefore we try to save it first before saving the attachment + (updated, validation) <- schemaService.schemaUpload(username, menasAttachment.refName, menasAttachment.refVersion - 1, sparkStruct) + _ <- attachmentService.uploadAttachment(menasAttachment) + } yield (updated, validation) + } catch { + case e: SchemaParsingException => throw e.copy(schemaType = schemaType) // adding schema type + } + } + + +} + + diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala index 41aea585e..578a4a4f1 100644 --- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/controllers/v3/VersionedModelControllerV3.scala @@ -29,7 +29,6 @@ import za.co.absa.enceladus.rest_api.controllers.v3.VersionedModelControllerV3.L import za.co.absa.enceladus.rest_api.services.VersionedModelService import java.net.URI -import java.util import java.util.Optional import java.util.concurrent.CompletableFuture import javax.servlet.http.HttpServletRequest @@ -137,7 +136,7 @@ abstract class VersionedModelControllerV3[C <: VersionedModel with Product } @PutMapping(Array("/{name}/{version}")) - @ResponseStatus(HttpStatus.NO_CONTENT) + @ResponseStatus(HttpStatus.CREATED) def edit(@AuthenticationPrincipal user: UserDetails, @PathVariable name: String, @PathVariable version: Int, diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/SchemaService.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/SchemaService.scala index 684f8907b..a4f5eb763 100644 --- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/SchemaService.scala +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/SchemaService.scala @@ -15,14 +15,14 @@ package za.co.absa.enceladus.rest_api.services +import org.apache.spark.sql.types.StructType import org.springframework.beans.factory.annotation.Autowired import org.springframework.stereotype.Service import za.co.absa.enceladus.model.{Schema, UsedIn, Validation} import za.co.absa.enceladus.rest_api.repositories.{DatasetMongoRepository, MappingTableMongoRepository, SchemaMongoRepository} +import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor import scala.concurrent.Future -import org.apache.spark.sql.types.StructType -import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor @Service class SchemaService @Autowired() (schemaMongoRepository: SchemaMongoRepository, @@ -39,10 +39,10 @@ class SchemaService @Autowired() (schemaMongoRepository: SchemaMongoRepository, } yield UsedIn(Some(usedInD), Some(usedInM)) } - def schemaUpload(username: String, schemaName: String, schemaVersion: Int, fields: StructType): Future[Option[(Schema, Validation)]] = { + def schemaUpload(username: String, schemaName: String, schemaVersion: Int, fields: StructType): Future[(Schema, Validation)] = { super.update(username, schemaName, schemaVersion)({ oldSchema => oldSchema.copy(fields = sparkMenasConvertor.convertSparkToMenasFields(fields.fields).toList) - }) + }).map(_.getOrElse(throw new IllegalArgumentException("Failed to derive new schema from file!"))) } override def recreate(username: String, schema: Schema): Future[Option[(Schema, Validation)]] = { @@ -56,15 +56,29 @@ class SchemaService @Autowired() (schemaMongoRepository: SchemaMongoRepository, } yield update } - override def update(username: String, schema: Schema): Future[Option[(Schema, Validation)]] = { - super.update(username, schema.name, schema.version) { latest => - latest.setDescription(schema.description).asInstanceOf[Schema] + /** + * This method applies only certain fields from `updateSchema` to the subject of this method. Here, for V2 API, + * only description field is applied, all other fields are disregarded - internally called at create/update + * @param current existing latest schema prior to changes + * @param update schema with create/update fields information + * @return + */ + protected def updateFields(current: Schema, update: Schema) : Schema = { + current.setDescription(update.description).asInstanceOf[Schema] + } + + /** final - override `updateFields` if needed */ + final override def update(username: String, update: Schema): Future[Option[(Schema, Validation)]] = { + super.update(username, update.name, update.version) { latest => + updateFields(latest, update) } } - override def create(newSchema: Schema, username: String): Future[Option[(Schema, Validation)]] = { - val schema = Schema(name = newSchema.name, - description = newSchema.description) + /** final - override `updateFields` if needed */ + final override def create(newSchema: Schema, username: String): Future[Option[(Schema, Validation)]] = { + val initSchema = Schema(name = newSchema.name, description = newSchema.description) + + val schema = updateFields(initSchema, newSchema) super.create(schema, username) } diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/VersionedModelService.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/VersionedModelService.scala index eb6ef28cc..a1b1c5236 100644 --- a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/VersionedModelService.scala +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/VersionedModelService.scala @@ -315,6 +315,7 @@ abstract class VersionedModelService[C <: VersionedModel with Product with Audit validateName(item.name) } + /** does not include za.co.absa.enceladus.rest_api.services.VersionedModelService#validate(java.lang.Object)*/ def validateForCreation(item: C): Future[Validation] = { isUniqueName(item.name).map { isUnique => if (isUnique) { diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/MappingTableServiceV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/MappingTableServiceV3.scala new file mode 100644 index 000000000..a97e9e11a --- /dev/null +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/MappingTableServiceV3.scala @@ -0,0 +1,41 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.services.v3 + +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service +import za.co.absa.enceladus.model._ +import za.co.absa.enceladus.rest_api.repositories.{DatasetMongoRepository, MappingTableMongoRepository} +import za.co.absa.enceladus.rest_api.services.{MappingTableService, SchemaService, VersionedModelService} + +import scala.concurrent.Future + +@Service +class MappingTableServiceV3 @Autowired()(mappingTableMongoRepository: MappingTableMongoRepository, + datasetMongoRepository: DatasetMongoRepository, + val schemaService: SchemaService) + extends MappingTableService(mappingTableMongoRepository, datasetMongoRepository) with HavingSchemaService { + + import scala.concurrent.ExecutionContext.Implicits.global + + override def validate(item: MappingTable): Future[Validation] = { + for { + originalValidation <- super.validate(item) + mtSchemaValidation <- validateSchemaExists(item.schemaName, item.schemaVersion) + } yield originalValidation.merge(mtSchemaValidation) + + } +} diff --git a/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/SchemaServiceV3.scala b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/SchemaServiceV3.scala new file mode 100644 index 000000000..1622a7b6b --- /dev/null +++ b/rest-api/src/main/scala/za/co/absa/enceladus/rest_api/services/v3/SchemaServiceV3.scala @@ -0,0 +1,50 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.services.v3 + +import org.apache.spark.sql.types.StructType +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.stereotype.Service +import za.co.absa.enceladus.model.{Schema, UsedIn, Validation} +import za.co.absa.enceladus.rest_api.repositories.{DatasetMongoRepository, MappingTableMongoRepository, SchemaMongoRepository} +import za.co.absa.enceladus.rest_api.services.{SchemaService, VersionedModelService} +import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor +import scala.concurrent.ExecutionContext.Implicits.global + + +import scala.concurrent.Future + +@Service +class SchemaServiceV3 @Autowired()(schemaMongoRepository: SchemaMongoRepository, + mappingTableMongoRepository: MappingTableMongoRepository, + datasetMongoRepository: DatasetMongoRepository, + sparkMenasConvertor: SparkMenasSchemaConvertor) + extends SchemaService(schemaMongoRepository, mappingTableMongoRepository, datasetMongoRepository, sparkMenasConvertor) { + + override def validate(item: Schema): Future[Validation] = { + if (item.fields.isEmpty) { + // V3 disallows empty schema fields - V2 allowed it at first that to get updated by an attachment upload/remote-load + Future.successful(Validation.empty.withError("schema-fields","No fields found! There must be fields defined for actual usage.")) + } else { + Future.successful(Validation.empty) + } + } + + // V3 applies fields on create/update from the payload, too (V2 did not allow fields payload here, only via 'upload' + override protected def updateFields(current: Schema, update: Schema) : Schema = { + current.setDescription(update.description).asInstanceOf[Schema].copy(fields = update.fields) + } +} diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/BaseRestApiTest.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/BaseRestApiTest.scala index ab2b8aa1e..1670e6ce7 100644 --- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/BaseRestApiTest.scala +++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/BaseRestApiTest.scala @@ -117,6 +117,15 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR upload(urlPath, headers, fileParamName, fileName, parameters) } + def sendPostUploadFileByAdmin[T](urlPath: String, + fileName: String, + parameters: Map[String, Any], + fileParamName: String = "file", + headers: HttpHeaders = new HttpHeaders()) + (implicit ct: ClassTag[T]): ResponseEntity[T] = { + upload(urlPath, headers, fileParamName, fileName, parameters, byAdmin = true) + } + def sendPostRemoteFile[T](urlPath: String, parameters: Map[String, Any], headers: HttpHeaders = new HttpHeaders()) @@ -126,6 +135,15 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR fromRemote(urlPath, headers, parameters) } + def sendPostRemoteFileByAdmin[T](urlPath: String, + parameters: Map[String, Any], + headers: HttpHeaders = new HttpHeaders()) + (implicit ct: ClassTag[T]): ResponseEntity[T] = { + require(parameters.keySet.contains("remoteUrl"), s"parameters map must contain the 'remoteUrl' entry, but only $parameters was found") + + fromRemote(urlPath, headers, parameters, byAdmin = true) + } + def sendPostSubject[T](urlPath: String, parameters: Map[String, Any], headers: HttpHeaders = new HttpHeaders()) @@ -136,16 +154,31 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR fromRemote(urlPath, headers, parameters) } + def sendPostSubjectByAdmin[T](urlPath: String, + parameters: Map[String, Any], + headers: HttpHeaders = new HttpHeaders()) + (implicit ct: ClassTag[T]): ResponseEntity[T] = { + require(parameters.keySet.contains("subject"), + s"parameters map must contain the 'subject', but only $parameters was found") + + fromRemote(urlPath, headers, parameters, byAdmin = true) + } + def sendPostAsync[B, T](urlPath: String, headers: HttpHeaders = new HttpHeaders(), bodyOpt: Option[B] = None)(implicit ct: ClassTag[T]): Future[ResponseEntity[T]] = { sendAsync(HttpMethod.POST, urlPath, headers, bodyOpt) } def sendPut[B, T](urlPath: String, headers: HttpHeaders = new HttpHeaders(), - bodyOpt: Option[B] = None)(implicit ct: ClassTag[T]): ResponseEntity[T] = { + bodyOpt: Option[B] = None)(implicit ct: ClassTag[T]): ResponseEntity[T] = { send(HttpMethod.PUT, urlPath, headers, bodyOpt) } + def sendPutByAdmin[B, T](urlPath: String, headers: HttpHeaders = new HttpHeaders(), + bodyOpt: Option[B] = None)(implicit ct: ClassTag[T]): ResponseEntity[T] = { + sendByAdmin(HttpMethod.PUT, urlPath, headers, bodyOpt) + } + def sendPutAsync[B, T](urlPath: String, headers: HttpHeaders = new HttpHeaders(), bodyOpt: Option[B] = None)(implicit ct: ClassTag[T]): Future[ResponseEntity[T]] = { sendAsync(HttpMethod.PUT, urlPath, headers, bodyOpt) @@ -199,7 +232,8 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR headers: HttpHeaders = HttpHeaders.EMPTY, fileParamName: String, fileName: String, - additionalParams: Map[String, Any]) + additionalParams: Map[String, Any], + byAdmin: Boolean = false) (implicit ct: ClassTag[T]): ResponseEntity[T] = { val parameters = new LinkedMultiValueMap[String, Any] @@ -209,7 +243,11 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR } val url = s"$baseUrl/$urlPath" - headers.addAll(authHeaders) + if (byAdmin) { + headers.addAll(authHeadersAdmin) + } else { + headers.addAll(authHeaders) + } headers.setContentType(MediaType.MULTIPART_FORM_DATA) val clazz = ct.runtimeClass.asInstanceOf[Class[T]] @@ -220,7 +258,8 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR def fromRemote[T](urlPath: String, headers: HttpHeaders = HttpHeaders.EMPTY, - params: Map[String, Any]) + params: Map[String, Any], + byAdmin: Boolean = false) (implicit ct: ClassTag[T]): ResponseEntity[T] = { val parameters: MultiValueMap[String, String] = new LinkedMultiValueMap() @@ -229,7 +268,11 @@ abstract class BaseRestApiTest(loginPath: String, apiPath: String) extends BaseR } val url = s"$baseUrl/$urlPath" - headers.addAll(authHeaders) + if (byAdmin) { + headers.addAll(authHeadersAdmin) + } else { + headers.addAll(authHeaders) + } headers.setContentType(MediaType.APPLICATION_FORM_URLENCODED) val clazz = ct.runtimeClass.asInstanceOf[Class[T]] diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala index e0f523000..5105fff5a 100644 --- a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala +++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/DatasetControllerV3IntegrationSuite.scala @@ -399,18 +399,15 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA response.getBody should include("name mismatch: 'datasetABC' != 'datasetXYZ'") } } - } - - "return 400" when { - "imported Dataset fails validation" in { - schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) - propertyDefinitionFixture.add(PropertyDefinitionFactory.getDummyPropertyDefinition("key1")) // key2 propdef is missing + "imported Dataset fails validation" in { + schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) + propertyDefinitionFixture.add(PropertyDefinitionFactory.getDummyPropertyDefinition("key1")) // key2 propdef is missing - val response = sendPost[String, Validation](s"$apiUrl/datasetXYZ/import", bodyOpt = Some(importableDs)) + val response = sendPost[String, Validation](s"$apiUrl/datasetXYZ/import", bodyOpt = Some(importableDs)) - response.getStatusCode shouldBe HttpStatus.BAD_REQUEST - response.getBody shouldBe Validation.empty.withError("key2", "There is no property definition for key 'key2'.") - } + response.getStatusCode shouldBe HttpStatus.BAD_REQUEST + response.getBody shouldBe Validation.empty.withError("key2", "There is no property definition for key 'key2'.") + } } "return 201" when { @@ -613,7 +610,6 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA } } - s"PUT $apiUrl/{name}/{version}/properties" should { "return 404" when { "when the name+version does not exist" in { @@ -700,7 +696,6 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA val headers1 = response1.getHeaders assert(headers1.getFirst("Location").endsWith("/api-v3/datasets/datasetA/2/properties")) - val response2 = sendGet[Map[String, String]](s"$apiUrl/datasetA/2/properties") assertOk(response2) val responseBody = response2.getBody @@ -851,9 +846,6 @@ class DatasetControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeA response.getBody should include("Rule with order 0 cannot be added, another rule with this order already exists.") } - } - - "return 400" when { "when rule is not valid (missing MT)" in { schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) val datasetV1 = DatasetFactory.getDummyDataset(name = "datasetA") diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala new file mode 100644 index 000000000..7dece37b2 --- /dev/null +++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/MappingTableControllerV3IntegrationSuite.scala @@ -0,0 +1,233 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.integration.controllers.v3 + +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.HttpStatus +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.junit4.SpringRunner +import za.co.absa.enceladus.model.properties.PropertyDefinition +import za.co.absa.enceladus.model.{DefaultValue, MappingTable, Validation} +import za.co.absa.enceladus.model.test.factories.{MappingTableFactory, PropertyDefinitionFactory, SchemaFactory} +import za.co.absa.enceladus.rest_api.integration.controllers.BaseRestApiTestV3 +import za.co.absa.enceladus.rest_api.integration.fixtures._ +import za.co.absa.enceladus.rest_api.integration.controllers.toExpected + +@RunWith(classOf[SpringRunner]) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles(Array("withEmbeddedMongo")) +class MappingTableControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAndAfterAll with Matchers { + + @Autowired + private val mappingTableFixture: MappingTableFixtureService = null + + @Autowired + private val schemaFixture: SchemaFixtureService = null + + private val apiUrl = "/mapping-tables" + + // fixtures are cleared after each test + override def fixtures: List[FixtureService[_]] = List(mappingTableFixture, schemaFixture) + + s"POST $apiUrl" should { + "return 400" when { + "referenced schema does not exits" in { + val mtA = MappingTableFactory.getDummyMappingTable("mtA", schemaName = "mtSchemaA", schemaVersion = 1) + + val response = sendPost[MappingTable, Validation](apiUrl, bodyOpt = Some(mtA)) + + assertBadRequest(response) + val responseBody = response.getBody + responseBody shouldBe Validation(Map("schema" -> List("Schema mtSchemaA v1 not found!"))) + } + } + + "return 201" when { + "a MappingTables is created" in { + val mtA = MappingTableFactory.getDummyMappingTable("mtA", schemaName = "mtSchema1", schemaVersion = 1) + schemaFixture.add(SchemaFactory.getDummySchema("mtSchema1")) + + schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) // Schema referenced by MT must exist + + val response = sendPost[MappingTable, Validation](apiUrl, bodyOpt = Some(mtA)) + assertCreated(response) + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/mapping-tables/mtA/1") + + val relativeLocation = stripBaseUrl(locationHeader) // because locationHeader contains domain, port, etc. + val response2 = sendGet[MappingTable](stripBaseUrl(relativeLocation)) + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(mtA, actual) + + assert(actual == expected) + } + + } + } + + // only MT-specific endpoints are covered further on: + s"GET $apiUrl/{name}/{version}/defaults" should { + "return 404" when { + "when the name/version does not exist" in { + mappingTableFixture.add(MappingTableFactory.getDummyMappingTable("mtA")) + + assertNotFound(sendGet[String](s"$apiUrl/notFoundMt/456/defaults")) + assertNotFound(sendGet[String](s"$apiUrl/mtA/456/defaults")) + assertNotFound(sendGet[String](s"$apiUrl/notFoundMt/latest/defaults")) + } + } + + "return 200" when { + "when there are no defaults" in { + mappingTableFixture.add(MappingTableFactory.getDummyMappingTable("mtA")) + + val response = sendGet[Array[DefaultValue]](s"$apiUrl/mtA/1/defaults") + + assertOk(response) + response.getBody shouldBe Seq() + } + + "when there are some defaults rules (version \"latest\")" in { + mappingTableFixture.add( + MappingTableFactory.getDummyMappingTable("mtA"), + MappingTableFactory.getDummyMappingTable("mtA", version = 2).copy(defaultMappingValue = List( + DefaultValue("columnX", "defaultXvalue"), + DefaultValue("columnY", "defaultYvalue") + )) + ) + + + val response = sendGet[Array[DefaultValue]](s"$apiUrl/mtA/latest/defaults") + assertOk(response) + response.getBody shouldBe Array(DefaultValue("columnX", "defaultXvalue"), DefaultValue("columnY", "defaultYvalue")) + } + } + } + + s"PUT $apiUrl/{name}/{version}/defaults" should { + "return 404" when { + "when the name/version does not exist" in { + mappingTableFixture.add(MappingTableFactory.getDummyMappingTable("mtA")) + + assertNotFound(sendPut[Array[DefaultValue], String](s"$apiUrl/notFoundMt/456/defaults", bodyOpt = Some(Array()))) + assertNotFound(sendPut[Array[DefaultValue], String](s"$apiUrl/mtA/456/defaults", bodyOpt = Some(Array()))) + assertNotFound(sendPut[Array[DefaultValue], String](s"$apiUrl/notFoundMt/latest/defaults", bodyOpt = Some(Array()))) + } + } + + "return 400" when { + "when version is not the latest (only last version can be updated)" in { + val mtAv1 = MappingTableFactory.getDummyMappingTable("mtA", version = 1) + val mtAv2 = MappingTableFactory.getDummyMappingTable("mtA", version = 2) + val mtAv3 = MappingTableFactory.getDummyMappingTable("mtA", version = 3) + + mappingTableFixture.add(mtAv1, mtAv2, mtAv3) + + val response = sendPut[Array[DefaultValue], Validation](s"$apiUrl/mtA/2/defaults", bodyOpt = Some(Array())) + + assertBadRequest(response) + val responseBody = response.getBody + responseBody shouldBe Validation(Map("version" -> + List("Version 2 of mtA is not the latest version, therefore cannot be edited") + )) + } + } + + "201 Created with location" when { + Seq( + ("empty defaults", Array.empty[DefaultValue]), + ("non-empty defaults", Array(DefaultValue("colA", "defaultA"))) + ).foreach { case (testCaseName, bothPayloadAndExpectedResult: Array[DefaultValue]) => + s"defaults are replaced with a new version ($testCaseName)" in { + val mtAv1 = MappingTableFactory.getDummyMappingTable("mtA", version = 1).copy(defaultMappingValue = List(DefaultValue("anOldDefault", "itsValue"))) + mappingTableFixture.add(mtAv1) + + schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) // Schema referenced by MT must exist + + val response1 = sendPut[Array[DefaultValue], Validation](s"$apiUrl/mtA/1/defaults", bodyOpt = Some(bothPayloadAndExpectedResult)) + assertCreated(response1) + response1.getBody shouldBe Validation.empty + val headers1 = response1.getHeaders + assert(headers1.getFirst("Location").endsWith("/api-v3/mapping-tables/mtA/2/defaults")) + + val response2 = sendGet[Array[DefaultValue]](s"$apiUrl/mtA/2/defaults") + assertOk(response2) + val responseBody = response2.getBody + responseBody shouldBe bothPayloadAndExpectedResult // PUT is udd = 'anOldDefault' gets replaced, no trace of it + } + } + } + } + + s"POST $apiUrl/{name}/{version}/defaults" should { + "return 404" when { + "when the name/version does not exist" in { + mappingTableFixture.add(MappingTableFactory.getDummyMappingTable("mtA")) + + val aDefaultValue = DefaultValue("colA", "defaultA") + assertNotFound(sendPost[DefaultValue, String](s"$apiUrl/notFoundMt/456/defaults", bodyOpt = Some(aDefaultValue))) + assertNotFound(sendPost[DefaultValue, String](s"$apiUrl/mtA/456/defaults", bodyOpt = Some(aDefaultValue))) + assertNotFound(sendPost[DefaultValue, String](s"$apiUrl/notFoundMt/latest/defaults", bodyOpt = Some(aDefaultValue))) + } + } + + "return 400" when { + "when version is not the latest (only last version can be updated)" in { + val mtAv1 = MappingTableFactory.getDummyMappingTable("mtA", version = 1) + val mtAv2 = MappingTableFactory.getDummyMappingTable("mtA", version = 2) + val mtAv3 = MappingTableFactory.getDummyMappingTable("mtA", version = 3) + + mappingTableFixture.add(mtAv1, mtAv2, mtAv3) + + val response = sendPost[DefaultValue, Validation](s"$apiUrl/mtA/2/defaults", bodyOpt = Some(DefaultValue("colA", "defaultA"))) + + assertBadRequest(response) + val responseBody = response.getBody + responseBody shouldBe Validation(Map("version" -> + List("Version 2 of mtA is not the latest version, therefore cannot be edited") + )) + } + } + + "201 Created with location" when { + s"defaults are replaced with a new version" in { + val mtAv1 = MappingTableFactory.getDummyMappingTable("mtA", version = 1).copy(defaultMappingValue = List(DefaultValue("anOldDefault", "itsValue"))) + mappingTableFixture.add(mtAv1) + + schemaFixture.add(SchemaFactory.getDummySchema("dummySchema")) // Schema referenced by MT must exist + + val response1 = sendPost[DefaultValue, Validation](s"$apiUrl/mtA/1/defaults", bodyOpt = Some(DefaultValue("colA", "defaultA"))) + assertCreated(response1) + response1.getBody shouldBe Validation.empty + val headers1 = response1.getHeaders + assert(headers1.getFirst("Location").endsWith("/api-v3/mapping-tables/mtA/2/defaults")) + + val response2 = sendGet[Array[DefaultValue]](s"$apiUrl/mtA/2/defaults") + assertOk(response2) + val responseBody = response2.getBody + val expectedDefaults = Array(DefaultValue("anOldDefault", "itsValue"), DefaultValue("colA", "defaultA")) // POST = adding, 'anOldDefault' is kept + responseBody shouldBe expectedDefaults + } + } + } + +} diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala new file mode 100644 index 000000000..633666d76 --- /dev/null +++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/PropertyDefinitionControllerV3IntegrationSuite.scala @@ -0,0 +1,374 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.integration.controllers.v3 + +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.HttpStatus +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.junit4.SpringRunner +import za.co.absa.enceladus.model.Validation +import za.co.absa.enceladus.model.properties.PropertyDefinition +import za.co.absa.enceladus.model.properties.propertyType.{EnumPropertyType, StringPropertyType} +import za.co.absa.enceladus.model.test.factories.PropertyDefinitionFactory +import za.co.absa.enceladus.model.versionedModel.NamedLatestVersion +import za.co.absa.enceladus.rest_api.integration.controllers.{BaseRestApiTestV3, toExpected} +import za.co.absa.enceladus.rest_api.integration.fixtures._ + +@RunWith(classOf[SpringRunner]) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles(Array("withEmbeddedMongo")) +class PropertyDefinitionControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAndAfterAll with Matchers { + + @Autowired + private val propertyDefinitionFixture: PropertyDefinitionFixtureService = null + + private val apiUrl = "/property-definitions/datasets" + + // fixtures are cleared after each test + override def fixtures: List[FixtureService[_]] = List(propertyDefinitionFixture) + + + private def minimalPdCreatePayload(name: String, suggestedValue: Option[String]) = { + val suggestedValuePart = suggestedValue match { + case Some(actualSuggestedValue) => s""","suggestedValue": "$actualSuggestedValue"""" + case _ => "" + } + + s"""{"name": "$name","propertyType": {"_t": "StringPropertyType"$suggestedValuePart}}""" + } + + private def invalidPayload(name: String) = + s"""{ + |"name": "$name", + |"propertyType": { + | "_t": "EnumPropertyType", + | "allowedValues": ["a", "b"], + | "suggestedValue": "invalidOptionC" + |} + |}""".stripMargin + + s"POST $apiUrl" can { + "return 201" when { + "PropertyDefinition is created" in { + val propertyDefinition = PropertyDefinitionFactory.getDummyPropertyDefinition() + val response = sendPostByAdmin[PropertyDefinition, Validation](apiUrl, bodyOpt = Some(propertyDefinition)) + assertCreated(response) + + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/property-definitions/datasets/dummyName/1") + + val response2 = sendGet[PropertyDefinition]("/property-definitions/datasets/dummyName/1") + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(propertyDefinition, actual) + assert(actual == expected) + } + Seq(Some("default1"), None).foreach { suggestedValue => + s"a PropertyDefinition is created with most of default values (suggestedValue=$suggestedValue)" in { + val propertyDefinition = minimalPdCreatePayload("smallPd", suggestedValue) + val response = sendPostByAdmin[String, Validation](apiUrl, bodyOpt = Some(propertyDefinition)) + assertCreated(response) + + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/property-definitions/datasets/smallPd/1") + + val response2 = sendGet[PropertyDefinition]("/property-definitions/datasets/smallPd/1") + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(PropertyDefinition("smallPd", propertyType = StringPropertyType(suggestedValue)), actual) + assert(actual == expected) + } + } + } + + "return 400" when { + "an enabled PropertyDefinition with that name already exists" in { + val propertyDefinition = PropertyDefinitionFactory.getDummyPropertyDefinition() + propertyDefinitionFixture.add(propertyDefinition) + + val response = sendPostByAdmin[PropertyDefinition, Validation](apiUrl, bodyOpt = Some(propertyDefinition)) + assertBadRequest(response) + + val actual = response.getBody + val expected = Validation().withError("name", "entity with name already exists: 'dummyName'") + assert(actual == expected) + } + "an invalid PD payload is sent" in { + val response = sendPostByAdmin[String, String](apiUrl, bodyOpt = Some(invalidPayload("somePd1"))) + assertBadRequest(response) + + response.getBody shouldBe "The suggested value invalidOptionC cannot be used: Value 'invalidOptionC' is not one of the allowed values (a, b)." + } + } + + "return 403" when { + s"admin auth is not used for POST $apiUrl" in { + val propertyDefinition = PropertyDefinitionFactory.getDummyPropertyDefinition() + val response = sendPost[PropertyDefinition, String](apiUrl, bodyOpt = Some(propertyDefinition)) + response.getStatusCode shouldBe HttpStatus.FORBIDDEN + } + } + } + + s"GET $apiUrl/{name}" should { + "return 200" when { + "a propDef with the given name exists - so it gives versions" in { + val pdV1 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 1) + val pdV2 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", + version = 2, parent = Some(PropertyDefinitionFactory.toParent(pdV1))) + propertyDefinitionFixture.add(pdV1, pdV2) + + val response = sendGet[NamedLatestVersion](s"$apiUrl/pdA") + assertOk(response) + assert(response.getBody == NamedLatestVersion("pdA", 2)) + } + } + + "return 404" when { + "a propDef with the given name does not exist" in { + val pd = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 1) + propertyDefinitionFixture.add(pd) + + val response = sendGet[String](s"$apiUrl/anotherDatasetName") + assertNotFound(response) + } + } + } + + s"GET $apiUrl/{name}/{version}" should { + "return 200" when { + "a PropertyDefinition with the given name and version exists - gives specified version of entity" in { + val pdV1 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 1) + val pdV2 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 2, description = Some("second")) + val pdV3 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 3, description = Some("third")) + propertyDefinitionFixture.add(pdV1, pdV2, pdV3) + + val response = sendGet[PropertyDefinition](s"$apiUrl/pdA/2") + assertOk(response) + + val actual = response.getBody + val expected = toExpected(pdV2, actual) + + assert(actual == expected) + } + } + + "return 404" when { + "a PropertyDefinition with the given name/version does not exist" in { + val pd = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "pdA", version = 1) + propertyDefinitionFixture.add(pd) + + val response = sendGet[String](s"$apiUrl/anotherPropertyDefinitionName/1") + assertNotFound(response) + + val response2 = sendGet[String](s"$apiUrl/pdA/7") + assertNotFound(response2) + } + } + } + + s"PUT $apiUrl/{name}/{version}" can { + "return 200" when { + "a PropertyDefinition with the given name and version is the latest that exists" should { + "update the propertyDefinition" in { + val propertyDefinitionA1 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA") + val propertyDefinitionA2 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA", + description = Some("second version"), version = 2) + propertyDefinitionFixture.add(propertyDefinitionA1, propertyDefinitionA2) + + val propertyDefinitionA3 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA", + description = Some("updated"), + propertyType = EnumPropertyType("a", "b"), + version = 2 // update references the last version + ) + + val response = sendPutByAdmin[PropertyDefinition, Validation](s"$apiUrl/propertyDefinitionA/2", bodyOpt = Some(propertyDefinitionA3)) + assertCreated(response) + response.getBody shouldBe Validation.empty + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionA/3") + + val response2 = sendGet[PropertyDefinition](s"$apiUrl/propertyDefinitionA/3") + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(propertyDefinitionA3.copy(version = 3, parent = Some(PropertyDefinitionFactory.toParent(propertyDefinitionA2))), actual) + assert(actual == expected) + } + } + } + + "return 400" when { + "a PropertyDefinition with the given name and version" should { + "fail when version/name in the URL and payload is mismatched" in { + val propertyDefinitionA1 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA", description = Some("init version")) + propertyDefinitionFixture.add(propertyDefinitionA1) + + val response = sendPutByAdmin[PropertyDefinition, String](s"$apiUrl/propertyDefinitionA/7", + bodyOpt = Some(PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA", version = 5))) + response.getStatusCode shouldBe HttpStatus.BAD_REQUEST + response.getBody should include("version mismatch: 7 != 5") + + val response2 = sendPutByAdmin[PropertyDefinition, String](s"$apiUrl/propertyDefinitionABC/4", + bodyOpt = Some(PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionXYZ", version = 4))) + response2.getStatusCode shouldBe HttpStatus.BAD_REQUEST + response2.getBody should include("name mismatch: 'propertyDefinitionABC' != 'propertyDefinitionXYZ'") + } + } + } + + "return 403" when { + s"admin auth is not used" in { + val propertyDefinitionA1 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA") + propertyDefinitionFixture.add(propertyDefinitionA1) + + val propertyDefinitionA2 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA", + description = Some("updated"), + propertyType = EnumPropertyType("a", "b"), + version = 1 // update references the last version + ) + + val response = sendPut[PropertyDefinition, String](s"$apiUrl/propertyDefinitionA/1", bodyOpt = Some(propertyDefinitionA2)) + response.getStatusCode shouldBe HttpStatus.FORBIDDEN + } + } + + "return 404" when { + "a propDef with the given name does not exist" in { + val propertyDefinitionA2 = PropertyDefinitionFactory.getDummyPropertyDefinition("propertyDefinitionA") + + val response = sendPutByAdmin[PropertyDefinition, String](s"$apiUrl/propertyDefinitionA/1", bodyOpt = Some(propertyDefinitionA2)) + assertNotFound(response) + } + } + + } + + s"POST $apiUrl/{name}/import" should { + val importablePd = + """{"todo":{"exportVersion":1},"item":{ + |"name":"propertyDefinitionXYZ", + |"description":"Hi, I am the import", + |"propertyType":{"_t":"StringPropertyType"}, + |"putIntoInfoFile":false, + |"essentiality":{"_t":"Optional"} + |}}""".stripMargin.replaceAll("[\\r\\n]", "") + + "return 400" when { + "a PropertyDefinition with the given name" should { + "fail when name in the URL and payload is mismatched" in { + val response = sendPostByAdmin[String, String](s"$apiUrl/propertyDefinitionABC/import", + bodyOpt = Some(importablePd)) + response.getStatusCode shouldBe HttpStatus.BAD_REQUEST + response.getBody should include("name mismatch: 'propertyDefinitionABC' != 'propertyDefinitionXYZ'") + } + } + } + + "return 403" when { + s"admin auth is not used" in { + val response = sendPost[String, Validation](s"$apiUrl/propertyDefinitionXYZ/import", bodyOpt = Some(importablePd)) + response.getStatusCode shouldBe HttpStatus.FORBIDDEN + } + } + + "return 201" when { + "there is a existing PropertyDefinition" should { + "a +1 version of propertyDefinition is added" in { + val propertyDefinition1 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "propertyDefinitionXYZ", description = Some("init version")) + propertyDefinitionFixture.add(propertyDefinition1) + + val response = sendPostByAdmin[String, Validation](s"$apiUrl/propertyDefinitionXYZ/import", bodyOpt = Some(importablePd)) + assertCreated(response) + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionXYZ/2") + response.getBody shouldBe Validation.empty + + val response2 = sendGet[PropertyDefinition](s"$apiUrl/propertyDefinitionXYZ/2") + assertOk(response2) + + val actual = response2.getBody + val expectedPdBase = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "propertyDefinitionXYZ", version = 2, + description = Some("Hi, I am the import"), + parent = Some(PropertyDefinitionFactory.toParent(propertyDefinition1)) + ) + val expected = toExpected(expectedPdBase, actual) + + assert(actual == expected) + } + } + + "there is no such PropertyDefinition, yet" should { + "a the version of propertyDefinition created" in { + val response = sendPostByAdmin[String, String](s"$apiUrl/propertyDefinitionXYZ/import", bodyOpt = Some(importablePd)) + assertCreated(response) + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/property-definitions/datasets/propertyDefinitionXYZ/1") // this is the first version + + val response2 = sendGet[PropertyDefinition](s"$apiUrl/propertyDefinitionXYZ/1") + assertOk(response2) + + val actual = response2.getBody + val expectedDsBase = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "propertyDefinitionXYZ", + description = Some("Hi, I am the import")) + val expected = toExpected(expectedDsBase, actual) + + assert(actual == expected) + } + } + } + } + + s"GET $apiUrl/{name}/{version}/export" should { + "return 404" when { + "when the name+version does not exist" in { + val response = sendGet[String](s"$apiUrl/notFoundPropertyDefinition/2/export") + assertNotFound(response) + } + } + + "return 200" when { + "there is a correct PropertyDefinition version" should { + "return the exported PropertyDefinition representation" in { + val propertyDefinition2 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "propertyDefinition", version = 2, description = Some("v2 here")) + val propertyDefinition3 = PropertyDefinitionFactory.getDummyPropertyDefinition(name = "propertyDefinition", version = 3, description = Some("showing non-latest export")) + propertyDefinitionFixture.add(propertyDefinition2, propertyDefinition3) + val response = sendGet[String](s"$apiUrl/propertyDefinition/2/export") + + assertOk(response) + + val body = response.getBody + assert(body == + """{"metadata":{"exportVersion":1},"item":{ + |"name":"propertyDefinition", + |"description":"v2 here", + |"propertyType":{"_t":"StringPropertyType","suggestedValue":null}, + |"putIntoInfoFile":false, + |"essentiality":{"_t":"Optional"} + |}}""".stripMargin.replaceAll("[\\r\\n]", "")) + } + } + } + } + + // todo delete +} diff --git a/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala new file mode 100644 index 000000000..e5f948571 --- /dev/null +++ b/rest-api/src/test/scala/za/co/absa/enceladus/rest_api/integration/controllers/v3/SchemaControllerV3IntegrationSuite.scala @@ -0,0 +1,734 @@ +/* + * Copyright 2018 ABSA Group Limited + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package za.co.absa.enceladus.rest_api.integration.controllers.v3 + +import com.github.tomakehurst.wiremock.WireMockServer +import com.github.tomakehurst.wiremock.client.ResponseDefinitionBuilder +import com.github.tomakehurst.wiremock.core.WireMockConfiguration +import org.apache.commons.io.IOUtils +import org.junit.runner.RunWith +import org.scalatest.BeforeAndAfterAll +import org.scalatest.matchers.should.Matchers +import org.springframework.beans.factory.annotation.Autowired +import org.springframework.boot.test.context.SpringBootTest +import org.springframework.http.{HttpStatus, MediaType} +import org.springframework.test.context.ActiveProfiles +import org.springframework.test.context.junit4.SpringRunner +import za.co.absa.enceladus.model.test.factories.{AttachmentFactory, SchemaFactory} +import za.co.absa.enceladus.model.{Schema, SchemaField, Validation} +import za.co.absa.enceladus.rest_api.integration.controllers.{BaseRestApiTestV3, toExpected} +import za.co.absa.enceladus.rest_api.integration.fixtures._ +import za.co.absa.enceladus.rest_api.models.rest.RestResponse +import za.co.absa.enceladus.rest_api.models.rest.errors.{SchemaFormatError, SchemaParsingError} +import za.co.absa.enceladus.rest_api.repositories.RefCollection +import za.co.absa.enceladus.rest_api.utils.SchemaType +import za.co.absa.enceladus.rest_api.utils.converters.SparkMenasSchemaConvertor +import za.co.absa.enceladus.restapi.TestResourcePath + +import java.io.File +import java.nio.file.{Files, Path} +import scala.collection.immutable.HashMap + +@RunWith(classOf[SpringRunner]) +@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT) +@ActiveProfiles(Array("withEmbeddedMongo")) +class SchemaControllerV3IntegrationSuite extends BaseRestApiTestV3 with BeforeAndAfterAll with Matchers { + + private val port = 8877 // same port as in test/resources/application.conf in the `menas.schemaRegistry.baseUrl` key + private val wireMockServer = new WireMockServer(WireMockConfiguration.wireMockConfig().port(port)) + + override def beforeAll(): Unit = { + super.beforeAll() + wireMockServer.start() + } + + override def afterAll(): Unit = { + super.afterAll() + wireMockServer.stop() + } + + @Autowired + private val schemaFixture: SchemaFixtureService = null + + @Autowired + private val datasetFixture: DatasetFixtureService = null + + @Autowired + private val mappingTableFixture: MappingTableFixtureService = null + + @Autowired + private val attachmentFixture: AttachmentFixtureService = null + + @Autowired + private val convertor: SparkMenasSchemaConvertor = null + + private val apiUrl = "/schemas" + private val schemaRefCollection = RefCollection.SCHEMA.name().toLowerCase() + + override def fixtures: List[FixtureService[_]] = List(schemaFixture, attachmentFixture, datasetFixture, mappingTableFixture) + + s"POST $apiUrl" can { + "return 201" when { + "a Schema is created (v1-payload has defined fields already)" in { + val schema = SchemaFactory.getDummySchema("schemaA", fields = List( + SchemaField("field1", "string", "", nullable = true, metadata = Map.empty, children = Seq.empty) + )) + + val response = sendPost[Schema, Validation](apiUrl, bodyOpt = Some(schema)) + + assertCreated(response) + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/1") + + response.getBody shouldBe Validation.empty + + val response2 = sendGet[Schema]("/schemas/schemaA/1") + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(schema, actual) + assert(actual == expected) + } + } + + "return 400" when { + "a Schema is created (empty fields = warning)" in { + val schema = SchemaFactory.getDummySchema("schemaA") + val response = sendPost[Schema, Validation](apiUrl, bodyOpt = Some(schema)) + + assertBadRequest(response) + response.getBody shouldBe Validation.empty + .withError("schema-fields", "No fields found! There must be fields defined for actual usage.") + } + "an enabled Schema with that name already exists" in { + val schema = SchemaFactory.getDummySchema(fields = List( + SchemaField("field1", "string", "", nullable = true, metadata = Map.empty, children = Seq.empty))) + schemaFixture.add(schema) + + val response = sendPost[Schema, Validation](apiUrl, bodyOpt = Some(schema)) + + assertBadRequest(response) + + val actual = response.getBody + val expected = Validation.empty + .withError("name", "entity with name already exists: 'dummyName'") + + assert(actual == expected) + } + } + } + + s"PUT $apiUrl/{name}/{version}" can { + "return 201" when { + "a Schema is updated (v1-payload has defined fields already)" in { + val schema1 = SchemaFactory.getDummySchema("schemaA", fields = List( + SchemaField("field1", "string", "", nullable = true, metadata = Map.empty, children = Seq.empty) + )) + schemaFixture.add(schema1) + + val schema2 = SchemaFactory.getDummySchema("schemaA", fields = List( + SchemaField("anotherField", "string", "", nullable = true, metadata = Map.empty, children = Seq.empty) + )) + val response = sendPut[Schema, Validation](s"$apiUrl/schemaA/1", bodyOpt = Some(schema2)) + + assertCreated(response) + val locationHeader = response.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") + + response.getBody shouldBe Validation.empty + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + val expected = toExpected(schema2.copy(version = 2, parent = Some(SchemaFactory.toParent(schema1))), actual) + assert(actual == expected) + } + } + + "return 400" when { + "a Schema fails to update due to empty fields" in { + val schema1 = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema1) + + val schema2 = SchemaFactory.getDummySchema("schemaA") + val response = sendPut[Schema, Validation](s"$apiUrl/schemaA/1", bodyOpt = Some(schema2)) + + assertBadRequest(response) + response.getBody shouldBe Validation.empty + .withError("schema-fields", "No fields found! There must be fields defined for actual usage.") + } + } + } + + // todo disable dataset - all versions/one version/ check the usage to prevent from disabling + // todo used-in implementation checks + + s"GET $apiUrl/{name}/{version}/json" should { + "return 404" when { + "no schema exists for the specified name" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/otherSchemaName/1/json") + + assertNotFound(response) + } + "no schema exists for the specified version" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/schema1/2/json") + + assertNotFound(response) + } + "the schema has no fields" in { // todo 404 or 400 failed valiadation??? + val schema = SchemaFactory.getDummySchema(name = "schemaA", version = 1) + schemaFixture.add(schema) + + val response = sendGet[Validation](s"$apiUrl/schemaA/1/json") + assertBadRequest(response) + response.getBody shouldBe + Validation.empty.withError("schema-fields", "Schema schemaA v1 exists, but has no fields!") + + } + "a non-boolean value is provided for the `pretty` query parameter" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1, fields = List(SchemaFactory.getDummySchemaField())) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/schema1/1/json?pretty=tru") + + assertNotFound(response) + } + } + + "return 200" when { + "there is a Schema with the specified name and version" should { + "return the Spark Struct representation of a schema as a JSON (pretty=false by default)" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1, fields = List(SchemaFactory.getDummySchemaField())) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/schema1/1/json") + + assertOk(response) + + val body = response.getBody + val expected = """{"type":"struct","fields":[{"name":"dummyFieldName","type":"string","nullable":true,"metadata":{}}]}""" + assert(body == expected) + } + "return the Spark Struct representation of a schema as a JSON (pretty=false explict)" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1, fields = List(SchemaFactory.getDummySchemaField())) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/schema1/1/json?pretty=false") + + assertOk(response) + + val body = response.getBody + val expected = """{"type":"struct","fields":[{"name":"dummyFieldName","type":"string","nullable":true,"metadata":{}}]}""" + assert(body == expected) + } + "return the Spark Struct representation of a schema as a pretty JSON" in { + val schema = SchemaFactory.getDummySchema(name = "schema1", version = 1, fields = List(SchemaFactory.getDummySchemaField())) + schemaFixture.add(schema) + + val response = sendGet[String](s"$apiUrl/schema1/1/json?pretty=true") + + assertOk(response) + + val body = response.getBody.replace("\r\n", "\n") // this will make it work on Windows (CRLF->LF), too. + val expected = + """|{ + | "type" : "struct", + | "fields" : [ { + | "name" : "dummyFieldName", + | "type" : "string", + | "nullable" : true, + | "metadata" : { } + | } ] + |}""".stripMargin + assert(body == expected) + } + } + } + } + + s"GET $apiUrl/{name}/{version}/original" should { + "return 404" when { + "no Attachment exists for the specified name" in { + val attachment = AttachmentFactory.getDummyAttachment(refName = "schemaName", refVersion = 2, refCollection = schemaRefCollection) + attachmentFixture.add(attachment) + + val response = sendGet[Array[Byte]](s"$apiUrl/otherSchemaName/2/original") + + assertNotFound(response) + assert(!response.getHeaders.containsKey("mime-type")) + } + "no Attachment exists with a version up to the specified version" in { + val attachment = AttachmentFactory.getDummyAttachment(refName = "schemaName", refVersion = 2, refCollection = schemaRefCollection) + attachmentFixture.add(attachment) + + val response = sendGet[Array[Byte]](s"$apiUrl/schemaName/1/original") + + assertNotFound(response) + assert(!response.getHeaders.containsKey("mime-type")) + } + } + + "return 200" when { + val attachment1 = AttachmentFactory.getDummyAttachment( + refName = "schemaName", + refVersion = 1, + refCollection = schemaRefCollection, + fileContent = Array(1, 2, 3)) + val attachment2 = AttachmentFactory.getDummyAttachment( + refName = "schemaName", + refVersion = 2, + refCollection = schemaRefCollection, + fileContent = Array(2, 3, 4), + fileMIMEType = MediaType.APPLICATION_OCTET_STREAM_VALUE) + val attachment4 = AttachmentFactory.getDummyAttachment( + refName = "schemaName", + refVersion = 4, + refCollection = schemaRefCollection, + fileContent = Array(4, 5, 6), + fileMIMEType = MediaType.APPLICATION_JSON_VALUE) + val attachment5 = AttachmentFactory.getDummyAttachment( + refName = "schemaName", + refVersion = 5, + refCollection = schemaRefCollection, + fileContent = Array(5, 6, 7)) + "there are Attachments with previous and subsequent versions" should { + "return the byte array of the uploaded file with the nearest previous version" in { + attachmentFixture.add(attachment1, attachment2, attachment4, attachment5) + + val response = sendGet[Array[Byte]](s"$apiUrl/schemaName/3/original") + + assertOk(response) + assert(response.getHeaders.containsKey("mime-type")) + assert(response.getHeaders.get("mime-type").get(0) == MediaType.APPLICATION_OCTET_STREAM_VALUE) + + val body = response.getBody + assert(body.sameElements(attachment2.fileContent)) + } + } + "there is an Attachment with the exact version" should { + "return the byte array of the uploaded file with the exact version" in { + attachmentFixture.add(attachment1, attachment2, attachment4, attachment5) + + val response = sendGet[Array[Byte]](s"$apiUrl/schemaName/4/original") + + assertOk(response) + assert(response.getHeaders.containsKey("mime-type")) + assert(response.getHeaders.get("mime-type").get(0) == "application/json") + + val body = response.getBody + assert(body.sameElements(attachment4.fileContent)) + } + } + } + } + + s"POST $apiUrl/{name}/{version}/from-file" should { + "return 201" when { + "a copybook has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA", version = 1) + schemaFixture.add(schema) + + val schemaParams = HashMap[String, String]("format" -> "copybook") + val responseUploaded = sendPostUploadFile[Validation]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Copybook.ok, schemaParams) + assertCreated(responseUploaded) + val locationHeader = responseUploaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 3) + } + } + + "a JSON struct type schema has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + val schemaParams = HashMap[String, Any]("format" -> "struct") + val responseUploaded = sendPostUploadFile[Validation]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.ok, schemaParams) + assertCreated(responseUploaded) + val locationHeader = responseUploaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 2) + } + } + + "an avro schema has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + val schemaParams = HashMap[String, Any]("format" -> "avro") + val responseUploaded = sendPostUploadFile[Schema]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Avro.ok, schemaParams) + assertCreated(responseUploaded) + val locationHeader = responseUploaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 7) + } + } + } + + "return 400" when { + "no upload format type is specified" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + val schemaParams = HashMap.empty[String, Any] // v2 fallbacked on this, v3 forbids it + val response = sendPostUploadFile[String]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.ok, schemaParams) + assertBadRequest(response) + response.getBody should include("Required String parameter 'format' is not present") + } + + "an empty upload format type is specified" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + val schemaParams = HashMap[String, Any]("format" -> "") // v2 fallbacked on this, v3 forbids it + val response = sendPostUploadFile[String]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.ok, schemaParams) + assertBadRequest(response) + response.getBody should include("not a recognized schema format. Menas currently supports: struct, copybook, avro.") + } + + "a copybook with a syntax error" should { + "return a response containing a schema parsing error with syntax error specific fields" in { + val schemaParams = HashMap[String, Any]("format" -> "copybook") + val response = sendPostUploadFile[RestResponse]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Copybook.bogus, schemaParams) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaParsingError) => + assert(e.errorType == "schema_parsing") + assert(e.schemaType == SchemaType.Copybook) + assert(e.line.contains(22)) + assert(e.field.contains("")) + assert(body.message.contains("Syntax error in the copybook")) + case e => fail(s"Expected an instance of SchemaParsingError, got $e.") + } + } + } + + "a JSON struct type schema with a syntax error" should { + "return a response containing a schema parsing error returned by the StructType parser" in { + val schemaParams = HashMap[String, Any]("format" -> "struct") + val response = sendPostUploadFile[RestResponse]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.bogus, schemaParams) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaParsingError) => + assert(e.errorType == "schema_parsing") + assert(e.schemaType == SchemaType.Struct) + assert(body.message.contains("StructType serializer: Failed to convert the JSON string")) + case e => fail(s"Expected an instance of SchemaParsingError, got $e.") + } + } + } + + "an avro-schema with a syntax error" should { + "return a response containing a schema parsing error encountered during avro schema parsing" in { + val schemaParams = HashMap[String, Any]("format" -> "avro") + val response = sendPostUploadFile[RestResponse]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Avro.bogus, schemaParams) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaParsingError) => + assert(e.errorType == "schema_parsing") + assert(e.schemaType == SchemaType.Avro) + assert(body.message.contains("Record has no fields")) + case e => fail(s"Expected an instance of SchemaParsingError, got $e.") + } + } + } + + "a wrong format has been specified" should { + "return a response containing a schema format error" in { + val schemaParams = HashMap[String, Any]("format" -> "foo") + val response = sendPostUploadFile[RestResponse]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Json.bogus, schemaParams) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaFormatError) => + assert(e.errorType == "schema_format") + assert(e.schemaType == "foo") + assert(body.message.contains("'foo' is not a recognized schema format.")) + case e => fail(s"Expected an instance of SchemaFormatError, got $e.") + } + } + } + } + + "return 404" when { + "a schema file is uploaded, but no schema exists for the specified name and version" in { + val schemaParams = HashMap[String, Any]("format" -> "copybook") + val responseUploaded = sendPostUploadFile[Schema]( + s"$apiUrl/schemaA/1/from-file", TestResourcePath.Copybook.ok, schemaParams) + assertNotFound(responseUploaded) + } + } + } + + import com.github.tomakehurst.wiremock.client.WireMock._ + + private def readTestResourceAsString(path: String): String = IOUtils.toString(getClass.getResourceAsStream(path)) + + /** + * will prepare the a response from file with correct `ContentType` + */ + private def readTestResourceAsResponseWithContentType(path: String): ResponseDefinitionBuilder = { + // this is crazy, but it works better than hardcoding mime-types + val filePath: Path = new File(getClass.getResource(path).toURI()).toPath + val mime = Option(Files.probeContentType(filePath)).getOrElse(MediaType.APPLICATION_OCTET_STREAM_VALUE) // default for e.g. cob + + val content = readTestResourceAsString(path) + import com.github.tomakehurst.wiremock.client.WireMock._ + okForContentType(mime, content) + } + + s"POST $apiUrl/{name}/{version}/from-remote-uri" should { + + val remoteFilePath = "/remote-test/someRemoteFile.ext" + val remoteUrl = s"http://localhost:$port$remoteFilePath" + + "return 201" when { + "a copybook has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Copybook.ok))) + + val params = HashMap[String, Any]("format" -> "copybook", "remoteUrl" -> remoteUrl) + val responseRemoteLoaded = sendPostRemoteFile[Schema](s"$apiUrl/schemaA/1/from-remote-uri", params) + assertCreated(responseRemoteLoaded) + val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 3) + } + } + + s"struct schema has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Json.ok))) + + val params = HashMap("remoteUrl" -> remoteUrl, "format" -> "struct") + val responseRemoteLoaded = sendPostRemoteFile[Schema](s"$apiUrl/schemaA/1/from-remote-uri", params) + assertCreated(responseRemoteLoaded) + val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 2) + } + } + + "an avro schema has no errors" should { + "return a new version of the schema" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Avro.ok))) + + val params = HashMap[String, Any]("format" -> "avro", "remoteUrl" -> remoteUrl) + val responseRemoteLoaded = sendPostRemoteFile[Schema](s"$apiUrl/schemaA/1/from-remote-uri", params) + assertCreated(responseRemoteLoaded) + val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 7) + } + } + } + + "return 400" when { + Seq( + (SchemaType.Copybook, TestResourcePath.Copybook.bogus, "Syntax error in the copybook"), + (SchemaType.Struct, TestResourcePath.Json.bogus, "StructType serializer: Failed to convert the JSON string"), + (SchemaType.Avro, TestResourcePath.Avro.bogus, "Record has no fields") + ).foreach { case (schemaType, testResourcePath, expectedErrorMessage) => + + s"a $schemaType with a syntax error" should { + "return a response containing a schema parsing error with syntax error specific fields" in { + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(testResourcePath))) + + val params = HashMap("format" -> schemaType.toString, "remoteUrl" -> remoteUrl) + val response = sendPostRemoteFile[RestResponse](s"$apiUrl/schemaA/1/from-remote-uri", params) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaParsingError) => + assert(e.errorType == "schema_parsing") + assert(e.schemaType == schemaType) + assert(body.message.contains(expectedErrorMessage)) + case e => fail(s"Expected an instance of SchemaParsingError, got $e.") + } + } + } + } + + "a wrong format has been specified" should { + "return a response containing a schema format error" in { + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Json.ok))) + + val params = HashMap[String, Any]("format" -> "foo", "remoteUrl" -> remoteUrl) + val response = sendPostRemoteFile[RestResponse](s"$apiUrl/schemaA/1/from-remote-uri", params) + val body = response.getBody + + assertBadRequest(response) + body.error match { + case Some(e: SchemaFormatError) => + assert(e.errorType == "schema_format") + assert(e.schemaType == "foo") + assert(body.message.contains("'foo' is not a recognized schema format.")) + case e => fail(s"Expected an instance of SchemaFormatError, got $e.") + } + } + } + } + + "return 404" when { + "a schema file is loaded from remote url, but no schema exists for the specified name and version" in { + wireMockServer.stubFor(get(urlPathEqualTo(remoteFilePath)) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Copybook.ok))) + + val params = HashMap[String, Any]("format" -> "copybook", "remoteUrl" -> remoteUrl) + val response = sendPostRemoteFile[Schema](s"$apiUrl/schemaA/1/from-remote-uri", params) + assertNotFound(response) + } + } + } + + s"POST $apiUrl/{name}/{version}/from-registry" should { + def subjectPath(subjectName: String) = s"/subjects/$subjectName/versions/latest/schema" + + "return 201" when { + "an avro schema has no errors" should { + "load schema by subject name as-is" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic1-value"))) + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Avro.ok))) + + val params = HashMap[String, Any]("format" -> "avro", "subject" -> "myTopic1-value") + val responseRemoteLoaded = sendPostSubject[Schema](s"$apiUrl/schemaA/1/from-registry", params) + assertCreated(responseRemoteLoaded) + val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 7) + } + + "load schema by subject name -value fallback" in { + val schema = SchemaFactory.getDummySchema("schemaA") + schemaFixture.add(schema) + + wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic2"))) // will fail + .willReturn(notFound())) + + wireMockServer.stubFor(get(urlPathEqualTo(subjectPath("myTopic2-value"))) // fallback will kick in + .willReturn(readTestResourceAsResponseWithContentType(TestResourcePath.Avro.ok))) + + val params = HashMap[String, Any]("format" -> "avro", "subject" -> "myTopic2") + val responseRemoteLoaded = sendPostSubject[Schema](s"$apiUrl/schemaA/1/from-registry", params) + assertCreated(responseRemoteLoaded) + val locationHeader = responseRemoteLoaded.getHeaders.getFirst("location") + locationHeader should endWith("/api-v3/schemas/schemaA/2") // +1 version + + val response2 = sendGet[Schema]("/schemas/schemaA/2") + assertOk(response2) + + val actual = response2.getBody + assert(actual.name == schema.name) + assert(actual.version == schema.version + 1) + assert(actual.fields.length == 7) + } + } + } + } + +}