Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(WIP) Issue #40: add delegator for dynamically detected cube processes #118

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions openeo/rest/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ def __init__(self, url, auth: AuthBase = None, session: requests.Session = None)
"""
super().__init__(root_url=url, auth=auth, session=session)
self._cached_capabilities = None
self._process_registry = None

# Initial API version check.
if self._api_version.below(self._MINIMUM_API_VERSION):
Expand Down Expand Up @@ -267,6 +268,15 @@ def capabilities(self) -> 'Capabilities':

return self._cached_capabilities

def process_registry(self) -> 'ProcessRegistry':
"""
Load all processes supported by the backend (lazy/cached)
:return: ProcessRegistry
"""
if self._process_registry is None:
self._process_registry = ProcessRegistry.from_connection(connection=self)
return self._process_registry

@deprecated("Use 'list_output_formats' instead")
def list_file_types(self) -> dict:
return self.list_output_formats()
Expand Down Expand Up @@ -554,3 +564,23 @@ def session(userid=None, endpoint: str = "https://openeo.org/openeo") -> Connect
"""
return connect(url=endpoint)


class ProcessRegistry:
"""
Registry of process specs (e.g. the processes supported by a backend)
"""
def __init__(self, processes: dict):
self._reg = processes

@classmethod
def from_connection(cls, connection=Connection):
"""Factory to load process registry from given backend connection."""
# Get as list from API
processes = connection.get('/processes').json()['processes']
# Make it a dictionary for more efficient retrieval
processes = {p['id']: p for p in processes}
return cls(processes=processes)

def get_parameters(self, process_id: str) -> List[dict]:
"""Get parameters for given process_id."""
return self._reg[process_id]["parameters"]
57 changes: 57 additions & 0 deletions openeo/rest/imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ def __init__(self, node_id: str, builder: GraphBuilder, session: 'Connection', m
self.session = session
self.graph = builder.processes
self.metadata = metadata
self.dynamic = DynamicCubeMethodDelegator(cube=self)

def __str__(self):
return "ImageCollection: %s" % self.node_id
Expand Down Expand Up @@ -1070,3 +1071,59 @@ def to_graphviz(self):
# TODO: add subgraph for "callback" arguments?

return graph


class DynamicProcessException(Exception):
pass


class _DynamicCubeMethod:
"""
A dynamically detected process bound to a raster cube.
The process should have a single "raster-cube" parameter.
"""

def __init__(self, cube: ImageCollectionClient, process_id: str, parameters: List[dict]):
self.cube = cube
self.process_id = process_id
self.parameters = parameters

# Find raster-cube parameter.
expected_schema = {"type": "object", "subtype": "raster-cube"}
names = [p["name"] for p in self.parameters if p["schema"] == expected_schema]
if len(names) != 1:
raise DynamicProcessException("Need one raster-cube parameter but found {c}".format(c=len(names)))
self.cube_parameter = names[0]

def __call__(self, *args, **kwargs):
"""Call the "cube method": pass cube and other arguments to the process."""
arguments = {
self.cube_parameter: {"from_node": self.cube.node_id}
}
# TODO: more advanced parameter checking (required vs optional), normalization based on type, ...
for i, arg in enumerate(args):
arguments[self.parameters[i]["name"]] = arg
for key, value in kwargs.items():
assert any(p["name"] == key for p in self.parameters)
assert key not in arguments
arguments[key] = value

return self.cube.graph_add_process(
process_id=self.process_id,
args=arguments,
)


class DynamicCubeMethodDelegator:
"""
Wrapper for a DataCube to group and delegate to dynamically detected processes
(depending on a particular backend or API spec)
"""

def __init__(self, cube: ImageCollectionClient):
self.cube = cube

def __getattr__(self, process_id):
self.process_registry = self.cube.session.process_registry()
parameters = self.process_registry.get_parameters(process_id)
return _DynamicCubeMethod(self.cube, process_id=process_id, parameters=parameters)
28 changes: 28 additions & 0 deletions tests/rest/test_imagecollectionclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
def session040(requests_mock):
requests_mock.get(API_URL + "/", json={"api_version": "0.4.0"})
session = openeo.connect(API_URL)
# Reset graph builder
GraphBuilder.id_counter = {}
return session


Expand Down Expand Up @@ -82,3 +84,29 @@ def result_callback(request, context):
path = tmpdir.join("tmp.tiff")
session040.load_collection("SENTINEL2").download(str(path), format="GTIFF")
assert path.read() == "tiffdata"


def test_dynamic_cube_method(session040, requests_mock):
processes = [
{
"id": "make_larger",
"description": "multiply a raster cube with a factor",
"parameters": [
{"name": "data", "schema": {"type": "object", "subtype": "raster-cube"}},
{"name": "factor", "schema": {"type": "float"}},
]}
]
requests_mock.get(API_URL + '/processes', json={"processes": processes})
requests_mock.get(API_URL + "/collections/SENTINEL2", json={"foo": "bar"})

cube = session040.load_collection("SENTINEL2")
cube = cube.dynamic.make_larger(factor=42)
assert set(cube.graph.keys()) == {"loadcollection1", "makelarger1"}
assert cube.graph["makelarger1"] == {
"process_id": "make_larger",
"arguments": {
"data": {"from_node": "loadcollection1"},
"factor": 42,
},
"result": False
}