diff --git a/.travis.yml b/.travis.yml
index 8fcebe5..f740a39 100644
--- a/.travis.yml
+++ b/.travis.yml
@@ -1,2 +1,4 @@
language: java
-script: mkdir /tmp/spark-employee && mvn clean test -Dtmp.folder=\tmp\spark-employee -Dintegration=true
\ No newline at end of file
+script:
+ - mkdir -p /tmp/spark-employee
+ - mvn clean test
diff --git a/pom.xml b/pom.xml
index 1d53ed7..b0a1eff 100644
--- a/pom.xml
+++ b/pom.xml
@@ -56,6 +56,7 @@
1.0
${project.build.directory}/surefire-reports
+ -Dskip.integration.tests=true -Dtmp.folder=\\tmp\\spark-employee
.
WDF TestSuite.txt
diff --git a/src/main/scala/org/mysql/employee/Main.scala b/src/main/scala/org/mysql/employee/Main.scala
index 2f26bd7..d7c70ea 100644
--- a/src/main/scala/org/mysql/employee/Main.scala
+++ b/src/main/scala/org/mysql/employee/Main.scala
@@ -35,28 +35,30 @@ object Main {
logger.info(s"=> jobName $jobName ")
logger.info(s"=> pathToFiles $pathToFiles ")
-
- val employeeDemographics = parse(sc.textFile(s"$pathToFiles/load_employees.dump"), EmployeeDemographic).cache()
- val departments = parse(sc.textFile(s"$pathToFiles/load_departments.dump"), Department).cache()
- val departmentEmployees = parse(sc.textFile(s"$pathToFiles/load_dept_emp.dump"), DepartmentEmployee).cache()
- val departmentManagers = parse(sc.textFile(s"$pathToFiles/load_dept_manager.dump"), DepartmentManager).cache()
- val employeeTitles = parse(sc.textFile(s"$pathToFiles/load_titles.dump"), EmployeeTitle).cache()
- val employeeSalaries = parse(sc.textFile(s"$pathToFiles/load_salaries1.dump"), EmployeeSalary).union(
- parse(sc.textFile(s"$pathToFiles/load_salaries2.dump"), EmployeeSalary).union(
- parse(sc.textFile(s"$pathToFiles/load_salaries3.dump"), EmployeeSalary))).cache()
+
+ class Parser(sc: SparkContext, pathToFiles: String) {
+ def apply[T:ClassTag](fileName: String, converter: Converter[Array[String],T]) : RDD[T] = {
+ parse(sc.textFile(s"$pathToFiles/$fileName"), converter)
+ }
+ }
+
+ def parseFile = new Parser(sc, pathToFiles)
+
+ val employeeDemographics = parseFile("load_employees.dump", EmployeeDemographic)
+ val departments = parseFile("load_departments.dump", Department)
+ val departmentEmployees = parseFile("load_dept_emp.dump", DepartmentEmployee)
+ val departmentManagers = parseFile("load_dept_manager.dump", DepartmentManager)
+ val employeeTitles = parseFile("load_titles.dump", EmployeeTitle)
+ val employeeSalaries = parseFile("load_salaries1.dump", EmployeeSalary).union(
+ parseFile("load_salaries2.dump", EmployeeSalary).union(
+ parseFile("load_salaries3.dump", EmployeeSalary)))
val employees = join(departments, departmentEmployees, departmentManagers,
- employeeDemographics, employeeTitles, employeeSalaries).cache()
-
- employeeDemographics.saveAsTextFile(s"$outputPath/employee_demographics")
- departments.saveAsTextFile(s"$outputPath/departments")
- departmentEmployees.saveAsTextFile(s"$outputPath/department_employees")
- departmentManagers.saveAsTextFile(s"$outputPath/department_managers")
- employeeTitles.saveAsTextFile(s"$outputPath/employee_titles")
- employeeSalaries.saveAsTextFile(s"$outputPath/employee_salaries")
+ employeeDemographics, employeeTitles, employeeSalaries).cache()
+
employees.saveAsTextFile(s"$outputPath/employees")
}
-
+
def validateArgs(logger: Logger, arg: Array[String]) = {
if (arg.length < 2) {
logger.error("=> wrong parameters number")
@@ -73,7 +75,7 @@ object Main {
def parse(lines: RDD[String]) = {
lines.map(_.trim.replaceAll("(INSERT INTO `.*` VALUES\\s*)|\\(|'|\\),|\\)|;", "")).filter(!_.isEmpty)
}
-
+
def parse[T: ClassTag](rdd: RDD[String], converter: Converter[Array[String], T]): RDD[T] = {
val convert = converter.convert(_)
parse(rdd).map { line => convert(line.split(",")) }
@@ -81,17 +83,14 @@ object Main {
def join(departments: RDD[Department], departmentEmployees: RDD[DepartmentEmployee], departmentManagers: RDD[DepartmentManager], employeeDemographics: RDD[EmployeeDemographic], employeeTitles: RDD[EmployeeTitle], employeeSalaries: RDD[EmployeeSalary]) = {
val departmentsRdd = departments.map { row => (row.id, row) }
- val departmentEmployeesDepRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) })
- val departmentEmployeesEmpRdd = departmentEmployeesDepRdd.map { row => (row._2._2.employeeId, row._2) }
+ val departmentEmployeesDepKeyRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) })
+ val departmentEmployeesEmpKeyRdd = departmentEmployeesDepKeyRdd.map { row => (row._2._2.employeeId, row._2) }
val departmentManagerDepRdd = departmentsRdd.join(departmentManagers.map { row => (row.managedDepartmentId, row) })
.map{ row => (row._2._2.employeeId, (row._2._1, row._2._2)) }
val employeeDemographicsRdd = employeeDemographics.map { row => (row.employeeId, row )}
.leftOuterJoin(departmentManagerDepRdd)
- println(s"departmentManagers:${departmentManagers.collect()}")
- println(s"departmentManagerDepRdd:${departmentManagerDepRdd.collect()}")
-
- val grouped = departmentEmployeesEmpRdd
+ val grouped = departmentEmployeesEmpKeyRdd
.join(employeeDemographicsRdd
.join(employeeSalaries.map { row => (row.employeeId, row) })
.join(employeeTitles.map { row => (row.employeeId, row) } )).groupBy { row => row._1 }
diff --git a/src/main/scala/org/mysql/employee/domain/Employee.scala b/src/main/scala/org/mysql/employee/domain/Employee.scala
index ead3cc5..fece5ad 100644
--- a/src/main/scala/org/mysql/employee/domain/Employee.scala
+++ b/src/main/scala/org/mysql/employee/domain/Employee.scala
@@ -6,7 +6,3 @@ case class Employee(id: String,
employeeDemographics: List[EmployeeDemographic],
employeeTitles: List[EmployeeTitle],
employeeSalaries: List[EmployeeSalary])
-
-object Employee {
-
-}
\ No newline at end of file
diff --git a/src/test/scala/org/mysql/employee/MainSpec.scala b/src/test/scala/org/mysql/employee/MainSpec.scala
index a16b800..aa14b37 100644
--- a/src/test/scala/org/mysql/employee/MainSpec.scala
+++ b/src/test/scala/org/mysql/employee/MainSpec.scala
@@ -1,9 +1,7 @@
package org.mysql.employee
import java.text.SimpleDateFormat
-
import scala.reflect.ClassTag
-
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.mysql.employee.constants.DateConstants
@@ -19,8 +17,9 @@ import org.mysql.employee.enums.Gender
import org.mysql.employee.utils.Converter
import org.scalatest.FunSpec
import org.scalatest.Matchers
-
import com.holdenkarau.spark.testing.SharedSparkContext
+import java.util.Properties
+import java.util.Enumeration
class MainSpec extends FunSpec with SharedSparkContext with Matchers {
@@ -169,13 +168,13 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers {
val oneEmployeeSalary = Array(EmployeeSalary("10001", 99999999, sdf.parse("1953-09-02"), sdf.parse("1986-06-26")))
val oneTitle = Array(EmployeeTitle("10001","Title",sdf.parse("1900-02-02"), sdf.parse("1901-02-02")))
- val employee = Main.join(sc.parallelize(oneDepartment), sc.parallelize(oneDepartmentEmployee),
+ val employee = Main.join(sc.parallelize(List(oneDepartment(0), twoDepartment(0))), sc.parallelize(oneDepartmentEmployee),
sc.parallelize(oneDepartmentManager), sc.parallelize(oneDemographic), sc.parallelize(oneTitle),
sc.parallelize(oneEmployeeSalary)).collect()
val expectedEmployee : Employee = Employee("10001", List((oneDepartmentEmployee(0),oneDepartment(0))),
List((twoDepartment(0),oneDepartmentManager(0))), List(oneDemographic(0)),
List(oneTitle(0)), List(oneEmployeeSalary(0)))
- // TODO employee should equal (Array(expectedEmployee))
+ employee should equal (Array(expectedEmployee))
}
}
@@ -186,7 +185,7 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers {
text.split("\\r?\\n")
}
- if (!"true".equals(System.getProperty("integration"))) {
+ if (!"true".equals(System.getProperty("skip.integration.tests"))) {
describe("Can construct RDDs from actual MySQL files") {
def loadEmployees(): (Array[String], RDD[EmployeeDemographic], List[EmployeeDemographic]) = {
diff --git a/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala b/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala
index 4a05dfc..e87ba1d 100644
--- a/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala
+++ b/src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala
@@ -9,16 +9,20 @@ class FileUtilsSpec extends FlatSpec with BeforeAndAfterEach {
val tmpFolder = if (System.getProperty("tmp.folder") != null) System.getProperty("tmp.folder") else "\\tmp\\spark-employee"
override def beforeEach() {
- if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required")
- new File(tmpFolder).delete()
- assert(!new File(tmpFolder).exists())
+ if (!"true".equals(System.getProperty("skip.integration.tests"))) {
+ if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required")
+ new File(tmpFolder).delete()
+ assert(!new File(tmpFolder).exists())
+ }
}
- it should "remove folders" in {
- new File(tmpFolder).createNewFile()
- assert(new File(tmpFolder).exists())
- FileUtils.rmFolder(tmpFolder)
- assert(!new File(tmpFolder).exists())
+ if (!"true".equals(System.getProperty("skip.integration.tests"))) {
+ it should "remove folders" in {
+ new File(tmpFolder).createNewFile()
+ assert(new File(tmpFolder).exists())
+ FileUtils.rmFolder(tmpFolder)
+ assert(!new File(tmpFolder).exists())
+ }
}
}