Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #KN-903 feat: Object and Schema APIs #976 #987

Open
wants to merge 8 commits into
base: release-5.7.0
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,20 @@ import org.apache.commons.lang3.StringUtils
import org.sunbird.actor.core.BaseActor
import org.sunbird.cloudstore.StorageService
import org.sunbird.common.dto.{Request, Response, ResponseHandler}
import org.sunbird.common.exception.ResponseCode
import org.sunbird.common.exception.{ClientException, ResponseCode, ServerException}
import org.sunbird.content.util.ContentConstants
import org.sunbird.graph.OntologyEngineContext
import org.sunbird.graph.dac.model.Node
import org.sunbird.graph.nodes.DataNode
import org.sunbird.graph.schema.DefinitionNode
import org.sunbird.graph.utils.NodeUtil
import org.sunbird.util.RequestUtil

import java.util
import javax.inject.Inject
import scala.collection.JavaConverters
import scala.collection.JavaConverters._
import scala.collection.convert.ImplicitConversions.`map AsScala`
import scala.concurrent.{ExecutionContext, Future}

class ObjectActor @Inject() (implicit oec: OntologyEngineContext, ss: StorageService) extends BaseActor {
Expand All @@ -20,18 +27,107 @@ class ObjectActor @Inject() (implicit oec: OntologyEngineContext, ss: StorageSer
override def onReceive(request: Request): Future[Response] = {
request.getOperation match {
case "readObject" => read(request)
case "createObject" => create(request)
case "updateObject" => update(request)
case "retireObject" => retire(request)
case _ => handleDefault(request) //ERROR(request.getOperation)
}
}

@throws[Exception]
private def read(request: Request): Future[Response] = {
val fields: util.List[String] = JavaConverters.seqAsJavaListConverter(request.get("fields").asInstanceOf[String].split(",").filter(field => StringUtils.isNotBlank(field) && !StringUtils.equalsIgnoreCase(field, "null"))).asJava
request.getRequest.put("fields", fields)
val schemaName = request.getContext.getOrDefault("schemaName", "object").asInstanceOf[String]
DataNode.read(request).map(node => {
if (NodeUtil.isRetired(node)) ResponseHandler.ERROR(ResponseCode.RESOURCE_NOT_FOUND, ResponseCode.RESOURCE_NOT_FOUND.name, "Object not found with identifier: " + node.getIdentifier)
val metadata: util.Map[String, AnyRef] = NodeUtil.serialize(node, fields,null, request.getContext.get("version").asInstanceOf[String])
ResponseHandler.OK.put("object", metadata)
ResponseHandler.OK.put(schemaName, metadata)
})
}
@throws[Exception]
private def create(request: Request): Future[Response] = {
try {
RequestUtil.restrictProperties(request)
} catch {
case e: Exception => throw new ClientException("INVALID_OBJECT", "The schema does not exist for the provided object.")
}
DataNode.create(request).map(node => {
ResponseHandler.OK.put("identifier", node.getIdentifier).put("versionKey", node.getMetadata.get("versionKey"))
})
}

@throws[Exception]
private def update(request: Request): Future[Response] = {
if (StringUtils.isBlank(request.getRequest.getOrDefault("versionKey", "").asInstanceOf[String])) throw new ClientException("ERR_INVALID_REQUEST", "Please Provide Version Key!")
try {
RequestUtil.restrictProperties(request)
DataNode.update(request).map(node => {
val identifier: String = node.getIdentifier.replace(".img", "")
ResponseHandler.OK.put("identifier", identifier).put("versionKey", node.getMetadata.get("versionKey"))
})
} catch {
case e: Exception => throw new ClientException("INVALID_OBJECT", "The schema does not exist for the provided object.")
}
}

@throws[Exception]
private def retire(request: Request): Future[Response] = {
request.getRequest.put("status", "Retired")
try {
DataNode.update(request).map(node => {
ResponseHandler.OK.put("identifier", node.getIdentifier)
})
} catch {
case e: Exception => throw new ClientException("INVALID_OBJECT", "The schema does not exist for the provided object.")
}
}

private def handleDefault(request: Request): Future[Response] = {
val req = new Request(request)
val graph_id = req.getContext.getOrDefault("graph_id", "domain").asInstanceOf[String]
val schemaName = req.getContext.getOrDefault("schemaName", "").asInstanceOf[String]
val schemaVersion = req.getContext.getOrDefault("schemaVersion", "1.0").asInstanceOf[String]
try {
val transitionProps = DefinitionNode.getTransitionProps(graph_id, schemaVersion, schemaName)
val operation = request.getOperation
if(transitionProps.contains(operation)){
val transitionData = transitionProps.asJava.get(operation).asInstanceOf[util.Map[String, AnyRef]]
val fromStatus = transitionData.get("from").asInstanceOf[util.List[String]]
val identifier = request.getContext.get("identifier").asInstanceOf[String]
val readReq = new Request();
readReq.setContext(request.getContext)
readReq.put("identifier", identifier)
DataNode.read(readReq).map(node => {
if (!fromStatus.contains(node.getMetadata.get("status").toString)) {
throw new ClientException(ContentConstants.ERR_CONTENT_NOT_DRAFT, "Transition not allowed! "+ schemaName.capitalize +" Object status should be one of :" + fromStatus.toString())
}
val toStatus = transitionData.get("to").asInstanceOf[String]
val metadata: util.Map[String, AnyRef] = NodeUtil.serialize(node, null, node.getObjectType.toLowerCase.replace("image", ""), request.getContext.get("version").asInstanceOf[String])
val completeMetaData = metadata ++ request.getRequest
val requiredProps = transitionData.getOrDefault("required", new util.ArrayList[String]()).asInstanceOf[util.List[String]]
if(!completeMetaData.isEmpty && !requiredProps.isEmpty){
val errors: util.List[String] = new util.ArrayList[String]
requiredProps.forEach(prop => {
if(!completeMetaData.contains(prop))
errors.add(s"Required Metadata $prop not set")
})
if (!errors.isEmpty)
throw new ClientException("CLIENT_ERROR", "Validation Errors.", errors)
}
request.getRequest.put("status", toStatus)
DataNode.update(request).map(updateNode => {
ResponseHandler.OK.put("transition", s"Transition of the object is successful!")
})
}).flatMap(f => f)
} else {
ERROR(request.getOperation)
}
} catch {
case e: Exception => {
throw new ClientException("INVALID_OBJECT", "The schema does not exist for the provided object.")
}
}
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package org.sunbird.content.actors

import org.apache.commons.lang3.StringUtils
import org.sunbird.actor.core.BaseActor
import org.sunbird.common.{Platform, Slug}
import org.sunbird.common.dto.{Request, Response, ResponseHandler}
import org.sunbird.common.exception.ClientException
import org.sunbird.graph.OntologyEngineContext
import org.sunbird.graph.common.Identifier
import org.sunbird.graph.nodes.DataNode
import org.sunbird.graph.utils.NodeUtil
import org.sunbird.content.util.SchemaConstants
import org.sunbird.util.RequestUtil

import java.util
import javax.inject.Inject
import scala.collection.JavaConverters
import scala.concurrent.{ExecutionContext, Future}
import org.sunbird.content.upload.mgr.SchemaUploadManager

class SchemaActor @Inject()(implicit oec: OntologyEngineContext) extends BaseActor {
implicit val ec: ExecutionContext = getContext().dispatcher
private final val SCHEMA_SLUG_LIMIT: Int = if (Platform.config.hasPath("schema.slug_limit")) Platform.config.getInt("schema.slug_limit") else 3
override def onReceive(request: Request): Future[Response] = {
request.getOperation match {
case "createSchema" => create(request)
case "readSchema" => read(request)
case "uploadSchema" => upload(request)
case "publishSchema" => publish(request)
case _ => ERROR(request.getOperation)
}
}
@throws[Exception]
private def create(request: Request): Future[Response] = {
val name = request.getRequest.getOrDefault(SchemaConstants.NAME, "").asInstanceOf[String]
if (!request.getRequest.containsKey("name")) throw new ClientException("ERR_SCHEMA_NAME_REQUIRED", "Unique name is mandatory for Schema creation")
val slug = Slug.makeSlug(name)
request.getRequest.put("slug", slug)
RequestUtil.restrictProperties(request)
request.getRequest.put(SchemaConstants.IDENTIFIER, Identifier.getIdentifier(slug, Identifier.getUniqueIdFromTimestamp, SCHEMA_SLUG_LIMIT))
DataNode.create(request).map(node => {
ResponseHandler.OK.put(SchemaConstants.IDENTIFIER, node.getIdentifier)
})
}

private def read(request: Request): Future[Response] = {
val fields: util.List[String] = JavaConverters.seqAsJavaListConverter(request.get("fields").asInstanceOf[String].split(",").filter(field => StringUtils.isNotBlank(field) && !StringUtils.equalsIgnoreCase(field, "null"))).asJava
request.getRequest.put("fields", fields)
val schemaId = request.get("identifier").asInstanceOf[String]
DataNode.read(request).map(node => {
if (null != node && StringUtils.equalsAnyIgnoreCase(node.getIdentifier, schemaId)) {
val metadata: util.Map[String, AnyRef] = NodeUtil.serialize(node, fields, request.getContext.get("schemaName").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String])
ResponseHandler.OK.put("schema", metadata)
} else throw new ClientException("ERR_INVALID_REQUEST", "Invalid Request. Please Provide Required Properties!")
})
}

def upload(request: Request): Future[Response] = {
val identifier: String = request.getContext.getOrDefault(SchemaConstants.IDENTIFIER, "").asInstanceOf[String]
val readReq = new Request(request)
readReq.put(SchemaConstants.IDENTIFIER, identifier)
readReq.put("fields", new util.ArrayList[String])
DataNode.read(readReq).map(node => {
if (null != node & StringUtils.isNotBlank(node.getObjectType))
request.getContext.put(SchemaConstants.SCHEMA, node.getObjectType.toLowerCase())
SchemaUploadManager.upload(request, node)
}).flatMap(f => f)
}

@throws[Exception]
private def publish(request: Request): Future[Response] = {
request.getRequest.put("status", "Live")
DataNode.update(request).map(node => {
ResponseHandler.OK.put("publishStatus", s"Schema '${node.getIdentifier}' published Successfully!")
})
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
package org.sunbird.content.upload.mgr

import org.apache.commons.lang3.StringUtils
import org.sunbird.cloudstore.StorageService
import org.sunbird.common.{Platform, Slug}
import org.sunbird.common.dto.{Request, Response, ResponseHandler}
import org.sunbird.common.exception.{ClientException, ResponseCode, ServerException}
import org.sunbird.graph.OntologyEngineContext
import org.sunbird.graph.dac.model.Node
import org.sunbird.graph.nodes.DataNode
import org.sunbird.telemetry.logger.TelemetryManager

import java.io.File
import scala.collection.JavaConverters.mapAsJavaMap
import scala.concurrent.{ExecutionContext, Future}
import scala.collection.JavaConverters._
import org.sunbird.cloud.storage.factory.{StorageConfig, StorageServiceFactory}
import org.sunbird.telemetry.util.LogTelemetryEventUtil
import java.util

object SchemaUploadManager {
implicit val ss: StorageService = new StorageService

private val SCHEMA_FOLDER = "schemas/local"
private val SCHEMA_VERSION_FOLDER = "1.0"

private val storageType: String = if (Platform.config.hasPath("cloud_storage_type")) Platform.config.getString("cloud_storage_type") else ""
private val storageKey = Platform.config.getString("cloud_storage_key")
private val storageSecret = Platform.config.getString("cloud_storage_secret")
// TODO: endPoint defined to support "cephs3". Make code changes after cloud-store-sdk 2.11 support it.
val endPoint = if (Platform.config.hasPath("cloud_storage_endpoint")) Option(Platform.config.getString("cloud_storage_endpoint")) else None
val storageContainer = Platform.config.getString("cloud_storage_container")

def getContainerName: String = {
if (Platform.config.hasPath("cloud_storage_container"))
Platform.config.getString("cloud_storage_container")
else
throw new ServerException("ERR_INVALID_CLOUD_STORAGE", "Cloud Storage Container name not configured.")
}

def upload(request: Request, node: Node)(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = {
val identifier: String = node.getIdentifier
val slug: String = node.getMetadata.get("slug").asInstanceOf[String]
val file = request.getRequest.get("file").asInstanceOf[File]
val reqFilePath: String = request.getRequest.getOrDefault("filePath", "").asInstanceOf[String].replaceAll("^/+|/+$", "")
val filePath = if (StringUtils.isBlank(reqFilePath)) None else Option(reqFilePath)
val uploadFuture: Future[Map[String, AnyRef]] = uploadFile(identifier, node, file, filePath, slug)
uploadFuture.map(result => {
updateNode(request, node.getIdentifier, node.getObjectType, result)
}).flatMap(f => f)
}


def uploadFile(objectId: String, node: Node, uploadFile: File, filePath: Option[String], slug: String)(implicit ec: ExecutionContext): Future[Map[String, AnyRef]] = {
println("uploadFile objectId " + objectId + " node " + node + " uploadFile " + uploadFile + " filePath "+ filePath)
validateUploadRequest(objectId, node, uploadFile)
val result: Array[String] = uploadArtifactToCloud(uploadFile, slug, filePath)
Future {
Map("identifier" -> objectId, "artifactUrl" -> result(1), "downloadUrl" -> result(1), "cloudStorageKey" -> result(0), "s3Key" -> result(0), "size" -> getCloudStoredFileSize(result(0)).asInstanceOf[AnyRef])
}
}

def uploadArtifactToCloud(uploadedFile: File, identifier: String, filePath: Option[String] = None, slug: Option[Boolean] = Option(true)): Array[String] = {
var urlArray = new Array[String](2)
try {
val folder = if (filePath.isDefined) filePath.get + File.separator + Platform.getString(SCHEMA_FOLDER, "schemas/local") + File.separator + Slug.makeSlug(identifier, true) + File.separator + Platform.getString(SCHEMA_VERSION_FOLDER, "1.0") else Platform.getString(SCHEMA_FOLDER, "schemas/local") + File.separator + Slug.makeSlug(identifier, true) + File.separator + Platform.getString(SCHEMA_VERSION_FOLDER, "1.0")
val cloudService = StorageServiceFactory.getStorageService(StorageConfig(storageType, storageKey, storageSecret))
val slugFile = Slug.createSlugFile(uploadedFile)
val objectKey = folder + "/" + slugFile.getName

val url = cloudService.upload(getContainerName, slugFile.getAbsolutePath, objectKey, Option(false), Option(1), Option(2), None)
urlArray = Array[String](objectKey, url)
} catch {
case e: Exception =>
TelemetryManager.error("Error while uploading the file.", e)
throw new ServerException("ERR_CONTENT_UPLOAD_FILE", "Error while uploading the File.", e)
}
urlArray
}

def updateNode(request: Request, identifier: String, objectType: String, result: Map[String, AnyRef])(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = {
val updatedResult = result - "identifier"
val artifactUrl = updatedResult.getOrElse("artifactUrl", "").asInstanceOf[String]
if (StringUtils.isNotBlank(artifactUrl)) {
val updateReq = new Request(request)
updateReq.getContext().put("identifier", identifier)
updateReq.getRequest.putAll(mapAsJavaMap(updatedResult))
println("updateReq "+ updateReq)
DataNode.update(updateReq).map(node => {
getUploadResponse(node)
})
} else {
Future {
ResponseHandler.ERROR(ResponseCode.SERVER_ERROR, "ERR_UPLOAD_FILE", "Something Went Wrong While Processing Your Request.")
}
}
}

def getUploadResponse(node: Node)(implicit ec: ExecutionContext): Response = {
val id = node.getIdentifier.replace(".img", "")
val url = node.getMetadata.get("artifactUrl").asInstanceOf[String]
ResponseHandler.OK.put("identifier", id).put("artifactUrl", url).put("versionKey", node.getMetadata.get("versionKey"))
}

private def validateUploadRequest(objectId: String, node: Node, data: AnyRef)(implicit ec: ExecutionContext): Unit = {
if (StringUtils.isBlank(objectId))
throw new ClientException("ERR_INVALID_ID", "Please Provide Valid Identifier!")
if (null == node)
throw new ClientException("ERR_INVALID_NODE", "Please Provide Valid Node!")
if (null == data)
throw new ClientException("ERR_INVALID_DATA", "Please Provide Valid File Or File Url!")
data match {
case file: File => validateFile(file)
case _ =>
}
}

protected def getCloudStoredFileSize(key: String)(implicit ss: StorageService): Double = {
val size = 0
if (StringUtils.isNotBlank(key)) try return ss.getObjectSize(key)
catch {
case e: Exception =>
TelemetryManager.error("Error While getting the file size from Cloud Storage: " + key, e)
}
size
}

private def validateFile(file: File): Unit = {
if (null == file || !file.exists())
throw new ClientException("ERR_INVALID_FILE", "Please Provide Valid File!")
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package org.sunbird.content.util

object SchemaConstants {

val SCHEMA: String = "schema"
val CREATE_SCHEMA: String = "createSchema"
val READ_SCHEMA: String = "readSchema"
val UPDATE_SCHEMA: String = "updateSchema"
val UPLOAD_SCHEMA: String = "uploadSchema"
val PUBLISH_SCHEMA: String = "publishSchema"
val SCHEMA_VERSION = "1.0"
val IDENTIFIER: String = "identifier"
val FIELDS: String = "fields"
val NAME: String = "name"
}
Loading