diff --git a/.gitignore b/.gitignore
index ae5ec9f..6b42b2d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -14,3 +14,6 @@ server.pid
*.eml
/dist/
.cache
+**/target
+.idea
+*.iml
diff --git a/pom.xml b/pom.xml
new file mode 100644
index 0000000..f623696
--- /dev/null
+++ b/pom.xml
@@ -0,0 +1,66 @@
+
+
+
+ 4.0.0
+ sunbird-viewer-service
+ sunbird-viewer-service
+
+ viewer-core
+ viewer-actors
+ viewer-service
+
+ pom
+ org.sunbird
+ 1.0
+
+
+
+ central
+ Central Repository
+ https://repo.maven.apache.org/maven2
+ default
+
+ false
+
+
+
+
+
+ UTF-8
+ UTF-8
+ 1.4.0
+ 2.12
+ 2.12.8
+ 11
+ 2.7.2
+ 1.0.0-rc5
+ 1.0.0-beta3
+ 2.5.22
+
+
+
+
+ org.apache.maven.plugins
+ maven-compiler-plugin
+ 3.8.1
+
+ 11
+ 11
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+ ${scoverage.plugin.version}
+
+ ${scala.version}
+ true
+ true
+ .*RoutesPrefix.*;.*Routes.*;.*ReverseRoutes.*
+
+
+
+
+
+
diff --git a/viewer-actors/pom.xml b/viewer-actors/pom.xml
new file mode 100644
index 0000000..f06bad6
--- /dev/null
+++ b/viewer-actors/pom.xml
@@ -0,0 +1,134 @@
+
+
+
+ sunbird-viewer-service
+ org.sunbird
+ 1.0
+
+ 4.0.0
+ jar
+ viewer-actors
+
+
+
+ jcenter-repo
+ Jcenter Repo
+ https://jcenter.bintray.com/
+
+
+
+
+ 11
+ 11
+ UTF-8
+
+
+
+
+ org.sunbird
+ viewer-core
+ 1.0
+
+
+
+ com.microsoft.azure
+ azure-storage
+ 3.0.0
+
+
+ org.apache.httpcomponents
+ httpclient
+ 4.5.1
+
+
+ com.typesafe.akka
+ akka-actor_${scala.maj.version}
+ ${akka.version}
+
+
+ org.scalameta
+ scalameta_${scala.maj.version}
+ 4.3.0
+
+
+
+
+ Test Dependencies
+
+ com.typesafe.akka
+ akka-testkit_${scala.maj.version}
+ ${akka.version}
+ test
+
+
+ org.scalatest
+ scalatest_${scala.maj.version}
+ 3.0.6
+ test
+
+
+ org.cassandraunit
+ cassandra-unit
+ 3.11.2.0
+ test
+
+
+ org.mockito
+ mockito-core
+ 3.3.3
+ test
+
+
+
+
+ src/main/scala
+ src/test/scala
+
+
+ net.alchim31.maven
+ scala-maven-plugin
+ 4.4.0
+
+ 11
+ 11
+ false
+
+
+
+
+ compile
+ testCompile
+
+
+
+
+
+ org.scalatest
+ scalatest-maven-plugin
+ 1.0
+
+
+ test
+ test
+
+ test
+
+
+
+
+
+ org.scoverage
+ scoverage-maven-plugin
+ ${scoverage.plugin.version}
+
+ ${scala.version}
+ true
+ true
+
+
+
+
+
+
\ No newline at end of file
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/Models.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/Models.scala
new file mode 100644
index 0000000..5be8888
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/Models.scala
@@ -0,0 +1,102 @@
+package org.sunbird.viewer
+
+import org.sunbird.viewer.core.APIResponse
+
+import java.util.UUID
+
+object Models {
+}
+
+object ResponseCode extends Enumeration {
+ type Code = Value
+ val OK, CLIENT_ERROR, SERVER_ERROR = Value
+}
+//type StatusCode = Value
+object StatusCode extends Enumeration {
+ type status = Int
+ val START,UPDATE = Value("1")
+ val END = Value("2")
+}
+object ProgressCode extends Enumeration {
+ type status = Int
+ val START= Value("1")
+ val END = Value("100")
+}
+
+
+object Constants{
+ val VIEW_START_REQUEST = "api.view.start"
+ val VIEW_END_REQUEST = "api.view.end"
+ val VIEW_UPDATE_REQUEST = "api.view.update"
+ val VIEW_READ_REQUEST = "api.view.read"
+ val VIEW_SUMMARY_LIST_API_ID = "api.summary.list"
+ val VIEW_SUMMARY_READ_API_ID = "api.summary.read"
+ val SUNBIRD_COURSES_KEYSPACE= "sunbird_courses"
+ val CONTENT_CONSUMPTION_TABLE="user_content_consumption_new"
+ val USER_ENROLMENTS_TABLE="user_enrolments"
+ val USER_ACTIVITY_TABLE="user_activity_agg"
+ val CONTENT_START_STATUS = 1
+ val CONTENT_END_ = 1
+}
+// Common Class
+
+case class Params(resmsgid: String, msgid: String, err: String, status: String, errmsg: Map[String,String], client_key: Option[String] = None)
+case class Response(id: String, ver: String, ts: String, params: Params, responseCode: String, result: Option[Map[String, AnyRef]]) extends APIResponse
+
+
+case class BaseRequest(`type`:String,request: String)
+
+trait BaseViewRequest {
+ def userId: String
+ def contentId : String
+ def collectionId: Option[String]
+ def contextId: Option[String]
+}
+
+case class ViewRequestBody(id: String, ver: String, ts: String, request: Map[String,AnyRef], params: Option[Params])
+
+
+case class StartRequest(userId: String, contentId:String,collectionId :Option[String],contextId:Option[String])
+ extends BaseViewRequest {
+ def validateRequest: Either[Map[String,AnyRef],StartRequest] ={
+ if(null == userId || userId.isEmpty)
+ Left(Map("request.userId" -> "cannot be empty"))
+ else if(null == contentId || contentId.isEmpty)
+ Left(Map("request.contentId" -> "cannot be empty"))
+ else
+ Right(StartRequest(userId,contentId,Some(collectionId.getOrElse(contentId)),
+ Some(contextId.getOrElse(collectionId.getOrElse(contentId)
+ ))))
+ }
+}
+
+case class ViewUpdateRequest(userId: String, contentId:String, batchId:String,collectionId :String,progress: Int)
+
+case class ViewEndRequest(userId: String, contentId:String, batchId:String,collectionId :String,
+ assessments: List[Map[String,AnyRef]])
+
+
+case class ContentEndEvent(eid: String = "BE_JOB_REQUEST", ets: Long = System.currentTimeMillis(),
+ mid: String = UUID.randomUUID.toString, actor: TypeId, context: Context, `object`: TypeId,
+ edata: EData, action: String, iteration: Int,
+ batchId: String,userId: String, courseId: String)
+case class TypeId(`type`: String, id: String)
+case class Context(pdata: PData)
+case class PData(ver: String , id : String)
+case class EData(contents: List[Content])
+case class Content(contentId: String, status:Int)
+
+case class ViewerSummaryRequest(userId: String, collectionId: Option[String], contextId: Option[String])
+
+case class Summary(userId: String, collectionId: String, contextId: String, enrolledDate: Long, active: Boolean,
+ contentStatus: Map[String, Integer], assessmentStatus: Map[String, Map[String, AnyRef]],
+ collection: Map[String, AnyRef], issuedCertificates: java.util.List[java.util.Map[String, String]],
+ completedOn: Long, progress: Int, status: Int)
+
+case class EnrolmentData(userId: String, collectionId: String, contextId: String, enrolledDate: Long, active: Boolean,
+ issuedCertificates: java.util.List[java.util.Map[String, String]], completedOn: Long, progress: Int, status: Int)
+
+case class UserActivityData(userId: String, collectionId: String, contextId: String, contentStatus: Map[String, Integer],
+ assessmentStatus: Map[String, Map[String, AnyRef]]) {
+ def this() = this("", "", "", Map(), Map())
+}
\ No newline at end of file
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/actors/HealthCheckActor.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/HealthCheckActor.scala
new file mode 100644
index 0000000..6f8bd97
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/HealthCheckActor.scala
@@ -0,0 +1,46 @@
+package org.sunbird.viewer.actors
+
+import akka.actor.Actor
+import org.sunbird.viewer.core.{CassandraUtil, JSONUtils, KafkaUtil, RedisUtil}
+
+import javax.inject.Inject
+
+case class ServiceHealth(name: String, healthy: Boolean, errMsg: Option[String] = None)
+class HealthCheckActor @Inject() extends Actor {
+
+ def receive: Receive = {
+ case "checkhealth" => sender() ! getHealthStatus
+ case "checkserviceshealth" => sender() ! getServiceHealthStatus(List("redis","cassandra","kafka"))
+ }
+
+
+ def getHealthStatus(): String = {
+
+ val result = Map(
+ "name" -> "viewer.service.health.api",
+ "healthy" -> "true")
+ JSONUtils.serialize(result)
+ }
+
+
+ def getServiceHealthStatus(services: List[String]): String = {
+ val status = services.map(service => {
+ try {
+ service match {
+ case "redis" => ServiceHealth(service, new RedisUtil().checkConnection)
+ case "cassandra" => ServiceHealth(service, new CassandraUtil().checkConnection())
+ case "kafka" => ServiceHealth(service, new KafkaUtil().checKConnection())
+ }
+ }
+ catch {
+ case ex: Exception =>
+ ServiceHealth(service, false, Some(ex.getMessage))
+ }
+ })
+ val result= Map(
+ "name" -> "viewer.service.health.api",
+ "services" -> status)
+ JSONUtils.serialize(result)
+ }
+}
+
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/actors/StartUpdateActor.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/StartUpdateActor.scala
new file mode 100644
index 0000000..8082425
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/StartUpdateActor.scala
@@ -0,0 +1,41 @@
+package org.sunbird.viewer.actors
+
+import akka.actor.Actor
+import org.sunbird.viewer._
+import org.sunbird.viewer.core.{CassandraUtil, JSONUtils}
+import org.sunbird.viewer.util.{QueryUtil, ResponseUtil}
+
+import javax.inject.Inject
+
+
+
+class StartUpdateActor @Inject()(cassandraUtil: CassandraUtil) extends Actor {
+ val convertInt = (text:String) => Integer.valueOf(text)
+
+ def receive: Receive = {
+ case BaseRequest("start", request) => sender() ! start(request)
+ case BaseRequest("update", request) => sender() ! update(request)
+ }
+
+ def start(request: String): Response = {
+ val requestBody = JSONUtils.deserialize[ViewRequestBody](request).request
+ val startRequest = JSONUtils.deserialize[StartRequest](JSONUtils.serialize(requestBody))
+ startRequest.validateRequest match {
+ case Left(error) => ResponseUtil.clientErrorResponse(Constants.VIEW_START_REQUEST, error)
+ case Right(request) =>
+ val statement = QueryUtil.getInsertViewStartStatement(Constants.CONTENT_CONSUMPTION_TABLE,request)
+ cassandraUtil.executeStatement(statement,List(convertInt(StatusCode.START.toString),
+ convertInt(ProgressCode.START.toString)))
+ val response = JSONUtils.deserialize[Map[String, AnyRef]](s"""{"${startRequest.contentId}":"Progress started"}""")
+ ResponseUtil.OK(Constants.VIEW_START_REQUEST, response)
+ }
+ }
+
+ def update(request: String): String = {
+ val result = Map(
+ "name" -> "viewer.service.health.api",
+ "services" -> request)
+ JSONUtils.serialize(result)
+ }
+}
+
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/actors/ViewerSummaryActor.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/ViewerSummaryActor.scala
new file mode 100644
index 0000000..c3a7eb6
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/actors/ViewerSummaryActor.scala
@@ -0,0 +1,131 @@
+package org.sunbird.viewer.actors
+
+import akka.actor.Actor
+import com.datastax.driver.core.TypeTokens
+import org.sunbird.viewer.core.{AppConfig, CassandraUtil, HTTPResponse, HttpUtil, JSONUtils, ServerException}
+import org.sunbird.viewer.util.{QueryUtil, ResponseUtil}
+import org.sunbird.viewer._
+
+import java.text.SimpleDateFormat
+import javax.inject.Inject
+import scala.collection.JavaConverters._
+
+class ViewerSummaryActor @Inject()(cassandraUtil: CassandraUtil, httpUtil: HttpUtil) extends Actor{
+ val baseUrl = {if(AppConfig.conf.hasPath("service.search.basePath"))AppConfig.getString("service.search.basePath") else "https://dev.sunbirded.org"}
+ val searchUrl = {if(AppConfig.conf.hasPath("service.search.basePath"))AppConfig.getString("service.search.basePath") else "/api/content/v1/search"}
+
+ override def receive: Receive = {
+ case BaseRequest("summary-list", userId) => sender() ! summaryList(userId)
+ case BaseRequest("summary-read", request) => sender() ! summaryRead(request)
+ }
+
+ def summaryList(userId: String): Response = {
+ // cache data
+
+ try {
+ // read all enrolments with status = 2
+ val enrolments: Map[String, EnrolmentData] = readEnrolments(userId)
+ if(!enrolments.isEmpty) {
+ // read all user-activity-agg
+ val activityData = readUserActivity(userId, enrolments.keySet.toList)
+ // get all content metadata from search
+ val collectionMetadata = searchMetadata(enrolments.keySet.toList)
+ // merge all and send the response
+ prepareResponse(Constants.VIEW_SUMMARY_LIST_API_ID, enrolments, activityData, collectionMetadata)
+ } else {
+ // return empty response
+ ResponseUtil.OK(Constants.VIEW_SUMMARY_LIST_API_ID, Map("summary" -> List()))
+ }
+
+
+ } catch {
+ case e: Exception => ResponseUtil
+ .serverErrorResponse(Constants.VIEW_SUMMARY_LIST_API_ID, Map("message" -> e.getMessage))
+ }
+ }
+
+ def summaryRead(request: String): Response = {
+ val req = JSONUtils.deserialize[ViewerSummaryRequest](request)
+ // cache data
+
+ try {
+ // read all enrolments with status = 2
+ val enrolments: Map[String, EnrolmentData] = readEnrolments(req.userId, req.collectionId.get, req.contextId.get)
+ if(!enrolments.isEmpty) {
+ // read all user-activity-agg
+ val activityData = readUserActivity(req.userId, enrolments.keySet.toList)
+ // get all content metadata from search
+ val collectionMetadata = searchMetadata(enrolments.keySet.toList)
+ // merge all and send the response
+ prepareResponse(Constants.VIEW_SUMMARY_READ_API_ID, enrolments, activityData, collectionMetadata)
+ } else {
+ // return empty response
+ ResponseUtil.OK(Constants.VIEW_SUMMARY_READ_API_ID, Map("summary" -> List()))
+ }
+ } catch {
+ case e: Exception => ResponseUtil
+ .serverErrorResponse(Constants.VIEW_SUMMARY_READ_API_ID, Map("message" -> e.getMessage))
+ }
+ }
+
+ def readEnrolments(userId: String, collectionId: String = null, batchId: String = null):Map[String, EnrolmentData] = {
+ val dateFormatter = new SimpleDateFormat("dd")
+ val query = QueryUtil.getEnrolments(ViewerSummaryRequest(userId, Option(collectionId), Option(batchId)))
+ val rows = cassandraUtil.find(query)
+ if(null != rows && !rows.asScala.isEmpty) {
+ rows.asScala.map(row => row.getString("courseid") -> EnrolmentData(row.getString("userid"), row.getString("courseid"),
+ row.getString("batchid"), row.getTimestamp("enrolled_date").getTime, row.getBool("active"),
+ row.getList("issued_certificates", TypeTokens.mapOf(classOf[String], classOf[String])),
+ row.getTimestamp("completedon").getTime, row.getInt("progress"), row.getInt("status"))).toMap
+ } else {
+ Map()
+ }
+ }
+
+ def readUserActivity(userId: String, collectionIds: List[String], contextIds: List[String] = List()) = {
+ val query = QueryUtil.getUserActivities(userId, collectionIds)
+ val rows = cassandraUtil.find(query)
+ //TODO: Need to verify if context filtering is required or not
+ //rows.asScala.filter(row => contextIds.contains(row.getString("context_id").replaceAll("cb:","")))
+ rows.asScala.map(row => {
+ val contentStatus = row.getMap[String, Integer]("content_status", classOf[String], classOf[Integer]).asScala.toMap
+ row.getString("activity_id") -> UserActivityData(row.getString("user_id"), row.getString("activity_id"),row.getString("context_id").replaceAll("cb:",""),
+ contentStatus , getAssessStatus(row.getMap[String, Integer]("agg", classOf[String], classOf[Integer]), contentStatus.keys.toList))
+ }).toMap
+ }
+
+ def getAssessStatus(aggs: java.util.Map[String, Integer], leafNodes: List[String]): Map[String, Map[String, AnyRef]] = {
+ val filteredIds = leafNodes.filter(id => 0 != aggs.getOrDefault("score:" + id, 0))
+ if(!filteredIds.isEmpty) {
+ filteredIds.map(id => id -> Map("score" -> aggs.getOrDefault("score:" + id, 0).asInstanceOf[AnyRef], "maxScore" -> aggs.getOrDefault("max_score:" + id, 0).asInstanceOf[AnyRef]).toMap).toMap
+ } else Map()
+ }
+
+ def searchMetadata(collectionIds: List[String]) = {
+ val request = s"""{"request":{"filters":{"identifier": ${collectionIds.mkString("[\"", "\",\"", "\"]")},"status":"Live","mimeType":"application/vnd.ekstep.content-collection","trackable.enabled":"Yes"},"fields":[]}}"""
+ val httpResponse: HTTPResponse = httpUtil.post(baseUrl + searchUrl, request, Map[String, String]("Content-Type" -> "application/json"))
+ if(200 == httpResponse.status) {
+ val response = JSONUtils.deserialize[Response](httpResponse.body)
+ response.result.getOrElse("result", Map()).asInstanceOf[Map[String, AnyRef]].getOrElse("content", List[java.util.Map[String, AnyRef]]()).asInstanceOf[List[Map[String, AnyRef]]].map(content => (content.getOrElse("identifier","").asInstanceOf[String] -> content)).toMap
+ } else {
+ throw ServerException(s"Error while searching collection metadata : ${httpResponse.status} : ${httpResponse.body}")
+ }
+ }
+
+ def prepareResponse(apiId:String, enrolments: Map[String, EnrolmentData], activityData: Map[String, UserActivityData],
+ collectionMetadata: Map[String, Map[String, AnyRef]]) = {
+ val summary = enrolments.map(enrolment => {
+ val enrolmentData = enrolment._2
+ val userActivityData = activityData.getOrElse(enrolment._1, classOf[UserActivityData].getDeclaredConstructor().newInstance())
+ val collectionData = collectionMetadata.getOrElse(enrolment._1, Map())
+
+ Summary(enrolmentData.userId, enrolmentData.collectionId, enrolmentData.contextId, enrolmentData.enrolledDate, enrolmentData.active,
+ userActivityData.contentStatus, userActivityData.assessmentStatus, collectionData, enrolmentData.issuedCertificates, enrolmentData.completedOn,
+ enrolmentData.progress, enrolmentData.status)
+ }).toList
+
+ ResponseUtil.OK(apiId, Map("summary" -> summary))
+ }
+
+
+}
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/util/QueryUtil.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/util/QueryUtil.scala
new file mode 100644
index 0000000..40ea515
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/util/QueryUtil.scala
@@ -0,0 +1,82 @@
+package org.sunbird.viewer.util
+
+import com.datastax.driver.core.querybuilder.QueryBuilder.bindMarker
+import com.datastax.driver.core.querybuilder.{Insert, QueryBuilder => QB}
+import org.sunbird.viewer.{BaseViewRequest, Constants, ViewerSummaryRequest}
+
+import java.util.Date
+import scala.collection.JavaConverters._
+
+/**
+ * STATEMENTS
+ * INSERT INTO sunbird_courses.user_content_consumption_new (userid,contentid,collectionid,batchid,status,progress) VALUES (?,?,?,?,?,?) IF NOT EXISTS;
+ * UPDATE sunbird_courses.user_content_consumption_new SET progress=? WHERE userid=? AND contentid=? and collectionid=? and contextid=?;
+ */
+
+object QueryUtil {
+
+ def getBaseInsert(table:String,request:BaseViewRequest): Insert = {
+ QB.insertInto(Constants.SUNBIRD_COURSES_KEYSPACE, table).ifNotExists()
+ .value("userid", request.userId).value("contentid", request.contentId)
+ .value("collectionId", request.collectionId.get).value("contextid", request.contextId.get)
+ }
+
+ def getInsertViewStartStatement(table: String,request: BaseViewRequest): String = {
+ getBaseInsert(table,request)
+ .value("status", bindMarker()).value("progress", bindMarker())
+ .value("last_updated_time", new Date()).toString
+ }
+
+ def getUpdateViewUpdateStatement(table: String): String = {
+ QB.update(Constants.SUNBIRD_COURSES_KEYSPACE, table)
+ .`with`(QB.set("progress", bindMarker())).and(QB.set("last_updated_time", new Date()))
+ .where(QB.eq("userid", bindMarker()))
+ .and(QB.eq("contentid", bindMarker())).toString
+ }
+
+ def getUpdateViewEndStatement(table: String): String = {
+ QB.update(Constants.SUNBIRD_COURSES_KEYSPACE, table)
+ .`with`(QB.set("progress", bindMarker())).and(QB.set("status", bindMarker()))
+ .and(QB.set("last_updated_time", new Date()))
+ .where(QB.eq("userid", bindMarker()))
+ .and(QB.eq("contentid", bindMarker())).ifExists().toString
+ }
+
+ def getUpdateEndEnrolStatement(table: String): String = {
+ QB.update(Constants.SUNBIRD_COURSES_KEYSPACE, table)
+ .`with`(QB.appendAll("contentstatus", bindMarker()))
+ .where(QB.eq("userid", bindMarker()))
+ .and(QB.eq("courseid", bindMarker()))
+ .and(QB.eq("batchid", bindMarker())).toString
+ }
+
+ def getViewReadStatement(table:String) : String = {
+ QB.select.from(Constants.SUNBIRD_COURSES_KEYSPACE,table)
+ .where(QB.eq("userid",bindMarker()))
+ .and(QB.eq("courseid",bindMarker()))
+ .and(QB.eq("batchid",bindMarker())).toString
+ }
+
+ def getEnrolments(request: ViewerSummaryRequest): String = {
+ var select = QB.select.from(Constants.SUNBIRD_COURSES_KEYSPACE, Constants.USER_ENROLMENTS_TABLE)
+ .where(QB.eq("userid", request.userId))
+ select = request.collectionId.map(x => select.and(QB.eq("courseid", x))).getOrElse(select)
+ select = request.contextId.map(x => select.and(QB.eq("batchid", x))).getOrElse(select)
+ select.toString
+ }
+
+ /**
+ * To fetch user activity agg
+ * cannot include context_id to in clause, as cassandra throws error
+ * @param userId
+ * @param collectionIds
+ * @param contextIds
+ * @return
+ */
+ def getUserActivities(userId: String, collectionIds: List[String]): String = {
+ val select = QB.select.from(Constants.SUNBIRD_COURSES_KEYSPACE, Constants.USER_ACTIVITY_TABLE)
+ .where(QB.eq("activity_type","Course")).and(QB.eq("user_id", userId))
+ .and(QB.in("activity_id", collectionIds.asJava))
+ select.toString
+ }
+}
diff --git a/viewer-actors/src/main/scala/org/sunbird/viewer/util/ResponseUtil.scala b/viewer-actors/src/main/scala/org/sunbird/viewer/util/ResponseUtil.scala
new file mode 100644
index 0000000..cb97c5a
--- /dev/null
+++ b/viewer-actors/src/main/scala/org/sunbird/viewer/util/ResponseUtil.scala
@@ -0,0 +1,25 @@
+package org.sunbird.viewer.util
+
+import org.joda.time.{DateTime, DateTimeZone}
+import org.sunbird.viewer.core.APIResponseUtil
+import org.sunbird.viewer.{Params, Response, ResponseCode}
+
+import java.util.UUID
+
+object ResponseUtil extends APIResponseUtil{
+ override def OK(apiId: String, result: Map[String, AnyRef]): Response = {
+ Response(apiId, "1.0", df.print(DateTime.now(DateTimeZone.UTC).getMillis), Params(UUID.randomUUID.toString, null, null, "successful", null), ResponseCode.OK.toString, Option(result))
+ }
+
+ override def clientErrorResponse(apiId: String, errResponse: Map[String, AnyRef]): Response = {
+ Response(apiId, "1.0", df.print(System.currentTimeMillis()),
+ Params(UUID.randomUUID().toString, null, ResponseCode.CLIENT_ERROR.toString, "failed", null),
+ ResponseCode.CLIENT_ERROR.toString, Some(errResponse))
+ }
+
+ override def serverErrorResponse(apiId: String, errResponse: Map[String, AnyRef]): Response = {
+ Response(apiId, "1.0", df.print(System.currentTimeMillis()),
+ Params(UUID.randomUUID().toString, null, ResponseCode.SERVER_ERROR.toString, "failed", null),
+ ResponseCode.SERVER_ERROR.toString, Some(errResponse))
+ }
+}
diff --git a/viewer-actors/src/test/resources/application.conf b/viewer-actors/src/test/resources/application.conf
new file mode 100644
index 0000000..1eb1193
--- /dev/null
+++ b/viewer-actors/src/test/resources/application.conf
@@ -0,0 +1,4 @@
+cassandra.connection.host = localhost
+cassandra.connection.port=9042
+kafka.broker.list="localhost:9092"
+kafka_topics_instruction=test.course
\ No newline at end of file
diff --git a/viewer-actors/src/test/resources/cassandra-unit.yaml b/viewer-actors/src/test/resources/cassandra-unit.yaml
new file mode 100755
index 0000000..a965a8f
--- /dev/null
+++ b/viewer-actors/src/test/resources/cassandra-unit.yaml
@@ -0,0 +1,590 @@
+# Cassandra storage config YAML
+
+# NOTE:
+# See http://wiki.apache.org/cassandra/StorageConfiguration for
+# full explanations of configuration directives
+# /NOTE
+
+# The name of the cluster. This is mainly used to prevent machines in
+# one logical cluster from joining another.
+cluster_name: 'Test Cluster'
+
+# You should always specify InitialToken when setting up a production
+# cluster for the first time, and often when adding capacity later.
+# The principle is that each node should be given an equal slice of
+# the token ring; see http://wiki.apache.org/cassandra/Operations
+# for more details.
+#
+# If blank, Cassandra will request a token bisecting the range of
+# the heaviest-loaded existing node. If there is no load information
+# available, such as is the case with a new cluster, it will pick
+# a random token, which will lead to hot spots.
+#initial_token:
+
+# See http://wiki.apache.org/cassandra/HintedHandoff
+hinted_handoff_enabled: true
+# this defines the maximum amount of time a dead host will have hints
+# generated. After it has been dead this long, new hints for it will not be
+# created until it has been seen alive and gone down again.
+max_hint_window_in_ms: 10800000 # 3 hours
+# Maximum throttle in KBs per second, per delivery thread. This will be
+# reduced proportionally to the number of nodes in the cluster. (If there
+# are two nodes in the cluster, each delivery thread will use the maximum
+# rate; if there are three, each will throttle to half of the maximum,
+# since we expect two nodes to be delivering hints simultaneously.)
+hinted_handoff_throttle_in_kb: 1024
+# Number of threads with which to deliver hints;
+# Consider increasing this number when you have multi-dc deployments, since
+# cross-dc handoff tends to be slower
+max_hints_delivery_threads: 2
+
+hints_directory: target/embeddedCassandra/hints
+
+# The following setting populates the page cache on memtable flush and compaction
+# WARNING: Enable this setting only when the whole node's data fits in memory.
+# Defaults to: false
+# populate_io_cache_on_flush: false
+
+# Authentication backend, implementing IAuthenticator; used to identify users
+# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthenticator,
+# PasswordAuthenticator}.
+#
+# - AllowAllAuthenticator performs no checks - set it to disable authentication.
+# - PasswordAuthenticator relies on username/password pairs to authenticate
+# users. It keeps usernames and hashed passwords in system_auth.credentials table.
+# Please increase system_auth keyspace replication factor if you use this authenticator.
+authenticator: AllowAllAuthenticator
+
+# Authorization backend, implementing IAuthorizer; used to limit access/provide permissions
+# Out of the box, Cassandra provides org.apache.cassandra.auth.{AllowAllAuthorizer,
+# CassandraAuthorizer}.
+#
+# - AllowAllAuthorizer allows any action to any user - set it to disable authorization.
+# - CassandraAuthorizer stores permissions in system_auth.permissions table. Please
+# increase system_auth keyspace replication factor if you use this authorizer.
+authorizer: AllowAllAuthorizer
+
+# Validity period for permissions cache (fetching permissions can be an
+# expensive operation depending on the authorizer, CassandraAuthorizer is
+# one example). Defaults to 2000, set to 0 to disable.
+# Will be disabled automatically for AllowAllAuthorizer.
+permissions_validity_in_ms: 2000
+
+
+# The partitioner is responsible for distributing rows (by key) across
+# nodes in the cluster. Any IPartitioner may be used, including your
+# own as long as it is on the classpath. Out of the box, Cassandra
+# provides org.apache.cassandra.dht.{Murmur3Partitioner, RandomPartitioner
+# ByteOrderedPartitioner, OrderPreservingPartitioner (deprecated)}.
+#
+# - RandomPartitioner distributes rows across the cluster evenly by md5.
+# This is the default prior to 1.2 and is retained for compatibility.
+# - Murmur3Partitioner is similar to RandomPartioner but uses Murmur3_128
+# Hash Function instead of md5. When in doubt, this is the best option.
+# - ByteOrderedPartitioner orders rows lexically by key bytes. BOP allows
+# scanning rows in key order, but the ordering can generate hot spots
+# for sequential insertion workloads.
+# - OrderPreservingPartitioner is an obsolete form of BOP, that stores
+# - keys in a less-efficient format and only works with keys that are
+# UTF8-encoded Strings.
+# - CollatingOPP collates according to EN,US rules rather than lexical byte
+# ordering. Use this as an example if you need custom collation.
+#
+# See http://wiki.apache.org/cassandra/Operations for more on
+# partitioners and token selection.
+partitioner: org.apache.cassandra.dht.Murmur3Partitioner
+
+# directories where Cassandra should store data on disk.
+data_file_directories:
+ - target/embeddedCassandra/data
+
+# commit log
+commitlog_directory: target/embeddedCassandra/commitlog
+
+cdc_raw_directory: target/embeddedCassandra/cdc
+
+# policy for data disk failures:
+# stop: shut down gossip and Thrift, leaving the node effectively dead, but
+# can still be inspected via JMX.
+# best_effort: stop using the failed disk and respond to requests based on
+# remaining available sstables. This means you WILL see obsolete
+# data at CL.ONE!
+# ignore: ignore fatal errors and let requests fail, as in pre-1.2 Cassandra
+disk_failure_policy: stop
+
+
+# Maximum size of the key cache in memory.
+#
+# Each key cache hit saves 1 seek and each row cache hit saves 2 seeks at the
+# minimum, sometimes more. The key cache is fairly tiny for the amount of
+# time it saves, so it's worthwhile to use it at large numbers.
+# The row cache saves even more time, but must store the whole values of
+# its rows, so it is extremely space-intensive. It's best to only use the
+# row cache if you have hot rows or static rows.
+#
+# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+#
+# Default value is empty to make it "auto" (min(5% of Heap (in MB), 100MB)). Set to 0 to disable key cache.
+key_cache_size_in_mb:
+
+# Duration in seconds after which Cassandra should
+# safe the keys cache. Caches are saved to saved_caches_directory as
+# specified in this configuration file.
+#
+# Saved caches greatly improve cold-start speeds, and is relatively cheap in
+# terms of I/O for the key cache. Row cache saving is much more expensive and
+# has limited use.
+#
+# Default is 14400 or 4 hours.
+key_cache_save_period: 14400
+
+# Number of keys from the key cache to save
+# Disabled by default, meaning all keys are going to be saved
+# key_cache_keys_to_save: 100
+
+# Maximum size of the row cache in memory.
+# NOTE: if you reduce the size, you may not get you hottest keys loaded on startup.
+#
+# Default value is 0, to disable row caching.
+row_cache_size_in_mb: 0
+
+# Duration in seconds after which Cassandra should
+# safe the row cache. Caches are saved to saved_caches_directory as specified
+# in this configuration file.
+#
+# Saved caches greatly improve cold-start speeds, and is relatively cheap in
+# terms of I/O for the key cache. Row cache saving is much more expensive and
+# has limited use.
+#
+# Default is 0 to disable saving the row cache.
+row_cache_save_period: 0
+
+# Number of keys from the row cache to save
+# Disabled by default, meaning all keys are going to be saved
+# row_cache_keys_to_save: 100
+
+# saved caches
+saved_caches_directory: target/embeddedCassandra/saved_caches
+
+# commitlog_sync may be either "periodic" or "batch."
+# When in batch mode, Cassandra won't ack writes until the commit log
+# has been fsynced to disk. It will wait up to
+# commitlog_sync_batch_window_in_ms milliseconds for other writes, before
+# performing the sync.
+#
+# commitlog_sync: batch
+# commitlog_sync_batch_window_in_ms: 50
+#
+# the other option is "periodic" where writes may be acked immediately
+# and the CommitLog is simply synced every commitlog_sync_period_in_ms
+# milliseconds.
+commitlog_sync: periodic
+commitlog_sync_period_in_ms: 10000
+
+# The size of the individual commitlog file segments. A commitlog
+# segment may be archived, deleted, or recycled once all the data
+# in it (potentially from each columnfamily in the system) has been
+# flushed to sstables.
+#
+# The default size is 32, which is almost always fine, but if you are
+# archiving commitlog segments (see commitlog_archiving.properties),
+# then you probably want a finer granularity of archiving; 8 or 16 MB
+# is reasonable.
+commitlog_segment_size_in_mb: 32
+
+# any class that implements the SeedProvider interface and has a
+# constructor that takes a Map of parameters will do.
+seed_provider:
+ # Addresses of hosts that are deemed contact points.
+ # Cassandra nodes use this list of hosts to find each other and learn
+ # the topology of the ring. You must change this if you are running
+ # multiple nodes!
+ - class_name: org.apache.cassandra.locator.SimpleSeedProvider
+ parameters:
+ # seeds is actually a comma-delimited list of addresses.
+ # Ex: ",,"
+ - seeds: "127.0.0.1"
+
+
+# For workloads with more data than can fit in memory, Cassandra's
+# bottleneck will be reads that need to fetch data from
+# disk. "concurrent_reads" should be set to (16 * number_of_drives) in
+# order to allow the operations to enqueue low enough in the stack
+# that the OS and drives can reorder them.
+#
+# On the other hand, since writes are almost never IO bound, the ideal
+# number of "concurrent_writes" is dependent on the number of cores in
+# your system; (8 * number_of_cores) is a good rule of thumb.
+concurrent_reads: 32
+concurrent_writes: 32
+
+# Total memory to use for memtables. Cassandra will flush the largest
+# memtable when this much memory is used.
+# If omitted, Cassandra will set it to 1/3 of the heap.
+# memtable_total_space_in_mb: 2048
+
+# Total space to use for commitlogs.
+# If space gets above this value (it will round up to the next nearest
+# segment multiple), Cassandra will flush every dirty CF in the oldest
+# segment and remove it.
+# commitlog_total_space_in_mb: 4096
+
+# This sets the amount of memtable flush writer threads. These will
+# be blocked by disk io, and each one will hold a memtable in memory
+# while blocked. If you have a large heap and many data directories,
+# you can increase this value for better flush performance.
+# By default this will be set to the amount of data directories defined.
+#memtable_flush_writers: 1
+
+# the number of full memtables to allow pending flush, that is,
+# waiting for a writer thread. At a minimum, this should be set to
+# the maximum number of secondary indexes created on a single CF.
+#memtable_flush_queue_size: 4
+
+# Whether to, when doing sequential writing, fsync() at intervals in
+# order to force the operating system to flush the dirty
+# buffers. Enable this to avoid sudden dirty buffer flushing from
+# impacting read latencies. Almost always a good idea on SSD:s; not
+# necessarily on platters.
+trickle_fsync: false
+trickle_fsync_interval_in_kb: 10240
+
+# TCP port, for commands and data
+storage_port: 0
+
+# SSL port, for encrypted communication. Unused unless enabled in
+# encryption_options
+ssl_storage_port: 7011
+
+# Address to bind to and tell other Cassandra nodes to connect to. You
+# _must_ change this if you want multiple nodes to be able to
+# communicate!
+#
+# Leaving it blank leaves it up to InetAddress.getLocalHost(). This
+# will always do the Right Thing *if* the node is properly configured
+# (hostname, name resolution, etc), and the Right Thing is to use the
+# address associated with the hostname (it might not be).
+#
+# Setting this to 0.0.0.0 is always wrong.
+listen_address: 127.0.0.1
+
+start_native_transport: true
+# port for the CQL native transport to listen for clients on
+native_transport_port: 9042
+
+# Whether to start the thrift rpc server.
+start_rpc: true
+
+# Address to broadcast to other Cassandra nodes
+# Leaving this blank will set it to the same value as listen_address
+# broadcast_address: 1.2.3.4
+
+# The address to bind the Thrift RPC service to -- clients connect
+# here. Unlike ListenAddress above, you *can* specify 0.0.0.0 here if
+# you want Thrift to listen on all interfaces.
+#
+# Leaving this blank has the same effect it does for ListenAddress,
+# (i.e. it will be based on the configured hostname of the node).
+rpc_address: localhost
+# port for Thrift to listen for clients on
+rpc_port: 0
+
+# enable or disable keepalive on rpc connections
+rpc_keepalive: true
+
+# Cassandra provides three options for the RPC Server:
+#
+# sync -> One connection per thread in the rpc pool (see below).
+# For a very large number of clients, memory will be your limiting
+# factor; on a 64 bit JVM, 128KB is the minimum stack size per thread.
+# Connection pooling is very, very strongly recommended.
+#
+# async -> Nonblocking server implementation with one thread to serve
+# rpc connections. This is not recommended for high throughput use
+# cases. Async has been tested to be about 50% slower than sync
+# or hsha and is deprecated: it will be removed in the next major release.
+#
+# hsha -> Stands for "half synchronous, half asynchronous." The rpc thread pool
+# (see below) is used to manage requests, but the threads are multiplexed
+# across the different clients.
+#
+# The default is sync because on Windows hsha is about 30% slower. On Linux,
+# sync/hsha performance is about the same, with hsha of course using less memory.
+rpc_server_type: sync
+
+# Uncomment rpc_min|max|thread to set request pool size.
+# You would primarily set max for the sync server to safeguard against
+# misbehaved clients; if you do hit the max, Cassandra will block until one
+# disconnects before accepting more. The defaults for sync are min of 16 and max
+# unlimited.
+#
+# For the Hsha server, the min and max both default to quadruple the number of
+# CPU cores.
+#
+# This configuration is ignored by the async server.
+#
+# rpc_min_threads: 16
+# rpc_max_threads: 2048
+
+# uncomment to set socket buffer sizes on rpc connections
+# rpc_send_buff_size_in_bytes:
+# rpc_recv_buff_size_in_bytes:
+
+# Frame size for thrift (maximum field length).
+# 0 disables TFramedTransport in favor of TSocket. This option
+# is deprecated; we strongly recommend using Framed mode.
+thrift_framed_transport_size_in_mb: 15
+
+# The max length of a thrift message, including all fields and
+# internal thrift overhead.
+thrift_max_message_length_in_mb: 16
+
+# Set to true to have Cassandra create a hard link to each sstable
+# flushed or streamed locally in a backups/ subdirectory of the
+# Keyspace data. Removing these links is the operator's
+# responsibility.
+incremental_backups: false
+
+# Whether or not to take a snapshot before each compaction. Be
+# careful using this option, since Cassandra won't clean up the
+# snapshots for you. Mostly useful if you're paranoid when there
+# is a data format change.
+snapshot_before_compaction: false
+
+# Whether or not a snapshot is taken of the data before keyspace truncation
+# or dropping of column families. The STRONGLY advised default of true
+# should be used to provide data safety. If you set this flag to false, you will
+# lose data on truncation or drop.
+auto_snapshot: false
+
+# Add column indexes to a row after its contents reach this size.
+# Increase if your column values are large, or if you have a very large
+# number of columns. The competing causes are, Cassandra has to
+# deserialize this much of the row to read a single column, so you want
+# it to be small - at least if you do many partial-row reads - but all
+# the index data is read for each access, so you don't want to generate
+# that wastefully either.
+column_index_size_in_kb: 64
+
+# Size limit for rows being compacted in memory. Larger rows will spill
+# over to disk and use a slower two-pass compaction process. A message
+# will be logged specifying the row key.
+#in_memory_compaction_limit_in_mb: 64
+
+# Number of simultaneous compactions to allow, NOT including
+# validation "compactions" for anti-entropy repair. Simultaneous
+# compactions can help preserve read performance in a mixed read/write
+# workload, by mitigating the tendency of small sstables to accumulate
+# during a single long running compactions. The default is usually
+# fine and if you experience problems with compaction running too
+# slowly or too fast, you should look at
+# compaction_throughput_mb_per_sec first.
+#
+# This setting has no effect on LeveledCompactionStrategy.
+#
+# concurrent_compactors defaults to the number of cores.
+# Uncomment to make compaction mono-threaded, the pre-0.8 default.
+#concurrent_compactors: 1
+
+# Multi-threaded compaction. When enabled, each compaction will use
+# up to one thread per core, plus one thread per sstable being merged.
+# This is usually only useful for SSD-based hardware: otherwise,
+# your concern is usually to get compaction to do LESS i/o (see:
+# compaction_throughput_mb_per_sec), not more.
+#multithreaded_compaction: false
+
+# Throttles compaction to the given total throughput across the entire
+# system. The faster you insert data, the faster you need to compact in
+# order to keep the sstable count down, but in general, setting this to
+# 16 to 32 times the rate you are inserting data is more than sufficient.
+# Setting this to 0 disables throttling. Note that this account for all types
+# of compaction, including validation compaction.
+compaction_throughput_mb_per_sec: 16
+
+# Track cached row keys during compaction, and re-cache their new
+# positions in the compacted sstable. Disable if you use really large
+# key caches.
+#compaction_preheat_key_cache: true
+
+# Throttles all outbound streaming file transfers on this node to the
+# given total throughput in Mbps. This is necessary because Cassandra does
+# mostly sequential IO when streaming data during bootstrap or repair, which
+# can lead to saturating the network connection and degrading rpc performance.
+# When unset, the default is 200 Mbps or 25 MB/s.
+# stream_throughput_outbound_megabits_per_sec: 200
+
+# How long the coordinator should wait for read operations to complete
+read_request_timeout_in_ms: 5000
+# How long the coordinator should wait for seq or index scans to complete
+range_request_timeout_in_ms: 10000
+# How long the coordinator should wait for writes to complete
+write_request_timeout_in_ms: 2000
+# How long a coordinator should continue to retry a CAS operation
+# that contends with other proposals for the same row
+cas_contention_timeout_in_ms: 1000
+# How long the coordinator should wait for truncates to complete
+# (This can be much longer, because unless auto_snapshot is disabled
+# we need to flush first so we can snapshot before removing the data.)
+truncate_request_timeout_in_ms: 60000
+# The default timeout for other, miscellaneous operations
+request_timeout_in_ms: 10000
+
+# Enable operation timeout information exchange between nodes to accurately
+# measure request timeouts. If disabled, replicas will assume that requests
+# were forwarded to them instantly by the coordinator, which means that
+# under overload conditions we will waste that much extra time processing
+# already-timed-out requests.
+#
+# Warning: before enabling this property make sure to ntp is installed
+# and the times are synchronized between the nodes.
+cross_node_timeout: false
+
+# Enable socket timeout for streaming operation.
+# When a timeout occurs during streaming, streaming is retried from the start
+# of the current file. This _can_ involve re-streaming an important amount of
+# data, so you should avoid setting the value too low.
+# Default value is 0, which never timeout streams.
+# streaming_socket_timeout_in_ms: 0
+
+# phi value that must be reached for a host to be marked down.
+# most users should never need to adjust this.
+# phi_convict_threshold: 8
+
+# endpoint_snitch -- Set this to a class that implements
+# IEndpointSnitch. The snitch has two functions:
+# - it teaches Cassandra enough about your network topology to route
+# requests efficiently
+# - it allows Cassandra to spread replicas around your cluster to avoid
+# correlated failures. It does this by grouping machines into
+# "datacenters" and "racks." Cassandra will do its best not to have
+# more than one replica on the same "rack" (which may not actually
+# be a physical location)
+#
+# IF YOU CHANGE THE SNITCH AFTER DATA IS INSERTED INTO THE CLUSTER,
+# YOU MUST RUN A FULL REPAIR, SINCE THE SNITCH AFFECTS WHERE REPLICAS
+# ARE PLACED.
+#
+# Out of the box, Cassandra provides
+# - SimpleSnitch:
+# Treats Strategy order as proximity. This improves cache locality
+# when disabling read repair, which can further improve throughput.
+# Only appropriate for single-datacenter deployments.
+# - PropertyFileSnitch:
+# Proximity is determined by rack and data center, which are
+# explicitly configured in cassandra-topology.properties.
+# - RackInferringSnitch:
+# Proximity is determined by rack and data center, which are
+# assumed to correspond to the 3rd and 2nd octet of each node's
+# IP address, respectively. Unless this happens to match your
+# deployment conventions (as it did Facebook's), this is best used
+# as an example of writing a custom Snitch class.
+# - Ec2Snitch:
+# Appropriate for EC2 deployments in a single Region. Loads Region
+# and Availability Zone information from the EC2 API. The Region is
+# treated as the Datacenter, and the Availability Zone as the rack.
+# Only private IPs are used, so this will not work across multiple
+# Regions.
+# - Ec2MultiRegionSnitch:
+# Uses public IPs as broadcast_address to allow cross-region
+# connectivity. (Thus, you should set seed addresses to the public
+# IP as well.) You will need to open the storage_port or
+# ssl_storage_port on the public IP firewall. (For intra-Region
+# traffic, Cassandra will switch to the private IP after
+# establishing a connection.)
+#
+# You can use a custom Snitch by setting this to the full class name
+# of the snitch, which will be assumed to be on your classpath.
+endpoint_snitch: SimpleSnitch
+
+# controls how often to perform the more expensive part of host score
+# calculation
+dynamic_snitch_update_interval_in_ms: 100
+# controls how often to reset all host scores, allowing a bad host to
+# possibly recover
+dynamic_snitch_reset_interval_in_ms: 600000
+# if set greater than zero and read_repair_chance is < 1.0, this will allow
+# 'pinning' of replicas to hosts in order to increase cache capacity.
+# The badness threshold will control how much worse the pinned host has to be
+# before the dynamic snitch will prefer other replicas over it. This is
+# expressed as a double which represents a percentage. Thus, a value of
+# 0.2 means Cassandra would continue to prefer the static snitch values
+# until the pinned host was 20% worse than the fastest.
+dynamic_snitch_badness_threshold: 0.1
+
+# request_scheduler -- Set this to a class that implements
+# RequestScheduler, which will schedule incoming client requests
+# according to the specific policy. This is useful for multi-tenancy
+# with a single Cassandra cluster.
+# NOTE: This is specifically for requests from the client and does
+# not affect inter node communication.
+# org.apache.cassandra.scheduler.NoScheduler - No scheduling takes place
+# org.apache.cassandra.scheduler.RoundRobinScheduler - Round robin of
+# client requests to a node with a separate queue for each
+# request_scheduler_id. The scheduler is further customized by
+# request_scheduler_options as described below.
+request_scheduler: org.apache.cassandra.scheduler.NoScheduler
+
+# Scheduler Options vary based on the type of scheduler
+# NoScheduler - Has no options
+# RoundRobin
+# - throttle_limit -- The throttle_limit is the number of in-flight
+# requests per client. Requests beyond
+# that limit are queued up until
+# running requests can complete.
+# The value of 80 here is twice the number of
+# concurrent_reads + concurrent_writes.
+# - default_weight -- default_weight is optional and allows for
+# overriding the default which is 1.
+# - weights -- Weights are optional and will default to 1 or the
+# overridden default_weight. The weight translates into how
+# many requests are handled during each turn of the
+# RoundRobin, based on the scheduler id.
+#
+# request_scheduler_options:
+# throttle_limit: 80
+# default_weight: 5
+# weights:
+# Keyspace1: 1
+# Keyspace2: 5
+
+# request_scheduler_id -- An identifer based on which to perform
+# the request scheduling. Currently the only valid option is keyspace.
+# request_scheduler_id: keyspace
+
+# index_interval controls the sampling of entries from the primrary
+# row index in terms of space versus time. The larger the interval,
+# the smaller and less effective the sampling will be. In technicial
+# terms, the interval coresponds to the number of index entries that
+# are skipped between taking each sample. All the sampled entries
+# must fit in memory. Generally, a value between 128 and 512 here
+# coupled with a large key cache size on CFs results in the best trade
+# offs. This value is not often changed, however if you have many
+# very small rows (many to an OS page), then increasing this will
+# often lower memory usage without a impact on performance.
+index_interval: 128
+
+# Enable or disable inter-node encryption
+# Default settings are TLS v1, RSA 1024-bit keys (it is imperative that
+# users generate their own keys) TLS_RSA_WITH_AES_128_CBC_SHA as the cipher
+# suite for authentication, key exchange and encryption of the actual data transfers.
+# NOTE: No custom encryption options are enabled at the moment
+# The available internode options are : all, none, dc, rack
+#
+# If set to dc cassandra will encrypt the traffic between the DCs
+# If set to rack cassandra will encrypt the traffic between the racks
+#
+# The passwords used in these options must match the passwords used when generating
+# the keystore and truststore. For instructions on generating these files, see:
+# http://download.oracle.com/javase/6/docs/technotes/guides/security/jsse/JSSERefGuide.html#CreateKeystore
+#
+encryption_options:
+ internode_encryption: none
+ keystore: conf/.keystore
+ keystore_password: cassandra
+ truststore: conf/.truststore
+ truststore_password: cassandra
+ # More advanced defaults below:
+ # protocol: TLS
+ # algorithm: SunX509
+ # store_type: JKS
+ # cipher_suites: [TLS_RSA_WITH_AES_128_CBC_SHA,TLS_RSA_WITH_AES_256_CBC_SHA]
\ No newline at end of file
diff --git a/viewer-actors/src/test/resources/data.cql b/viewer-actors/src/test/resources/data.cql
new file mode 100644
index 0000000..d97297a
--- /dev/null
+++ b/viewer-actors/src/test/resources/data.cql
@@ -0,0 +1,103 @@
+CREATE KEYSPACE IF NOT EXISTS sunbird_courses WITH replication = {
+ 'class': 'SimpleStrategy',
+ 'replication_factor': '1'
+};
+
+
+CREATE TABLE sunbird_courses.user_content_consumption_new (
+ userid text,
+ contentid text,
+ collectionid text,
+ contextid text,
+ datetime timestamp,
+ last_access_time timestamp,
+ last_completed_time timestamp,
+ last_updated_time timestamp,
+ progress int,
+ status int,
+ PRIMARY KEY (userid, contentid, collectionid, contextid)
+) WITH CLUSTERING ORDER BY (contentid ASC,collectionid ASC, contextid ASC)
+ AND bloom_filter_fp_chance = 0.01
+ AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
+ AND comment = ''
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
+ AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
+ AND crc_check_chance = 1.0
+ AND dclocal_read_repair_chance = 0.1
+ AND default_time_to_live = 0
+ AND gc_grace_seconds = 864000
+ AND max_index_interval = 2048
+ AND memtable_flush_period_in_ms = 0
+ AND min_index_interval = 128
+ AND read_repair_chance = 0.0
+ AND speculative_retry = '99PERCENTILE';
+
+INSERT INTO sunbird_courses.user_content_consumption_new (userid,contentid,collectionid,contextid,progress,status) VALUES ('u2', 'c2','cc2','b2',20,1);
+
+
+CREATE TABLE sunbird_courses.user_enrolments (
+ userid text,
+ courseid text,
+ batchid text,
+ active boolean,
+ addedby text,
+ certificates list>>,
+ certstatus int,
+ completedon timestamp,
+ completionpercentage int,
+ contentstatus map,
+ datetime timestamp,
+ enrolled_date timestamp,
+ enrolleddate text,
+ issued_certificates list>>,
+ lastreadcontentid text,
+ lastreadcontentstatus int,
+ progress int,
+ status int,
+ PRIMARY KEY (userid, courseid, batchid)
+) WITH CLUSTERING ORDER BY (courseid ASC, batchid ASC)
+ AND bloom_filter_fp_chance = 0.01
+ AND caching = {'keys': 'ALL', 'rows_per_partition': 'NONE'}
+ AND comment = ''
+ AND compaction = {'class': 'org.apache.cassandra.db.compaction.SizeTieredCompactionStrategy', 'max_threshold': '32', 'min_threshold': '4'}
+ AND compression = {'chunk_length_in_kb': '64', 'class': 'org.apache.cassandra.io.compress.LZ4Compressor'}
+ AND crc_check_chance = 1.0
+ AND dclocal_read_repair_chance = 0.1
+ AND default_time_to_live = 0
+ AND gc_grace_seconds = 864000
+ AND max_index_interval = 2048
+ AND memtable_flush_period_in_ms = 0
+ AND min_index_interval = 128
+ AND read_repair_chance = 0.0
+ AND speculative_retry = '99PERCENTILE';
+
+CREATE TABLE sunbird_courses.user_activity_agg (
+ activity_type text,
+ activity_id text,
+ user_id text,
+ context_id text,
+ content_status frozen