This repository has been archived by the owner on Mar 25, 2021. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
pc-compliance-copy.py
305 lines (254 loc) · 14.3 KB
/
pc-compliance-copy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
from __future__ import print_function
try:
input = raw_input
except NameError:
pass
import pc_lib_api
import pc_lib_general
import requests
import time
# --Helper Functions (Local)-- #
def search_list_value(list_to_search, field_to_search, field_to_return, search_value):
item_to_return = None
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search] == search_value:
item_to_return = source_item[field_to_return]
break
return item_to_return
def search_list_value_lower(list_to_search, field_to_search, field_to_return, search_value):
item_to_return = None
search_value = search_value.lower()
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search].lower() == search_value:
item_to_return = source_item[field_to_return]
break
return item_to_return
def search_list_object(list_to_search, field_to_search, search_value):
object_to_return = None
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search] == search_value:
object_to_return = source_item
break
return object_to_return
def search_list_object_lower(list_to_search, field_to_search, search_value):
object_to_return = None
search_value = search_value.lower()
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search].lower() == search_value:
object_to_return = source_item
break
return object_to_return
def search_list_list(list_to_search, field_to_search, search_value):
object_list_to_return = []
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search] == search_value:
object_list_to_return.append(source_item)
break
return object_list_to_return
def search_list_list_lower(list_to_search, field_to_search, search_value):
object_list_to_return = []
search_value = search_value.lower()
for source_item in list_to_search:
if field_to_search in source_item:
if source_item[field_to_search].lower() == search_value:
object_list_to_return.append(source_item)
break
return object_list_to_return
# --Execution Block-- #
# --Parse command line arguments-- #
parser = pc_lib_general.pc_arg_parser_defaults()
parser.add_argument(
'-policy',
'--policy',
action='store_true',
help='(Optional) - If you want to try update the policies with your new compliance standard, add this switch to the command. Any policies not able to be updated will be listed out during the process.')
parser.add_argument(
'-label',
'--label',
action='store_true',
help='(Optional) - Add a label to any policy updated with the new compliance standard. This only works if you have also specified the -policy switch.')
parser.add_argument(
'source_compliance_standard_name',
type=str,
help='Name of the compliance standard to copy from. Please enter it exactly as listed in the Prisma Cloud UI')
parser.add_argument(
'destination_compliance_standard_name',
type=str,
help='Name of the new compliance standard to create.')
args = parser.parse_args()
# --End parse command line arguments-- #
# --Main-- #
# Get login details worked out
pc_settings = pc_lib_general.pc_login_get(args.username, args.password, args.uiurl, args.config_file)
# Verification (override with -y)
if not args.yes:
print()
print('Ready to execute commands against your Prisma Cloud tenant.')
verification_response = str(input('Would you like to continue (y or yes to continue)?'))
continue_response = {'yes', 'y'}
print()
if verification_response not in continue_response:
pc_lib_general.pc_exit_error(400, 'Verification failed due to user response. Exiting...')
# Sort out API Login
print('API - Getting authentication token...', end='')
pc_settings = pc_lib_api.pc_jwt_get(pc_settings)
print('Done.')
## Compliance Copy ##
wait_timer = 5
# Check the compliance standard and get the JSON information
print('API - Getting the Compliance Standards list...', end='')
pc_settings, response_package = pc_lib_api.api_compliance_standard_list_get(pc_settings)
compliance_standard_list_temp = response_package['data']
compliance_standard_original = search_list_object_lower(compliance_standard_list_temp, 'name', args.source_compliance_standard_name)
if compliance_standard_original is None:
pc_lib_general.pc_exit_error(400, 'Compliance Standard not found. Please check the Compliance Standard name and try again.')
compliance_standard_new_temp = search_list_object_lower(compliance_standard_list_temp, 'name', args.destination_compliance_standard_name)
if compliance_standard_new_temp is not None:
pc_lib_general.pc_exit_error(400, 'New Compliance Standard appears to already exist. Please check the new Compliance Standard name and try again.')
print('Done.')
# Create the new Standard
print('API - Creating the new Compliance Standard...')
compliance_standard_new_temp = {}
compliance_standard_new_temp['name'] = args.destination_compliance_standard_name
if 'description' in compliance_standard_original:
compliance_standard_new_temp['description'] = compliance_standard_original['description']
print('Adding ' + compliance_standard_new_temp['name'])
pc_settings, response_package = pc_lib_api.api_compliance_standard_add(pc_settings, compliance_standard_new_temp)
compliance_standard_new_response = response_package['data']
# Find the new Standard object with wait state
time.sleep(wait_timer)
pc_settings, response_package = pc_lib_api.api_compliance_standard_list_get(pc_settings)
compliance_standard_list_temp = response_package['data']
compliance_standard_new = search_list_object(compliance_standard_list_temp, 'name', compliance_standard_new_temp['name'])
if compliance_standard_new is None:
pc_lib_general.pc_exit_error(500, 'New Compliance Standard was not found! Try it again or increase the wait timer.')
print()
# Get the list of requirements that need to be created
print('API - Getting Compliance Standard Requirements...', end='')
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_list_get(pc_settings, compliance_standard_original['id'])
compliance_requirement_list_original = response_package['data']
print('Done.')
# Create the new requirements
print('API - Creating the Requirements and adding them to the new Standard...')
for compliance_requirement_original_temp in compliance_requirement_list_original:
compliance_requirement_new_temp = {}
compliance_requirement_new_temp['name'] = compliance_requirement_original_temp['name']
compliance_requirement_new_temp['requirementId'] = compliance_requirement_original_temp['requirementId']
if 'description' in compliance_requirement_original_temp:
compliance_requirement_new_temp['description'] = compliance_requirement_original_temp['description']
print('Adding ' + compliance_requirement_new_temp['name'])
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_add(pc_settings, compliance_standard_new['id'], compliance_requirement_new_temp)
print()
# Get new list of requirements with wait timer
print('API - Getting the new list of requirements...', end='')
time.sleep(wait_timer)
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_list_get(pc_settings, compliance_standard_new['id'])
compliance_requirement_list_new = response_package['data']
print('Done.')
# Get list of sections and create for each requirement section
print('API - Get list of sections, create them, and associate them to the new requirements (might take a while)...')
# Create mapping list source for policy updates later
map_section_list = []
for compliance_requirement_original_temp in compliance_requirement_list_original:
# Get sections for requirement
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_section_list_get(pc_settings, compliance_requirement_original_temp['id'])
compliance_section_list_original_temp = response_package['data']
# Find new ID for requirement
compliance_requirement_new_temp = search_list_object(compliance_requirement_list_new, 'name', compliance_requirement_original_temp['name'])
# Create new sections under new ID
for compliance_section_original_temp in compliance_section_list_original_temp:
compliance_section_new_temp = {}
compliance_section_new_temp['sectionId'] = compliance_section_original_temp['sectionId']
if 'description' in compliance_section_original_temp:
compliance_section_new_temp['description'] = compliance_section_original_temp['description']
print('Adding ' + compliance_section_new_temp['sectionId'])
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_section_add(pc_settings, compliance_requirement_new_temp['id'], compliance_section_new_temp)
# Add entry for mapping table for Policy updates later
compliance_section_new_temp['requirementGUIDOriginal'] = compliance_requirement_original_temp['id']
compliance_section_new_temp['requirementGUIDNew'] = compliance_requirement_new_temp['id']
compliance_section_new_temp['sectionGUIDOriginal'] = compliance_section_original_temp['id']
compliance_section_new_temp['sectionGUIDNew'] = None
map_section_list.append(compliance_section_new_temp)
print()
if args.policy:
print('Compliance framework copy complete. Policy switch detected. Starting policy mapping for new compliance framework.')
else:
print('Compliance framework copy complete.')
print()
## Policy Updates ##
# Check to see if the user wants to try to update the policies
if not args.policy:
print('Policy switch not specified. Skipping policy update/attach. Done.')
else:
# Need to add the new GUID from the new sections to the mapping tables
print('API - Getting the new section IDs for the policy mapping and creating a map table...', end='')
# Timer to make sure everything is posted
time.sleep(wait_timer)
for compliance_requirement_new_temp in compliance_requirement_list_new:
# Get new sections for requirement
pc_settings, response_package = pc_lib_api.api_compliance_standard_requirement_section_list_get(pc_settings, compliance_requirement_new_temp['id'])
compliance_section_list_new_temp = response_package['data']
# Get new GUID and update mapping table
for compliance_section_new_temp in compliance_section_list_new_temp:
success_test = False
for map_section_temp in map_section_list:
if map_section_temp['requirementGUIDNew'] == compliance_requirement_new_temp['id'] and map_section_temp['sectionId'] == compliance_section_new_temp['sectionId']:
map_section_temp['sectionGUIDNew'] = compliance_section_new_temp['id']
success_test = True
break
if not success_test:
pc_lib_general.pc_exit_error(500, 'New Section cannot find related map for Policy updates! Sync error?.')
print('Done.')
# Get the policy list that will need to be updated (filtered to the original standard)
print('API - Getting the compliance standard policy list to update...', end='')
pc_settings, response_package = pc_lib_api.api_compliance_standard_policy_list_get(pc_settings, compliance_standard_original['name'])
policy_list_original = response_package['data']
print('Done.')
# Work though the list of policies to build the update package
print('API - Individual policy retrieval and update (might take a while)...')
policy_update_error = False
policy_update_error_list = []
for policy_original_temp in policy_list_original:
# Get the individual policy JSON object
pc_settings, response_package = pc_lib_api.api_policy_get(pc_settings, policy_original_temp['policyId'])
policy_specific_temp = response_package['data']
# Add new compliance section(s)
complianceMetadata_section_list_new_temp_2 = []
for complianceMetadata_section_temp in policy_specific_temp['complianceMetadata']:
complianceMetadata_section_new_temp = {}
for map_section_temp in map_section_list:
if map_section_temp['sectionGUIDOriginal'] == complianceMetadata_section_temp['complianceId']:
complianceMetadata_section_new_temp['customAssigned'] = True
complianceMetadata_section_new_temp['systemDefault'] = False
complianceMetadata_section_new_temp['complianceId'] = map_section_temp['sectionGUIDNew']
complianceMetadata_section_list_new_temp_2.append(complianceMetadata_section_new_temp)
break
if len(complianceMetadata_section_list_new_temp_2) == 0:
pc_lib_general.pc_exit_error(500, 'Cannot find any compliance section matches in a policy - this should not be possible?')
# Merge the existing and new lists
policy_specific_temp['complianceMetadata'].extend(complianceMetadata_section_list_new_temp_2)
# Add a label (optional) for the new compliance report name
if args.label:
policy_specific_temp['labels'].append(args.destination_compliance_standard_name)
# Post the updated policy to the API
try:
print('Updating ' + policy_specific_temp['name'])
pc_settings, response_package = pc_lib_api.api_policy_update(pc_settings, policy_specific_temp['policyId'], policy_specific_temp)
except requests.exceptions.HTTPError as e:
policy_update_error = True
print('Error updating ' + policy_specific_temp['name'])
policy_update_error_list.append(policy_specific_temp['name'])
if policy_update_error:
print()
print('An error was encountered when trying to update one or more policies. Below is a list of the policy name(s) that could not be updated. '
'Please manually attach these policies to your new compliance standard, if desired.')
print()
for policy_update_error_item in policy_update_error_list:
print(policy_update_error_item)
print()
print('**Compliance copy and policy update complete**')