Skip to content

Commit

Permalink
Merge pull request #455 from renaissance-benchmarks/topic/movie-lens-io
Browse files Browse the repository at this point in the history
Use Spark to load CSV files and avoid extra IO
  • Loading branch information
lbulej authored Nov 18, 2024
2 parents 5358b4e + ebae568 commit 5c786b9
Show file tree
Hide file tree
Showing 3 changed files with 136 additions and 125 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,11 @@ package org.renaissance.apache.spark
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.rdd._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Encoder
import org.apache.spark.sql.Encoders
import org.apache.spark.storage.StorageLevel
import org.renaissance.Benchmark
import org.renaissance.Benchmark._
Expand All @@ -12,12 +16,13 @@ import org.renaissance.BenchmarkResult
import org.renaissance.BenchmarkResult.Assert
import org.renaissance.BenchmarkResult.ValidationException
import org.renaissance.License
import org.renaissance.apache.spark.ResourceUtil.linesFromUrl
import org.renaissance.apache.spark.ResourceUtil.writeResourceToFile
import org.renaissance.apache.spark.ResourceUtil.linesFromSource
import org.renaissance.apache.spark.ResourceUtil.sourceFromResource

import java.net.URL
import java.nio.file.Path
import scala.collection.Map
import scala.io.Source

