-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy path0_Data_prep.py
executable file
·129 lines (109 loc) · 3.81 KB
/
0_Data_prep.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
# # Spark-SQL from PySpark
#
# This example shows how to send SQL queries to Spark.
#NOTE: In CDP find the HMS warehouse directory and external table directory by browsing to:
# Environment -> <env name> -> Data Lake Cluster -> Cloud Storage
# copy and paste the external location to the config setting below.
#Temporary workaround for MLX-975
#In utils/hive-site.xml edit hive.metastore.warehouse.dir and hive.metastore.warehouse.external.dir based on settings in CDP Data Lake -> Cloud Storage
if ( not os.path.exists('/etc/hadoop/conf/hive-site.xml')):
!cp /home/cdsw/utils/hive-site.xml /etc/hadoop/conf/
#Data taken from http://stat-computing.org/dataexpo/2009/the-data.html
#!for i in `seq 1987 2008`; do wget http://stat-computing.org/dataexpo/2009/$i.csv.bz2; bunzip2 $i.csv.bz2; sed -i '1d' $i.csv; aws s3 cp $i.csv s3://ml-field/demo/flight-analysis/data/flights_csv/; rm $i.csv; done
from __future__ import print_function
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
spark = SparkSession\
.builder\
.appName("PythonSQL")\
.config("spark.executor.memory", "8g")\
.config("spark.executor.instances", 5)\
.config("spark.yarn.access.hadoopFileSystems","s3a://ml-field/demo/flight-analysis/data/")\
.config("spark.driver.maxResultSize","8g")\
.getOrCreate()
spark.sql("SHOW databases").show()
spark.sql("USE default")
spark.sql("SHOW tables").show()
#spark.sql("DROP TABLE flights").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`flights` (
`Year` int ,
`Month` int ,
`DayofMonth` int ,
`DayOfWeek` int ,
`DepTime` int ,
`CRSDepTime` int ,
`ArrTime` int ,
`CRSArrTime` int ,
`UniqueCarrier` string ,
`FlightNum` int ,
`TailNum` string ,
`ActualElapsedTime` int ,
`CRSElapsedTime` int ,
`AirTime` int ,
`ArrDelay` int ,
`DepDelay` int ,
`Origin` string ,
`Dest` string ,
`Distance` int ,
`TaxiIn` int ,
`TaxiOut` int ,
`Cancelled` int ,
`CancellationCode` string ,
`Diverted` string ,
`CarrierDelay` int ,
`WeatherDelay` int ,
`NASDelay` int ,
`SecurityDelay` int ,
`LateAircraftDelay` int )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/flights_csv/'
'''
#spark.sql(statement)
spark.sql("SELECT COUNT(*) FROM `default`.`flights`").show()
spark.sql("SELECT * FROM `default`.`flights` LIMIT 10").take(5)
#spark.sql("DROP TABLE airports").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`airports` (
`iata` string ,
`airport` string ,
`city` string ,
`state` string ,
`country` string ,
`lat` double ,
`long` double )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/airports_csv/'
'''
#spark.sql(statement)
spark.sql("SELECT COUNT(*) FROM `default`.`airports`").show()
spark.sql("SELECT * FROM `default`.`airports` LIMIT 10").show()
ident,type,name,elevation_ft,continent,iso_country,iso_region,municipality,gps_code,iata_code,local_code,coordinates
#spark.sql("DROP TABLE airports_extended").show()
statement = '''
CREATE EXTERNAL TABLE IF NOT EXISTS `default`.`airports_extended` (
`ident` string ,
`type` string ,
`name` string ,
`elevation_ft` string ,
`continent` string ,
`iso_country` string ,
`iso_region` string ,
`municipality` string ,
`gps_code` string ,
`iata_code` string ,
`local_code` string ,
`coordinates` string )
ROW FORMAT DELIMITED FIELDS TERMINATED BY ','
STORED AS TextFile
LOCATION 's3a://ml-field/demo/flight-analysis/data/airports-extended/'
'''
#spark.sql(statement)
spark.sql("SELECT COUNT(*) FROM `default`.`airports_extended`").show()
spark.sql("SELECT * FROM `default`.`airports_extended` LIMIT 10").take(5)
spark.sql("SELECT coordinates FROM `default`.`airports_extended`").show()
#spark.stop()