-
Notifications
You must be signed in to change notification settings - Fork 4
/
cred_helper.py
executable file
·93 lines (72 loc) · 2.56 KB
/
cred_helper.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
#!/usr/bin/env python3
#
# Returns information about user's GCP entitlements
#
# Usage: cred_helper.py [get|test]
#
# get prints the GCP auth token
# test prints the user's GCP entitlements
#
# Calling script without arguments prints out usage information and then exits with a non-zero code (per spec)
#
import subprocess
import requests
import json
import re
import sys
def get_gcloud_auth_token(test):
"""
Returns the gcloud auth token based on the .user-bazelrc
"""
with open(".user-bazelrc") as f:
all = f.read()
match = re.search(r"# user: (.*)", all)
if match is None:
sys.exit('Did not find username in .user-bazelrc file as "# user: <username>"')
USER = match.group(1)
cmd = ["gcloud", "auth", "print-access-token", USER]
if test:
print("Running: " + subprocess.list2cmdline(cmd))
result = subprocess.run(cmd, capture_output=True, text=True, check=True)
token = result.stdout.strip()
return token
def generate_credentials(test):
"""
Generate the credentials in a form that Bazel wants, which is the
Authorization key points to a list
"""
bearer_token = get_gcloud_auth_token(test)
# Create the JSON object with the required format
credentials = {"headers": {"Authorization": [f"Bearer {bearer_token}"]}}
return credentials
def test_permissions(credentials, bucket_name):
"""
Tests the user's entitlements for this bucket
Note that the call to check the permissions needs the Authorization key to
point to a string and not a list. So, take the first element in the list
and make it the only value
"""
credentials["headers"]["Authorization"] = credentials["headers"]["Authorization"][0]
url = (
f"https://storage.googleapis.com/storage/v1/b/{bucket_name}/iam/testPermissions"
)
permissions = {"permissions": ["storage.buckets.get", "storage.objects.create"]}
response = requests.get(url, params=permissions, headers=credentials["headers"])
response.raise_for_status()
return response.json()
def main():
if (
len(sys.argv) <= 1
or (len(sys.argv) == 2 and sys.argv[1] not in ["get", "test"])
or len(sys.argv) >= 3
):
sys.exit("Usage: python cred_helper.py [get|test]")
test = sys.argv[1] == "test"
credentials = generate_credentials(test)
if not test:
print(json.dumps(credentials, indent=2))
return
permissions = test_permissions(credentials, "megaboom-bazel-artifacts")
print(json.dumps(permissions, indent=2))
if __name__ == "__main__":
main()