-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathTopics_spark.py
53 lines (40 loc) · 1.21 KB
/
Topics_spark.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import sys
sys.path.append("..")
import DataProcess.getCfg as cfg
import json
import re
from tqdm import tqdm
from pyspark import SparkContext, SparkConf
import numpy as np
path_mp = cfg.get_path_conf('../path.cfg')
def topics_index_single(line):
obj = json.loads(line)
doc_id = obj['id']
contents = obj['contents']
topic_name = ''
for li in contents:
if type(li).__name__ == 'dict':
if 'type' in li and li['type'] == 'kicker':
topic_name = li['content'].strip()
break
st = set()
st.add(doc_id)
if topic_name == '':
return ''
else:
return (topic_name, st)
# create inverted list for topis
# No args
# output: (topic, [doc line numbers])
def topics_index(args = None):
SparkContext.getOrCreate().stop()
conf = SparkConf().setMaster("local[*]").setAppName("topics_index")
sc = SparkContext(conf=conf)
WashingtonPost = sc.textFile(path_mp['DataPath'] + path_mp['WashingtonPost'])
WashingtonPost.map(lambda line: topics_index_single(line)) \
.filter(lambda x: x != '') \
.reduceByKey(lambda a, b: a | b) \
.map(lambda t: str(t[0]) + ' ' + ' '.join(t[1])) \
.saveAsTextFile(cfg.OUTPUT + 'topics_index')
if __name__ == "__main__":
getattr(__import__('Topics_spark'), sys.argv[1])(sys.argv[2:])