From 70ab70587ce75213789effaa404760e081a35a79 Mon Sep 17 00:00:00 2001 From: Pradyumna Nagendra Date: Thu, 30 Jul 2020 12:31:31 +0530 Subject: [PATCH 1/3] Issue #TG-375 feat: ReplayIssueCertificate job --- .../main/scala/ReplayIssueCertificates.scala | 38 +++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala diff --git a/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala new file mode 100644 index 000000000..b210b101b --- /dev/null +++ b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala @@ -0,0 +1,38 @@ +import java.util.UUID + +import com.datastax.spark.connector._ +import com.datastax.spark.connector.rdd.CassandraTableScanRDD +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.ekstep.analytics.framework.{Dispatcher, FrameworkContext, OutputDispatcher} +import org.ekstep.analytics.framework.util.{CommonUtil, JSONUtils} + +object ReplayIssueCertificates { + + case class Event(eid: String, ets:Long, mid: String, actor: Map[String, AnyRef], context: Map[String, AnyRef], `object`: Map[String, AnyRef], edata: Map[String, AnyRef]) + + val batchSize = 2 + def prepareEvent(batchId: String, courseId: String, usersList: List[List[String]]): Seq[String] = { + usersList.map(users => { + val actor = Map("id" -> "Course Certificate Generator", "type" -> "System") + val context = Map("id" -> "org.sunbird.platform", "ver" -> "1.0") + val obj = Map("id" -> (batchId + "_" + courseId), "type" -> "CourseCertificateGeneration") + val edata = Map("batchId" -> batchId, "courseId" -> courseId, "userIds" -> users, "action" -> "issue-certificate", "iteration" -> 1.asInstanceOf[AnyRef]) + val event = Event("BE_JOB_REQUEST", System.currentTimeMillis, ("LP." + System.currentTimeMillis + "." + UUID.randomUUID), actor, context, obj, edata) + JSONUtils.serialize(event) + }).toSeq + } + + def main(sc: SparkContext, batchId: String, courseId: String, env: String, kafkaBrokerList: String): Unit = { + implicit val sparkContext = sc + implicit val fc = new FrameworkContext() + val data = sc.cassandraTable("sunbird_courses", "user_enrolments").select("userid", "batchid", "courseid", "status").where("courseid = ?", courseId).where("batchid = ?", batchId).cache() + val userIds = data.collect().map(row => row.getString("userid")).toList + val usersList = userIds.grouped(batchSize).toList + val event: RDD[String] = sc.parallelize[String](prepareEvent(batchId, courseId, usersList)) + val config = Map("topic" -> (env +".certificate.job.request"), "brokerList" -> kafkaBrokerList) + OutputDispatcher.dispatch(Dispatcher("kafka", config), event); + println("Number of events pushed are: " + usersList.size) + } + +} From 50124bf9bcf17cecce41dca5b220779357cf8f91 Mon Sep 17 00:00:00 2001 From: Pradyumna Nagendra Date: Thu, 30 Jul 2020 12:34:40 +0530 Subject: [PATCH 2/3] Issue #TG-375 feat: ReplayIssueCertificate job --- adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala index b210b101b..3635d6b9d 100644 --- a/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala +++ b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala @@ -11,7 +11,7 @@ object ReplayIssueCertificates { case class Event(eid: String, ets:Long, mid: String, actor: Map[String, AnyRef], context: Map[String, AnyRef], `object`: Map[String, AnyRef], edata: Map[String, AnyRef]) - val batchSize = 2 + val batchSize = 10 def prepareEvent(batchId: String, courseId: String, usersList: List[List[String]]): Seq[String] = { usersList.map(users => { val actor = Map("id" -> "Course Certificate Generator", "type" -> "System") From a61c093b08c1043698ecab10b7ab6cffd2fa9ff3 Mon Sep 17 00:00:00 2001 From: Pradyumna Nagendra Date: Mon, 3 Aug 2020 13:25:30 +0530 Subject: [PATCH 3/3] Issue #TG-375 feat: ReplayIssueCertificate job --- .../src/main/scala/ReplayIssueCertificates.scala | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala index 3635d6b9d..8fd784953 100644 --- a/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala +++ b/adhoc-scripts/src/main/scala/ReplayIssueCertificates.scala @@ -12,25 +12,25 @@ object ReplayIssueCertificates { case class Event(eid: String, ets:Long, mid: String, actor: Map[String, AnyRef], context: Map[String, AnyRef], `object`: Map[String, AnyRef], edata: Map[String, AnyRef]) val batchSize = 10 - def prepareEvent(batchId: String, courseId: String, usersList: List[List[String]]): Seq[String] = { + def prepareEvent(batchId: String, courseId: String, usersList: List[List[String]], reIssue: Boolean): Seq[String] = { usersList.map(users => { val actor = Map("id" -> "Course Certificate Generator", "type" -> "System") val context = Map("id" -> "org.sunbird.platform", "ver" -> "1.0") val obj = Map("id" -> (batchId + "_" + courseId), "type" -> "CourseCertificateGeneration") - val edata = Map("batchId" -> batchId, "courseId" -> courseId, "userIds" -> users, "action" -> "issue-certificate", "iteration" -> 1.asInstanceOf[AnyRef]) + val edata = Map("batchId" -> batchId, "courseId" -> courseId, "userIds" -> users, "action" -> "issue-certificate", "iteration" -> 1.asInstanceOf[AnyRef], "reIssue"-> reIssue.asInstanceOf[AnyRef]) val event = Event("BE_JOB_REQUEST", System.currentTimeMillis, ("LP." + System.currentTimeMillis + "." + UUID.randomUUID), actor, context, obj, edata) JSONUtils.serialize(event) }).toSeq } - def main(sc: SparkContext, batchId: String, courseId: String, env: String, kafkaBrokerList: String): Unit = { + def main(sc: SparkContext, batchId: String, courseId: String, env: String, kafkaBrokerList: String, reIssue: Boolean = false): Unit = { implicit val sparkContext = sc implicit val fc = new FrameworkContext() val data = sc.cassandraTable("sunbird_courses", "user_enrolments").select("userid", "batchid", "courseid", "status").where("courseid = ?", courseId).where("batchid = ?", batchId).cache() val userIds = data.collect().map(row => row.getString("userid")).toList val usersList = userIds.grouped(batchSize).toList - val event: RDD[String] = sc.parallelize[String](prepareEvent(batchId, courseId, usersList)) - val config = Map("topic" -> (env +".certificate.job.request"), "brokerList" -> kafkaBrokerList) + val event: RDD[String] = sc.parallelize[String](prepareEvent(batchId, courseId, usersList, reIssue)) + val config = Map("topic" -> (env +".coursebatch.certificate.request"), "brokerList" -> kafkaBrokerList) OutputDispatcher.dispatch(Dispatcher("kafka", config), event); println("Number of events pushed are: " + usersList.size) }