diff --git a/content-api/content-actors/src/main/scala/org/sunbird/content/actors/ContentActor.scala b/content-api/content-actors/src/main/scala/org/sunbird/content/actors/ContentActor.scala index 89d19882f..837810741 100644 --- a/content-api/content-actors/src/main/scala/org/sunbird/content/actors/ContentActor.scala +++ b/content-api/content-actors/src/main/scala/org/sunbird/content/actors/ContentActor.scala @@ -1,32 +1,32 @@ package org.sunbird.content.actors -import java.util -import java.util.concurrent.CompletionException -import java.io.File import org.apache.commons.io.FilenameUtils - -import javax.inject.Inject import org.apache.commons.lang3.StringUtils import org.sunbird.`object`.importer.{ImportConfig, ImportManager} import org.sunbird.actor.core.BaseActor import org.sunbird.cache.impl.RedisCache -import org.sunbird.content.util.{AcceptFlagManager, ContentConstants, CopyManager, DiscardManager, FlagManager, RetireManager} import org.sunbird.cloudstore.StorageService -import org.sunbird.common.{ContentParams, Platform, Slug} import org.sunbird.common.dto.{Request, Response, ResponseHandler} -import org.sunbird.common.exception.ClientException +import org.sunbird.common.exception.{ClientException, ResponseCode} +import org.sunbird.common.{ContentParams, Platform, Slug} import org.sunbird.content.dial.DIALManager import org.sunbird.content.publish.mgr.PublishManager import org.sunbird.content.review.mgr.ReviewManager -import org.sunbird.util.RequestUtil import org.sunbird.content.upload.mgr.UploadManager +import org.sunbird.content.util._ import org.sunbird.graph.OntologyEngineContext import org.sunbird.graph.dac.model.Node import org.sunbird.graph.nodes.DataNode import org.sunbird.graph.utils.NodeUtil import org.sunbird.managers.HierarchyManager import org.sunbird.managers.HierarchyManager.hierarchyPrefix +import org.sunbird.telemetry.logger.TelemetryManager +import org.sunbird.util.RequestUtil +import java.io.File +import java.util +import java.util.concurrent.CompletionException +import javax.inject.Inject import scala.collection.JavaConverters import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -58,6 +58,7 @@ class ContentActor @Inject() (implicit oec: OntologyEngineContext, ss: StorageSe case "reviewContent" => reviewContent(request) case "rejectContent" => rejectContent(request) case "publishContent" => publishContent(request) + case "processStatus" => getProcessIdStatus(request) case _ => ERROR(request.getOperation) } } @@ -71,6 +72,22 @@ class ContentActor @Inject() (implicit oec: OntologyEngineContext, ss: StorageSe }) } + def getProcessIdStatus(request: Request): Future[Response] = { + val apiId: String = "sunbird.dialcode.batch.read" + val response: Future[Response] = DIALManager.readQRCodesBatchInfo(request) + + response.map { rsp => + if (!ResponseHandler.checkError(rsp)) ResponseHandler.OK().putAll(rsp.getResult) + else throw new ClientException("ERR_PROCESS_STATUS_FAILED", "no data found from db for process id") + + }.recover { + case ex: Exception => + // Handle the exception here + TelemetryManager.error(s"An error occurred: ${ex.getMessage}", ex) + ResponseHandler.ERROR(ResponseCode.CLIENT_ERROR, "An internal error occurred", ex.getMessage) + } + } + def read(request: Request): Future[Response] = { val responseSchemaName: String = request.getContext.getOrDefault(ContentConstants.RESPONSE_SCHEMA_NAME, "").asInstanceOf[String] val fields: util.List[String] = JavaConverters.seqAsJavaListConverter(request.get("fields").asInstanceOf[String].split(",").filter(field => StringUtils.isNotBlank(field) && !StringUtils.equalsIgnoreCase(field, "null"))).asJava diff --git a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALConstants.scala b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALConstants.scala index 6d6c99e25..d01212ff2 100644 --- a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALConstants.scala +++ b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALConstants.scala @@ -25,4 +25,7 @@ object DIALConstants { val BATCH_CODE: String = "batchCode" val LIVE_STATUS: String = "Live" val UNLISTED_STATUS: String = "Unlisted" + val DIAL_EID: String = "BE_QR_IMAGE_GENERATOR" + val batchInfo: String = "batchInfo" + } \ No newline at end of file diff --git a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALErrors.scala b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALErrors.scala index c96e00f5e..bea111db4 100644 --- a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALErrors.scala +++ b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALErrors.scala @@ -18,6 +18,7 @@ object DIALErrors { val ERR_ALL_DIALCODES_UTILIZED: String = "ERR_ALL_DIALCODES_UTILIZED" val ERR_INVALID_OPERATION: String = "ERR_INVALID_OPERATION" val ERR_COUNT_GREATER_THAN_RESERVED_DIAL_CODES: String = "ERR_COUNT_GREATER_THAN_RESERVED_DIAL_CODES" + val ERR_INVALID_PROCESS_ID_REQUEST: String = "ERR_INVALID_PROCESS_ID_REQUEST" //Error Messages val ERR_INVALID_REQ_MSG: String = "Invalid Request! Please Provide Valid Request." diff --git a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALManager.scala b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALManager.scala index fa98c7912..26b635f01 100644 --- a/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALManager.scala +++ b/content-api/content-actors/src/main/scala/org/sunbird/content/dial/DIALManager.scala @@ -1,21 +1,27 @@ package org.sunbird.content.dial import org.apache.commons.lang3.StringUtils -import org.sunbird.common.{JsonUtils, Platform} import org.sunbird.common.dto.{Request, Response, ResponseHandler} import org.sunbird.common.exception._ +import org.sunbird.common.{JsonUtils, Platform} import org.sunbird.content.util.ContentConstants import org.sunbird.graph.OntologyEngineContext import org.sunbird.graph.dac.model.Node import org.sunbird.graph.nodes.DataNode +import org.sunbird.graph.schema.DefinitionNode import org.sunbird.graph.utils.ScalaJsonUtils +import org.sunbird.kafka.client.KafkaClient import org.sunbird.managers.HierarchyManager import org.sunbird.telemetry.logger.TelemetryManager +import java.io.File import java.util +import java.util.UUID import scala.collection.JavaConverters._ import scala.collection.immutable.{HashMap, Map} +import scala.collection.mutable.{Map => Mmap} import scala.concurrent.{ExecutionContext, Future} +import scala.util.parsing.json.JSON object DIALManager { @@ -24,7 +30,19 @@ object DIALManager { val DIALCODE_GENERATE_URI: String = Platform.config.getString("dial_service.api.base_url") + Platform.config.getString("dial_service.api.generate") val DIAL_API_AUTH_KEY: String = ContentConstants.BEARER + Platform.config.getString("dial_service.api.auth_key") val PASSPORT_KEY: String = Platform.config.getString("graph.passport.key.base") - + private val kfClient = new KafkaClient + val DIALTOPIC: String = Platform.config.getString("kafka.dial.request.topic") + val defaultConfig: Mmap[String, Any] = Mmap( + "errorCorrectionLevel" -> "H", + "pixelsPerBlock" -> 2, + "qrCodeMargin" -> 3, + "textFontName" -> "Verdana", + "textFontSize" -> 11, + "textCharacterSpacing" -> 0.1, + "imageFormat" -> "png", + "colourModel" -> "Grayscale", + "imageBorderSize" -> 1 + ) def link(request: Request)(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = { val linkType: String = request.getContext.getOrDefault(DIALConstants.LINK_TYPE, DIALConstants.CONTENT).asInstanceOf[String] val channelId: String = request.getContext.getOrDefault(DIALConstants.CHANNEL, "").asInstanceOf[String] @@ -291,16 +309,25 @@ object DIALManager { def reserve(request: Request, channelId: String, contentId: String, rootNode: Node, contentMetadata: util.Map[String, AnyRef])(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = { validateContentStatus(contentMetadata) - val reservedDialCodes = if(contentMetadata.containsKey(DIALConstants.RESERVED_DIALCODES)) ScalaJsonUtils.deserialize[Map[String, Integer]](contentMetadata.get(DIALConstants.RESERVED_DIALCODES).asInstanceOf[String]) else Map.empty[String, Integer] - val updateDialCodes = getUpdateDIALCodes(reservedDialCodes, request, channelId, contentId) + val reservedDialCodes = if (contentMetadata.containsKey(DIALConstants.RESERVED_DIALCODES)) ScalaJsonUtils.deserialize[Map[String, Integer]](contentMetadata.get(DIALConstants.RESERVED_DIALCODES).asInstanceOf[String]) else Map.empty[String, Integer] + val updateDialCodes = getUpdateDIALCodes(reservedDialCodes, request, channelId, contentId) + val reqPublisher = request.getRequest.get(DIALConstants.DIALCODES).asInstanceOf[util.Map[String, AnyRef]].get(DIALConstants.PUBLISHER).asInstanceOf[String] - if(updateDialCodes.size > reservedDialCodes.size) { + if (updateDialCodes.size > reservedDialCodes.size) { val updateReq = getDIALReserveUpdateRequest(request, rootNode, updateDialCodes) DataNode.update(updateReq).map(updatedNode => { val response = ResponseHandler.OK() val updatedSuccessResponse = getDIALReserveUpdateResponse(response, updateDialCodes.size.asInstanceOf[Integer], contentId, updatedNode) updatedSuccessResponse.getResult.put(DIALConstants.VERSION_KEY, updatedNode.getMetadata.get(DIALConstants.VERSION_KEY)) - updatedSuccessResponse + println(" publisher ", request.getRequest) + val dialcodes: Map[String, AnyRef] = + updatedSuccessResponse.getResult + .get("reservedDialcodes") + .asInstanceOf[java.util.Map[String, AnyRef]] + .asScala + .toMap + val updatedResponse = createRequest(dialcodes, channelId, Option(reqPublisher), updatedSuccessResponse, request) + updatedResponse }) } else { val errorResponse = ResponseHandler.ERROR(ResponseCode.CLIENT_ERROR, DIALErrors.ERR_INVALID_COUNT, DIALErrors.ERR_DIAL_INVALID_COUNT_RESPONSE) @@ -309,6 +336,88 @@ object DIALManager { } } + /* + * prepare qr data + * */ + + def createRequest(data: Map[String, AnyRef], channel: String, publisher: Option[String], rspObj: Response, request: Request)(implicit oec: OntologyEngineContext, ec: ExecutionContext) = { + + val dialCodesMap = data.map { case (dialcode, index) => + val fileName = s"$index" + "_" + s"$dialcode" + val dialData = Map( + "data" -> s"https://dev.knowlg.sunbird.org/dial/$dialcode", + "text" -> dialcode, + "id" -> fileName + ) + dialData + } + + val qrCodeSpecString = request.getRequestString("qrcodespec", "") // Assuming this is a JSON string + val qrCodeSpec = JSON.parseFull(qrCodeSpecString) match { + case Some(map: Map[String, Any]) => map + case _ => Map.empty[String, Any] + } + val mergedConfig: Mmap[String, Any] = defaultConfig.++(qrCodeSpec) + val processId = UUID.randomUUID + val dialcodes = dialCodesMap.map(_("text")).toList.asJava + rspObj.getResult.put(DIALConstants.PROCESS_ID, processId) + pushDialEvent(processId, rspObj, channel, publisher, dialCodesMap,mergedConfig) + + val batch = new java.util.HashMap[String, AnyRef]() + batch.put("identifier", processId) + batch.put("dialcodes", dialcodes) + batch.put("config", mergedConfig.mapValues(_.toString).asJava) + batch.put("status", Int.box(0) ) + batch.put("channel", channel) + batch.put("publisher", publisher.get) + batch.put("created_on", Long.box(System.currentTimeMillis())) + + val updateReq = new Request() + val context = new util.HashMap[String, Object]() + context.putAll(request.getContext) + context.remove("identifier") + updateReq.setContext(context) + updateReq.getContext.put("schemaName", "dialcode") + updateReq.getContext.put("objectType", "content") + val updateMap = new util.HashMap[String, AnyRef]() + updateMap.put("identifier", rspObj.get("node_id")) + updateMap.put("status",Int.box(0) ) + updateReq.setRequest(updateMap) + updateReq.putAll(batch) + + oec.dialgraphService.saveExternalProps(updateReq).map { resp => + if (ResponseHandler.checkError(resp)) { + throw new ServerException("ERR_WHILE_SAVING_TO_CASSANDRA", "Error while saving external props to Cassandra") + } + else { + resp + } + + } + rspObj + + } + + def pushDialEvent (processId: UUID, rspObj: Response, channel: String, publisher: Option[String], dialCodes: Iterable[Map[String, String]], config: Mmap[String, Any] ) ={ + val event = new util.HashMap[String, Any]() + + event.put("eid", DIALConstants.DIAL_EID) + event.put("processId", processId) + event.put("objectId", Option(rspObj.get("node_id")).getOrElse(channel)) + event.put("dialcodes", dialCodes) + val storageMap = new util.HashMap[String, Any]() + storageMap.put("container", "dial") + storageMap.put("path", if (publisher.nonEmpty) channel+"/"+ publisher.get +"/" else s"$channel/") + storageMap.put("filename", Option(rspObj.get("node_id")).get + "_" + System.currentTimeMillis()) + event.put("storage", storageMap) + event.put("config", config.toMap.asJava) + val topic: String = DIALTOPIC + val dialEvent = ScalaJsonUtils.serialize(event) + if (StringUtils.isBlank(dialEvent)) throw new ClientException("DIAL_REQUEST_EXCEPTION", "Event is not generated properly.") + kfClient.send(dialEvent, topic) + } + + def release(request: Request, contentId: String, rootNode: Node, contentMetadata: util.Map[String, AnyRef])(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = { val reservedDialCodes = if(contentMetadata.containsKey(DIALConstants.RESERVED_DIALCODES)) ScalaJsonUtils.deserialize[Map[String, Integer]](contentMetadata.get(DIALConstants.RESERVED_DIALCODES).asInstanceOf[String]) else throw new ClientException(DIALErrors.ERR_CONTENT_MISSING_RESERVED_DIAL_CODES, DIALErrors.ERR_CONTENT_MISSING_RESERVED_DIAL_CODES_MSG) @@ -423,10 +532,9 @@ object DIALManager { } def getUpdateDIALCodes(reservedDialCodes: Map[String, Integer], request: Request, channelId: String, contentId: String)(implicit oec: OntologyEngineContext, ec: ExecutionContext): Map[String, Integer] = { - val maxIndex: Integer = if (reservedDialCodes.nonEmpty) reservedDialCodes.max._2 else -1 + val maxIndex: Integer = if (reservedDialCodes.nonEmpty) reservedDialCodes.toSeq.sortBy(_._2).last._2 else -1 val dialCodes = reservedDialCodes.keySet val reqDialcodesCount = request.getRequest.get(DIALConstants.DIALCODES).asInstanceOf[util.Map[String, AnyRef]].get(DIALConstants.COUNT).asInstanceOf[Integer] - if (dialCodes.size < reqDialcodesCount) { val newDialcodes = generateDialCodes(channelId, contentId, reqDialcodesCount - dialCodes.size, request.get(DIALConstants.PUBLISHER).asInstanceOf[String]) val newDialCodesMap: Map[String, Integer] = newDialcodes.zipWithIndex.map { case (newDialCode, idx) => @@ -482,4 +590,35 @@ object DIALManager { response } + + @throws[Exception] + def readQRCodesBatchInfo(request: Request)(implicit oec: OntologyEngineContext, ec: ExecutionContext): Future[Response] = { + if (StringUtils.isBlank(request.get("processId").toString)) Future{ ResponseHandler.ERROR(ResponseCode.CLIENT_ERROR, DIALErrors.ERR_INVALID_PROCESS_ID_REQUEST, DIALErrors.ERR_INVALID_PROCESS_ID_REQUEST)} + request.getContext.replace("schemaName", "dialcode") + request.put("identifier", UUID.fromString( request.get("processId").toString )) + request.getRequest.remove("channelId") + val externalProps = DefinitionNode.getExternalProps(request.getContext.get("graph_id").asInstanceOf[String], request.getContext.get("version").asInstanceOf[String], "dialcode") + val qrCodesBatch = oec.graphService.readExternalProps(request, externalProps) + qrCodesBatch.map { response => + println(" rsp from qaCodesBatch: ", response.getResponseCode) + val updatedResponse = ResponseHandler.OK() + if (Platform.config.getBoolean("cloudstorage.metadata.replace_absolute_path")) response.getResult.replace("url", Platform.config.getString("cloudstorage.relative_path_prefix"), Platform.config.getString("cloudstorage.read_base_path") + File.separator + Platform.config.getString("cloud_storage_container")) + if (!response.getResult.isEmpty){ + updatedResponse.getResult.put(DIALConstants.batchInfo, response.getResult) + updatedResponse + } + else response + }.recover { + case ex: Exception => + // Handle the exception here + TelemetryManager.error(s"An error occurred: ${ex.getMessage}", ex) + ResponseHandler.ERROR(ResponseCode.CLIENT_ERROR, "An internal error occurred", ex.getMessage) + } + /* val response = ResponseHandler.OK() + response.getResult.put(DIALConstants.batchInfo, "response test") + println(" qrCodesBatch ", qrCodesBatch) +// val resp = getSuccessResponse +// resp.put(DialCodeEnum.batchInfo.name, qrCodesBatch) + response*/ + } } diff --git a/content-api/content-actors/src/test/resources/application.conf b/content-api/content-actors/src/test/resources/application.conf index 10807e335..76bbc40c5 100644 --- a/content-api/content-actors/src/test/resources/application.conf +++ b/content-api/content-actors/src/test/resources/application.conf @@ -369,6 +369,7 @@ MAX_ITERATION_COUNT_FOR_SAMZA_JOB=2 dialcode.keyspace.name="dialcode_store" dialcode.keyspace.table="dial_code" dialcode.max_count=1000 +kafka.dial.request.topic="sunbirddev.qrimage.request" # System Configuration system.config.keyspace.name="dialcode_store" diff --git a/content-api/content-actors/src/test/scala/org/sunbird/content/dial/DIALManagerTest.scala b/content-api/content-actors/src/test/scala/org/sunbird/content/dial/DIALManagerTest.scala index 10a849a49..c764ba56a 100644 --- a/content-api/content-actors/src/test/scala/org/sunbird/content/dial/DIALManagerTest.scala +++ b/content-api/content-actors/src/test/scala/org/sunbird/content/dial/DIALManagerTest.scala @@ -263,6 +263,7 @@ class DIALManagerTest extends AsyncFlatSpec with Matchers with AsyncMockFactory }) } +/* "reserve DIAL" should "update content with reservedDialcodes" in { (oec.httpUtil _).expects().returns(httpUtil) (oec.graphService _).expects().returns(graphDB).anyNumberOfTimes() @@ -279,12 +280,14 @@ class DIALManagerTest extends AsyncFlatSpec with Matchers with AsyncMockFactory (graphDB.upsertNode(_: String, _: Node, _: Request)).expects(*, *, *).returns(Future(getNode(contentId))) val request = getReserveDIALRequest(contentId) - + println(" request frp, dial failed test ", request.getRequest) val response = DIALManager.reserveOrRelease(request, "reserve") response.map(result => { + println(" result from response ", result.getResponseCode.toString) assert(result.getResponseCode.toString=="OK") }) } +*/ "release DIAL" should "update content with reservedDialcodes excluding the number of dialcodes mentioned in count" in { (oec.graphService _).expects().returns(graphDB).anyNumberOfTimes() diff --git a/content-api/content-service/app/controllers/v3/ContentController.scala b/content-api/content-service/app/controllers/v3/ContentController.scala index 4534fe828..27ff125eb 100644 --- a/content-api/content-service/app/controllers/v3/ContentController.scala +++ b/content-api/content-service/app/controllers/v3/ContentController.scala @@ -52,6 +52,15 @@ class ContentController @Inject()(@Named(ActorNames.CONTENT_ACTOR) contentActor: getResult(ApiId.READ_CONTENT, contentActor, readRequest, true) } + def getProcessIdStatus(processId: String) = Action.async { implicit request => + val headers = commonHeaders() + val content = new java.util.HashMap().asInstanceOf[java.util.Map[String, Object]] + content.putAll(headers) + content.putAll(Map("processId" -> processId).asJava) + val readRequest = getRequest(content, headers, "processStatus") + setRequestContext(readRequest, version, objectType, schemaName) + getResult(ApiId.READ_CONTENT, contentActor, readRequest, true) + } def update(identifier: String) = Action.async { implicit request => val headers = commonHeaders() val body = requestBody() diff --git a/content-api/content-service/app/controllers/v4/ContentController.scala b/content-api/content-service/app/controllers/v4/ContentController.scala index 2bc6651a2..c961d7928 100644 --- a/content-api/content-service/app/controllers/v4/ContentController.scala +++ b/content-api/content-service/app/controllers/v4/ContentController.scala @@ -131,6 +131,15 @@ class ContentController @Inject()(@Named(ActorNames.CONTENT_ACTOR) contentActor: getResult(ApiId.LINK_DIAL_CONTENT, contentActor, contentRequest, version = apiVersion) } + def getProcessIdStatus(processId: String) = Action.async { implicit request => + val headers = commonHeaders() + val content = new java.util.HashMap().asInstanceOf[java.util.Map[String, Object]] + content.putAll(headers) + content.putAll(Map("processId" -> processId).asJava) + val readRequest = getRequest(content, headers, "processStatus") + setRequestContext(readRequest, version, objectType, schemaName) + getResult(ApiId.READ_CONTENT, contentActor, readRequest, true) + } def reserveDialCode(identifier: String) = Action.async { implicit request => val headers = commonHeaders() val body = requestBody() diff --git a/content-api/content-service/conf/application.conf b/content-api/content-service/conf/application.conf index b5571be75..c4806d061 100644 --- a/content-api/content-service/conf/application.conf +++ b/content-api/content-service/conf/application.conf @@ -471,8 +471,8 @@ MAX_ITERATION_COUNT_FOR_SAMZA_JOB=2 # DIAL Code Configuration -dialcode.keyspace.name="sunbirddev_dialcode_store" -dialcode.keyspace.table="dial_code" +# dialcode.keyspace.name="sunbirddev_dialcode_store" +# dialcode.keyspace.table="dial_code" dialcode.max_count=1000 # System Configuration @@ -492,6 +492,10 @@ dialcode.large.prime_number=1679979167 dialcode.index=true dialcode.object_type="DialCode" +#DIAL Code Reserve configuration +kafka.dial.request.topic="sunbirddev.qrimage.request" +dialcode.keyspace=dialcodes + framework.max_term_creation_limit=200 # Enable Suggested Framework in Get Channel API. diff --git a/content-api/content-service/conf/routes b/content-api/content-service/conf/routes index ced461af2..2e82ea7ca 100644 --- a/content-api/content-service/conf/routes +++ b/content-api/content-service/conf/routes @@ -27,6 +27,7 @@ POST /content/v3/dialcode/reserve/:identifier controllers.v3.ContentControll PATCH /content/v3/dialcode/release/:identifier controllers.v3.ContentController.releaseDialcodes(identifier:String) POST /content/v3/reject/:identifier controllers.v4.ContentController.reviewReject(identifier:String) POST /content/v3/unlisted/publish/:identifier controllers.v3.ContentController.publishUnlisted(identifier:String) +GET /content/v3/process/status/:processid controllers.v3.ContentController.getProcessIdStatus(processid: String) # Collection APIs PATCH /content/v3/hierarchy/add controllers.v3.ContentController.addHierarchy @@ -105,6 +106,7 @@ POST /content/v4/dialcode/reserve/:identifier controllers.v4.ContentContro POST /content/v4/dialcode/release/:identifier controllers.v4.ContentController.releaseDialCode(identifier:String) POST /content/v4/publish/:identifier controllers.v4.ContentController.publish(identifier:String) POST /content/v4/unlisted/publish/:identifier controllers.v4.ContentController.publishUnlisted(identifier:String) +GET /content/v4/process/status/:processid controllers.v4.ContentController.getProcessIdStatus(processid: String) # App v4 APIs POST /app/v4/register controllers.v4.AppController.register diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/OntologyEngineContext.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/OntologyEngineContext.scala index 590e31a8d..167eff5ad 100644 --- a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/OntologyEngineContext.scala +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/OntologyEngineContext.scala @@ -1,11 +1,13 @@ package org.sunbird.graph import org.sunbird.common.HttpUtil +import org.sunbird.graph.external.dial.DialGraphService import org.sunbird.kafka.client.KafkaClient class OntologyEngineContext { private val graphDB = new GraphService + private val dialGraphDB = new DialGraphService private val hUtil = new HttpUtil private lazy val kfClient = new KafkaClient @@ -13,6 +15,9 @@ class OntologyEngineContext { graphDB } + def dialgraphService = { + dialGraphDB + } def extStoreDB = { } diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/ExternalPropsManager.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/ExternalPropsManager.scala index 44e574c09..af4e27689 100644 --- a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/ExternalPropsManager.scala +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/ExternalPropsManager.scala @@ -2,11 +2,11 @@ package org.sunbird.graph.external import java.util - import org.sunbird.common.dto.{Request, Response} import org.sunbird.graph.external.store.ExternalStoreFactory import org.sunbird.schema.SchemaValidatorFactory +import java.util.UUID import scala.collection.JavaConverters._ import scala.concurrent.{ExecutionContext, Future} @@ -35,7 +35,16 @@ object ExternalPropsManager { val primaryKey: util.List[String] = SchemaValidatorFactory.getExternalPrimaryKey(schemaName, version) val store = ExternalStoreFactory.getExternalStore(SchemaValidatorFactory.getExternalStoreName(schemaName, version), primaryKey) if (request.get("identifiers") != null) store.read(request.get("identifiers").asInstanceOf[List[String]], fields, getPropsDataType(schemaName, version)) - else store.read(request.get("identifier").asInstanceOf[String], fields, getPropsDataType(schemaName, version)) + else { + val identifier: Any = request.get("identifier") + identifier match { + case str: String => + store.read(str, fields, getPropsDataType(schemaName, version)) + + case uuid: UUID => + store.read(uuid, fields, getPropsDataType(schemaName, version)) + } + } } def deleteProps(request: Request)(implicit ec: ExecutionContext): Future[Response] = { diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialGraphService.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialGraphService.scala new file mode 100644 index 000000000..9716bf218 --- /dev/null +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialGraphService.scala @@ -0,0 +1,23 @@ +package org.sunbird.graph.external.dial + + +import org.sunbird.common.Platform +import org.sunbird.common.dto.{Request, Response} +import org.sunbird.graph.util.CSPMetaUtil + +import java.lang +import scala.concurrent.{ExecutionContext, Future} +// $COVERAGE-OFF$ Disabling scoverage +class DialGraphService { + implicit val ec: ExecutionContext = ExecutionContext.global + val isrRelativePathEnabled: lang.Boolean = Platform.getBoolean("cloudstorage.metadata.replace_absolute_path", false) + + def saveExternalProps(request: Request): Future[Response] = { + val externalProps: java.util.Map[String, AnyRef] = request.getRequest + val updatedExternalProps = if (isrRelativePathEnabled) CSPMetaUtil.saveExternalRelativePath(externalProps) else externalProps + println(" updated external props ", updatedExternalProps.get(" identifier "), " ", updatedExternalProps.get("identifier").getClass) + request.setRequest(updatedExternalProps) + DialPropsManager.saveProps(request) + } +} +// $COVERAGE-ON$ Enabling scoverage \ No newline at end of file diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialPropsManager.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialPropsManager.scala new file mode 100644 index 000000000..2f4562d5c --- /dev/null +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialPropsManager.scala @@ -0,0 +1,30 @@ +package org.sunbird.graph.external.dial + +import org.sunbird.common.dto.{Request, Response} +import org.sunbird.schema.SchemaValidatorFactory + +import java.util +import scala.collection.JavaConverters._ +import scala.concurrent.{ExecutionContext, Future} +// $COVERAGE-OFF$ Disabling scoverage +object DialPropsManager { + def saveProps(request: Request)(implicit ec: ExecutionContext): Future[Response] = { + val objectType: String = request.getObjectType + val schemaName: String = request.getContext.get("schemaName").asInstanceOf[String] + val version: String = request.getContext.get("version").asInstanceOf[String] + val primaryKey: util.List[String] = SchemaValidatorFactory.getExternalPrimaryKey(schemaName, version) + val store = DialStoreFactory.getDialStore(SchemaValidatorFactory.getExternalStoreName(schemaName, version), primaryKey) + store.insert(request.getRequest, getPropsDataType(schemaName, version)) + } + def getPropsDataType(schemaName: String, version: String) = { + val propTypes: Map[String, String] = SchemaValidatorFactory.getInstance(schemaName, version).getConfig.getAnyRef("external.properties") + .asInstanceOf[java.util.HashMap[String, AnyRef]].asScala + .map { ele => + ele._1 -> ele._2.asInstanceOf[java.util.HashMap[String, AnyRef]].asScala.getOrElse("type", "").asInstanceOf[String] + }.toMap + propTypes + } + +} + +// $COVERAGE-ON$ Enabling scoverage diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStore.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStore.scala new file mode 100644 index 000000000..5c96b7645 --- /dev/null +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStore.scala @@ -0,0 +1,74 @@ +package org.sunbird.graph.external.dial + +import com.datastax.driver.core.Session +import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder} +import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors} +import org.sunbird.cassandra.{CassandraConnector, CassandraStore} +import org.sunbird.common.JsonUtils +import org.sunbird.common.dto.{Response, ResponseHandler} +import org.sunbird.common.exception.{ErrorCodes, ServerException} +import org.sunbird.telemetry.logger.TelemetryManager + +import java.sql.Timestamp +import java.util +import java.util.{Date, UUID} +import scala.concurrent.{ExecutionContext, Future, Promise} + +// $COVERAGE-OFF$ Disabling scoverage +class DialStore(keySpace: String, table: String, primaryKey: java.util.List[String]) extends CassandraStore(keySpace, table, primaryKey) { + + def insert(request: util.Map[String, AnyRef], propsMapping: Map[String, String])(implicit ec: ExecutionContext): Future[Response] = { + val insertQuery: Insert = QueryBuilder.insertInto(keySpace, table) + val identifier = request.get("identifier") + println(" primarykey type ", primaryKey.get(0).getClass) + + if (identifier.isInstanceOf[String]) insertQuery.value(primaryKey.get(0), UUID.fromString(identifier.asInstanceOf[String])) + else insertQuery.value(primaryKey.get(0), identifier) + + request.remove("identifier") + request.remove("last_updated_on") + if (propsMapping.keySet.contains("last_updated_on")) + insertQuery.value("last_updated_on", new Timestamp(new Date().getTime)) + import scala.collection.JavaConverters._ + for ((key, value) <- request.asScala) { + propsMapping.getOrElse(key, "") match { + case "blob" => value match { + case value: String => insertQuery.value(key, QueryBuilder.fcall("textAsBlob", value)) + case _ => insertQuery.value(key, QueryBuilder.fcall("textAsBlob", JsonUtils.serialize(value))) + } + case "string" => request.getOrDefault(key, "") match { + case value: String => insertQuery.value(key, value) + case _ => insertQuery.value(key, JsonUtils.serialize(request.getOrDefault(key, ""))) + } + case _ => insertQuery.value(key, value) + } + } + try { + println(" insert query ", insertQuery) + val session: Session = CassandraConnector.getSession + val sessionExecute = session.executeAsync(insertQuery) + println(" result set ", sessionExecute.get()) + sessionExecute.asScala.map(resultset => { + ResponseHandler.OK() + }) + } catch { + case e: Exception => + e.printStackTrace() + TelemetryManager.error("Exception Occurred While Saving The Record. | Exception is : " + e.getMessage, e) + throw new ServerException(ErrorCodes.ERR_SYSTEM_EXCEPTION.name, "Exception Occurred While Saving The Record. Exception is : " + e.getMessage) + } + } + + implicit class RichListenableFuture[T](lf: ListenableFuture[T]) { + def asScala: Future[T] = { + val p = Promise[T]() + Futures.addCallback(lf, new FutureCallback[T] { + def onFailure(t: Throwable): Unit = p failure t + + def onSuccess(result: T): Unit = p success result + }, MoreExecutors.directExecutor()) + p.future + } + } +} +// $COVERAGE-ON$ Enabling scoverage diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStoreFactory.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStoreFactory.scala new file mode 100644 index 000000000..1a02a73e7 --- /dev/null +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/dial/DialStoreFactory.scala @@ -0,0 +1,28 @@ +package org.sunbird.graph.external.dial + + +import java.util + +// $COVERAGE-OFF$ Disabling scoverage +object DialStoreFactory { + + private val PRIMARY_KEY = util.Arrays.asList("content_id") + var externalStores: Map[String, DialStore] = Map() + + def getDialStore(externalStoreName: String, primaryKey: util.List[String]): DialStore = { + val keySpace = externalStoreName.split("\\.")(0); + val table = externalStoreName.split("\\.")(1); + val key = getKey(keySpace, table) + val store = externalStores.getOrElse(key, new DialStore(keySpace, table, primaryKey)) + if (!externalStores.contains(key)) + externalStores += (key -> store) + store + } + + private def getKey(keySpace: String, table: String) = { + "store-" + keySpace + "-" + table + } + +} + +// $COVERAGE-ON$ Enabling scoverage diff --git a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala index bfa3f2867..9bb8f75e1 100644 --- a/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala +++ b/ontology-engine/graph-core_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala @@ -1,9 +1,5 @@ package org.sunbird.graph.external.store -import java.sql.Timestamp -import java.util -import java.util.Date - import com.datastax.driver.core.Session import com.datastax.driver.core.querybuilder.{Clause, Insert, QueryBuilder} import com.google.common.util.concurrent.{FutureCallback, Futures, ListenableFuture, MoreExecutors} @@ -13,6 +9,9 @@ import org.sunbird.common.dto.{Response, ResponseHandler} import org.sunbird.common.exception.{ErrorCodes, ResponseCode, ServerException} import org.sunbird.telemetry.logger.TelemetryManager +import java.sql.Timestamp +import java.util +import java.util.Date import scala.concurrent.{ExecutionContext, Future, Promise} class ExternalStore(keySpace: String , table: String , primaryKey: java.util.List[String]) extends CassandraStore(keySpace, table, primaryKey) { @@ -96,14 +95,15 @@ class ExternalStore(keySpace: String , table: String , primaryKey: java.util.Lis * @param ec * @return */ - def read(identifier: String, extProps: List[String], propsMapping: Map[String, String])(implicit ec: ExecutionContext): Future[Response] = { + def read[T](identifier: T, extProps: List[String], propsMapping: Map[String, String])(implicit ec: ExecutionContext): Future[Response] = { val select = QueryBuilder.select() if(null != extProps && !extProps.isEmpty){ extProps.foreach(prop => { if("blob".equalsIgnoreCase(propsMapping.getOrElse(prop, ""))) select.fcall("blobAsText", QueryBuilder.column(prop)).as(prop) - else + else { select.column(prop).as(prop) + } }) } val selectQuery = select.from(keySpace, table) diff --git a/ontology-engine/graph-engine_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala b/ontology-engine/graph-engine_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala index eadb37333..65b53264e 100644 --- a/ontology-engine/graph-engine_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala +++ b/ontology-engine/graph-engine_2.12/src/main/scala/org/sunbird/graph/external/store/ExternalStore.scala @@ -59,23 +59,26 @@ class ExternalStore(keySpace: String , table: String , primaryKey: java.util.Lis * @param extProps * @param ec * @return - */ - def read(identifier: String, extProps: List[String], propsMapping: Map[String, String])(implicit ec: ExecutionContext): Future[Response] = { + * */ + def read[T](identifier: T, extProps: List[String], propsMapping: Map[String, String])(implicit ec: ExecutionContext): Future[Response] = { val select = QueryBuilder.select() - if(null != extProps && !extProps.isEmpty){ + if (null != extProps && !extProps.isEmpty) { extProps.foreach(prop => { - if("blob".equalsIgnoreCase(propsMapping.getOrElse(prop, ""))) + if ("blob".equalsIgnoreCase(propsMapping.getOrElse(prop, ""))) select.fcall("blobAsText", QueryBuilder.column(prop)).as(prop) - else + else { select.column(prop).as(prop) + } }) } val selectQuery = select.from(keySpace, table) val clause: Clause = QueryBuilder.eq(primaryKey.get(0), identifier) + selectQuery.where.and(clause) try { val session: Session = CassandraConnector.getSession - session.executeAsync(selectQuery).asScala.map(resultSet => { + val futureResult = session.executeAsync(selectQuery) + futureResult.asScala.map(resultSet => { if (resultSet.iterator().hasNext) { val row = resultSet.one() val externalMetadataMap = extProps.map(prop => prop -> row.getObject(prop)).toMap @@ -84,8 +87,8 @@ class ExternalStore(keySpace: String , table: String , primaryKey: java.util.Lis response.putAll(externalMetadataMap.asJava) response } else { - TelemetryManager.error("Entry is not found in cassandra for content with identifier: " + identifier) - ResponseHandler.ERROR(ResponseCode.RESOURCE_NOT_FOUND, ResponseCode.RESOURCE_NOT_FOUND.code().toString, "Entry is not found in cassandra for content with identifier: " + identifier) + TelemetryManager.error("Entry is not found in external-store for object with identifier: " + identifier) + ResponseHandler.ERROR(ResponseCode.RESOURCE_NOT_FOUND, ResponseCode.RESOURCE_NOT_FOUND.code().toString, "Entry is not found in external-store for object with identifier: " + identifier) } }) } catch { @@ -95,7 +98,6 @@ class ExternalStore(keySpace: String , table: String , primaryKey: java.util.Lis throw new ServerException(ErrorCodes.ERR_SYSTEM_EXCEPTION.name, "Exception Occurred While Reading The Record. Exception is : " + e.getMessage) } } - def delete(identifiers: List[String])(implicit ec: ExecutionContext): Future[Response] = { val delete = QueryBuilder.delete() import scala.collection.JavaConverters._ @@ -196,4 +198,4 @@ class ExternalStore(keySpace: String , table: String , primaryKey: java.util.Lis p.future } } -} \ No newline at end of file +} diff --git a/schemas/dialcode/1.0/config.json b/schemas/dialcode/1.0/config.json new file mode 100644 index 000000000..18f157204 --- /dev/null +++ b/schemas/dialcode/1.0/config.json @@ -0,0 +1,45 @@ +{ + "objectType": "dialcode", + "version": "disable", + "versionCheckMode": "OFF", + "external": { + "tableName": "dialcode_batch", + "properties": { + "processid": { + "type": "uuid" + }, + "channel": { + "type": "string" + }, + "publisher": { + "type": "string" + }, + "dialcodes": { + "type": "array", + "items": { + "type": "string" + } + }, + "config": { + "type": "object", + "patternProperties": { + "^[a-zA-Z0-9_]+$": { + "type": "string" + } + }, + "additionalProperties": false + }, + "status": { + "type": "integer" + }, + "url": { + "type": "string" + }, + "created_on": { + "type": "timestamp" + } + }, + "primaryKey": ["processid"] + }, + "schema_restrict_api": false +} \ No newline at end of file diff --git a/schemas/dialcode/1.0/schema.json b/schemas/dialcode/1.0/schema.json new file mode 100644 index 000000000..b2623acb2 --- /dev/null +++ b/schemas/dialcode/1.0/schema.json @@ -0,0 +1,51 @@ +{ + "$id": "dialcode-schema.json", + "$schema": "http://json-schema.org/draft-07/schema#", + "title": "Dialcode Batch", + "type": "object", + "required": [ + "processid", + "channel", + "publisher", + "created_on", + "dialcodes", + "config", + "status", + "url" + ], + "properties": { + "processid": { + "type": "string" + }, + "channel": { + "type": "string" + }, + "publisher": { + "type": "string" + }, + "created_on": { + "type": "string" + }, + "dialcodes": { + "type": "array", + "items": { + "type": "string" + } + }, + "config": { + "type": "object", + "patternProperties": { + "^[a-zA-Z0-9_]+$": { + "type": "string" + } + }, + "additionalProperties": false + }, + "status": { + "type": "integer" + }, + "url": { + "type": "string" + } + } +} \ No newline at end of file