forked from patrickwardle/knockknock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathvirusTotal.py
208 lines (139 loc) · 3.79 KB
/
virusTotal.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
#!/usr/bin/python
#
# KnockKnock by Patrick Wardle is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.
#
import os
import sys
import json
import urllib
import urllib2
#project imports
import file
import utils
#global dictionary of VT results
vtResults = {}
#query URL
VT_URL = 'https://www.virustotal.com/partners/sysinternals/file-reports?apikey='
#API key
VT_API_KEY = 'bef728a398d7b666c5fbdc6f64671161284ef49c23e270ac540ada64893b433b'
def processResults(results):
#item (files) to query
# ->grab up to 25 before making a query!
items = []
#queried startup items
queriedItems = set()
#process items, 25 at a time
for result in results:
#iterate over each plugin's results
for startupObj in result['items']:
#data for item (file)
itemData = {}
#only process files
# ->note, plugins don't be mixed item items, so can bail here
if not isinstance(startupObj, file.File):
#stop processing this item group
break
#skip items that don't have hashes
if not startupObj.hash:
#skip
continue
#skip values that already have been queried
if startupObj.hash in queriedItems:
#skip
continue
#auto start location
itemData['autostart_location'] = result['name']
#set item name
itemData['autostart_entry'] = startupObj.name
#set item path
itemData['image_path'] = startupObj.path
#set hash
itemData['hash'] = startupObj.hash
#set creation times
itemData['creation_datetime'] = os.path.getctime(startupObj.path)
#add item info to list
items.append(itemData)
#save in set of queried items
queriedItems.add(startupObj.hash)
#when we've got 25
# ->query VT
if 25 == len(items):
#query
# ->results stored in global vtResults dictionary
queryVT(items)
#reset
items = []
#query any remaining items
if len(items):
#query
# ->results stored in global vtResults dictionary
queryVT(items)
#(re)iterate over all detected items (results)
# ->any that were queried add the VT results
for result in results:
#iterate over each plugin's results
for startupObj in result['items']:
#skip non-item files, or items that weren't queried
if not isinstance(startupObj, file.File) or \
startupObj.hash not in queriedItems:
#skip
continue
#skip items that didn't get a response
if startupObj.hash not in vtResults:
#skip
continue
#add VT results to item
startupObj.vtRatio = vtResults[startupObj.hash]
return vtResults
#query
# ->results stored in global vtResults dictionary
def queryVT(items):
#headers
requestHeaders = {}
#set content type
requestHeaders['Content-Type'] = 'application/json'
#set user agent
requestHeaders['User-Agent'] = 'VirusTotal'
#wrap
try:
#build request
request = urllib2.Request(VT_URL+VT_API_KEY, json.dumps(items), headers=requestHeaders)
#make request
response = urllib2.urlopen(request)
#convert response to JSON
vtResponse = json.loads(response.read())
#process response
# ->should be a list of items, within the 'data' key
if 'data' in vtResponse:
#process/parse all
for item in vtResponse['data']:
#process
parseResult(item)
#exceptions
# ->ignore (likely network related)
except Exception, e:
#dbg msg
utils.logMessage(utils.MODE_ERROR, '\n EXCEPTION, %s() threw: %s' % (sys._getframe().f_code.co_name, e))
#ignore
pass
#bail
return
#process a single result
# ->save parse/save info
def parseResult(item):
#global
global vtResults
#extract found flag
found = item['found']
#extract hash
hash = item['hash']
#when item is found
# ->save detection ratio
if found:
#save detection ratio
vtResults[hash] = item['detection_ratio']
#otherwise indicate it wasn't found
else:
#not found
vtResults[hash] = 'not found'
return