From 37b9b12704ee14676ae7f85803cf57413bceba05 Mon Sep 17 00:00:00 2001 From: maty Date: Sat, 25 Jun 2016 12:31:57 -0400 Subject: [PATCH] Simplifying parsing in Main, fixing manager test and disabling integration tests in CI --- .travis.yml | 4 +- pom.xml | 1 + src/main/scala/org/mysql/employee/Main.scala | 49 +++++++++---------- .../org/mysql/employee/domain/Employee.scala | 4 -- .../scala/org/mysql/employee/MainSpec.scala | 11 ++--- .../mysql/employee/utils/FileUtilsSpec.scala | 20 +++++--- 6 files changed, 45 insertions(+), 44 deletions(-) diff --git a/.travis.yml b/.travis.yml index 8fcebe5..f740a39 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,2 +1,4 @@ language: java -script: mkdir /tmp/spark-employee && mvn clean test -Dtmp.folder=\tmp\spark-employee -Dintegration=true \ No newline at end of file +script: + - mkdir -p /tmp/spark-employee + - mvn clean test diff --git a/pom.xml b/pom.xml index 1d53ed7..b0a1eff 100644 --- a/pom.xml +++ b/pom.xml @@ -56,6 +56,7 @@ 1.0 ${project.build.directory}/surefire-reports + -Dskip.integration.tests=true -Dtmp.folder=\\tmp\\spark-employee . WDF TestSuite.txt diff --git a/src/main/scala/org/mysql/employee/Main.scala b/src/main/scala/org/mysql/employee/Main.scala index 2f26bd7..d7c70ea 100644 --- a/src/main/scala/org/mysql/employee/Main.scala +++ b/src/main/scala/org/mysql/employee/Main.scala @@ -35,28 +35,30 @@ object Main { logger.info(s"=> jobName $jobName ") logger.info(s"=> pathToFiles $pathToFiles ") - - val employeeDemographics = parse(sc.textFile(s"$pathToFiles/load_employees.dump"), EmployeeDemographic).cache() - val departments = parse(sc.textFile(s"$pathToFiles/load_departments.dump"), Department).cache() - val departmentEmployees = parse(sc.textFile(s"$pathToFiles/load_dept_emp.dump"), DepartmentEmployee).cache() - val departmentManagers = parse(sc.textFile(s"$pathToFiles/load_dept_manager.dump"), DepartmentManager).cache() - val employeeTitles = parse(sc.textFile(s"$pathToFiles/load_titles.dump"), EmployeeTitle).cache() - val employeeSalaries = parse(sc.textFile(s"$pathToFiles/load_salaries1.dump"), EmployeeSalary).union( - parse(sc.textFile(s"$pathToFiles/load_salaries2.dump"), EmployeeSalary).union( - parse(sc.textFile(s"$pathToFiles/load_salaries3.dump"), EmployeeSalary))).cache() + + class Parser(sc: SparkContext, pathToFiles: String) { + def apply[T:ClassTag](fileName: String, converter: Converter[Array[String],T]) : RDD[T] = { + parse(sc.textFile(s"$pathToFiles/$fileName"), converter) + } + } + + def parseFile = new Parser(sc, pathToFiles) + + val employeeDemographics = parseFile("load_employees.dump", EmployeeDemographic) + val departments = parseFile("load_departments.dump", Department) + val departmentEmployees = parseFile("load_dept_emp.dump", DepartmentEmployee) + val departmentManagers = parseFile("load_dept_manager.dump", DepartmentManager) + val employeeTitles = parseFile("load_titles.dump", EmployeeTitle) + val employeeSalaries = parseFile("load_salaries1.dump", EmployeeSalary).union( + parseFile("load_salaries2.dump", EmployeeSalary).union( + parseFile("load_salaries3.dump", EmployeeSalary))) val employees = join(departments, departmentEmployees, departmentManagers, - employeeDemographics, employeeTitles, employeeSalaries).cache() - - employeeDemographics.saveAsTextFile(s"$outputPath/employee_demographics") - departments.saveAsTextFile(s"$outputPath/departments") - departmentEmployees.saveAsTextFile(s"$outputPath/department_employees") - departmentManagers.saveAsTextFile(s"$outputPath/department_managers") - employeeTitles.saveAsTextFile(s"$outputPath/employee_titles") - employeeSalaries.saveAsTextFile(s"$outputPath/employee_salaries") + employeeDemographics, employeeTitles, employeeSalaries).cache() + employees.saveAsTextFile(s"$outputPath/employees") } - + def validateArgs(logger: Logger, arg: Array[String]) = { if (arg.length < 2) { logger.error("=> wrong parameters number") @@ -73,7 +75,7 @@ object Main { def parse(lines: RDD[String]) = { lines.map(_.trim.replaceAll("(INSERT INTO `.*` VALUES\\s*)|\\(|'|\\),|\\)|;", "")).filter(!_.isEmpty) } - + def parse[T: ClassTag](rdd: RDD[String], converter: Converter[Array[String], T]): RDD[T] = { val convert = converter.convert(_) parse(rdd).map { line => convert(line.split(",")) } @@ -81,17 +83,14 @@ object Main { def join(departments: RDD[Department], departmentEmployees: RDD[DepartmentEmployee], departmentManagers: RDD[DepartmentManager], employeeDemographics: RDD[EmployeeDemographic], employeeTitles: RDD[EmployeeTitle], employeeSalaries: RDD[EmployeeSalary]) = { val departmentsRdd = departments.map { row => (row.id, row) } - val departmentEmployeesDepRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) }) - val departmentEmployeesEmpRdd = departmentEmployeesDepRdd.map { row => (row._2._2.employeeId, row._2) } + val departmentEmployeesDepKeyRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) }) + val departmentEmployeesEmpKeyRdd = departmentEmployeesDepKeyRdd.map { row => (row._2._2.employeeId, row._2) } val departmentManagerDepRdd = departmentsRdd.join(departmentManagers.map { row => (row.managedDepartmentId, row) }) .map{ row => (row._2._2.employeeId, (row._2._1, row._2._2)) } val employeeDemographicsRdd = employeeDemographics.map { row => (row.employeeId, row )} .leftOuterJoin(departmentManagerDepRdd) - println(s"departmentManagers:${departmentManagers.collect()}") - println(s"departmentManagerDepRdd:${departmentManagerDepRdd.collect()}") - - val grouped = departmentEmployeesEmpRdd + val grouped = departmentEmployeesEmpKeyRdd .join(employeeDemographicsRdd .join(employeeSalaries.map { row => (row.employeeId, row) }) .join(employeeTitles.map { row => (row.employeeId, row) } )).groupBy { row => row._1 } diff --git a/src/main/scala/org/mysql/employee/domain/Employee.scala b/src/main/scala/org/mysql/employee/domain/Employee.scala index ead3cc5..fece5ad 100644 --- a/src/main/scala/org/mysql/employee/domain/Employee.scala +++ b/src/main/scala/org/mysql/employee/domain/Employee.scala @@ -6,7 +6,3 @@ case class Employee(id: String, employeeDemographics: List[EmployeeDemographic], employeeTitles: List[EmployeeTitle], employeeSalaries: List[EmployeeSalary]) - -object Employee { - -} \ No newline at end of file diff --git a/src/test/scala/org/mysql/employee/MainSpec.scala b/src/test/scala/org/mysql/employee/MainSpec.scala index a16b800..aa14b37 100644 --- a/src/test/scala/org/mysql/employee/MainSpec.scala +++ b/src/test/scala/org/mysql/employee/MainSpec.scala @@ -1,9 +1,7 @@ package org.mysql.employee import java.text.SimpleDateFormat - import scala.reflect.ClassTag - import org.apache.spark.SparkContext import org.apache.spark.rdd.RDD import org.mysql.employee.constants.DateConstants @@ -19,8 +17,9 @@ import org.mysql.employee.enums.Gender import org.mysql.employee.utils.Converter import org.scalatest.FunSpec import org.scalatest.Matchers - import com.holdenkarau.spark.testing.SharedSparkContext +import java.util.Properties +import java.util.Enumeration class MainSpec extends FunSpec with SharedSparkContext with Matchers { @@ -169,13 +168,13 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers { val oneEmployeeSalary = Array(EmployeeSalary("10001", 99999999, sdf.parse("1953-09-02"), sdf.parse("1986-06-26"))) val oneTitle = Array(EmployeeTitle("10001","Title",sdf.parse("1900-02-02"), sdf.parse("1901-02-02"))) - val employee = Main.join(sc.parallelize(oneDepartment), sc.parallelize(oneDepartmentEmployee), + val employee = Main.join(sc.parallelize(List(oneDepartment(0), twoDepartment(0))), sc.parallelize(oneDepartmentEmployee), sc.parallelize(oneDepartmentManager), sc.parallelize(oneDemographic), sc.parallelize(oneTitle), sc.parallelize(oneEmployeeSalary)).collect() val expectedEmployee : Employee = Employee("10001", List((oneDepartmentEmployee(0),oneDepartment(0))), List((twoDepartment(0),oneDepartmentManager(0))), List(oneDemographic(0)), List(oneTitle(0)), List(oneEmployeeSalary(0))) - // TODO employee should equal (Array(expectedEmployee)) + employee should equal (Array(expectedEmployee)) } } @@ -186,7 +185,7 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers { text.split("\\r?\\n") } - if (!"true".equals(System.getProperty("integration"))) { + if (!"true".equals(System.getProperty("skip.integration.tests"))) { describe("Can construct RDDs from actual MySQL files") { def loadEmployees(): (Array[String], RDD[EmployeeDemographic], List[EmployeeDemographic]) = { diff --git a/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala b/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala index 4a05dfc..e87ba1d 100644 --- a/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala +++ b/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala @@ -9,16 +9,20 @@ class FileUtilsSpec extends FlatSpec with BeforeAndAfterEach { val tmpFolder = if (System.getProperty("tmp.folder") != null) System.getProperty("tmp.folder") else "\\tmp\\spark-employee" override def beforeEach() { - if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required") - new File(tmpFolder).delete() - assert(!new File(tmpFolder).exists()) + if (!"true".equals(System.getProperty("skip.integration.tests"))) { + if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required") + new File(tmpFolder).delete() + assert(!new File(tmpFolder).exists()) + } } - it should "remove folders" in { - new File(tmpFolder).createNewFile() - assert(new File(tmpFolder).exists()) - FileUtils.rmFolder(tmpFolder) - assert(!new File(tmpFolder).exists()) + if (!"true".equals(System.getProperty("skip.integration.tests"))) { + it should "remove folders" in { + new File(tmpFolder).createNewFile() + assert(new File(tmpFolder).exists()) + FileUtils.rmFolder(tmpFolder) + assert(!new File(tmpFolder).exists()) + } } }