-
Notifications
You must be signed in to change notification settings - Fork 14
/
s3scan.py
executable file
·253 lines (191 loc) · 6.63 KB
/
s3scan.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import re
import sys, os
import httplib2
import boto3, botocore
from BeautifulSoup import BeautifulSoup, SoupStrainer
from optparse import OptionParser
from botocore.client import Config
# Saving myself from passing around these variables
# Sorry!
globalBaseUrl = ""
globalLinkList = []
s3 = None
# https://stackoverflow.com/questions/287871/print-in-terminal-with-colors-using-python
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
# helper functions for coloring
def printGreen(text):
return bcolors.OKGREEN + text + bcolors.ENDC
def printBlue(text):
return bcolors.OKBLUE + text + bcolors.ENDC
def printWarning(text):
return bcolors.WARNING + text + bcolors.ENDC
def printFail(text):
return bcolors.FAIL + text + bcolors.ENDC
def printScreen(text, color):
return {
'green': printGreen(text),
'blue': printBlue(text),
'warn': printWarning(text),
'fail': printFail(text)
}.get(color, 'blue')
def retrieve_links(url):
"""Given a url, fetch all hyperlinks on that page and return them as a list"""
page_source = get_source(url)
linkList = [] # pun!
for link in BeautifulSoup(page_source, parseOnlyThese=SoupStrainer('a')):
if link.has_key('href'):
linkList.append(link['href'])
return linkList
def scanner(url):
"""Look for S3 bucket urls
Check their permissions and print their security setting
"""
if globalBaseUrl in url:
if url not in globalLinkList:
# add the current url in the global links pool
globalLinkList.append(url)
global s3
sys.stdout.write(printScreen("[>]Current webpage: " + url + "\n", "blue"))
page_source = get_source(url)
reg = re.compile('((?:https*)(?::\\/{2}[\\.\\w-]+\\.amazonaws.com)(?:[\\/|\\.]?)(?:[^\\s"]*))')
# return if empty page
if page_source == None:
return
for bucket in re.findall(reg, page_source):
sys.stdout.write(printScreen("[*]Found " + bucket + "\n", "blue"))
# we don't need the complete URL. Just the root of the bucket
bucketUrl = bucket.split('com/')[0] + 'com/' # TODO this should be a regex
# grab the username
# https://abhn.s3.amazonaws.com/randomstring ==> abhn
if "https" in bucketUrl:
bucketName = bucketUrl.split('.s3')[0].split('https://')[1]
else:
bucketName = bucketUrl.split('.s3')[0].split('http://')[1]
bucket = s3.Bucket(bucketName)
sys.stdout.write(printScreen("[>]Testing " + bucketName + "\t", "blue"))
# flags
readFlag = 0
writeFlag = 0
fullControlFlag = 0
secureFlag = 0
# READ :- Any authenticated AWS user can read
# WRITE :- Any authenticated AWS user ca
# FULL CONTROL :- Any authenticated AWS user can read/write/delete
# ClientError :- AccessDenied (Bucket is secure)
try:
acl = bucket.Acl()
for grant in acl.grants:
if grant['Grantee']['Type'] == "Group" and grant['Permission'] == "READ":
readFlag = 1
elif grant['Grantee']['Type'] == "Group" and grant['Permission'] == "WRITE":
writeFlag = 1
elif grant['Grantee']['Type'] == "Group" and grant['Permission'] == "FULL_CONTROL":
fullControlFlag = 1
else:
pass
# evaluate
if readFlag and not writeFlag and not fullControlFlag:
sys.stdout.write(printScreen("[Insecure - Read]", "fail"))
elif readFlag and writeFlag and not fullControlFlag:
sys.stdout.write(printScreen("[Insecure - Read+Write]", "fail"))
elif fullControlFlag:
sys.stdout.write(printScreen("[Insecure - Full Control]", "fail"))
else:
sys.stdout.write(printScreen("[Not Public]", "green"))
sys.stdout.write('\n')
except botocore.exceptions.ClientError as e:
if e.response["Error"]["Code"] == "NoSuchBucket":
sys.stdout.write(printScreen("[No Such Bucket. Takeover?]\n", "fail"))
else:
sys.stdout.write(printScreen("[" + e.response["Error"]["Code"] + "]\n", "green"))
def get_source(url):
"""Return the source of the supplied url argument"""
http = httplib2.Http()
try:
status, response = http.request(url,
headers={'User-Agent':' Mozilla/5.0 (Windows NT 6.1; WOW64; rv:12.0) Gecko/20100101 Firefox/12.0'})
if status.status == 200:
return response
else:
return None
except httplib2.HttpLib2Error as e:
return None
def driver(url):
"""Scan the current url, retrieve all hyperlinks and then scan those pages recursively"""
# maintain a list of links from the current page
currList = []
page_source = get_source(url)
if page_source != None:
links = retrieve_links(url)
for link in links:
if len(link) > 0:
# we hit a relative link
if globalBaseUrl not in link and link[0] == '/':
link = globalBaseUrl + link
scanner(link)
currList.append(link)
else:
scanner(link)
currList.append(link)
else:
continue
for link in currList:
driver(link)
else:
sys.stdout.write(printScreen("[x]Empty response. Skipping\n", "warn"))
def initiator(globalBaseUrl):
"""take a url and set up s3 auth. Then call the driver"""
global s3
# alternate way to authenticate in else.
# use what you prefer
if True:
access_key = os.environ.get('AWS_ACCESS_KEY_ID')
secret_key = os.environ.get('AWS_SECRET_ACCESS_KEY')
if access_key is None or secret_key is None:
print printWarning("""No access credentials available.
Please export your AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.
Details: http://docs.aws.amazon.com/general/latest/gr/managing-aws-access-keys.html
""")
sys.exit(0)
s3 = boto3.resource('s3', config=Config(signature_version='s3v4'))
else:
# If you prefer to supply the credentials here,
# make sure you flip the if condition to False
# and subsitiute the necessary data :)
s3 = boto3.resource('s3',
aws_access_key_id=ACCESS_ID,
aws_secret_access_key=ACCESS_KEY,
config=Config(signature_version='s3v4')
)
print printScreen("[>]Initiating...", "blue")
print printScreen("[>]Press Ctrl+C to terminate script", "blue")
scanner(globalBaseUrl)
driver(globalBaseUrl)
def main():
parser = OptionParser(usage="$ python ./%prog [-u] url", version="%prog 1.0")
parser.add_option("-u", "--url", dest="url",
help="url to scan")
parser.add_option("-d", action="store_true", dest="debug",
help="turn on debug messages")
(options, args) = parser.parse_args()
if options.url == None:
parser.print_help()
exit(0)
# debug switch
if not options.debug:
# show no traceback. Only exception
sys.tracebacklimit = 0
global globalBaseUrl
globalBaseUrl = options.url
# initiate
initiator(globalBaseUrl)
if __name__ == '__main__':
main()