-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathHow-to-write-results-into-HDFS.py
31 lines (21 loc) · 1.04 KB
/
How-to-write-results-into-HDFS.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
# Run with parameter --packages com.databricks:spark-avro_2.10:1.0.0
# E.g. spark-shell --packages com.databricks:spark-avro_2.10:1.0.0
from pyspark import SparkContext, SparkConf
conf = SparkConf().setAppName("pyspark fwjr JSONs 2 parquet")
sc = SparkContext(conf=conf)
df = sqlContext.load("/cms/wmarchive/test/avro/2016/01/01/", "com.databricks.spark.avro")
df.printSchema()
df.count()
aggregation1 = df.select("steps.performance.cpu") \
.rdd \
.flatMap(lambda cpuArrayRows: cpuArrayRows[0]) \
.map(lambda row: row.asDict()) \
.flatMap(lambda rowDict: [(k,v) for k,v in rowDict.iteritems()]) \
.reduceByKey(lambda x,y: x+y)
# Store the file as a simple text file
aggregation1.saveAsTextFile("wmarchive/test-plaintext-aggregation1")
aggregated1DF = sqlContext.createDataFrame([{v[0]:v[1] for v in aggregation1.collect()}])
# saving in Json format
aggregated1DF.toJSON().saveAsTextFile("wmarchive/test-json-aggregation1")
# how to write in Avro format
aggregated1DF.save("wmarchive/test-avro-aggregation1", "com.databricks.spark.avro")