Skip to content

Commit

Permalink
Simplifying parsing in Main, fixing manager test
Browse files Browse the repository at this point in the history
and disabling integration tests in CI
  • Loading branch information
mbentley-pillar authored and matyb committed Jun 27, 2016
1 parent 0a3a933 commit 37b9b12
Show file tree
Hide file tree
Showing 6 changed files with 45 additions and 44 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
language: java
script: mkdir /tmp/spark-employee && mvn clean test -Dtmp.folder=\tmp\spark-employee -Dintegration=true
script:
- mkdir -p /tmp/spark-employee
- mvn clean test
1 change: 1 addition & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
<version>1.0</version>
<configuration>
<reportsDirectory>${project.build.directory}/surefire-reports</reportsDirectory>
<argLine>-Dskip.integration.tests=true -Dtmp.folder=\\tmp\\spark-employee</argLine>
<junitxml>.</junitxml>
<filereports>WDF TestSuite.txt</filereports>
</configuration>
Expand Down
49 changes: 24 additions & 25 deletions src/main/scala/org/mysql/employee/Main.scala
Original file line number Diff line number Diff line change
Expand Up @@ -35,28 +35,30 @@ object Main {

logger.info(s"=> jobName $jobName ")
logger.info(s"=> pathToFiles $pathToFiles ")

val employeeDemographics = parse(sc.textFile(s"$pathToFiles/load_employees.dump"), EmployeeDemographic).cache()
val departments = parse(sc.textFile(s"$pathToFiles/load_departments.dump"), Department).cache()
val departmentEmployees = parse(sc.textFile(s"$pathToFiles/load_dept_emp.dump"), DepartmentEmployee).cache()
val departmentManagers = parse(sc.textFile(s"$pathToFiles/load_dept_manager.dump"), DepartmentManager).cache()
val employeeTitles = parse(sc.textFile(s"$pathToFiles/load_titles.dump"), EmployeeTitle).cache()
val employeeSalaries = parse(sc.textFile(s"$pathToFiles/load_salaries1.dump"), EmployeeSalary).union(
parse(sc.textFile(s"$pathToFiles/load_salaries2.dump"), EmployeeSalary).union(
parse(sc.textFile(s"$pathToFiles/load_salaries3.dump"), EmployeeSalary))).cache()

class Parser(sc: SparkContext, pathToFiles: String) {
def apply[T:ClassTag](fileName: String, converter: Converter[Array[String],T]) : RDD[T] = {
parse(sc.textFile(s"$pathToFiles/$fileName"), converter)
}
}

def parseFile = new Parser(sc, pathToFiles)

val employeeDemographics = parseFile("load_employees.dump", EmployeeDemographic)
val departments = parseFile("load_departments.dump", Department)
val departmentEmployees = parseFile("load_dept_emp.dump", DepartmentEmployee)
val departmentManagers = parseFile("load_dept_manager.dump", DepartmentManager)
val employeeTitles = parseFile("load_titles.dump", EmployeeTitle)
val employeeSalaries = parseFile("load_salaries1.dump", EmployeeSalary).union(
parseFile("load_salaries2.dump", EmployeeSalary).union(
parseFile("load_salaries3.dump", EmployeeSalary)))

val employees = join(departments, departmentEmployees, departmentManagers,
employeeDemographics, employeeTitles, employeeSalaries).cache()

employeeDemographics.saveAsTextFile(s"$outputPath/employee_demographics")
departments.saveAsTextFile(s"$outputPath/departments")
departmentEmployees.saveAsTextFile(s"$outputPath/department_employees")
departmentManagers.saveAsTextFile(s"$outputPath/department_managers")
employeeTitles.saveAsTextFile(s"$outputPath/employee_titles")
employeeSalaries.saveAsTextFile(s"$outputPath/employee_salaries")
employeeDemographics, employeeTitles, employeeSalaries).cache()

employees.saveAsTextFile(s"$outputPath/employees")
}

