forked from phantomcyber/playbooks
-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathec2_instance_isolation.py
330 lines (243 loc) · 13.8 KB
/
ec2_instance_isolation.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
"""
Isolate an EC2 instance by changing its security group in order to protect it from malicious traffic. This playbook can be started alone or used from another playbook after doing investigation and notification. The existing security group is removed from the instance and a new isolation security group is added.
"""
import phantom.rules as phantom
import json
from datetime import datetime, timedelta
def on_start(container):
phantom.debug('on_start() called')
# call 'filter_ec2_resource' block
filter_ec2_resource(container=container)
return
"""
Separate the EC2 resource from the other artifacts in the Finding.
"""
def filter_ec2_resource(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('filter_ec2_resource() called')
# collect filtered artifact ids for 'if' condition 1
matched_artifacts_1, matched_results_1 = phantom.condition(
container=container,
conditions=[
["artifact:*.name", "==", "AwsEc2Instance Resource Artifact"],
],
name="filter_ec2_resource:condition_1")
# call connected blocks if filtered artifacts or results
if matched_artifacts_1 or matched_results_1:
describe_instance_before(action=action, success=success, container=container, results=results, handle=handle, filtered_artifacts=matched_artifacts_1, filtered_results=matched_results_1)
return
"""
Gather EC2 instance metadata before making any changes.
"""
def describe_instance_before(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('describe_instance_before() called')
# collect data for 'describe_instance_before' call
filtered_artifacts_data_1 = phantom.collect2(container=container, datapath=['filtered-data:filter_ec2_resource:condition_1:artifact:*.cef.InstanceId', 'filtered-data:filter_ec2_resource:condition_1:artifact:*.id'])
parameters = []
# build parameters list for 'describe_instance_before' call
for filtered_artifacts_item_1 in filtered_artifacts_data_1:
parameters.append({
'limit': "",
'filters': "",
'instance_ids': filtered_artifacts_item_1[0],
'dry_run': "",
# context (artifact id) is added to associate results with the artifact
'context': {'artifact_id': filtered_artifacts_item_1[1]},
})
phantom.act("describe instance", parameters=parameters, assets=['aws_ec2'], callback=add_isolation_SG, name="describe_instance_before")
return
"""
Add the isolation security group to the EC2 instance.
"""
def add_isolation_SG(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('add_isolation_SG() called')
#phantom.debug('Action: {0} {1}'.format(action['name'], ('SUCCEEDED' if success else 'FAILED')))
# collect data for 'add_isolation_SG' call
results_data_1 = phantom.collect2(container=container, datapath=['describe_instance_before:action_result.parameter.instance_ids', 'describe_instance_before:action_result.parameter.context.artifact_id'], action_results=results)
parameters = []
# build parameters list for 'add_isolation_SG' call
for results_item_1 in results_data_1:
if results_item_1[0]:
parameters.append({
'instance_id': results_item_1[0],
'group_id': "sg-009121c0a21b3b2e0",
# context (artifact id) is added to associate results with the artifact
'context': {'artifact_id': results_item_1[1]},
})
phantom.act("assign instance", parameters=parameters, assets=['aws_ec2'], callback=remove_existing_SGs, name="add_isolation_SG", parent_action=action)
return
"""
Remove any pre-existing security groups that were part of the insecure configuration.
"""
def remove_existing_SGs(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('remove_existing_SGs() called')
#phantom.debug('Action: {0} {1}'.format(action['name'], ('SUCCEEDED' if success else 'FAILED')))
# collect data for 'remove_existing_SGs' call
results_data_1 = phantom.collect2(container=container, datapath=['add_isolation_SG:action_result.parameter.instance_id', 'add_isolation_SG:action_result.parameter.context.artifact_id'], action_results=results)
results_data_2 = phantom.collect2(container=container, datapath=['describe_instance_before:action_result.data.*.Reservations.*.Instances.*.SecurityGroups.*.GroupId', 'describe_instance_before:action_result.parameter.context.artifact_id'], action_results=results)
parameters = []
# build parameters list for 'remove_existing_SGs' call
for results_item_1 in results_data_1:
for results_item_2 in results_data_2:
if results_item_1[0] and results_item_2[0]:
parameters.append({
'instance_id': results_item_1[0],
'group_id': results_item_2[0],
# context (artifact id) is added to associate results with the artifact
'context': {'artifact_id': results_item_1[1]},
})
phantom.act("remove instance", parameters=parameters, assets=['aws_ec2'], callback=describe_instance_after, name="remove_existing_SGs", parent_action=action)
return
"""
Gather EC2 instance metadata after changing the security groups to verify the change.
"""
def describe_instance_after(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('describe_instance_after() called')
#phantom.debug('Action: {0} {1}'.format(action['name'], ('SUCCEEDED' if success else 'FAILED')))
# collect data for 'describe_instance_after' call
results_data_1 = phantom.collect2(container=container, datapath=['describe_instance_before:action_result.parameter.instance_ids', 'describe_instance_before:action_result.parameter.context.artifact_id'], action_results=results)
parameters = []
# build parameters list for 'describe_instance_after' call
for results_item_1 in results_data_1:
parameters.append({
'limit': "",
'filters': "",
'instance_ids': results_item_1[0],
'dry_run': "",
# context (artifact id) is added to associate results with the artifact
'context': {'artifact_id': results_item_1[1]},
})
phantom.act("describe instance", parameters=parameters, assets=['aws_ec2'], callback=describe_instance_after_callback, name="describe_instance_after", parent_action=action)
return
def describe_instance_after_callback(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('describe_instance_after_callback() called')
format_before(action=action, success=success, container=container, results=results, handle=handle)
format_note(action=action, success=success, container=container, results=results, handle=handle)
list_connections_1(action=action, success=success, container=container, results=results, handle=handle)
return
"""
Combine the before and after messages into a single comment.
"""
def format_comment(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('format_comment() called')
template = """Before this playbook run the instance {0} had the following security groups:
{1}
and after this playbook run the instance has the following security groups:
{2}"""
# parameter list for template variable replacement
parameters = [
"describe_instance_before:action_result.parameter.instance_ids",
"format_before:formatted_data",
"format_after:formatted_data",
]
phantom.format(container=container, template=template, parameters=parameters, name="format_comment")
add_comment_1(container=container)
return
"""
Format a message describing the security groups before the change.
"""
def format_before(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('format_before() called')
template = """%%
Security Group ID: {0}
Security Group Name: {1}
%%"""
# parameter list for template variable replacement
parameters = [
"describe_instance_before:action_result.data.*.Reservations.*.Instances.*.NetworkInterfaces.*.Groups.*.GroupId",
"describe_instance_before:action_result.data.*.Reservations.*.Instances.*.NetworkInterfaces.*.Groups.*.GroupName",
]
phantom.format(container=container, template=template, parameters=parameters, name="format_before")
format_after(container=container)
return
"""
Format a message describing the security groups after the change.
"""
def format_after(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('format_after() called')
template = """%%
Security Group ID: {0}
Security Group Name: {1}
%%"""
# parameter list for template variable replacement
parameters = [
"describe_instance_after:action_result.data.*.Reservations.*.Instances.*.NetworkInterfaces.*.Groups.*.GroupId",
"describe_instance_after:action_result.data.*.Reservations.*.Instances.*.NetworkInterfaces.*.Groups.*.GroupName",
]
phantom.format(container=container, template=template, parameters=parameters, name="format_after")
format_comment(container=container)
return
"""
Post a comment describing the security group assignment before and after the change.
"""
def add_comment_1(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('add_comment_1() called')
formatted_data_1 = phantom.get_format_data(name='format_comment')
phantom.comment(container=container, comment=formatted_data_1)
return
"""
Add a note to the Security Hub Finding to describe the change that was made.
"""
def add_note_1(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('add_note_1() called')
#phantom.debug('Action: {0} {1}'.format(action['name'], ('SUCCEEDED' if success else 'FAILED')))
source_data_identifier_value = container.get('source_data_identifier', None)
# collect data for 'add_note_1' call
formatted_data_1 = phantom.get_format_data(name='format_note')
parameters = []
# build parameters list for 'add_note_1' call
parameters.append({
'note': formatted_data_1,
'findings_id': source_data_identifier_value,
'overwrite': "",
})
phantom.act("add note", parameters=parameters, assets=['aws_security_hub'], name="add_note_1")
return
"""
Format a note to add to the Finding in Security Hub.
"""
def format_note(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('format_note() called')
template = """Phantom ran two playbooks investigating the EC2 instance {0} and isolating it from external networks by removing its previous security groups and assigning it to a quarantine security group. The event can be reviewed and further response can be taken using Mission Control in Phantom: {1}"""
# parameter list for template variable replacement
parameters = [
"describe_instance_before:action_result.parameter.instance_ids",
"container:url",
]
phantom.format(container=container, template=template, parameters=parameters, name="format_note")
add_note_1(container=container)
return
"""
List active TCP and UDP connections to show which traffic is still reaching the instance and to show that Phantom still has SSH access to the instance.
"""
def list_connections_1(action=None, success=None, container=None, results=None, handle=None, filtered_artifacts=None, filtered_results=None):
phantom.debug('list_connections_1() called')
#phantom.debug('Action: {0} {1}'.format(action['name'], ('SUCCEEDED' if success else 'FAILED')))
# collect data for 'list_connections_1' call
results_data_1 = phantom.collect2(container=container, datapath=['describe_instance_after:action_result.data.*.Reservations.*.Instances.*.PublicDnsName', 'describe_instance_after:action_result.parameter.context.artifact_id'], action_results=results)
parameters = []
# build parameters list for 'list_connections_1' call
for results_item_1 in results_data_1:
if results_item_1[0]:
parameters.append({
'local_addr': "",
'remote_port': "",
'remote_addr': "",
'ip_hostname': results_item_1[0],
'local_port': "",
# context (artifact id) is added to associate results with the artifact
'context': {'artifact_id': results_item_1[1]},
})
phantom.act("list connections", parameters=parameters, assets=['ssh'], name="list_connections_1", parent_action=action)
return
def on_finish(container, summary):
phantom.debug('on_finish() called')
# This function is called after all actions are completed.
# summary of all the action and/or all detals of actions
# can be collected here.
# summary_json = phantom.get_summary()
# if 'result' in summary_json:
# for action_result in summary_json['result']:
# if 'action_run_id' in action_result:
# action_results = phantom.get_action_results(action_run_id=action_result['action_run_id'], result_data=False, flatten=False)
# phantom.debug(action_results)
return