forked from Fraunhofer-AISEC/archie
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcalculate_trigger.py
215 lines (198 loc) · 7.95 KB
/
calculate_trigger.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from faultclass import build_filters
import logging
import pandas
logger = logging.getLogger(__name__)
def find_tb_info_row(tb_id, goldenrun_tb_info):
idx = goldenrun_tb_info.index[goldenrun_tb_info["id"] == tb_id]
return idx[0]
def allign_fault_to_instruction(address, tbinfo_size, tbinfo_assembler, tbinfo_id):
asm_addresses = []
"Start searching for instruction addresses"
split = tbinfo_assembler.split("[ ")
for sp in split[1:]:
"Find end of address"
s = sp.split("]")
"Convert and append to list"
asm_addresses.append(int("0x" + s[0].strip(), 0))
asm_addresses.append(tbinfo_id + tbinfo_size)
for i in range(0, len(asm_addresses) - 1, 1):
if address >= asm_addresses[i] and address < asm_addresses[i + 1]:
return asm_addresses[i]
def calculate_lifespan_from_start(
filter_lists,
fault_address,
goldenrun_tb_exec,
goldenrun_tb_info,
trigger_occurrences,
):
[idx, instruction_address] = find_fault(
fault_address, goldenrun_tb_exec, goldenrun_tb_info, trigger_occurrences
)
tb_id = goldenrun_tb_exec.at[idx, "tb"]
lifespan = 0
for filt in filter_lists:
if filt[0] != tb_id:
continue
for ins in filt:
lifespan += 1
if ins == instruction_address:
break
break
for itter in range(0, idx):
idtbinfo = find_tb_info_row(
goldenrun_tb_exec.at[itter, "tb"], goldenrun_tb_info
)
lifespan += goldenrun_tb_info.at[idtbinfo, "ins_count"]
return lifespan
def find_fault(
fault_address, goldenrun_tb_exec, goldenrun_tb_info, trigger_occurrences
):
idx = pandas.Index([])
for index, tb in goldenrun_tb_info.iterrows():
if fault_address < tb["id"] or fault_address >= (tb["id"] + tb["size"]):
continue
tmp = goldenrun_tb_exec.index[goldenrun_tb_exec["tb"] == tb["id"]]
idx = idx.union(tmp)
"""Identify desired occurrence"""
if trigger_occurrences > len(idx):
return [-1, 0]
idx = idx[trigger_occurrences - 1]
idtbinfo = find_tb_info_row(goldenrun_tb_exec.at[idx, "tb"], goldenrun_tb_info)
ins = allign_fault_to_instruction(
fault_address,
goldenrun_tb_info.at[idtbinfo, "size"],
goldenrun_tb_info.at[idtbinfo, "assembler"],
goldenrun_tb_info.at[idtbinfo, "id"],
)
return [idx, ins]
def search_for_fault_location(
filter_lists,
trigger_position,
fault_address,
trigger_occurrences,
fault_lifespan,
goldenrun_tb_exec,
goldenrun_tb_info,
):
logger.info(f"Search trigger to fault INSN at 0x{fault_address:08x}")
[idx, ins] = find_fault(
fault_address, goldenrun_tb_exec, goldenrun_tb_info, trigger_occurrences
)
if idx < 0:
return [-1, trigger_occurrences, fault_lifespan]
trigger_not_in_same_tb = 0
lifespan_diff = trigger_position + fault_lifespan
trigger_position = trigger_position * (-1)
while trigger_position != 0:
if idx < 0:
if fault_lifespan > 0:
fault_lifespan = calculate_lifespan_from_start(
filter_lists,
fault_address,
goldenrun_tb_exec,
goldenrun_tb_info,
trigger_occurrences,
)
fault_lifespan += lifespan_diff
return [fault_address, 0, fault_lifespan]
idtbinfo = find_tb_info_row(goldenrun_tb_exec.at[idx, "tb"], goldenrun_tb_info)
if trigger_not_in_same_tb == 1:
"""Is current tb to short for trigger position"""
if trigger_position > goldenrun_tb_info.at[idtbinfo, "ins_count"]:
idx = idx - 1
trigger_position = (
trigger_position - goldenrun_tb_info.at[idtbinfo, "ins_count"]
)
else:
for filt in filter_lists:
if filt[0] == goldenrun_tb_info.at[idtbinfo, "id"]:
ins = filt[len(filt) - trigger_position]
trigger_position = 0
break
else:
tb_id = goldenrun_tb_exec.at[idx, "tb"]
for filt in filter_lists:
"""found matching filter"""
if filt[0] == tb_id:
for i in range(0, len(filt), 1):
if filt[i] == ins:
"""Case ins is in the current tb"""
if i >= trigger_position:
i -= trigger_position
ins = filt[i]
trigger_position = 0
else:
"""Case ins is not in the current tb"""
trigger_not_in_same_tb = 1
trigger_position -= i
idx -= 1
break
# Got trigger address, now calculate the adjusted hitcounter. If the trigger
# address is in a different TB to the fault address, it may be executed more
# often than the TB with the fault address itself. To still trigger the
# correct fault, the hitcounter has to be adjusted.
if trigger_not_in_same_tb == 1:
trigger_tb = goldenrun_tb_exec.at[idx, "tb"]
trigger_hitcounter = goldenrun_tb_exec.iloc[0 : idx + 1].tb.value_counts()[
trigger_tb
]
else:
trigger_hitcounter = trigger_occurrences
logger.info(
"Found trigger for faulting instruction address {} at {} with "
"hitcounter {}".format(fault_address, ins, trigger_hitcounter)
)
return [ins, trigger_hitcounter, fault_lifespan]
def calculate_trigger_addresses(fault_list, goldenrun_tb_exec, goldenrun_tb_info):
""""""
"check every fault list"
cachelist = []
lists = build_filters(goldenrun_tb_info)
for list in lists:
list = list.reverse()
for faults in fault_list:
for fault in faults["faultlist"]:
if fault.trigger.address >= 0 or fault.trigger.hitcounter == 0:
continue
found = False
for tdict in cachelist:
if (
tdict["faultaddress"] == fault.address
and tdict["faultlifespan"] == fault.lifespan
and tdict["triggerhitcounter"] == fault.trigger.hitcounter
and tdict["triggeraddress"] == fault.trigger.address
):
fault.trigger.address = tdict["answer"][0]
fault.trigger.hitcounter = tdict["answer"][1]
fault.lifespan = tdict["answer"][2]
found = True
break
if found is True:
continue
if fault.lifespan != 0 and fault.trigger.address + fault.lifespan < 0:
logger.warning(
f"Lifespan is too short to take effect for 0x{fault.address:0x}"
f" with hitcounter {fault.trigger.hitcounter}, trigger address"
f" {fault.trigger.address} and lifespan {fault.lifespan}"
)
tbs = [-1, fault.trigger.hitcounter, fault.lifespan]
else:
tbs = search_for_fault_location(
lists,
fault.trigger.address,
fault.address,
fault.trigger.hitcounter,
fault.lifespan,
goldenrun_tb_exec,
goldenrun_tb_info,
)
d = dict()
d["faultaddress"] = fault.address
d["triggerhitcounter"] = fault.trigger.hitcounter
d["triggeraddress"] = fault.trigger.address
d["faultlifespan"] = fault.lifespan
d["answer"] = tbs
cachelist.insert(0, d)
fault.trigger.address = tbs[0]
fault.trigger.hitcounter = tbs[1]
fault.lifespan = tbs[2]