Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SETL-5] Implement Json Extract and Load #11

Open
wants to merge 1 commit into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions src/main/scala/com/vngrs/etl/extractors/JsonExtract.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package com.vngrs.etl.extractors

import com.vngrs.etl.Extract
import com.vngrs.etl.transformers.JsonToRowTransform
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

/**
* Extracts a JSON file(s).
*
* @param path File path(s).
* In order to supply multiple files, you can use wildcards or give multiple paths separated with comma.
*/
final case class JsonExtract(path: String) extends Extract[Row] {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those primitive operations must be composable without creating new classes. This class is not only meaningless but also an antipattern that we must avoid as library authors.


/**
* Extracts a JSON file (one object per line) and returns the result as a
* [[org.apache.spark.rdd.RDD]] of [[org.apache.spark.sql.Row]].
*
* @param sc [[org.apache.spark.SparkContext]]
* @return Extracted [[org.apache.spark.rdd.RDD]]
*/
override def apply(sc: SparkContext): RDD[Row] = {
val data = FileExtract(path).apply(sc)

JsonToRowTransform().apply(data)
}
}
26 changes: 26 additions & 0 deletions src/main/scala/com/vngrs/etl/loaders/JsonLoad.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package com.vngrs.etl.loaders

import com.vngrs.etl.Load
import com.vngrs.etl.exceptions.NoSchemaException
import com.vngrs.etl.transformers.RowToJsonTransform
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.Row

/**
* Loads [[org.apache.spark.sql.Row]]s as JSON
*/
final case class JsonLoad(path: String) extends Load[Row] {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same antipattern applies here, too.


/**
* Saves content of the RDD in JSON Format (one object per line).
*
* @param input [[org.apache.spark.rdd.RDD]] of [[org.apache.spark.sql.Row]]s.
* @throws com.vngrs.etl.exceptions.NoSchemaException if given rows does not have a schema
*/
@throws(classOf[NoSchemaException])
override def apply(input: RDD[Row]): Unit = {
val jsonRdd = RowToJsonTransform().apply(input)

FileLoad(path).apply(jsonRdd)
}
}
2 changes: 2 additions & 0 deletions src/test/resources/json_examples/childs.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "id": 121, "name": "Janie", "surname": "Doe", "age": 16 }
{ "id": 122, "name": "Johnnie", "surname": "Doe", "age": 9 }
2 changes: 2 additions & 0 deletions src/test/resources/json_examples/parents.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{ "id": 1, "name": "John", "surname": "Doe", "age": 35 }
{ "id": 2, "name": "Jane", "surname": "Doe", "age": 30 }
66 changes: 66 additions & 0 deletions src/test/scala/com/vngrs/etl/extractors/JsonExtractSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.vngrs.etl.extractors

import com.vngrs.etl.SparkSpec
import org.apache.spark.sql.types.{DataTypes, StructField}

/**
* JSON Extract Specs
*/
// Following wart does not work with scalatest's intercept functionality
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class JsonExtractSpec extends SparkSpec {

"A json extractor" should "read a JSON file" in {
val path = s"$rootFolder/parents.json"

val extractor = JsonExtract(path)

extractor.testCollect.length should equal (2)
}

it should "read multiple JSON files" in {
val path = s"$rootFolder/parents.json,$rootFolder/childs.json"

val extractor = JsonExtract(path)

extractor.testCollect.length should equal (4)
}

it should "read multiple JSON files with wild cards" in {
val path = s"$rootFolder/*.json"

val extractor = JsonExtract(path)

extractor.testCollect.length should equal (4)
}

it should "interpret JSON schema" in {
val path = s"$rootFolder/parents.json"

val extractor = JsonExtract(path)

val interpretedSchema = extractor.testCollect.head.schema

val expectedFields = Set(
StructField("id", DataTypes.LongType),
StructField("name", DataTypes.StringType),
StructField("surname", DataTypes.StringType),
StructField("age", DataTypes.LongType)
)

interpretedSchema.fields.toSet should contain theSameElementsAs expectedFields
}

it should "throw an exception when path does not exists" in {
val path = s"$rootFolder/a_non_existed_file"

val extractor = JsonExtract(path)

intercept[Exception] {
extractor.testCollect
}
}

/** Root folder for test cases */
private val rootFolder: String = getClass.getResource("/json_examples").getPath
}
73 changes: 73 additions & 0 deletions src/test/scala/com/vngrs/etl/loaders/JsonLoadSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.vngrs.etl.loaders

import com.vngrs.etl.SparkSpec
import com.vngrs.etl.utils.FileSystems
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema
import org.apache.spark.sql.types.{DataTypes, StructField, StructType}

import scala.util.Random

/**
* JSON Loader Specs
*/
// Following wart does not work with scalatest's intercept functionality
@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements"))
class JsonLoadSpec extends SparkSpec {

"A json loader" should "load json data to files" in {
val path = getClass.getResource("/").getPath + s"$randomString"
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of randomly writing to resource folder, we can create temporary files. File.createTempFilecreates a temporary file with system call so the uniqueness of file will be ensured by the OS.


val schema = StructType(Seq(
StructField("id", DataTypes.LongType),
StructField("name", DataTypes.StringType),
StructField("surname", DataTypes.StringType),
StructField("age", DataTypes.LongType)
))

val data = Seq[Row](
new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema),
new GenericRowWithSchema(Array[Any](2L, "Jane", "Doe", 30L), schema)
)

val rdd = parallelize(data)

val loader = JsonLoad(path)

loader(rdd)

new SQLContext(sc).read.json(path).count() should equal (2)

// cleanup
try { FileSystems.delete(sc, path) }
}

it should "load json data to files (read check)" in {
val path = getClass.getResource("/").getPath + s"$randomString"

val schema = StructType(Seq(
StructField("id", DataTypes.LongType),
StructField("name", DataTypes.StringType),
StructField("surname", DataTypes.StringType),
StructField("age", DataTypes.LongType)
))

val data = Seq[Row](
new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema)
)

val rdd = parallelize(data).coalesce(1)

val loader = JsonLoad(path)

loader(rdd)

sc.textFile(path).collect().head should equal ("""{"id":1,"name":"John","surname":"Doe","age":35}""")

// cleanup
try { FileSystems.delete(sc, path) }
}

/** Generates a random Alpha Numeric String with a length of 20 **/
private def randomString: String = Random.alphanumeric.take(20).mkString("")
}