-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathsparkmllib-sentiment-anylisis.py
70 lines (57 loc) · 2.63 KB
/
sparkmllib-sentiment-anylisis.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
# encoding=utf-8
import jieba
from pyspark import SparkConf, SparkContext
from pyspark.mllib.feature import HashingTF, IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
APP_NAME = "My Spark Application"
if __name__ == "__main__":
# Spark配置
conf = SparkConf().setAppName(APP_NAME)
conf = conf.setMaster("local[*]")
sc = SparkContext(conf=conf)
# 导入文件
# originData = sc.textFile('hdfs:///tmp/output.txt')
originData = sc.textFile('/Users/aaron/Documents/work_ml/commentNLP/spide/output/疯狂动物城 Zootopia.txt')
# print originData.count()
# 数据预处理
# originDistinctData = originData.distinct()
rateDocument = originData.map(lambda line: line.split('\t')).filter(lambda line: len(line) >= 2)
# print rateDocument.count()
# 统计打分情况
fiveRateDocument = rateDocument.filter(lambda line: int(line[0]) == 5)
# print fiveRateDocument.count()
fourRateDocument = rateDocument.filter(lambda line: int(line[0]) == 4)
# print fourRateDocument.count()
threeRateDocument = rateDocument.filter(lambda line: int(line[0]) == 3)
# print threeRateDocument.count()
twoRateDocument = rateDocument.filter(lambda line: int(line[0]) == 2)
# print twoRateDocument.count()
oneRateDocument = rateDocument.filter(lambda line: int(line[0]) == 1)
# print oneRateDocument.count()
# 生成训练数据
negRateDocument = oneRateDocument.union(twoRateDocument).union(threeRateDocument)
negRateDocument.repartition(1)
posRateDocument = sc.parallelize(fiveRateDocument.take(negRateDocument.count())).repartition(1)
allRateDocument = negRateDocument.union(posRateDocument)
allRateDocument.repartition(1)
rate = allRateDocument.map(lambda s: s[0])
document = allRateDocument.map(lambda s: s[1])
# 分词
words = document.map(lambda w: "/".join(jieba.cut_for_search(w))).map(lambda line: line.split("/"))
# 训练词频矩阵
hashingTF = HashingTF()
tf = hashingTF.transform(words)
tf.cache()
# 计算 TF-IDF 矩阵
idfModel = IDF().fit(tf)
tfidf = idfModel.transform(tf)
# 生成训练集和测试集
zipped = rate.zip(tfidf)
data = zipped.map(lambda line: LabeledPoint(line[0], line[1]))
training, test = data.randomSplit([0.6, 0.4], seed=0)
# 训练贝叶斯分类模型
NBmodel = NaiveBayes.train(training, 1.0)
predictionAndLabel = test.map(lambda p: (NBmodel.predict(p.features), p.label))
accuracy = 1.0 * predictionAndLabel.filter(lambda x: 1.0 if x[0] == x[1] else 0.0).count() / test.count()
print accuracy