diff --git a/src/main/scala/com/vngrs/etl/extractors/JsonExtract.scala b/src/main/scala/com/vngrs/etl/extractors/JsonExtract.scala new file mode 100644 index 0000000..74acc44 --- /dev/null +++ b/src/main/scala/com/vngrs/etl/extractors/JsonExtract.scala @@ -0,0 +1,29 @@ +package com.vngrs.etl.extractors + +import com.vngrs.etl.Extract +import com.vngrs.etl.transformers.JsonToRowTransform +import org.apache.spark.SparkContext +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +/** + * Extracts a JSON file(s). + * + * @param path File path(s). + * In order to supply multiple files, you can use wildcards or give multiple paths separated with comma. + */ +final case class JsonExtract(path: String) extends Extract[Row] { + + /** + * Extracts a JSON file (one object per line) and returns the result as a + * [[org.apache.spark.rdd.RDD]] of [[org.apache.spark.sql.Row]]. + * + * @param sc [[org.apache.spark.SparkContext]] + * @return Extracted [[org.apache.spark.rdd.RDD]] + */ + override def apply(sc: SparkContext): RDD[Row] = { + val data = FileExtract(path).apply(sc) + + JsonToRowTransform().apply(data) + } +} diff --git a/src/main/scala/com/vngrs/etl/loaders/JsonLoad.scala b/src/main/scala/com/vngrs/etl/loaders/JsonLoad.scala new file mode 100644 index 0000000..d27c22b --- /dev/null +++ b/src/main/scala/com/vngrs/etl/loaders/JsonLoad.scala @@ -0,0 +1,26 @@ +package com.vngrs.etl.loaders + +import com.vngrs.etl.Load +import com.vngrs.etl.exceptions.NoSchemaException +import com.vngrs.etl.transformers.RowToJsonTransform +import org.apache.spark.rdd.RDD +import org.apache.spark.sql.Row + +/** + * Loads [[org.apache.spark.sql.Row]]s as JSON + */ +final case class JsonLoad(path: String) extends Load[Row] { + + /** + * Saves content of the RDD in JSON Format (one object per line). + * + * @param input [[org.apache.spark.rdd.RDD]] of [[org.apache.spark.sql.Row]]s. + * @throws com.vngrs.etl.exceptions.NoSchemaException if given rows does not have a schema + */ + @throws(classOf[NoSchemaException]) + override def apply(input: RDD[Row]): Unit = { + val jsonRdd = RowToJsonTransform().apply(input) + + FileLoad(path).apply(jsonRdd) + } +} diff --git a/src/test/resources/json_examples/childs.json b/src/test/resources/json_examples/childs.json new file mode 100644 index 0000000..b67f31a --- /dev/null +++ b/src/test/resources/json_examples/childs.json @@ -0,0 +1,2 @@ +{ "id": 121, "name": "Janie", "surname": "Doe", "age": 16 } +{ "id": 122, "name": "Johnnie", "surname": "Doe", "age": 9 } diff --git a/src/test/resources/json_examples/parents.json b/src/test/resources/json_examples/parents.json new file mode 100644 index 0000000..674372b --- /dev/null +++ b/src/test/resources/json_examples/parents.json @@ -0,0 +1,2 @@ +{ "id": 1, "name": "John", "surname": "Doe", "age": 35 } +{ "id": 2, "name": "Jane", "surname": "Doe", "age": 30 } diff --git a/src/test/scala/com/vngrs/etl/extractors/JsonExtractSpec.scala b/src/test/scala/com/vngrs/etl/extractors/JsonExtractSpec.scala new file mode 100644 index 0000000..8d8bf94 --- /dev/null +++ b/src/test/scala/com/vngrs/etl/extractors/JsonExtractSpec.scala @@ -0,0 +1,66 @@ +package com.vngrs.etl.extractors + +import com.vngrs.etl.SparkSpec +import org.apache.spark.sql.types.{DataTypes, StructField} + +/** + * JSON Extract Specs + */ +// Following wart does not work with scalatest's intercept functionality +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) +class JsonExtractSpec extends SparkSpec { + + "A json extractor" should "read a JSON file" in { + val path = s"$rootFolder/parents.json" + + val extractor = JsonExtract(path) + + extractor.testCollect.length should equal (2) + } + + it should "read multiple JSON files" in { + val path = s"$rootFolder/parents.json,$rootFolder/childs.json" + + val extractor = JsonExtract(path) + + extractor.testCollect.length should equal (4) + } + + it should "read multiple JSON files with wild cards" in { + val path = s"$rootFolder/*.json" + + val extractor = JsonExtract(path) + + extractor.testCollect.length should equal (4) + } + + it should "interpret JSON schema" in { + val path = s"$rootFolder/parents.json" + + val extractor = JsonExtract(path) + + val interpretedSchema = extractor.testCollect.head.schema + + val expectedFields = Set( + StructField("id", DataTypes.LongType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("age", DataTypes.LongType) + ) + + interpretedSchema.fields.toSet should contain theSameElementsAs expectedFields + } + + it should "throw an exception when path does not exists" in { + val path = s"$rootFolder/a_non_existed_file" + + val extractor = JsonExtract(path) + + intercept[Exception] { + extractor.testCollect + } + } + + /** Root folder for test cases */ + private val rootFolder: String = getClass.getResource("/json_examples").getPath +} diff --git a/src/test/scala/com/vngrs/etl/loaders/JsonLoadSpec.scala b/src/test/scala/com/vngrs/etl/loaders/JsonLoadSpec.scala new file mode 100644 index 0000000..8b98f30 --- /dev/null +++ b/src/test/scala/com/vngrs/etl/loaders/JsonLoadSpec.scala @@ -0,0 +1,73 @@ +package com.vngrs.etl.loaders + +import com.vngrs.etl.SparkSpec +import com.vngrs.etl.utils.FileSystems +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema +import org.apache.spark.sql.types.{DataTypes, StructField, StructType} + +import scala.util.Random + +/** + * JSON Loader Specs + */ +// Following wart does not work with scalatest's intercept functionality +@SuppressWarnings(Array("org.wartremover.warts.NonUnitStatements")) +class JsonLoadSpec extends SparkSpec { + + "A json loader" should "load json data to files" in { + val path = getClass.getResource("/").getPath + s"$randomString" + + val schema = StructType(Seq( + StructField("id", DataTypes.LongType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("age", DataTypes.LongType) + )) + + val data = Seq[Row]( + new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema), + new GenericRowWithSchema(Array[Any](2L, "Jane", "Doe", 30L), schema) + ) + + val rdd = parallelize(data) + + val loader = JsonLoad(path) + + loader(rdd) + + new SQLContext(sc).read.json(path).count() should equal (2) + + // cleanup + try { FileSystems.delete(sc, path) } + } + + it should "load json data to files (read check)" in { + val path = getClass.getResource("/").getPath + s"$randomString" + + val schema = StructType(Seq( + StructField("id", DataTypes.LongType), + StructField("name", DataTypes.StringType), + StructField("surname", DataTypes.StringType), + StructField("age", DataTypes.LongType) + )) + + val data = Seq[Row]( + new GenericRowWithSchema(Array[Any](1L, "John", "Doe", 35L), schema) + ) + + val rdd = parallelize(data).coalesce(1) + + val loader = JsonLoad(path) + + loader(rdd) + + sc.textFile(path).collect().head should equal ("""{"id":1,"name":"John","surname":"Doe","age":35}""") + + // cleanup + try { FileSystems.delete(sc, path) } + } + + /** Generates a random Alpha Numeric String with a length of 20 **/ + private def randomString: String = Random.alphanumeric.take(20).mkString("") +}