-
Notifications
You must be signed in to change notification settings - Fork 0
/
test_ogc_processes.py
441 lines (428 loc) · 14.2 KB
/
test_ogc_processes.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
# Copyright 2024, by the California Institute of Technology. ALL RIGHTS RESERVED.
# United States Government Sponsorship acknowledged. Any commercial use must be
# negotiated with the Office of Technology Transfer at the California Institute of
# Technology. This software is subject to U.S. export control laws and regulations
# and has been classified as EAR99. By accepting this software, the user agrees to
# comply with all applicable U.S. export laws and regulations. User has the
# responsibility to obtain export licenses, or other export authority as may be
# required before exporting such information to foreign countries or providing
# access to foreign persons.
# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
import json
import os
import time
from unittest import TestCase
import requests
from dotenv import load_dotenv
from jsonschema.validators import validate
from ideas_api.lib.utils.TimeUtlis import TimeUtils
version = f'0.0.1123'
class TestOgcProcesses(TestCase):
def setUp(self) -> None:
super().setUp()
load_dotenv()
self.token = os.environ.get('auth_token')
self.base_url = os.environ.get('base_url')
self.base_url = self.base_url[: -1] if self.base_url.endswith('/') else self.base_url
def test_create_new_process_01(self):
# version = f'0.0.{TimeUtils().get_datetime_unix(False)}'
new_process = {
"id": "UNIT-TEST:EXAMPLE-API",
"title": "Process Title",
"description": "Process Description for LIS and RRR + Rapid. Ref: https://github.com/opengeospatial/ogcapi-processes/blob/master/core/examples/json/ProcessDescription.json",
"version": version,
"jobControlOptions": [
"sync-execute"
],
"outputTransmission": [
"value"
],
"additionalParameters": {
"parameters": [
{
"name": "stagesCount",
"value": [
"2"
]
},
{
"name": "stage001Names",
"value": [
"LIS"
]
},
{
"name": "stage002Names",
"value": [
"RRR",
"RAPID"
]
}
]
},
"inputs": {
"executingStageFlags": {
"title": "mandatory input to describe which stages need to be executed",
"description": "Boolean array to indicate which stages to be executed. each array index corresponds to each stage which where the details can be found in #/additionalParameters/parameters. If missing or mismatched, a default value of 'TRUE' is used.",
"schema": {
"$ref": "#/definitions/executingStageFlagsSchema"
}
},
"stagedResults": {
"title": "Results from other or input processes",
"description": "This can be power Process result or LIS process result if starting from RRR",
"schema": {
"$ref": "#/definitions/stagedResultsSchema"
}
},
"scenario": {
"title": "TODO:",
"description": "TODO:",
"schema": {
"type": "string",
"enum": [
"1x",
"2x",
"3x"
]
}
},
"hydroModel": {
"title": "TODO:",
"description": "TODO:",
"schema": {
"type": "string",
"enum": [
"VIC",
"NOAH"
]
}
},
"precipitationType": {
"title": "TODO:",
"description": "TODO:",
"schema": {
"type": "string",
"enum": [
"CHIRPS",
"MERRA2",
"IMERG"
]
}
},
"basinId": {
"title": "MERIT-Basin ID",
"description": "The numerical ID used to denote a MERIT-Basin. Currently only supporting basins 23 (western Europe) and 74 (Mississippi)",
"schema": {
"type": "integer",
"enum": [
23,
74
]
}
},
"landSurfaceModel": {
"title": "Land Surface Model",
"description": "Land Surface Model used as input data for RAPID.",
"schema": {
"type": "string",
"enum": [
"VIC",
"NOAH",
"NOAH_MP_1x",
"NOAH_MP_2x",
"NOAH_MP_3x",
"CLSM",
"LIS"
]
}
},
"spatialResolution": {
"title": "TODO:",
"description": "TODO:. Example: 1, 2, 3, ...",
"schema": {
"type": "string"
}
},
"startTime": {
"title": "start time for the process",
"description": "start time for the process in date-time string",
"schema": {
"type": "string",
"format": "date-time"
}
},
"endTime": {
"title": "end time for the process",
"description": "end time for the process in date-time string",
"schema": {
"type": "string",
"format": "date-time"
}
},
"spatialCoverage": {
"title": "spatial coverage to be used for all the processes",
"description": "Bounding box is the only supported spatial coverage type for this workflow",
"schema": {
"$ref": "#/definitions/spatialCoverageSchema"
}
}
},
"outputs": {
"processResults": {
"title": "Results from each stage in the workflow",
"description": "This can be power Process result or LIS process result if starting from RRR",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/processResultsSchema"
}
}
}
},
"definitions": {
"executingStageFlagsSchema": {
"type": "array",
"description": "This is an array to describe whether to execute each stage of the workflow. As this workflow as 2 stages, the length is 2.",
"minItems": 2,
"maxItems": 2,
"items": {
"type": "boolean"
}
},
"stagedResultsSchema": {
"type": "array",
"description": "This is where previous and external stage results will be added so that they can be used in current workflow. ",
"items": {
"type": "object",
"required": [
"name",
"value"
],
"properties": {
"name": {
"type": "string",
"description": "It is the name of the stage. a specific datatype can be appended if needed. example: LIS___data"
},
"value": {
"type": "string",
"description": "usualy a URL like S3 URL where the file is stored"
}
}
}
},
"spatialCoverageSchema": {
"type": "object",
"description": "This is the OGC definition of bbox. Pls follow this definition when providing bbox as an input",
"required": [
"bbox"
],
"properties": {
"bbox": {
"bbox": {
"type": "array",
"oneOf": [
{
"maxItems": 4,
"minItems": 4,
"type": "object"
},
{
"maxItems": 6,
"minItems": 6,
"type": "object"
}
],
"items": {
"type": "number"
}
},
"crs": {
"type": "string",
"format": "uri",
"default": "http://www.opengis.net/def/crs/OGC/1.3/CRS84",
"enum": [
"http://www.opengis.net/def/crs/OGC/1.3/CRS84",
"http://www.opengis.net/def/crs/OGC/0/CRS84h"
]
}
}
}
},
"processResultsSchema": {
"type": "object",
"description": "This is how outputs from each stage is stored",
"required": [
"name",
"value"
],
"properties": {
"name": {
"type": "string",
"description": "It is the name of the stage. a specific datatype can be appended if needed. example: LIS___data"
},
"value": {
"type": "string",
"description": "usualy a URL like S3 URL where the file is stored"
}
}
}
},
"links": [
{
"href": "https://processing.example.org/oapi-p/processes/EchoProcess/execution",
"rel": "http://www.opengis.net/def/rel/ogc/1.0/execute",
"title": "Execute endpoint"
}
]
}
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
s = requests.session()
s.trust_env = False
response = s.put(f'{self.base_url}/processes', data=json.dumps(new_process), headers=headers)
self.assertEqual(response.status_code, 200)
return
def test_list_all_processes_01(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
s = requests.session()
s.trust_env = False
response = s.get(f'{self.base_url}/processes', headers=headers)
self.assertEqual(response.status_code, 200)
print(json.dumps(json.loads(response.text), indent=4))
return
def test_get_process_01(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
s = requests.session()
s.trust_env = False
response = s.get(f'{self.base_url}/processes/UNIT-TEST:EXAMPLE-API?version_id={version}', headers=headers)
self.assertEqual(response.status_code, 200)
response_json = json.loads(response.text)
print(json.dumps(response_json, indent=2))
return
def test_start_job_01(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
new_job = {
"inputs": {
"executingStageFlags": [
True, # LIS
True, # RRR, Rapid
],
"stagedResults": [
# {
# "name": "LIS__DATA",
# "value": "s3://aqacf-nexus-stage/LIS_gar_gdas_3x_3hr/"
# }
],
"scenario": "3x",
"hydroModel": "NOAH",
"basinId": 23, # TODO need to be in descriptive string form
"precipitationType": "GDAS",
"landSurfaceModel": "LIS",
"spatialResolution": "1",
"startTime": "2021-01-01T00:00:00+00:00",
"endTime": "2021-03-01T00:00:00+00:00",
"spatialCoverage": {
"bbox": [
-180.0,
-90.0,
180.0,
90.0
]
}
}
}
s = requests.session()
s.trust_env = False
response = s.post(f'{self.base_url}/processes/UNIT-TEST:EXAMPLE-API/execution?version_id={version}',
headers=headers, data=json.dumps(new_job))
self.assertEqual(response.status_code, 200, response.text)
response_json = json.loads(response.text)
created_job_schema = {
"type": "object",
"required": ["processID", "type", "jobID", "status", "message", "progress"],
"properties": {
"processID": {"type": "string"},
"type": {"type": "string", "enum": ["process"]},
"jobID": {"type": "string"},
"status": {"type": "string", "enum": ["ACCEPTED"]},
"message": {"type": "string"},
"progress": {"type": "integer"},
}
}
try:
validate(instance=response_json, schema=created_job_schema)
except Exception as errors:
self.assertTrue(False, f'failed to validate response against schema: {response_json} vs. {errors}')
time.sleep(5)
response = requests.get(f'{self.base_url}/jobs/{response_json["jobID"]}', headers=headers)
self.assertEqual(response.status_code, 200)
response_json = json.loads(response.text)
status_job_schema = {
"type": "object",
"required": ["processID", "type", "jobID", "status", "message", "created", "progress"],
"properties": {
"processID": {"type": "string"},
"type": {"type": "string", "enum": ["process"]},
"jobID": {"type": "string"},
"status": {"type": "string", "enum": ["ACCEPTED", 'RUNNING', 'SUCCESSFUL', 'FAILED', 'DISMISSED']},
"message": {"type": "string"},
"created": {"type": "string"},
"started": {"type": "string"},
"finished": {"type": "string"},
"updated": {"type": "string"},
"progress": {"type": "integer"},
}
}
try:
validate(instance=response_json, schema=status_job_schema)
except Exception as errors:
self.assertTrue(False, f'failed to validate status response against schema: {response_json} vs. {errors}')
return
def test_result_job_01(self):
headers = {
'Authorization': f'Bearer {self.token}',
'Content-Type': 'application/json'
}
s = requests.session()
s.trust_env = False
response = s.get(f'{self.base_url}/jobs/a6779174-cd35-4598-8792-a12ca074f8fe-2023-11-07T22:43:21.399767/results', headers=headers)
self.assertEqual(response.status_code, 200)
response_json = json.loads(response.text)
print(response.text)
self.assertEqual(response.status_code, 200)
response_json = json.loads(response.text)
result_job_schema = {
"type": "object",
"required": ["processResults"],
"properties": {
"processResults": {
"type": "array",
"items": {
"type": "object",
"required": ["name", "value"],
"properties": {
"name": {"type": "string"},
"value": {"type": "string"},
}
}
},
}
}
try:
validate(instance=response_json, schema=result_job_schema)
except Exception as errors:
self.assertTrue(False, f'failed to validate status response against schema: {response_json} vs. {errors}')
return