def validateArgs(logger: Logger, arg: Array[String]) = {
if (arg.length < 2) {
logger.error("=> wrong parameters number")
Expand All @@ -73,25 +75,22 @@ object Main {
def parse(lines: RDD[String]) = {
lines.map(_.trim.replaceAll("(INSERT INTO `.*` VALUES\\s*)|\\(|'|\\),|\\)|;", "")).filter(!_.isEmpty)
}

def parse[T: ClassTag](rdd: RDD[String], converter: Converter[Array[String], T]): RDD[T] = {
val convert = converter.convert(_)
parse(rdd).map { line => convert(line.split(",")) }
}

def join(departments: RDD[Department], departmentEmployees: RDD[DepartmentEmployee], departmentManagers: RDD[DepartmentManager], employeeDemographics: RDD[EmployeeDemographic], employeeTitles: RDD[EmployeeTitle], employeeSalaries: RDD[EmployeeSalary]) = {
val departmentsRdd = departments.map { row => (row.id, row) }
val departmentEmployeesDepRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) })
val departmentEmployeesEmpRdd = departmentEmployeesDepRdd.map { row => (row._2._2.employeeId, row._2) }
val departmentEmployeesDepKeyRdd = departmentsRdd.join(departmentEmployees.map { row => (row.departmentId, row) })
val departmentEmployeesEmpKeyRdd = departmentEmployeesDepKeyRdd.map { row => (row._2._2.employeeId, row._2) }
val departmentManagerDepRdd = departmentsRdd.join(departmentManagers.map { row => (row.managedDepartmentId, row) })
.map{ row => (row._2._2.employeeId, (row._2._1, row._2._2)) }
val employeeDemographicsRdd = employeeDemographics.map { row => (row.employeeId, row )}
.leftOuterJoin(departmentManagerDepRdd)

println(s"departmentManagers:${departmentManagers.collect()}")
println(s"departmentManagerDepRdd:${departmentManagerDepRdd.collect()}")

val grouped = departmentEmployeesEmpRdd
val grouped = departmentEmployeesEmpKeyRdd
.join(employeeDemographicsRdd
.join(employeeSalaries.map { row => (row.employeeId, row) })
.join(employeeTitles.map { row => (row.employeeId, row) } )).groupBy { row => row._1 }
Expand Down
4 changes: 0 additions & 4 deletions src/main/scala/org/mysql/employee/domain/Employee.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,3 @@ case class Employee(id: String,
employeeDemographics: List[EmployeeDemographic],
employeeTitles: List[EmployeeTitle],
employeeSalaries: List[EmployeeSalary])

object Employee {

}
11 changes: 5 additions & 6 deletions src/test/scala/org/mysql/employee/MainSpec.scala
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package org.mysql.employee

import java.text.SimpleDateFormat

import scala.reflect.ClassTag

import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.mysql.employee.constants.DateConstants
Expand All @@ -19,8 +17,9 @@ import org.mysql.employee.enums.Gender
import org.mysql.employee.utils.Converter
import org.scalatest.FunSpec
import org.scalatest.Matchers

import com.holdenkarau.spark.testing.SharedSparkContext
import java.util.Properties
import java.util.Enumeration

class MainSpec extends FunSpec with SharedSparkContext with Matchers {

Expand Down Expand Up @@ -169,13 +168,13 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers {
val oneEmployeeSalary = Array(EmployeeSalary("10001", 99999999, sdf.parse("1953-09-02"), sdf.parse("1986-06-26")))
val oneTitle = Array(EmployeeTitle("10001","Title",sdf.parse("1900-02-02"), sdf.parse("1901-02-02")))

val employee = Main.join(sc.parallelize(oneDepartment), sc.parallelize(oneDepartmentEmployee),
val employee = Main.join(sc.parallelize(List(oneDepartment(0), twoDepartment(0))), sc.parallelize(oneDepartmentEmployee),
sc.parallelize(oneDepartmentManager), sc.parallelize(oneDemographic), sc.parallelize(oneTitle),
sc.parallelize(oneEmployeeSalary)).collect()
val expectedEmployee : Employee = Employee("10001", List((oneDepartmentEmployee(0),oneDepartment(0))),
List((twoDepartment(0),oneDepartmentManager(0))), List(oneDemographic(0)),
List(oneTitle(0)), List(oneEmployeeSalary(0)))
// TODO employee should equal (Array(expectedEmployee))
employee should equal (Array(expectedEmployee))
}

}
Expand All @@ -186,7 +185,7 @@ class MainSpec extends FunSpec with SharedSparkContext with Matchers {
text.split("\\r?\\n")
}

if (!"true".equals(System.getProperty("integration"))) {
if (!"true".equals(System.getProperty("skip.integration.tests"))) {
describe("Can construct RDDs from actual MySQL files") {

def loadEmployees(): (Array[String], RDD[EmployeeDemographic], List[EmployeeDemographic]) = {
Expand Down
20 changes: 12 additions & 8 deletions src/test/scala/org/mysql/employee/utils/FileUtilsSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ class FileUtilsSpec extends FlatSpec with BeforeAndAfterEach {
val tmpFolder = if (System.getProperty("tmp.folder") != null) System.getProperty("tmp.folder") else "\\tmp\\spark-employee"

override def beforeEach() {
if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required")
new File(tmpFolder).delete()
assert(!new File(tmpFolder).exists())
if (!"true".equals(System.getProperty("skip.integration.tests"))) {
if (tmpFolder == null) throw new IllegalStateException("property 'tmp.folder' is required")
new File(tmpFolder).delete()
assert(!new File(tmpFolder).exists())
}
}

it should "remove folders" in {
new File(tmpFolder).createNewFile()
assert(new File(tmpFolder).exists())
FileUtils.rmFolder(tmpFolder)
assert(!new File(tmpFolder).exists())
if (!"true".equals(System.getProperty("skip.integration.tests"))) {
it should "remove folders" in {
new File(tmpFolder).createNewFile()
assert(new File(tmpFolder).exists())
FileUtils.rmFolder(tmpFolder)
assert(!new File(tmpFolder).exists())
}
}

}

0 comments on commit 37b9b12

Please sign in to comment.