@Name("movie-lens")
@Group("apache-spark")
Expand Down Expand Up @@ -79,8 +84,6 @@ final class MovieLens extends Benchmark with SparkUtil {

private val randomSeed = 31

private var inputFileParam: String = _

private var alsConfigurations: Iterable[AlsConfig] = _

private val personalRatingsInputFile = "/ratings-personal.csv"
Expand All @@ -100,98 +103,138 @@ final class MovieLens extends Benchmark with SparkUtil {

class MovieLensHelper {
var movies: Map[Int, String] = _
var ratings: RDD[(Long, Rating)] = _
var personalRatings: Seq[Rating] = _
var personalRatingsRDD: RDD[Rating] = _
var personalRatingsUserId: Int = _
var training: RDD[Rating] = _
var validation: RDD[Rating] = _
var test: RDD[Rating] = _
var numTraining: Long = 0
var numValidation: Long = 0
var numTest: Long = 0
var bestModel: Option[MatrixFactorizationModel] = _
var bestConfig: AlsConfig = _
var bestValidationRmse = Double.MaxValue
var numRatings: Long = 0
var numUsers: Long = 0
var numMovies: Long = 0

private def parseRatingsCsvLines(lines: RDD[String]) = {
createRddFromCsv(
lines,
hasHeader = false,
delimiter = ",",
parts => {
val (userId, movieId, rating, timestamp) = (parts(0), parts(1), parts(2), parts(3))
val stratum = timestamp.toLong % 10
(stratum, Rating(userId.toInt, movieId.toInt, rating.toDouble))
}

private def dataFrameFromCsvLines(
lines: Seq[String],
schema: StructType,
hasHeader: Boolean
): DataFrame = {
implicit val encoder: Encoder[String] = Encoders.STRING
val ds = sparkSession.createDataset(lines)
val reader = sparkSession.read.option("header", value = hasHeader).schema(schema)

// Repartition the dataset to mimic the default used by textfile().
reader.csv(ds).repartition(sparkContext.defaultMinPartitions)
}

private def ratingsRddFromCsvLines(lines: Seq[String]) = {
val schema = StructType(
Seq(
StructField("userId", IntegerType, false),
StructField("movieId", IntegerType, false),
StructField("rating", DoubleType, false),
StructField("timestamp", LongType, false)
)
)

dataFrameFromCsvLines(lines, schema, hasHeader = false).rdd.map { row =>
val stratum = row.getAs[Long]("timestamp") % 10
val rating = Rating(row.getAs("userId"), row.getAs("movieId"), row.getAs("rating"))
(stratum, rating)
}
}

private def moviesRddFromCsvLines(lines: Seq[String]) = {
val schema = StructType(
Seq(
StructField("movieId", IntegerType, false),
StructField("title", StringType, false),
StructField("genres", StringType, false)
)
)

dataFrameFromCsvLines(lines, schema, hasHeader = true).rdd.map { row =>
(row.getAs[Int]("movieId"), row.getAs[String]("title"))
}
}

private def initMovies(source: Source): Unit = {
movies = moviesRddFromCsvLines(linesFromSource(source)).collect().toMap
}

def loadData(
moviesSource: Source,
personalRatingsSource: Source,
ratingsSource: Source,
trainingThreshold: Int,
validationThreshold: Int
): Unit = {
initMovies(moviesSource)

val personalRatingsRdd = initPersonalRatings(loadPersonalRatings(personalRatingsSource))
val ratingsRdd = describeRatings(ratingsRddFromCsvLines(linesFromSource(ratingsSource)))
val parts = splitRatings(ratingsRdd, trainingThreshold, validationThreshold)

// Merge personal ratings into training data set.
initDatasets(parts._1.union(personalRatingsRdd), parts._2, parts._3)
}

def loadPersonalRatings(url: URL) = {
private def loadPersonalRatings(source: Source) = {
// Get only entries with positive rating.
val lines = sparkContext.parallelize(linesFromUrl(url))
val ratings = parseRatingsCsvLines(lines).values.filter { _.rating > 0.0 }
ratingsRddFromCsvLines(linesFromSource(source)).values.filter { _.rating > 0.0 }
}

private def initPersonalRatings(ratings: RDD[Rating]) = {
assume(!ratings.isEmpty(), "collection of personal ratings is not empty!")

val positiveRatings = ratings.collect().toSeq
val userIds = positiveRatings.map(_.user).distinct
val collectedRatings = ratings.collect().toSeq
val userIds = collectedRatings.map(_.user).distinct
assume(userIds.length == 1, "personal ratings come from a single user!")

personalRatings = positiveRatings
personalRatingsRDD = ensureCached(ratings)
personalRatings = collectedRatings
personalRatingsUserId = userIds.head
}

def loadRatings(file: Path) = {
val lines = sparkContext.textFile(file.toString)
ratings = ensureCached(parseRatingsCsvLines(lines))

numRatings = ratings.count()
numUsers = ratings.map(_._2.user).distinct().count()
numMovies = ratings.map(_._2.product).distinct().count()
ratings
}

def loadMovies(file: Path) = {
movies = createRddFromCsv(
sparkContext.textFile(file.toString),
hasHeader = true,
delimiter = ",",
parts => {
val (movieId, movieName) = (parts(0), parts(1))
(movieId.toInt, movieName)
}
).collectAsMap()
private def describeRatings(ratings: RDD[(Long, Rating)]) = {
val numRatings = ratings.count()
val numUsers = ratings.map(_._2.user).distinct().count()
val numMovies = ratings.map(_._2.product).distinct().count()
println(s"Got $numRatings ratings from $numUsers users on $numMovies movies.")
ratings
}

def splitRatings(trainingThreshold: Int, validationThreshold: Int) = {
// Merge personal ratings into training data set and cache them.
training = ensureCached(
ratings
.filter(x => x._1 < trainingThreshold)
.values
.union(personalRatingsRDD)
private def splitRatings(
ratings: RDD[(Long, Rating)],
trainingThreshold: Int,
validationThreshold: Int
) = {
(
ratings.filter(x => x._1 < trainingThreshold).values,
ratings.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold).values,
ratings.filter(x => x._1 >= validationThreshold).values
)
numTraining = training.count()
}

validation = ensureCached(
ratings
.filter(x => x._1 >= trainingThreshold && x._1 < validationThreshold)
.values
)
private def initDatasets(
trainingSet: RDD[Rating],
validationSet: RDD[Rating],
testSet: RDD[Rating]
): Unit = {
// Cache data sets and print info to trigger evaluation.
training = ensureCached(trainingSet)

validation = ensureCached(validationSet)
numValidation = validation.count()

test = ensureCached(
ratings.filter(x => x._1 >= validationThreshold).values
)
test = ensureCached(testSet)
numTest = test.count()

println(s"Training: $numTraining, validation: $numValidation, test: $numTest")
println(s"Training: ${training.count()}, validation: $numValidation, test: $numTest")
}

def trainModels(configs: Iterable[AlsConfig]) = {
def trainModels(configs: Iterable[AlsConfig]): Unit = {
// Train models and evaluate them on the validation set.
for (config <- configs) {
val model = trainModel(training, config)
Expand Down Expand Up @@ -221,7 +264,7 @@ final class MovieLens extends Benchmark with SparkUtil {
.run(ratings)
}

def recommendMovies() = {
def recommendMovies(): Array[Rating] = {
val testRmse = computeRmse(bestModel.get, test, numTest)

println(
Expand Down Expand Up @@ -261,7 +304,7 @@ final class MovieLens extends Benchmark with SparkUtil {
}

override def setUpBeforeAll(bc: BenchmarkContext): Unit = {
import scala.jdk.CollectionConverters._
import scala.jdk.CollectionConverters.ListHasAsScala

// Validation parameters.
topRecommendedMovieCount = bc.parameter("top_recommended_movie_count").toInteger
Expand All @@ -276,8 +319,6 @@ final class MovieLens extends Benchmark with SparkUtil {
//
setUpSparkContext(bc, useCheckpointDir = true)

inputFileParam = bc.parameter("input_file").value

alsConfigurations = bc
.parameter("als_configs")
.toCsvRows(m =>
Expand All @@ -289,21 +330,17 @@ final class MovieLens extends Benchmark with SparkUtil {
)
.asScala

loadData(bc.scratchDirectory())

// Split ratings into training (~60%), validation (~20%), and test (~20%)
// data sets based on the last digit of a rating's timestamp.
helper.splitRatings(6, 8)
}

def loadData(scratchDir: Path) = {
helper.loadPersonalRatings(getClass.getResource(personalRatingsInputFile))
helper.loadRatings(writeResourceToFile(inputFileParam, scratchDir.resolve("ratings.csv")))
helper.loadMovies(writeResourceToFile(moviesInputFile, scratchDir.resolve("movies.csv")))

println(
"Got " + helper.numRatings + " ratings from "
+ helper.numUsers + " users on " + helper.numMovies + " movies."
// Load movies and ratings and split the ratings into training (~60%),
// validation (~20%), and test (~20%) sets based on the last digit of a
// rating's timestamp.
val ratingsInputFileParam = bc.parameter("input_file").value

helper.loadData(
sourceFromResource(moviesInputFile),
sourceFromResource(personalRatingsInputFile),
sourceFromResource(ratingsInputFileParam),
6,
8
)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,34 +6,13 @@ import java.net.URL
import java.nio.charset.StandardCharsets
import java.nio.file.Files
import java.nio.file.Path
import java.nio.file.{StandardCopyOption => CopyOption}
import java.nio.file.{StandardOpenOption => OpenOption}
import java.util.zip.ZipInputStream
import scala.io.BufferedSource
import scala.io.Source

private object ResourceUtil {

/**
* Writes the resource associated with the [[ResourceUtil]] class
* to a file, replacing an existing file.
*
* @param resourceName path to the resource
* @param file path the output file
* @return [[Path]] to the output file
*/
def writeResourceToFile(resourceName: String, file: Path) = {
val resourceStream = getResourceStream(resourceName)
try {
Files.copy(resourceStream, file, CopyOption.REPLACE_EXISTING)
} finally {
// This may mask a try-block exception, but at least it will fail anyway.
resourceStream.close()
}

file
}

/**
* Writes the resource associated with the [[ResourceUtil]] class
* to a file, replacing an existing file.
Expand Down Expand Up @@ -73,7 +52,7 @@ private object ResourceUtil {
def duplicateLinesFromUrl(url: URL, copyCount: Int, outputFile: Path): Path = {
import scala.jdk.CollectionConverters._

val lines = linesFromUrl(url).asJava
val lines = linesFromSource(Source.fromURL(url)).asJava

for (_ <- 0 until copyCount) {
Files.write(outputFile, lines, OpenOption.CREATE, OpenOption.APPEND)
Expand All @@ -83,20 +62,30 @@ private object ResourceUtil {
}

/**
* Loads a file from the given [[URL]] as a sequence of lines.
* Loads the contents of a [[Source]] as a sequence of lines and closes the source.
*
* @param url input file [[URL]]
* @param source input [[Source]]
* @return a sequence of lines
*/
def linesFromUrl(url: URL): Seq[String] = {
val source = Source.fromURL(url)
def linesFromSource(source: Source): Seq[String] = {
try {
source.getLines().toSeq
} finally {
source.close()
}
}

/**
* Creates a [[Source]] from a resource associated with the [[ResourceUtil]] class.
*
* @param resourceName path to the resource
* @return a [[Source]] for the given resource
*/
def sourceFromResource(resourceName: String): BufferedSource = {
// Use an explicit codec to avoid influence of the environment.
Source.fromURL(getClass.getResource(resourceName))(scala.io.Codec.UTF8)
}

/**
* Creates a [[Source]] from a file within a ZIP resource
* associated with the [[ResourceUtil]] class.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,21 +90,6 @@ trait SparkUtil {
)
}

def createRddFromCsv[T: ClassTag](
lines: RDD[String],
hasHeader: Boolean,
delimiter: String,
mapper: Array[String] => T
) = {
val linesWithoutHeader = if (hasHeader) dropFirstLine(lines) else lines
linesWithoutHeader.map(_.split(delimiter)).map(mapper).filter(_ != null)
}

private def dropFirstLine(lines: RDD[String]): RDD[String] = {
val first = lines.first()
lines.filter { line => line != first }
}

def tearDownSparkContext(): Unit = {
sparkSession.close()
}
Expand Down

0 comments on commit 5c786b9

Please sign in to comment.