-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcilclass.py
505 lines (407 loc) · 16 KB
/
cilclass.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
import cilparser
import os
import runtime_collector
from global_configs import *
userlevel_dict = {"cameraserver":1047,"mdnsr":1020,"_isolated":90000,\
"bluetooth":1002,"logd":1036,"radio":1001,"mediaex":1040,"keystore":1017,\
"media":1013,"system":1000,"webview_zygote":1053,"_app":10000,\
"audioserver":1041,"gps":1021,"statsd":1066,"shell":2000,"nobody":9999,\
"graphics":1003,"tombstoned":1058,"vpn":1016,"secure_element":1068,"incidentd":1067,\
"nfc":1027,"wifi":1010,"drm":1019,"shared_relro":1037,\
"mediacodec":1046,"root":0}
class dev(object):
"""docstring for dev"""
def __init__(self, dev_name,**kw):
super(dev, self).__init__()
self.dev_name = dev_name
self.finegrained_neal_list = [] #expanded neverallow
self.attri_dict = dict()
self.finegrained_allow_list = [] #expanded allow
self.domset = set()
self.typeset = set()
#set related cil file path first
sys_cil_filepath = os.path.join(work_path,dev_name,sys_cil_file)
#set vendor_sepolicy.cil/nonplat_sepolicy.cil
sysver_cil_filepath = os.path.join(work_path,dev_name,sysver_cil_file)
if os.path.exists(sysver_cil_filepath):#in Pixel
print "seversion >= 28.0"
ven_cil_filepath = os.path.join(work_path,dev_name,ven_cil_file)
sysver_cil_filepath = os.path.join(work_path,dev_name,sysver_cil_file)
else:#in OEM
print "seversion <= 27.0"
ven_cil_filepath = os.path.join(work_path,dev_name,np_cil_file)
#set plat_sepolicy_vers.txt
self.version_filepath = os.path.join(work_path,dev_name,version_file)
with open(self.version_filepath) as f:
self.seversion = f.read().strip() #string type: 26.0/27.0/28.0
map_cil_filepath = os.path.join(work_path,dev_name,mapping_dir,self.seversion+".cil")
print sys_cil_filepath,ven_cil_filepath,map_cil_filepath
sys_parsed_result = cilparser.syscil_parser(dev_name,sys_cil_filepath)
ven_parsed_result = cilparser.normalcil_parser(dev_name,ven_cil_filepath)
map_parsed_result = cilparser.normalcil_parser(dev_name,map_cil_filepath)
sysallow = sys_parsed_result[2]
sysneal = sys_parsed_result[1]
sysattr = sys_parsed_result[0]
venattr = ven_parsed_result[0]
venallow = ven_parsed_result[2]
venneal = ven_parsed_result[1]
mapattr = map_parsed_result[0]
sys_typetrans = sys_parsed_result[3]
ven_typetrans = ven_parsed_result[3]
#merge
#merge (sys,vendor,maps)'s attribute/allow/neverallow first
#attribute:dict
allattr_list = [sysattr,venattr,mapattr]
if os.path.exists(sysver_cil_filepath):
sysver_parsed_result = cilparser.normalcil_parser(dev_name,sysver_cil_filepath)
allattr_list.append(sysver_parsed_result[0])
merged_attr = cilparser.attr_merge(allattr_list)
self.merged_attr = merged_attr
#allow::a list of tuple [(d,t,c,[perms]),(),()]
merged_allow = sysallow+venallow
if os.path.exists(sysver_cil_filepath):
merged_allow+=sysver_parsed_result[2]
#neverallow:a dict---neal[filename]:[(d,t,c,perm)]
if not os.path.exists(sysver_cil_filepath):
merged_neal = dict(sysneal.items()+venneal.items())
else:
merged_neal = dict(sysneal.items()+venneal.items()+sysver_parsed_result[1].items())
#Need clear the duplicated A and A_27_0,and instance them
self.merged_neal = merged_neal
for attr in merged_attr:
#if not attr.startswith("base_typeattr_"):
self.attri_dict[attr] = attribute(attr,merged_attr[attr][0],merged_attr[attr][1])
#Futhermore, parse indirect neverallow from base_typeattr:
indirect_neal_list = []
count_i = 0
for allowrule in merged_allow:
if "base_typeattr_" in allowrule[0]:
#allow (subA - subB) X file [read,write,open]
for indirect_neal_sub in self.attri_dict[allowrule[0]].notset:
indirect_neal_list.append((indirect_neal_sub,allowrule[1],allowrule[2],allowrule[3]))
count_i += 1
if "base_typeattr_" in allowrule[1]:
#allow X (objA-objB) file [read,write,open]
for indirect_neal_obj in self.attri_dict[allowrule[1]].notset:
indirect_neal_list.append((allowrule[0],indirect_neal_obj,allowrule[2],allowrule[3]))
count_i += 1
finegrained_indirect_neal_list = self.expandrules(indirect_neal_list,merged_attr)
self.finegrained_neal_list = finegrained_indirect_neal_list
print "indirect_neverallow:",len(finegrained_indirect_neal_list)
self.finegrained_allow_list = self.expandrules(merged_allow,merged_attr)
for fr in self.finegrained_allow_list:
self.domset.add(fr.domain)
self.typeset.add(fr._type)
if "expanded_neal" in kw and kw['expanded_neal'] ==True:
#directly get from neverallow rules
merged_neal_list =[]
merged_neal_list += indirect_neal_list
for cilfile in merged_neal:
for r in merged_neal[cilfile]:
#type(r) = tuple
merged_neal_list.append(r)
#print len(merged_neal_list)
self.finegrained_neal_list += self.expandrules(merged_neal_list,merged_attr)
print "finegrained neverallow:%d"%(len(self.finegrained_neal_list))
#infer from base_typeattr declaration
'''
for i in self.finegrained_allow_list:
print "----"
i.show()
print "neal list:"
for i in self.finegrained_neal_list:
print "----"
i.show()
'''
##### Merge and expand typetransition
merged_typetrans = sys_typetrans+ven_typetrans
self.expanded_typetrans = self.expand_typetrans(merged_typetrans,merged_attr)
print "Typetransitions:", len(self.expanded_typetrans)
#Create Typetransition Graph (useing a dict):
self.typetrans_dict = dict()
for typetrans in self.expanded_typetrans:
if not self.typetrans_dict.has_key(typetrans[0]):
self.typetrans_dict[typetrans[0]] = [(typetrans[1],typetrans[3])]
else:
self.typetrans_dict[typetrans[0]].append((typetrans[1],typetrans[3]))
def expand_typetrans(self,typetrans_list,attr_dict):
#BUGS here, not exclude _28_0
ret_list = []
for entry in typetrans_list:
subset = cilparser.recursively_expand_attr(entry[0],attr_dict)
objset = cilparser.recursively_expand_attr(entry[1],attr_dict)
targetset = cilparser.recursively_expand_attr(entry[3],attr_dict)
for sub in subset:
for obj in objset:
for tar in targetset:
ret_list.append((sub,obj,entry[2],tar))
return ret_list
'''
def expand_typetrans(self,typetrans_list,attr_dict):
#BUGS here, not exclude _28_0
print typetrans_list[1]
print attr_dict["update_engine_28_0"]
exit()
ret_list = []
for entry in typetrans_list:
sublist = []
objlist = []
targetlist = []
if attr_dict.get(entry[0])!=None:
for i in (attr_dict[entry[0]][0]-attr_dict[entry[0]][1]):
sublist.append(i)
else:
sublist.append(entry[0])
if attr_dict.get(entry[1])!=None:
for j in (attr_dict[entry[1]][0]-attr_dict[entry[1]][1]):
objlist.append(j)
else:
objlist.append(entry[1])
if attr_dict.get(entry[3])!=None:
for k in (attr_dict[entry[3]][0]-attr_dict[entry[3]][1]):
targetlist.append(k)
else:
targetlist.append(entry[3])
for sub in sublist:
for obj in objlist:
for tar in targetlist:
ret_list.append((sub,obj,entry[2],tar))
return ret_list
'''
def expandrules(self,rule_list,attr_dict):
ret_list = []
for rule in rule_list:
#('zygote', 'overlay_prop', 'file', ['ioctl', 'read', 'getattr', 'lock', 'map', 'open'])")
for i in cilparser.expand_single_rule(rule,attr_dict):
ret_list.append(finegrained_rule(i))
return ret_list
class attribute(object):
"""docstring for attribute"""
def __init__(self, name, andset, notset):
super(attribute, self).__init__()
self.attr_name = name
self.andset = andset
self.notset = notset
self.contained_typeset = andset - notset
self.level = 0 #TODO
def __repr__(self):
return "[%s]---{%s-%s}"%(self.attr_name,str(self.andset),str(self.notset))
def description(self):
des = str(self.andset).strip("(set)")
des += " - "
if self.notset != set():
des += str(self.notset).strip("(set)")
print des
class rule(object):
"""docstring for rule"""
def __init__(self, rule_tuple):
super(rule, self).__init__()
self.domain = rule_tuple[0]
self._type = rule_tuple[1]
self.claz = rule_tuple[2]
self.perms = rule_tuple[3]
def __repr__(self):
return "(%s,%s,%s,%s)"%(repr(self.domain),repr(self._type),repr(self.claz),repr(self.perms))
def __str__(self):
return "(%s,%s,%s,%s)"%(repr(self.domain),repr(self._type),repr(self.claz),repr(self.perms))
def show(self,**kw):
if 'perm' in kw:
print " %s-[%s:(%s)]->%s "%(self.domain,self.claz,str(kw),self._type)
else:
print " %s-[%s:(%s)]->%s "%(self.domain,self.claz,str(self.perms),self._type)
def tuple_type(self):
return (self.domain,self.claz,str(self.perms),self._type)
class finegrained_rule(rule):
"""input a finegrained tuple:('shell', 'self', 'process', ['execmem'],
"('appdomain', 'self', 'process', ['execmem'])")"""
def __init__(self,fin_rule_tuple):
super(finegrained_rule,self).__init__(fin_rule_tuple)
if fin_rule_tuple[1] == "self":
self._type = fin_rule_tuple[0]
else:
self._type = fin_rule_tuple[1]
self.src_rule = rule(fin_rule_tuple[4])
'''
class domain(object):
"""docstring for domain"""
def __init__(self, domain_name):
super(domain, self).__init__()
self.name = domain_name
self.attr_list = get_attr(self,domain_name)
self.spawn_by_init =
self.defined_by_plat =
self.untrusted_code_ornot =
def get_attr(self,typename):
ret_list = []
for attr in self.attri_dict:
if typename in dev_instance.attri_dict[attr].contained_typeset:
ret_list.append(attr)
ret_list.sort()
return ret_list
'''
class sub_feature(object):
"""All features of a subject type. Type name is needed to initialize the feature class"""
def __init__(self, devins,arg):
super(sub_feature, self).__init__()
self.typename = arg
##attribute features:
#[0:domain,1:mlstrustedsubject,2:coredomain,3:appdomain,4:untrusted_app_all,5:netdomain,
#6:bluetoothdomain,7:binderservicedomain,8:halserverdomain,9:halclientdomain]
self.attribute_features = dict()
self.attribute_features = {"domain":False,"mlstrustedsubject":False,"coredomain":False,"appdomain":False,\
"untrusted_app_all":False,"netdomain":False,"bluetoothdomain":False,"binderservicedomain":False,"halserverdomain":False,\
"halclientdomain":False}
attrlist = get_attr(devins,arg)
for attr in attrlist:
if attr in self.attribute_features.keys():
self.attribute_features[attr] = True
#self.attribute_vec = self.attrfeatures2vec(self.attribute_features)
#untrusted features
if "untrusted_app" in self.typename or "isolated_app" == self.typename or "shell" == self.typename \
or "ephemeral_app" == self.typename or "untrusted_v2_app" == self.typename:
self.untrusted_domain = True #untrusted_app\isolated_app\shell
else:
self.untrusted_domain = False
#self.typetransition_path = [] #kernel,init,zygote,untrusted_app
path = typetransition_lookup(devins.typetrans_dict,"kernel",arg)
#print "typetransition lookup result:",path
if path!=[]:#Got from typetransition in sepolicy
if path[-1]!= arg:
path.append(arg)
self.typetransition_path = path
self.typetransition_distance = self.typetransition_path.index(arg)
exec_file = typetransition_file_lookup(devins.typetrans_dict,arg)
self.exec_file_feature = obj_feature(devins,exec_file)
else:
#1.vendor_init is forked by init label
#2.others are forked by zygote
if self.typename == "vendor_init":
self.typetransition_path = ["kernel","init","vendor_init"]
self.typetransition_distance = 1
exec_file = 'init_exec'
self.exec_file_feature = obj_feature(devins,exec_file)
else:
if ("_app") in self.typename or self.typename =="system_server" or \
self.typename in runtime_collector.seapp_info:
self.typetransition_path = ["kernel","init","zygote"]+[arg]
self.typetransition_distance = 4
exec_file = 'zygote_fork'
self.exec_file_feature = obj_feature(devins,exec_file)
else:
self.typetransition_path = ["unknown"]
self.typetransition_distance = -1
exec_file = 'unknown'
self.exec_file_feature = obj_feature(devins,exec_file)
#user who has this context when running. Read from an prepared dict file
self.runtime_user = runtime_collector.runtime_feature_collector(devins,self.typetransition_path)
if self.runtime_user != None:
uid = userlevel_dict.get(self.runtime_user)
if uid == 0:
self.userlevel = "root"
if 1000<=uid<2000:
self.userlevel = "system"
if 2000<=uid<2900:
self.userlevel = "shell"
if 3000<=uid<5000:
self.userlevel = "supplemental group"
if 9997<=uid<10000:
self.userlevel = "app_shared"
if 10000<=uid<20000:
self.userlevel = "app"
if 20000<=uid<30000:
self.userlevel = "app cache data group"
if 30000<=uid<40000:
self.userlevel = "app external data group"
if 40000<=uid<50000:
self.userlevel = "app external cache data group"
if 50000<=uid<60000:
self.userlevel = "app public data"
if uid == 65534:
self.userlevel = "nomapping user"
if 90000<=uid<100000:
self.userlevel = "isolated"
else :
self.userlevel = "vendor"
else:
#No related exec_file in fs, or not invoked by rc files
print "Unknown User (%s) for subject (%s)"%(self.runtime_user,arg)
self.userlevel = -1
#dictorize for sklearn DictVectorizer
self.feature_dict = self.attribute_features
self.feature_dict["untrusted_domain"] = self.untrusted_domain
self.feature_dict["typetransition_distance"] = self.typetransition_distance
self.feature_dict["userlevel"] = self.userlevel
self.feature_dict["user"] = self.runtime_user
#print self.typetransition_path,exec_file
def __repr__(self):
ret_expr = ''
ret_expr += "Type Name: %s \n" % self.typename
#ret_expr += "Attribute Features:%s \n"% self.attribute_vec
ret_expr += "Typetransition Path (length:%d):%s \n"%(self.typetransition_distance,self.typetransition_path)
ret_expr += "Exec File :%s \n" %self.exec_file_feature.typename
ret_expr += str(self.feature_dict)
return ret_expr
def attrfeatures2vec(self,attribute_features):
print "Vector:"
vec = [0]*10
feature_domain_lookuplist = ["domain","mlstrustedsubject","coredomain","appdomain","untrusted_app_all",\
"netdomain","bluetoothdomain","binderservicedomain","halserverdomain","halclientdomain"]
for attr in attribute_features:
if attribute_features[attr]==True:
idx = feature_domain_lookuplist.index(attr)
vec[idx] = 1
return vec
class obj_feature(object):
"""All features of an object type"""
def __init__(self, devins,arg):
super(obj_feature, self).__init__()
self.typename = arg
self.fileflag = 0 #etc. 755
self.owner = ""
self.untrusted_file = False #create/write by an untrusted domain
def filecontext_lookup(self,devins,arg):
pass
#----class def ends------#
def typetransition_file_lookup(typetrans_dict,typename):
for src_type in typetrans_dict:
for entry in typetrans_dict[src_type]:
if entry[1] == typename:
exec_file = entry[0]
return exec_file
return ""
def typetransition_lookup(typetrans_dict,start,end):
#find all paths from start("kernel") to end (target)
path = []
paths = []
#print "Trying to find path from %s to %s recuresivly" % (start,end)
if end == "kernel" or end == "su" or end =="crash_dump":
return [end]
count = 0
for src_type in typetrans_dict:
for entry in typetrans_dict[src_type]:
if entry[1] == end:
count = 1
#print src_type,typetrans_dict[src_type],type(typetrans_dict[src_type])
path += typetransition_lookup(typetrans_dict,start,src_type)
if not src_type in path:
path.append(src_type)
if count != 0 :
break
return path
def get_attr(dev_instance,typename):
ret_list = []
for attr in dev_instance.attri_dict:
if typename in dev_instance.attri_dict[attr].contained_typeset and not (typename+"_"+dev_instance.seversion.replace(".","_"))==attr:
ret_list.append(attr)
ret_list.sort()
return ret_list
if __name__ == '__main__':
#test
devins = dev("Pixel",expanded_neal=False)
for testcase in ["adbd","untrusted_app_25","system_server","shell","system_server","qtimeservice"]:
print "-------------"
print testcase
feat_ins = sub_feature(devins,testcase)
print feat_ins.feature_dict
print feat_ins.typetransition_path
print feat_ins.runtime_user