From 4ec42e8fd5342aab07997c492c8be9272ff58c83 Mon Sep 17 00:00:00 2001 From: "Joseph K. Bradley" Date: Thu, 13 Nov 2014 13:38:28 -0800 Subject: [PATCH] Added examples for MLlib book chapter, plus fake spam,ham datasets. --- data/mllib/ham.txt | 7 ++ data/mllib/spam.txt | 5 ++ .../spark/examples/mllib/JavaBookExample.java | 81 ++++++++++++++++++ .../src/main/python/mllib/book_example.py | 57 +++++++++++++ .../examples/ml/BookExamplePipeline.scala | 83 +++++++++++++++++++ .../spark/examples/mllib/BookExample.scala | 65 +++++++++++++++ 6 files changed, 298 insertions(+) create mode 100644 data/mllib/ham.txt create mode 100644 data/mllib/spam.txt create mode 100644 examples/src/main/java/org/apache/spark/examples/mllib/JavaBookExample.java create mode 100755 examples/src/main/python/mllib/book_example.py create mode 100644 examples/src/main/scala/org/apache/spark/examples/ml/BookExamplePipeline.scala create mode 100644 examples/src/main/scala/org/apache/spark/examples/mllib/BookExample.scala diff --git a/data/mllib/ham.txt b/data/mllib/ham.txt new file mode 100644 index 0000000000000..a002eb4a544bf --- /dev/null +++ b/data/mllib/ham.txt @@ -0,0 +1,7 @@ +Dear Spark Learner, Thanks so much for attending the Spark Summit 2014! Check out videos of talks from the summit at ... +Hi Mom, Apologies for being late about emailing and forgetting to send you the package. I hope you and bro have been ... +Wow, hey Fred, just heard about the Spark petabyte sort. I think we need to take time to try it out immediately ... +Hi Spark user list, This is my first question to this list, so thanks in advance for your help! I tried running ... +Thanks Tom for your email. I need to refer you to Alice for this one. I haven't yet figured out that part either ... +Good job yesterday! I was attending your talk, and really enjoyed it. I want to try out GraphX ... +Summit demo got whoops from audience! Had to let you know. --Joe diff --git a/data/mllib/spam.txt b/data/mllib/spam.txt new file mode 100644 index 0000000000000..55641dd9f0ed6 --- /dev/null +++ b/data/mllib/spam.txt @@ -0,0 +1,5 @@ +Dear sir, I am a Prince in a far kingdom you have not heard of. I want to send you money via wire transfer so please ... +Get Viagra real cheap! Send money right away to ... +Oh my gosh you can be really strong too with these drugs found in the rainforest. Get them cheap right now ... +YOUR COMPUTER HAS BEEN INFECTED! YOU MUST RESET YOUR PASSWORD. Reply to this email with your password and SSN ... +THIS IS NOT A SCAM! Send money and get access to awesome stuff really cheap and never have to ... diff --git a/examples/src/main/java/org/apache/spark/examples/mllib/JavaBookExample.java b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBookExample.java new file mode 100644 index 0000000000000..3a062294adad2 --- /dev/null +++ b/examples/src/main/java/org/apache/spark/examples/mllib/JavaBookExample.java @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib; + +import java.util.Arrays; + +import org.apache.spark.SparkConf; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; +import org.apache.spark.api.java.function.Function; + +import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS; +import org.apache.spark.mllib.classification.LogisticRegressionModel; +import org.apache.spark.mllib.feature.HashingTF; +import org.apache.spark.mllib.linalg.Vector; +import org.apache.spark.mllib.regression.LabeledPoint; + +public final class JavaBookExample { + + public static void main(String[] args) { + SparkConf sparkConf = new SparkConf().setAppName("JavaBookExample"); + JavaSparkContext sc = new JavaSparkContext(sparkConf); + + // Load 2 types of emails from text files: spam and ham (non-spam). + // Each line has text from one email. + JavaRDD spam = sc.textFile("data/mllib/spam.txt"); + JavaRDD ham = sc.textFile("data/mllib/ham.txt"); + + // Create a HashingTF instance to map email text to vectors of 100 features. + final HashingTF tf = new HashingTF(100); + + // Each email is split into words, and each word is mapped to one feature. + // Create LabeledPoint datasets for positive (spam) and negative (ham) examples. + JavaRDD positiveExamples = spam.map(new Function() { + @Override public LabeledPoint call(String email) { + return new LabeledPoint(1, tf.transform(Arrays.asList(email.split(" ")))); + } + }); + JavaRDD negativeExamples = ham.map(new Function() { + @Override public LabeledPoint call(String email) { + return new LabeledPoint(0, tf.transform(Arrays.asList(email.split(" ")))); + } + }); + JavaRDD trainingData = positiveExamples.union(negativeExamples); + trainingData.cache(); // Cache data since Logistic Regression is an iterative algorithm. + + // Create a Logistic Regression learner which uses the LBFGS optimizer. + LogisticRegressionWithLBFGS lrLearner = new LogisticRegressionWithLBFGS(); + // Set model regularization. A well-chosen regParam can make models more robust. + lrLearner.optimizer().setRegParam(1.0); + // Run the actual learning algorithm on the training data. + LogisticRegressionModel model = lrLearner.run(trainingData.rdd()); + + // Test on a positive example (spam) and a negative one (ham). + // First apply the same HashingTF feature transformation used on the training data. + Vector posTestExample = + tf.transform(Arrays.asList("O M G GET cheap stuff by sending money to ...".split(" "))); + Vector negTestExample = + tf.transform(Arrays.asList("Hi Dad, I started studying Spark the other ...".split(" "))); + // Now use the learned model to predict spam/ham for new emails. + System.out.println("Prediction for positive test example: " + model.predict(posTestExample)); + System.out.println("Prediction for negative test example: " + model.predict(negTestExample)); + + sc.stop(); + } +} diff --git a/examples/src/main/python/mllib/book_example.py b/examples/src/main/python/mllib/book_example.py new file mode 100755 index 0000000000000..5851f8cfabc1b --- /dev/null +++ b/examples/src/main/python/mllib/book_example.py @@ -0,0 +1,57 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from pyspark import SparkContext +from pyspark.mllib.regression import LabeledPoint +from pyspark.mllib.classification import LogisticRegressionWithSGD +from pyspark.mllib.feature import HashingTF + + +if __name__ == "__main__": + sc = SparkContext(appName="PythonBookExample") + + # Load 2 types of emails from text files: spam and ham (non-spam). + # Each line has text from one email. + spam = sc.textFile("data/mllib/spam.txt") + ham = sc.textFile("data/mllib/ham.txt") + + # Create a HashingTF instance to map email text to vectors of 100 features. + tf = HashingTF(numFeatures = 100) + # Each email is split into words, and each word is mapped to one feature. + spamFeatures = spam.map(lambda email: tf.transform(email.split(" "))) + hamFeatures = ham.map(lambda email: tf.transform(email.split(" "))) + + # Create LabeledPoint datasets for positive (spam) and negative (ham) examples. + positiveExamples = spamFeatures.map(lambda features: LabeledPoint(1, features)) + negativeExamples = hamFeatures.map(lambda features: LabeledPoint(0, features)) + training_data = positiveExamples.union(negativeExamples) + training_data.cache() # Cache data since Logistic Regression is an iterative algorithm. + + # Run Logistic Regression using the SGD optimizer. + # regParam is model regularization, which can make models more robust. + model = LogisticRegressionWithSGD.train(training_data, regParam=1.0) + + # Test on a positive example (spam) and a negative one (ham). + # First apply the same HashingTF feature transformation used on the training data. + posTestExample = tf.transform("O M G GET cheap stuff by sending money to ...".split(" ")) + negTestExample = tf.transform("Hi Dad, I started studying Spark the other ...".split(" ")) + + # Now use the learned model to predict spam/ham for new emails. + print "Prediction for positive test example: %g" % model.predict(posTestExample) + print "Prediction for negative test example: %g" % model.predict(negTestExample) + + sc.stop() diff --git a/examples/src/main/scala/org/apache/spark/examples/ml/BookExamplePipeline.scala b/examples/src/main/scala/org/apache/spark/examples/ml/BookExamplePipeline.scala new file mode 100644 index 0000000000000..49d07b976155e --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/ml/BookExamplePipeline.scala @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.ml + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.{Row, SQLContext} +import org.apache.spark.ml.Pipeline +import org.apache.spark.ml.classification.LogisticRegression +import org.apache.spark.ml.feature.{HashingTF, Tokenizer} + +object BookExamplePipeline { + + case class Document(id: Long, text: String) + + case class LabeledDocument(id: Long, text: String, label: Double) + + def main(args: Array[String]) { + val conf = new SparkConf().setAppName("BookExamplePipeline") + val sc = new SparkContext(conf) + val sqlContext = new SQLContext(sc) + import sqlContext._ + + // Load 2 types of emails from text files: spam and ham (non-spam). + // Each line has text from one email. + val spam = sc.textFile("data/mllib/spam.txt") + val ham = sc.textFile("data/mllib/ham.txt") + + // Create LabeledPoint datasets for positive (spam) and negative (ham) examples. + val positiveExamples = spam.zipWithIndex().map { case (email, index) => + LabeledDocument(index, email, 1.0) + } + val negativeExamples = ham.zipWithIndex().map { case (email, index) => + LabeledDocument(index, email, 0.0) + } + val trainingData = positiveExamples ++ negativeExamples + + // Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr. + // Each stage outputs a column in a SchemaRDD and feeds it to the next stage's input column. + val tokenizer = new Tokenizer() // Splits each email into words + .setInputCol("text") + .setOutputCol("words") + val hashingTF = new HashingTF() // Maps email words to vectors of 100 features. + .setNumFeatures(100) + .setInputCol(tokenizer.getOutputCol) + .setOutputCol("features") + val lr = new LogisticRegression() // LogisticRegression uses inputCol "features" by default. + .setRegParam(1.0) // model regularization + val pipeline = new Pipeline() + .setStages(Array(tokenizer, hashingTF, lr)) + + // Fit the pipeline to training documents. + // RDDs of case classes work well with Pipelines since Spark SQL can infer a schema from + // case classes and convert the data into a SchemaRDD. + val model = pipeline.fit(trainingData) + + // Make predictions on test documents. + // The fitted model automatically transforms features using Tokenizer and HashingTF. + val testData = sc.parallelize(Seq( + Document(0, "O M G GET cheap stuff by sending money to ..."), // positive example (spam) + Document(1, "Hi Dad, I started studying Spark the other ...") // negative example (ham) + )) + val predictions = model.transform(testData) + .select('id, 'prediction).collect() + .map { case Row(id, prediction) => (id, prediction) }.toMap + println(s"Prediction for positive test example: ${predictions(0)}") + println(s"Prediction for negative test example: ${predictions(1)}") + } +} diff --git a/examples/src/main/scala/org/apache/spark/examples/mllib/BookExample.scala b/examples/src/main/scala/org/apache/spark/examples/mllib/BookExample.scala new file mode 100644 index 0000000000000..0c036357fc00f --- /dev/null +++ b/examples/src/main/scala/org/apache/spark/examples/mllib/BookExample.scala @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.spark.examples.mllib + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.mllib.classification.LogisticRegressionWithLBFGS +import org.apache.spark.mllib.feature.HashingTF +import org.apache.spark.mllib.regression.LabeledPoint + +object BookExample { + + def main(args: Array[String]) { + val conf = new SparkConf().setAppName(s"Book example: Scala") + val sc = new SparkContext(conf) + + // Load 2 types of emails from text files: spam and ham (non-spam). + // Each line has text from one email. + val spam = sc.textFile("data/mllib/spam.txt") + val ham = sc.textFile("data/mllib/ham.txt") + + // Create a HashingTF instance to map email text to vectors of 100 features. + val tf = new HashingTF(numFeatures = 100) + // Each email is split into words, and each word is mapped to one feature. + val spamFeatures = spam.map(email => tf.transform(email.split(" "))) + val hamFeatures = ham.map(email => tf.transform(email.split(" "))) + + // Create LabeledPoint datasets for positive (spam) and negative (ham) examples. + val positiveExamples = spamFeatures.map(features => LabeledPoint(1, features)) + val negativeExamples = hamFeatures.map(features => LabeledPoint(0, features)) + val trainingData = positiveExamples ++ negativeExamples + trainingData.cache() // Cache data since Logistic Regression is an iterative algorithm. + + // Create a Logistic Regression learner which uses the LBFGS optimizer. + val lrLearner = new LogisticRegressionWithLBFGS() + // Set model regularization. A well-chosen regParam can make models more robust. + lrLearner.optimizer.setRegParam(1.0) + // Run the actual learning algorithm on the training data. + val model = lrLearner.run(trainingData) + + // Test on a positive example (spam) and a negative one (ham). + // First apply the same HashingTF feature transformation used on the training data. + val posTestExample = tf.transform("O M G GET cheap stuff by sending money to ...".split(" ")) + val negTestExample = tf.transform("Hi Dad, I started studying Spark the other ...".split(" ")) + // Now use the learned model to predict spam/ham for new emails. + println(s"Prediction for positive test example: ${model.predict(posTestExample)}") + println(s"Prediction for negative test example: ${model.predict(negTestExample)}") + + sc.stop() + } +}