-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathcfn_transform.py
321 lines (259 loc) · 12.1 KB
/
cfn_transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
"""Library for creating transforms to CloudFormation templates.
Copyright 2018 iRobot Corporation
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
__version__ = '0.9.0'
import collections
import argparse
import sys
import importlib
import inspect
import six
import yaml
class CloudFormationTemplateTransform(object):
"""A class for creating transforms to CloudFormation templates.
A tranform is instantiated with a template, and when applied changes that template
in-place.
Subclasses generally implement transforms through two mechanisms.
A method named after a template section, that is, Metadata, Parameters, Mappings,
Conditions, Resources, or Outputs, can return a dict of entries to merge into that
section. These methods are also a good place to modify those sections in arbitrary
ways.
If a method is defined named process_resource, it will be applied to the resources
in the template. A resource type spec (a string or regex object, or a list of those)
can be defined in the field PROCESS_RESOURCE_TYPE_SPEC to filter the resources processed.
If a more complex transform needs to be done that doesn't fit into those options, an
_apply method can be defined that will replace the built-in transformation steps.
"""
@classmethod
def resource_type_matches(cls, resource_type, resource_type_spec):
"""Check a resource type matches the type spec. The type spec can be:
- a string
- a regex (uses search, not match)
- a callable returning a bool
- an iterable of type specs
"""
if resource_type_spec is None:
return True
if isinstance(resource_type_spec, six.string_types):
return resource_type == resource_type_spec
elif hasattr(resource_type_spec, 'search'):
return resource_type_spec.search(resource_type)
elif callable(resource_type_spec):
return resource_type_spec(resource_type)
elif isinstance(resource_type_spec, collections.Iterable):
return any(cls.resource_type_matches(resource_type, spec) for spec in resource_type_spec)
else:
raise TypeError("Unknown resource_type_spec {}".format(resource_type_spec))
@classmethod
def _map(cls, resources, func, resource_type_spec):
remove = []
add = {}
for logical_id, resource in six.iteritems(resources):
if cls.resource_type_matches(resource['Type'], resource_type_spec):
new_resources = func(logical_id, resource)
add.update(new_resources or {})
if not resource:
remove.append(logical_id)
for logical_id in remove:
del resources[logical_id]
_merge_dicts(resources, add)
return resources
def __init__(self, template, options={}):
self.template = template
self.options = options
self._remaining_args = options.get('remaining_args', [])
self.applied = False
def subtransformers(self):
return []
def Description(self):
return None
def Metadata(self):
return {}
def Parameters(self):
return {}
def Mappings(self):
return {}
def Conditions(self):
return {}
def Transform(self):
return None
def Resources(self):
return {}
def Outputs(self):
return {}
def apply(self):
"""Apply the transform to the template. In general, should only be called once.
The transform is applied as follows:
- subtransformers
- sections: All section methods (Resources, etc.) are called and their outputs
are merged into the template
- process_resource: If the class has a process_resource method, it is applied
to each resource in the template.
- If a field PROCESS_RESOURCE_TYPE_SPEC is defined, it is used to filter the
resources.
- The process_resource method receives the resource logical id and the resource
definition. It can return a dict of new resources to add. Emptying the
contents of the input resource will cause it to be removed.
Hooks can be provided for the above steps by defining methods named update_before_X
or update_after_X, and additionally update_at_start and update_at_end. These
are called with no arguments.
Instead of overriding this method to perform more complex transforms, override the
_apply method, which will preserve the housekeeping that this class performs.
"""
if self.applied:
raise RuntimeError("Transform applied more than once")
self._apply()
self.applied = True
return self.template
def _run_hook(self, *args):
for name in args:
method_name = 'update_{}'.format(name)
if hasattr(self, method_name):
getattr(self, method_name)()
def _apply(self):
self._run_hook('at_start', 'before_subtransformers')
for subtransformer in self.subtransformers():
if inspect.isclass(subtransformer):
subtransformer = subtransformer(self.template)
self.template = subtransformer.apply()
self._run_hook('after_subtransformers', 'before_sections')
desc = self.Description()
if desc is not None:
self.template['Description'] = desc
transforms = self.Transform()
if transforms:
if 'Transform' not in self.template:
self.template['Transform'] = []
if isinstance(transforms, six.string_types):
self.template['Transform'].append(transforms)
else:
self.template['Transform'].extend(transforms)
dict_fields = ['Metadata', 'Parameters', 'Mappings', 'Conditions', 'Resources', 'Outputs']
for field in dict_fields:
if field not in self.template:
self.template[field] = {}
value = getattr(self, field)()
_merge_dicts(self.template[field], value)
for field in dict_fields + ['Description', 'Transform']:
if field in self.template and not self.template[field]:
del self.template[field]
self._run_hook('after_sections', 'before_process_resource')
if hasattr(self, 'process_resource'):
self._map(self.template['Resources'], self.process_resource, resource_type_spec=getattr(self, 'PROCESS_RESOURCE_TYPE_SPEC', None))
self._run_hook('after_process_resource')
self._run_hook('at_end')
@classmethod
def main(cls, **kwargs):
"""Run the given CloudFormationTemplateTransform class
against commandline inputs, supporting both files and stdin/out.
Keyword args are passed to file_transformer.main()
"""
try:
import file_transformer
except Exception as e:
sys.exit("{}\nSee https://github.com/benkehoe/file-transformer".format(e))
def loader(input_stream, args):
return yaml.load(input_stream)
def processor(input, args):
transform = cls(input, vars(args))
transform.apply()
return transform.template
def dumper(output, output_stream, args):
yaml.dump(output, output_stream)
return file_transformer.main(processor, loader, dumper, **kwargs)
@classmethod
def _subclass_main(cls, args=None):
parser = argparse.ArgumentParser()
parser.add_argument('transform_class')
args, remaining_args = parser.parse_known_args(args=args)
xform = args.transform_class.split(':')
try:
if len(xform) == 2:
pkg_name, cls_name = xform
module = importlib.import_module(pkg_name)
subcls = getattr(module, cls_name)
elif len(xform) == 1:
pkg_name = xform[0]
module = importlib.import_module(pkg_name)
subcls = inspect.getmembers(module, lambda o: (
inspect.isclass(o)
and issubclass(o, cls)
and o is not cls))
if len(subcls) == 0:
parser.exit("No {} subclass found in {}".format(cls.__name__, pkg_name))
elif len(subcls) > 1:
names = [name for name, _ in subcls]
parser.exit("Multiple transforms found in {}, please choose from: {}".format(pkg_name, ' '.join(names)))
else:
subcls = subcls[0][1]
else:
parser.exit("Improperly formatted transform specifier")
except Exception as e:
# import traceback
# traceback.print_exception(*sys.exc_info())
parser.exit("Exception importing transform class: {}".format(e))
return subcls.main(args=remaining_args)
@classmethod
def get_lambda_handler(cls):
def handler(event, context):
import boto3
def resolve_location(location):
if isinstance(location, dict):
bucket = location['Bucket']
key = location['Key']
else:
raise ValueError("Unknown location {}".format(location))
return bucket, key
if 'TemplateBody' in event:
template_body = event['TemplateBody']
if isinstance(template_body, six.string_types):
template = template_body
else:
template = yaml.safe_load(template_body)
elif 'TemplateURL' in event:
template_url = event['TemplateURL']
raise NotImplementedError
elif 'TemplateLocation' in event:
bucket, key = resolve_location(event['TemplateLocation'])
client = boto3.client('s3')
response = client.get_object(Bucket=bucket, Key=key)
template = yaml.load(response['Body'])
template = yaml.safe_load(template_body)
transform = cls(template, options={'Context': context})
transformed = transform.apply()
if 'OutputLocation' not in event:
return transformed
transformed_str = yaml.dump(transformed)
bucket, key = resolve_location(event['OutputLocation'])
client = boto3.client('s3')
response = client.put_object(Bucket=bucket, Key=key, Body=transformed_str)
return handler
def module_main():
return CloudFormationTemplateTransform._subclass_main()
def _merge_dicts(dict1, dict2, path=None):
"""Recursively merge dict2 into dict1 (in place)"""
if path is None:
path = []
for key, value2 in six.iteritems(dict2):
if key not in dict1:
dict1[key] = value2
continue
value1 = dict1[key]
if value1 == value2:
continue
elif isinstance(value1, dict) and isinstance(value2, dict):
_merge_dicts(dict1[key], dict2[key], path=path+[key])
elif isinstance(value1, (set, frozenset)) and isinstance(value2, (set, frozenset, list)):
dict1[key] = value1 | frozenset(value2)
else:
raise TypeError("Cannot merge {} with {} at {}".format(type(value1), type(value2), '/'.join(path)))
return dict1