forked from patrickwardle/knockknock
-
Notifications
You must be signed in to change notification settings - Fork 0
/
knockknock.py
executable file
·492 lines (338 loc) · 10.4 KB
/
knockknock.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
#!/usr/bin/python
#
# KnockKnock by Patrick Wardle is licensed under a Creative Commons Attribution-NonCommercial 4.0 International License.
#
import os
import sys
import json
import traceback
#project imports
import file
import utils
import output
import command
import whitelist
import virusTotal
#directory containing plugins
PLUGIN_DIR = "plugins/"
#args
args = None
#global plugin manager
PluginManager = None
#global plugin manager
pluginManagerObj = None
#main interface
def knocknock():
#results
results = []
try:
#init
# ->logging, plugin manager, etc
if not initKK():
#dgb msg
utils.logMessage(utils.MODE_ERROR, 'initialization(s) failed')
#bail
return False
#dbg msg
utils.logMessage(utils.MODE_INFO, 'initialization complete')
#list plugins and bail
if args.list:
#display plugins
listPlugins()
#bail
return True
#scan for thingz
results = scan(args.plugin)
#make sure scan succeeded
if None == results:
#dbg msg
utils.logMessage(utils.MODE_ERROR, 'scan failed')
#bail
return False
#depending on args
# filter out apple signed binaries, or whitelisted binaries, etc
if not args.apple or not args.whitelist: #or args.signed:
#iterate over all results
# ->one for each startup item type
for result in results:
#ignored/whitelisted items
ignoredItems = []
#scan each startup object
# ->if it should be ingored, add to ignore list
for startupObj in result['items']:
#filter out files
# ->depending on args, singed by apple, whitelisted, etc
if isinstance(startupObj, file.File):
#by default, ignore signed by Apple
if not args.apple and startupObj.signedByApple:
#add to list
ignoredItems.append(startupObj)
#ignore white listed items
if not args.whitelist and startupObj.isWhitelisted:
#add to list
ignoredItems.append(startupObj)
#now that we are done iterating
# ->subtract out all ignored/whitelisted items
result['items'] = list(set(result['items']) - set(ignoredItems))
#filter out dups in unclassified plugin
# ->needed since it just looks at the proc list
removeUnclassDups(results)
#get vt results
if not args.disableVT:
#dbg msg
utils.logMessage(utils.MODE_INFO, 'querying VirusTotal - sit tight!')
#process
# ->will query VT and add VT info to all files
virusTotal.processResults(results)
#format output
# ->normal output or JSON
formattedResults = output.formatResults(results, args.json)
#show em
print formattedResults.encode('ascii', 'xmlcharrefreplace')
#top level exception handler
except Exception, e:
#dbg msg
utils.logMessage(utils.MODE_ERROR, '\n EXCEPTION, %s() threw: %s' % (sys._getframe().f_code.co_name, e))
#stack trace
traceback.print_exc()
#bail
return False
return True
#filter out dups in unclassified plugin
# ->needed, since it just looks at the proc list so grabs items that are likely detected/classified elsewhere
def removeUnclassDups(results):
#unique unclass'd items
uniqueItems = []
#get unclassifed results
unclassItems = [result for result in results if result['name'] == 'Unclassified Items']
#bail if there aren't any
if not unclassItems:
#none
return
#just want the dictionary
# ->first item
unclassItems = unclassItems[0]
#get all hashes
hashes = allHashes(results)
#look at each unclass item
# ->remove it if its reported elsewhere
for unclassItem in unclassItems['items']:
#only keep otherwise unknown items
if 0x1 == hashes.count(unclassItem.hash):
#save
uniqueItems.append(unclassItem)
#update
unclassItems['items'] = uniqueItems
return
#return a list of hashes of all startup items (files)
def allHashes(results):
#list of hashes
hashes = []
#iterate over all results
# ->grab file hashes
for result in results:
#hash all files
for startupObj in result['items']:
#check for file
if isinstance(startupObj, file.File):
#save hash
hashes.append(startupObj.hash)
return hashes
#initialize knockknock
#TODO: test with python 2.6
def initKK():
#global args
global args
#global import
global PluginManager
#global import
global argparse
#get python version
pythonVersion = sys.version_info
#check that python is at least 2.7
if sys.version_info[0] == 2 and sys.version_info[1] < 7:
#err msg
# ->as logging isn't init'd yet, just print directly
print('ERROR: KnockKnock requires python 2.7+ (found: %s)' % (pythonVersion))
#bail
return False
#TODO: check for python 3.0?
#try import argparse
# ->should work now since just checked that python is 2.7+
try:
#import
import argparse
#handle exception
# ->bail w/ error msg
except ImportError:
#err msg
# ->as logging isn't init'd yet, just print directly
print('ERROR: could not load required module (argparse)')
#bail
return False
#add knock knock's lib path to system path
# ->ensures 3rd-party libs will be imported OK
sys.path.insert(0, os.path.join(utils.getKKDirectory(), 'libs'))
#now can import 3rd party lib
# ->yapsy
from yapsy.PluginManager import PluginManager
#parse options/args
# ->will bail (with msg) if usage is incorrect
args = parseArgs()
#init output/logging
if not utils.initLogging(args.verbosity):
#bail
return False
#dbg msg
utils.logMessage(utils.MODE_INFO, 'initialized logging')
#check version (Mavericks/Yosemite for now)
# ->this isn't a fatal error for now, so just log a warning for unsupported versions
if not utils.isSupportedOS():
#dbg msg
utils.logMessage(utils.MODE_WARN, '%s is not an officially supported OS X version (your mileage may vary)' % ('.'.join(utils.getOSVersion())))
#dbg msg
else:
#dbg msg
utils.logMessage(utils.MODE_INFO, '%s is a supported OS X version' % ('.'.join(utils.getOSVersion())))
#load python <-> Objc bindings
# ->might fail if non-Apple version of python is being used
if not utils.loadObjcBindings():
#dbg msg
utils.logMessage(utils.MODE_ERROR, 'python <-> Objc bindings/module not installed\n run via /usr/bin/python or install modules via \'pip install pyobjc\' to fix')
#bail
return False
#load whitelists
whitelist.loadWhitelists()
#init plugin manager
if not initPluginManager():
#bail
return False
#dbg msg
utils.logMessage(utils.MODE_INFO, 'initialized plugin manager')
#giving warning about r00t
if 0 != os.geteuid():
#dbg msg
utils.logMessage(utils.MODE_INFO, 'not running as r00t...some results may be missed (e.g. CronJobs)')
return True
#parse args
def parseArgs():
#init parser
parser = argparse.ArgumentParser()
#arg, plugin name
# ->optional
parser.add_argument('-p', '--plugin', help='name of plugin')
#arg, verbose
# ->optional
parser.add_argument('-v', '--verbosity', help='enable verbose output', action='store_true')
#arg, display binaries signed by Apple
# ->optional
parser.add_argument('-a', '--apple', help='include Apple-signed binaries', action='store_true')
#arg, display binaries that are whitelisted
# ->optional
parser.add_argument('-w', '--whitelist', help='include white-listed binaries', action='store_true')
#arg, hide binaries that are signed (by anybody)
# ->optional
#parser.add_argument('-s', '--signed', help='exclude all signed binaries', action='store_true')
#arg, list plugins
# ->optional
parser.add_argument('-l', '--list', help='list all plugins', action='store_true')
#arg, output JSON
# ->optional
parser.add_argument('-j', '--json', help='produce output in JSON format', action='store_true')
#arg, disable VT integration
# ->optional
parser.add_argument('-d','--disableVT', help='disable VirusTotal integration', action='store_true')
#parse args
return parser.parse_args()
#init plugin manager
def initPluginManager():
#global
global pluginManagerObj
#create plugin manager
pluginManagerObj = PluginManager()
if not pluginManagerObj:
#err msg
utils.logMessage(utils.MODE_ERROR, 'failed to create plugin manager')
#bail
return False
#set plugin path
pluginManagerObj.setPluginPlaces([utils.getKKDirectory() + PLUGIN_DIR])
#get all plugins
pluginManagerObj.collectPlugins()
return True
#list plugins
def listPlugins():
#dbg msg
utils.logMessage(utils.MODE_INFO, 'listing plugins')
#interate over all plugins
for plugin in sorted(pluginManagerObj.getAllPlugins(), key=lambda x: x.name):
#dbg msg
# ->always use print, since -v might not have been used
print '%s -> %s' % (os.path.split(plugin.path)[1], plugin.name)
return
#scanz!
def scan(pluginName):
#results
results = []
#flag indicating plugin was found
# ->only relevant when a plugin name is specified
foundPlugin = False
#full scan?
if not pluginName:
#dbg msg
utils.logMessage(utils.MODE_INFO, 'beginning full scan')
#plugin only
else:
#dbg msg
utils.logMessage(utils.MODE_INFO, 'beginning scan using %s plugin' % pluginName)
#interate over all plugins
for plugin in pluginManagerObj.getAllPlugins():
#results from plugin
pluginResults = None
#no plugin names means run 'em all
if not pluginName:
#dbg msg
utils.logMessage(utils.MODE_INFO, 'executing plugin: %s' % plugin.name)
#execute current plugin
pluginResults = plugin.plugin_object.scan()
#try to find match
else:
#get name of plugin file as name
# ->e.g. /plugins/somePlugin.py -> 'somePlugin'
currentPlugin = os.path.split(plugin.path)[1]
#check for match
if pluginName.lower() == currentPlugin.lower():
#found it
foundPlugin = True
#dbg msg
utils.logMessage(utils.MODE_INFO, 'executing requested plugin: %s' % pluginName)
#execute plugin
pluginResults = plugin.plugin_object.scan()
#save plugin output
if pluginResults:
#plugins normally return a single dictionary of results
if isinstance(pluginResults, dict):
#save results
results.append(pluginResults)
#some plugins though can return a list of dictionaries
# ->e.g. the launch daemon/agent plugin (one dictionary for each type)
elif isinstance(pluginResults, list):
#save results
results.extend(pluginResults)
#check if specific plugin was specified and found
# ->if so, can bail
if pluginName and foundPlugin:
#bail
break
#sanity check
# -> make sure if a specific plugin was specified, it was found/exec'd
if pluginName and not foundPlugin:
#err msg
utils.logMessage(utils.MODE_ERROR, 'did not find requested plugin')
#reset results
results = None
return results
#invoke main interface
if __name__ == '__main__':
#main interface
knocknock()