-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmysql_to_redshift.py
315 lines (293 loc) · 12.3 KB
/
mysql_to_redshift.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
'''
Fix a MySQL dump it can be imported to a Redshift database.
NOTE: dump using
mysqldump --compatible=postgresql --default-character-set=utf8 -r $databasename.mysql -u $user $databasename $tablename
'''
import re
import sys
import os
import time
import getopt
def file_len(filename):
'''
returns
number of lines in filename
'''
with open(filename) as f:
i = 0
for i, l in enumerate(f):
pass
return i + 1
def parse(input_filename, output_filename, table_name, distribution_key, redshift_sort_keys, type_map):
#State storage
'''
returns
equivalent redshift file
input
required
input_filename: name of PROPERLY taken mysqldump file
output_filename
insert_mode: one of [ "OVERWRITE_EXISTING", "KEEP_EXISTING", "TRUNCATE" ]
NOTE: if insert_mode is OVERWRITE_EXISTING and distribution_key not specified, first primary key is set as dist key.
optional
table_name: generated table name for the first table encountered, else the same as input.
distribution_key: distribution key column in redshift for the first table encountered.
redshift_sort_keys: sort key(s) column(s) in redshift for the first table encountered.
type_map: override type conversion from mysql to redshift.
ex: tinyint(1):smallint,char(35):varchar(70),bigint(20) unsigned:bigint
insert_mode: https://docs.aws.amazon.com/console/datapipeline/redshiftcopyactivity.
Determines how to handle pre-existing data in the target table that overlaps with rows in the data to be loaded.
"allowedValues": [ "OVERWRITE_EXISTING", "KEEP_EXISTING", "TRUNCATE"]
if insert_mode is OVERWRITE_EXISTING and distribution_key not specified, first primary key is set as dist key.
'''
if input_filename == "-":
num_lines = -1
else:
num_lines = int(file_len(input_filename))
tables = {}
current_table = None
creation_lines = []
foreign_key_lines = []
num_inserts = 0
started = time.time()
table_primary_key_not_done = True
#Open output file and write header. Logging file handle will be std out
if output_filename == "-":
output = sys.stdout
logging = open(os.devnull, "w")
else:
output = open(output_filename, "w")
logging = sys.stdout
if input_filename == "-":
input_fh = sys.stdin
else:
input_fh = open(input_filename)
output.write("START TRANSACTION;\n")
for i, line in enumerate(input_fh):
time_taken = time.time() - started
percentage_done = (i + 1) / float(num_lines)
secs_left = (time_taken / percentage_done) - time_taken
logging.write( "\rLine %i (of %s: %.2f%%) [%s tables] [%s inserts] [ETA: %i min %i sec]" %
(
i + 1,
num_lines,
((i + 1) / float(num_lines)) * 100,
len(tables),
num_inserts,
secs_left // 60,
secs_left % 60,
)
)
logging.flush()
line = line.decode("utf8").strip()
#Ignore comment lines
if line.startswith("--") or line.startswith("/*") or line.startswith("LOCK TABLES") or line.startswith(
"DROP TABLE") or line.startswith("UNLOCK TABLES") or not line:
continue
#Outside of anything handling
if current_table is None:
#Start of a table creation statement?
if line.startswith("CREATE TABLE"):
#current_table is the table name in MySQL
current_table = line.split('"')[1]
if table_name and not table_name.isspace():
logging.write("\rLine %i [Encountered table %s ] [Generated table name %s]" %
(
i + 1, current_table, table_name)
)
current_table = table_name
table_name = None
tables[current_table] = {"columns": []}
creation_lines = []
table_primary_key_not_done = True
#Inserting data into a table?
elif line.startswith("INSERT INTO"):
output.write(line.encode("utf8").replace("'0000-00-00 00:00:00'", "NULL") + "\n")
num_inserts += 1
else:
print "\n ! Ignore. Unknown line in main body: %s" % line
#Inside-create-statement handling
else:
#Is it a column?
if line.startswith('"'):
useless, name, definition = line.strip(",").split('"', 2)
try:
sql_type, extra = definition.strip().split(" ", 1)
#This must be a tricky enum
if ')' in extra:
sql_type, extra = definition.strip().split(")")
except ValueError:
sql_type = definition.strip()
extra = ""
#check if extra contains unsigned, and remove it
unsigned = "unsigned" in extra.lower()
extra = re.sub("CHARACTER SET [\w\d]+\s*", "", extra.replace("unsigned", ""))
extra = re.sub("COLLATE [\w\d]+\s*", "", extra.replace("unsigned", ""))
extra = extra.replace("AUTO_INCREMENT", "")
extra = extra.replace("SERIAL", "")
extra = extra.replace("ZEROFILL", "")
extra = extra.replace("UNSIGNED", "")
sql_type = sql_type.lower()
#TODO: Write a helper dict somewhere for the long elif ladder
if type_map is not None and sql_type in type_map:
red_type = type_map[sql_type]
elif type_map is not None and unsigned and sql_type+ " unsigned" in type_map:
red_type = type_map[sql_type+ " unsigned"]
elif sql_type == "tinyint(1)":
red_type = "boolean"
elif sql_type.startswith("tinyint("):
red_type = "smallint"
elif sql_type.startswith("smallint("):
if unsigned:
red_type = "integer"
else:
red_type = "smallint"
elif sql_type.startswith("mediumint("):
red_type = "integer"
elif sql_type.startswith("int("):
if unsigned:
red_type = "bigint"
else:
red_type = "integer"
elif sql_type.startswith("bigint("):
if unsigned:
red_type = "varchar(80)"
else:
red_type = "bigint"
elif sql_type.startswith("float"):
red_type = "real"
elif sql_type.startswith("double"):
red_type = "double precision"
elif sql_type.startswith("decimal"):
#same decimal
red_type = sql_type
elif sql_type.startswith("char("):
size = int(sql_type.split("(")[1].rstrip(")"))
red_type = "varchar(%s)" % (size * 4)
elif sql_type.startswith("varchar("):
size = int(sql_type.split("(")[1].rstrip(")"))
red_type = "varchar(%s)" % (size * 4)
elif sql_type == "longtext":
red_type = "varchar(max)"
elif sql_type == "mediumtext":
red_type = "varchar(max)"
elif sql_type == "tinytext":
red_type = "text(%s)" % (255 * 4)
elif sql_type == "text":
red_type = "varchar(max)"
elif sql_type.startswith("enum(") or sql_type.startswith("set("):
red_type = "varchar(%s)" % (255 * 2)
elif sql_type == "blob":
red_type = "varchar(max)"
elif sql_type == "mediumblob":
red_type = "varchar(max)"
elif sql_type == "longblob":
red_type = "varchar(max)"
elif sql_type == "tinyblob":
red_type = "varchar(255)"
elif sql_type.startswith("binary"):
red_type = "varchar(255)"
elif sql_type == "date":
red_type = sql_type
elif sql_type == "time":
red_type = "varchar(40)"
elif sql_type == "datetime":
red_type = "timestamp"
elif sql_type == "year":
red_type = "varchar(16)"
elif sql_type == "timestamp":
red_type = sql_type
else:
#all else, e.g., varchar binary
red_type = "varchar(max)"
#Record it
creation_lines.append('"%s" %s %s' % (name, red_type, extra))
tables[current_table]['columns'].append((name, red_type, extra))
#Is it a constraint or something?
elif line.startswith("PRIMARY KEY"):
#composite primary key not supported in redshift
if table_primary_key_not_done:
#aws datapipeline redshift copy only supports 1 primary key. remove this restriction after that is fixed
#creation_lines.append(line.rstrip(","))
first_pkey = line.rstrip(",").rstrip(")").lstrip("PRIMARY KEY").lstrip("(\"").split(",")[0].rstrip("\"")
pkey_line ='PRIMARY KEY("' + first_pkey + '")'
creation_lines.append(pkey_line)
if not distribution_key:
distribution_key = first_pkey
table_primary_key_not_done = False
elif line.startswith("CONSTRAINT"):
#Try adding foreign key in a different transaction. If it fails, no big deal.
foreign_key_lines.append("ALTER TABLE \"%s\" ADD CONSTRAINT %s DEFERRABLE INITIALLY DEFERRED" % (
current_table, line.split("CONSTRAINT")[1].strip().rstrip(",")))
#No need for index on foreign key column as Redshift does not support indexes
elif line.startswith("UNIQUE KEY"):
creation_lines.append("UNIQUE (%s)" % line.split("(")[1].split(")")[0])
elif line.startswith("FULLTEXT KEY"):
#No indexes in Redshift
pass
elif line.startswith("KEY"):
pass
#Is it the end of the table?
elif line == ");":
output.write("CREATE TABLE IF NOT EXISTS \"%s\" (\n" % current_table)
for j, outline in enumerate(creation_lines):
output.write(" %s%s\n" % (outline, "," if j != (len(creation_lines) - 1) else ""))
output.write(')\n')
if distribution_key and not distribution_key.isspace():
output.write('distkey(%s)\n' % distribution_key)
distribution_key = None
if redshift_sort_keys and not redshift_sort_keys.isspace():
output.write('sortkey(%s)\n' % redshift_sort_keys)
redshift_sort_keys = None
output.write(';\n\n')
current_table = None
else:
print "\n ! Ignore. Unknown line inside table creation: %s" % line
print ""
def usage():
print(
"Usage: %s -i input -o output [-t table_name] [-d DistKey] [-s SortKey1,SortKey2...] "
"[-m MySQL1:RedType1,MySQL2:RedType2...]" % sys.argv[0]
)
if __name__ == "__main__":
input_file = ''
output_file = ''
gen_table_name = None
dist_key = None
sort_keys = None
map_types = None
insert_mode = None
try:
opts, args = getopt.getopt
(
sys.argv[1:], "i:o:t:d:s:m:n:",
["input_file=", "output_file=", "table_name=", "dist_key=", "sort_keys=","map_types=", "insert_mode="]
)
except getopt.GetoptError as e:
print (str(e))
usage()
sys.exit(2)
for opt, arg in opts:
if opt in ("-i", "--input_file"):
input_file = arg
elif opt in ("-o", "--output_file"):
output_file = arg
elif opt in ("-t", "--table_name"):
gen_table_name = arg
elif opt in ("-d", "--dist_key"):
dist_key = arg
elif opt in ("-s", "--sort_keys"):
sort_keys = arg
elif opt in ("-m", "--map_types"):
map_types = arg
elif opt in ("-n", "--insert_mode"):
insert_mode = arg
map_types_dict = None
if map_types and not map_types.isspace():
map_types_dict_temp = dict([arg.split(':') for arg in map_types.lower().lstrip().split(',')])
#sanitize keys to strip whitespaces
map_types_dict = dict()
for key in map_types_dict_temp:
map_types_dict[key.lstrip().rstrip()] = map_types_dict_temp[key]
parse(input_file, output_file, gen_table_name, dist_key, sort_keys, map_types_dict)
sys.exit(0)