diff --git a/docs/asyncio.md b/docs/asyncio.md new file mode 100644 index 00000000..b058cab0 --- /dev/null +++ b/docs/asyncio.md @@ -0,0 +1,38 @@ +# Asyncio + +`kr8s` uses `asyncio` under the hood when interacting with the Kubernetes API. However, it exposes a standard synchronous API by default. + +```python +import kr8s + +api = kr8s.api() +pods = api.get("pods") +``` + +For users that want it the `asyncio` API is also available via `kr8s.asyncio`. + +```python +import kr8s.asyncio + +api = kr8s.asyncio.api() +pods = await api.get("pods") +``` + +Submodules including `kr8s.objects` and `kr8s.portforward` also have `asyncio` equivalents at `kr8s.asyncio.objects` and `kr8s.asyncio.portforward`. + +```python +from kr8s.asyncio.object import Pod + +pod = Pod({ + "apiVersion": "v1", + "kind": "Pod", + "metadata": { + "name": "my-pod", + }, + "spec": { + "containers": [{"name": "pause", "image": "gcr.io/google_containers/pause",}] + }, + }) + +await pod.create() +``` diff --git a/docs/client.md b/docs/client.md index eb245a94..a2c503f4 100644 --- a/docs/client.md +++ b/docs/client.md @@ -7,7 +7,7 @@ import kr8s api = kr8s.api() -version = await api.version() +version = api.version() print(version) ``` @@ -21,7 +21,7 @@ The client API is inspired by `kubectl` rather than the Kubernetes API directly import kr8s api = kr8s.api() -pods = await api.get("pods", namespace=kr8s.ALL) +pods = api.get("pods", namespace=kr8s.ALL) for pod in pods: print(pod.name) @@ -33,12 +33,16 @@ For situations where there may not be an appropriate method to call or you want To make API requests for resources more convenience `call_api` allows building the url via various kwargs. +```{note} +Note that `call_api` is only available via the [asyncio API](asyncio). +``` + For example to get all pods you could make the following low-level call. ```python -import kr8s +import kr8s.asyncio -api = kr8s.api() +api = kr8s.asyncio.api() async with api.call_api("GET", url="pods", namespace="") as r: pods_response = await r.json() @@ -87,7 +91,7 @@ api2 = kr8s.api() ```python from kr8s.objects import Pod -pod = await Pod.get("some-pod") +pod = Pod.get("some-pod") # pod.api is a pointer to api despite not being passed a reference due to caching ``` @@ -107,7 +111,7 @@ api2 = kr8s.Api(bypass_factory=True) ```python from kr8s.objects import Pod -pod = await Pod.get("some-pod", api=api2) +pod = Pod.get("some-pod", api=api2) # be sure to pass a reference around as caching will no longer work ``` diff --git a/docs/conf.py b/docs/conf.py index 18efbaeb..0c1129d6 100644 --- a/docs/conf.py +++ b/docs/conf.py @@ -50,7 +50,7 @@ "show-module-summary", "imported-members", ] -autoapi_ignore = ["*tests*", "*conftest*"] +autoapi_ignore = ["*tests*", "*conftest*", "*asyncio*"] # autoapi_python_use_implicit_namespaces = True autoapi_keep_files = True # autoapi_generate_api_docs = False diff --git a/docs/index.md b/docs/index.md index 26f54366..f31a8211 100644 --- a/docs/index.md +++ b/docs/index.md @@ -25,7 +25,7 @@ $ pip install kr8s import kr8s api = kr8s.api() -pods = await api.get("pods") +pods = api.get("pods") ``` ### Object API @@ -44,7 +44,7 @@ pod = Pod({ }, }) -await pod.create() +pod.create() ``` @@ -62,6 +62,7 @@ installation authentication client object +asyncio ``` ```{toctree} diff --git a/docs/object.md b/docs/object.md index 79e96e96..421e5c21 100644 --- a/docs/object.md +++ b/docs/object.md @@ -6,7 +6,7 @@ Responses from the Client API are usually objects from [](#kr8s.objects) which r import kr8s api = kr8s.api() -pods = await api.get("pods", namespace=kr8s.ALL) +pods = api.get("pods", namespace=kr8s.ALL) pod = pods[0] print(type(pod)) # @@ -50,27 +50,27 @@ Objects also have helper methods for interacting with Kubernetes resources. ```python # Patch the Pod -await pod.patch({"metadata": {"labels": {"foo": "bar"}}}) +pod.patch({"metadata": {"labels": {"foo": "bar"}}}) # Check the Pod exists -await pod.exists() +pod.exists() # True # Update the object with the latest state from the API -await pod.refresh() +pod.refresh() # Delete the Pod -await pod.delete() +pod.delete() ``` Some objects also have additional methods that are unique to them. ```python # Get Pod logs -logs = await pod.logs() +logs = pod.logs() # Check if Pod containers are ready -await pod.ready() +pod.ready() # True ``` @@ -95,7 +95,7 @@ pod = Pod({ }, }) -await pod.create() +pod.create() ``` Get a Pod reference by name. @@ -103,7 +103,7 @@ Get a Pod reference by name. ```python from kr8s.object import Pod -pod = await Pod.get("my-pod") +pod = Pod.get("my-pod") ``` When creating new objects they will not have a client reference because they are created directly. In this case the object will call the [](#kr8s.api) factory function which will either create a new client if none exists or will grab the first client from the cache if one was created somewhere else in your code. @@ -192,7 +192,7 @@ class CustomObject(APIObject): api = kr8s.api() -cos = await api.get("customobjects") # Will return a list of CustomObject instances +cos = api.get("customobjects") # Will return a list of CustomObject instances ``` ```{note} diff --git a/kr8s/__init__.py b/kr8s/__init__.py index 64af81fc..1c6960bb 100644 --- a/kr8s/__init__.py +++ b/kr8s/__init__.py @@ -1,6 +1,19 @@ # SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, Yuvi Panda, Anaconda Inc, NVIDIA # SPDX-License-Identifier: BSD 3-Clause License -from ._api import Api, ALL, api # noqa +from functools import partial + +from ._api import ALL # noqa +from ._api import Api as _AsyncApi +from ._asyncio import sync as _sync from ._exceptions import NotFoundError # noqa +from .asyncio import api as _api # noqa __version__ = "0.0.0" + + +@_sync +class Api(_AsyncApi): + __doc__ = _AsyncApi.__doc__ + + +api = partial(_api, _asyncio=False) diff --git a/kr8s/_api.py b/kr8s/_api.py index 64c2686b..c77ce114 100644 --- a/kr8s/_api.py +++ b/kr8s/_api.py @@ -25,6 +25,7 @@ class Api(object): """ + _asyncio = True _instances = weakref.WeakValueDictionary() def __init__(self, **kwargs) -> None: @@ -141,7 +142,7 @@ async def _get_kind( watch: bool = False, ) -> dict: """Get a Kubernetes resource.""" - from .objects import get_class + from ._objects import get_class if not namespace: namespace = self.auth.namespace @@ -175,6 +176,22 @@ async def get( field_selector: str = None, ) -> List[object]: """Get a Kubernetes resource.""" + return await self._get( + kind, + *names, + namespace=namespace, + label_selector=label_selector, + field_selector=field_selector, + ) + + async def _get( + self, + kind: str, + *names: List[str], + namespace: str = None, + label_selector: str = None, + field_selector: str = None, + ) -> List[object]: async with self._get_kind( kind, namespace=namespace, @@ -197,6 +214,24 @@ async def watch( label_selector: str = None, field_selector: str = None, since: str = None, + ): + """Watch a Kubernetes resource.""" + async for t, object in self._watch( + kind, + namespace=namespace, + label_selector=label_selector, + field_selector=field_selector, + since=since, + ): + yield t, object + + async def _watch( + self, + kind: str, + namespace: str = None, + label_selector: str = None, + field_selector: str = None, + since: str = None, ): """Watch a Kubernetes resource.""" async with self._get_kind( @@ -251,25 +286,3 @@ def __version__(self): from . import __version__ return f"kr8s/{__version__}" - - -def api(url=None, kubeconfig=None, serviceaccount=None, namespace=None) -> Api: - """Create a :class:`kr8s.Api` object for interacting with the Kubernetes API. - - If a kr8s object already exists with the same arguments, it will be returned. - """ - - def _f(**kwargs): - key = frozenset(kwargs.items()) - if key in Api._instances: - return Api._instances[key] - if all(k is None for k in kwargs.values()) and list(Api._instances.values()): - return list(Api._instances.values())[0] - return Api(**kwargs, bypass_factory=True) - - return _f( - url=url, - kubeconfig=kubeconfig, - serviceaccount=serviceaccount, - namespace=namespace, - ) diff --git a/kr8s/_asyncio.py b/kr8s/_asyncio.py new file mode 100644 index 00000000..57634f63 --- /dev/null +++ b/kr8s/_asyncio.py @@ -0,0 +1,115 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, MrNaif2018, Dask Developers, NVIDIA +# SPDX-License-Identifier: MIT License +# SPDX-License-URL: https://github.com/bitcartcc/universalasync/blob/d3979113316431a24f0260804442d29a38e414a2/LICENSE +# Forked from https://github.com/bitcartcc/universalasync/tree/d3979113316431a24f0260804442d29a38e414a2 +import asyncio +import functools +import inspect +from typing import Any, AsyncGenerator, Callable, Generator, Tuple + + +def _get_event_loop() -> asyncio.AbstractEventLoop: + try: + return asyncio.get_event_loop_policy().get_event_loop() + except RuntimeError: + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +def get_event_loop() -> asyncio.AbstractEventLoop: + """Useful utility for getting event loop. Acts like get_event_loop(), but also creates new event loop if needed + + This will return a working event loop in 100% of cases. + + Returns: + asyncio.AbstractEventLoop: event loop + """ + loop = _get_event_loop() + if loop.is_closed(): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + return loop + + +def iter_over_async(agen: AsyncGenerator, run_func: Callable) -> Generator: + ait = agen.__aiter__() + + async def get_next() -> Tuple[bool, Any]: + try: + obj = await ait.__anext__() + return False, obj + except StopAsyncIteration: + return True, None + + while True: + done, obj = run_func(get_next()) + if done: + break + yield obj + + +def run_sync_ctx(coroutine: Any, loop: asyncio.AbstractEventLoop) -> Any: + if inspect.isawaitable(coroutine): + return asyncio.run(coroutine) + + if inspect.isasyncgen(coroutine): + return iter_over_async(coroutine, lambda coro: asyncio.run(coro)) + + +def async_to_sync_wraps(function: Callable) -> Callable: + """Wrap an async method/property to universal method. + + This allows to run wrapped methods in both async and sync contexts transparently without any additional code + + When run from another thread, it runs coroutines in new thread's event loop + + See :ref:`Example ` for full example + + Args: + function (Callable): function/property to wrap + + Returns: + Callable: modified function + """ + + @functools.wraps(function) + def async_to_sync_wrap(*args: Any, **kwargs: Any) -> Any: + loop = get_event_loop() + coroutine = function(*args, **kwargs) + + return run_sync_ctx(coroutine, loop) + + result = async_to_sync_wrap + return result + + +def sync(source: object) -> object: + """Convert all public async methods/properties of an object to universal methods. + + See :func:`async_to_sync_wraps` for more info + + Args: + source (object): object to convert + + Returns: + object: converted object. Note that parameter passed is being modified anyway + """ + setattr(source, "_asyncio", False) + for name in dir(source): + method = getattr(source, name) + + if not name.startswith("_"): + if inspect.iscoroutinefunction(method) or inspect.isasyncgenfunction( + method + ): + function = getattr(source, name) + setattr(source, name, async_to_sync_wraps(function)) + + elif name == "__aenter__" and not hasattr(source, "__enter__"): + setattr(source, "__enter__", async_to_sync_wraps(method)) + + elif name == "__aexit__" and not hasattr(source, "__exit__"): + setattr(source, "__exit__", async_to_sync_wraps(method)) + + return source diff --git a/kr8s/_objects.py b/kr8s/_objects.py new file mode 100644 index 00000000..4d437983 --- /dev/null +++ b/kr8s/_objects.py @@ -0,0 +1,856 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, Yuvi Panda, Anaconda Inc, NVIDIA +# SPDX-License-Identifier: BSD 3-Clause License +import asyncio +import json +from typing import Any, List, Optional + +import aiohttp +from aiohttp import ClientResponse + +import kr8s +import kr8s.asyncio +from kr8s._api import Api +from kr8s._data_utils import list_dict_unpack +from kr8s._exceptions import NotFoundError +from kr8s.asyncio.portforward import PortForward as AsyncPortForward +from kr8s.portforward import PortForward as SyncPortForward + + +class APIObject: + """Base class for Kubernetes objects.""" + + namespaced = False + scalable = False + scalable_spec = "replicas" + _asyncio = True + + def __init__(self, resource: dict, api: Api = None) -> None: + """Initialize an APIObject.""" + # TODO support passing pykube or kubernetes objects in addition to dicts + self._raw = resource + self.api = api or (kr8s.asyncio.api() if self._asyncio else kr8s.api()) + + def __repr__(self): + """Return a string representation of the Kubernetes resource.""" + return f"<{self.kind} {self.name}>" + + def __str__(self): + """Return a string representation of the Kubernetes resource.""" + return self.name + + @property + def raw(self) -> str: + """Raw object returned from the Kubernetes API.""" + return self._raw + + @raw.setter + def raw(self, value): + self._raw = value + + @property + def name(self) -> str: + """Name of the Kubernetes resource.""" + return self.raw["metadata"]["name"] + + @property + def namespace(self) -> str: + """Namespace of the Kubernetes resource.""" + if self.namespaced: + return self.raw.get("metadata", {}).get( + "namespace", self.api.auth.namespace + ) + return None + + @property + def metadata(self) -> dict: + """Metadata of the Kubernetes resource.""" + return self.raw["metadata"] + + @property + def spec(self) -> dict: + """Spec of the Kubernetes resource.""" + return self.raw["spec"] + + @property + def status(self) -> dict: + """Status of the Kubernetes resource.""" + return self.raw["status"] + + @property + def labels(self) -> dict: + """Labels of the Kubernetes resource.""" + try: + return self.raw["metadata"]["labels"] + except KeyError: + return {} + + @property + def annotations(self) -> dict: + """Annotations of the Kubernetes resource.""" + try: + return self.raw["metadata"]["annotations"] + except KeyError: + return {} + + @property + def replicas(self) -> int: + """Replicas of the Kubernetes resource.""" + if self.scalable: + return self.raw["spec"][self.scalable_spec] + raise NotImplementedError(f"{self.kind} is not scalable") + + @classmethod + async def get( + cls, name: str, namespace: str = None, api: Api = None, **kwargs + ) -> "APIObject": + """Get a Kubernetes resource by name.""" + + api = api or (kr8s.asyncio.api() if cls._asyncio else kr8s.api()) + try: + resources = await api._get( + cls.endpoint, name, namespace=namespace, **kwargs + ) + [resource] = resources + except ValueError: + raise ValueError( + f"Expected exactly one {cls.kind} object. Use selectors to narrow down the search." + ) + + return resource + + async def exists(self, ensure=False) -> bool: + """Check if this object exists in Kubernetes.""" + return await self._exists(ensure=ensure) + + async def _exists(self, ensure=False) -> bool: + """Check if this object exists in Kubernetes.""" + async with self.api.call_api( + "GET", + version=self.version, + url=f"{self.endpoint}/{self.name}", + namespace=self.namespace, + raise_for_status=False, + ) as resp: + status = resp.status + if status == 200: + return True + if ensure: + raise NotFoundError(f"Object {self.name} does not exist") + return False + + async def create(self) -> None: + """Create this object in Kubernetes.""" + async with self.api.call_api( + "POST", + version=self.version, + url=self.endpoint, + namespace=self.namespace, + data=json.dumps(self.raw), + ) as resp: + self.raw = await resp.json() + + async def delete(self, propagation_policy: str = None) -> None: + """Delete this object from Kubernetes.""" + data = {} + if propagation_policy: + data["propagationPolicy"] = propagation_policy + try: + async with self.api.call_api( + "DELETE", + version=self.version, + url=f"{self.endpoint}/{self.name}", + namespace=self.namespace, + data=json.dumps(data), + ) as resp: + self.raw = await resp.json() + except aiohttp.ClientResponseError as e: + raise NotFoundError(f"Object {self.name} does not exist") from e + + async def refresh(self) -> None: + """Refresh this object from Kubernetes.""" + await self._refresh() + + async def _refresh(self) -> None: + """Refresh this object from Kubernetes.""" + async with self.api.call_api( + "GET", + version=self.version, + url=f"{self.endpoint}/{self.name}", + namespace=self.namespace, + ) as resp: + self.raw = await resp.json() + + async def patch(self, patch, *, subresource=None) -> None: + """Patch this object in Kubernetes.""" + return await self._patch(patch, subresource=subresource) + + async def _patch(self, patch, *, subresource=None) -> None: + """Patch this object in Kubernetes.""" + url = f"{self.endpoint}/{self.name}" + if subresource: + url = f"{url}/{subresource}" + async with self.api.call_api( + "PATCH", + version=self.version, + url=url, + namespace=self.namespace, + data=json.dumps(patch), + headers={"Content-Type": "application/merge-patch+json"}, + ) as resp: + self.raw = await resp.json() + + async def scale(self, replicas=None): + """Scale this object in Kubernetes.""" + if not self.scalable: + raise NotImplementedError(f"{self.kind} is not scalable") + await self._exists(ensure=True) + # TODO support dot notation in self.scalable_spec to support nested fields + await self._patch({"spec": {self.scalable_spec: replicas}}) + while self.replicas != replicas: + await self._refresh() + await asyncio.sleep(0.1) + + async def watch(self): + """Watch this object in Kubernetes.""" + since = self.metadata.get("resourceVersion") + async for event, obj in self.api._watch( + self.endpoint, + namespace=self.namespace, + field_selector=f"metadata.name={self.name}", + since=since, + ): + self.raw = obj.raw + yield event, self + + +## v1 objects + + +class Binding(APIObject): + """A Kubernetes Binding.""" + + version = "v1" + endpoint = "bindings" + kind = "Binding" + plural = "bindings" + singular = "binding" + namespaced = True + + +class ComponentStatus(APIObject): + """A Kubernetes ComponentStatus.""" + + version = "v1" + endpoint = "componentstatuses" + kind = "ComponentStatus" + plural = "componentstatuses" + singular = "componentstatus" + namespaced = False + + +class ConfigMap(APIObject): + """A Kubernetes ConfigMap.""" + + version = "v1" + endpoint = "configmaps" + kind = "ConfigMap" + plural = "configmaps" + singular = "configmap" + namespaced = True + + +class Endpoints(APIObject): + """A Kubernetes Endpoints.""" + + version = "v1" + endpoint = "endpoints" + kind = "Endpoints" + plural = "endpoints" + singular = "endpoint" + namespaced = True + + +class Event(APIObject): + """A Kubernetes Event.""" + + version = "v1" + endpoint = "events" + kind = "Event" + plural = "events" + singular = "event" + namespaced = True + + +class LimitRange(APIObject): + """A Kubernetes LimitRange.""" + + version = "v1" + endpoint = "limitranges" + kind = "LimitRange" + plural = "limitranges" + singular = "limitrange" + namespaced = True + + +class Namespace(APIObject): + """A Kubernetes Namespace.""" + + version = "v1" + endpoint = "namespaces" + kind = "Namespace" + plural = "namespaces" + singular = "namespace" + namespaced = False + + +class Node(APIObject): + """A Kubernetes Node.""" + + version = "v1" + endpoint = "nodes" + kind = "Node" + plural = "nodes" + singular = "node" + namespaced = False + + @property + def unschedulable(self): + if "unschedulable" in self.raw["spec"]: + return self.raw["spec"]["unschedulable"] + return False + + async def cordon(self): + await self._patch({"spec": {"unschedulable": True}}) + + async def uncordon(self): + await self._patch({"spec": {"unschedulable": False}}) + + +class PersistentVolumeClaim(APIObject): + """A Kubernetes PersistentVolumeClaim.""" + + version = "v1" + endpoint = "persistentvolumeclaims" + kind = "PersistentVolumeClaim" + plural = "persistentvolumeclaims" + singular = "persistentvolumeclaim" + namespaced = True + + +class PersistentVolume(APIObject): + """A Kubernetes PersistentVolume.""" + + version = "v1" + endpoint = "persistentvolumes" + kind = "PersistentVolume" + plural = "persistentvolumes" + singular = "persistentvolume" + namespaced = False + + +class Pod(APIObject): + """A Kubernetes Pod.""" + + version = "v1" + endpoint = "pods" + kind = "Pod" + plural = "pods" + singular = "pod" + namespaced = True + + async def ready(self): + """Check if the pod is ready.""" + await self._refresh() + conditions = list_dict_unpack( + self.status.get("conditions", []), + key="type", + value="status", + ) + return ( + "Ready" in conditions + and "ContainersReady" in conditions + and conditions.get("Ready", "False") == "True" + and conditions.get("ContainersReady", "False") == "True" + ) + + async def logs( + self, + container=None, + pretty=None, + previous=False, + since_seconds=None, + since_time=None, + timestamps=False, + tail_lines=None, + limit_bytes=None, + ): + params = {} + if container is not None: + params["container"] = container + if pretty is not None: + params["pretty"] = pretty + if previous: + params["previous"] = "true" + if since_seconds is not None and since_time is None: + params["sinceSeconds"] = int(since_seconds) + elif since_time is not None and since_seconds is None: + params["sinceTime"] = since_time + if timestamps: + params["timestamps"] = "true" + if tail_lines is not None: + params["tailLines"] = int(tail_lines) + if limit_bytes is not None: + params["limitBytes"] = int(limit_bytes) + + async with self.api.call_api( + "GET", + version=self.version, + url=f"{self.endpoint}/{self.name}/log", + namespace=self.namespace, + params=params, + ) as resp: + return await resp.text() + + def portforward(self, remote_port: int, local_port: int = None) -> int: + """Port forward a pod. + + Returns an instance of :class:`kr8s.portforward.PortForward` for this Pod. + + Example: + This can be used as a an async context manager or with explicit start/stop methods. + + Context manager: + + >>> async with pod.portforward(8888) as port: + ... print(f"Forwarding to port {port}") + ... # Do something with port 8888 + + + Explict start/stop: + + >>> pf = pod.portforward(8888) + >>> await pf.start() + >>> print(f"Forwarding to port {pf.local_port}") + >>> # Do something with port 8888 + >>> await pf.stop() + """ + if self._asyncio: + return AsyncPortForward(self, remote_port, local_port) + return SyncPortForward(self, remote_port, local_port) + + +class PodTemplate(APIObject): + """A Kubernetes PodTemplate.""" + + version = "v1" + endpoint = "podtemplates" + kind = "PodTemplate" + plural = "podtemplates" + singular = "podtemplate" + namespaced = True + + +class ReplicationController(APIObject): + """A Kubernetes ReplicationController.""" + + version = "v1" + endpoint = "replicationcontrollers" + kind = "ReplicationController" + plural = "replicationcontrollers" + singular = "replicationcontroller" + namespaced = True + scalable = True + + async def ready(self): + """Check if the deployment is ready.""" + await self._refresh() + return ( + self.raw["status"].get("observedGeneration", 0) + >= self.raw["metadata"]["generation"] + and self.raw["status"].get("readyReplicas", 0) == self.replicas + ) + + +class ResourceQuota(APIObject): + """A Kubernetes ResourceQuota.""" + + version = "v1" + endpoint = "resourcequotas" + kind = "ResourceQuota" + plural = "resourcequotas" + singular = "resourcequota" + namespaced = True + + +class Secret(APIObject): + """A Kubernetes Secret.""" + + version = "v1" + endpoint = "secrets" + kind = "Secret" + plural = "secrets" + singular = "secret" + namespaced = True + + +class ServiceAccount(APIObject): + """A Kubernetes ServiceAccount.""" + + version = "v1" + endpoint = "serviceaccounts" + kind = "ServiceAccount" + plural = "serviceaccounts" + singular = "serviceaccount" + namespaced = True + + +class Service(APIObject): + """A Kubernetes Service.""" + + version = "v1" + endpoint = "services" + kind = "Service" + plural = "services" + singular = "service" + namespaced = True + + async def proxy_http_request( + self, method: str, path: str, port: Optional[int] = None, **kwargs: Any + ) -> ClientResponse: + """Issue a HTTP request with specific HTTP method to proxy of a Service. + + Args: + method: HTTP method to use. + path: Path to proxy. + port: Port to proxy to. If not specified, the first port in the + Service's spec will be used. + **kwargs: Additional keyword arguments to pass to the API call. + """ + return await self._proxy_http_request(method, path, port, **kwargs) + + async def _proxy_http_request( + self, method: str, path: str, port: Optional[int] = None, **kwargs: Any + ) -> ClientResponse: + if port is None: + port = self.raw["spec"]["ports"][0]["port"] + async with self.api.call_api( + method, + version=self.version, + url=f"{self.endpoint}/{self.name}:{port}/proxy/{path}", + namespace=self.namespace, + **kwargs, + ) as response: + return response + + async def proxy_http_get( + self, path: str, port: Optional[int] = None, **kwargs + ) -> None: + return await self._proxy_http_request("GET", path, port, **kwargs) + + async def proxy_http_post( + self, path: str, port: Optional[int] = None, **kwargs + ) -> None: + return await self._proxy_http_request("POST", path, port, **kwargs) + + async def proxy_http_put( + self, path: str, port: Optional[int] = None, **kwargs + ) -> None: + return await self._proxy_http_request("PUT", path, port, **kwargs) + + async def proxy_http_delete( + self, path: str, port: Optional[int] = None, **kwargs + ) -> None: + return await self._proxy_http_request("DELETE", path, port, **kwargs) + + async def ready_pods(self) -> List[Pod]: + """Return a list of ready pods for this service.""" + return await self._ready_pods() + + async def _ready_pods(self) -> List[Pod]: + """Return a list of ready pods for this service.""" + pod_selector = ",".join([f"{k}={v}" for k, v in self.labels.items()]) + pods = await self.api._get("pods", label_selector=pod_selector) + return [pod for pod in pods if await pod.ready()] + + async def ready(self) -> bool: + """Check if the service is ready.""" + pods = await self._ready_pods() + return len(pods) > 0 + + def portforward(self, remote_port: int, local_port: int = None) -> int: + """Port forward a service. + + Returns an instance of :class:`kr8s.portforward.PortForward` for this Service. + + Example: + This can be used as a an async context manager or with explicit start/stop methods. + + Context manager: + + >>> async with service.portforward(8888) as port: + ... print(f"Forwarding to port {port}") + ... # Do something with port 8888 + + + Explict start/stop: + + >>> pf = service.portforward(8888) + >>> await pf.start() + >>> print(f"Forwarding to port {pf.local_port}") + >>> # Do something with port 8888 + >>> await pf.stop() + + """ + if self._asyncio: + return AsyncPortForward(self, remote_port, local_port) + return SyncPortForward(self, remote_port, local_port) + + +## apps/v1 objects + + +class ControllerRevision(APIObject): + """A Kubernetes ControllerRevision.""" + + version = "apps/v1" + endpoint = "controllerrevisions" + kind = "ControllerRevision" + plural = "controllerrevisions" + singular = "controllerrevision" + namespaced = True + + +class DaemonSet(APIObject): + """A Kubernetes DaemonSet.""" + + version = "apps/v1" + endpoint = "daemonsets" + kind = "DaemonSet" + plural = "daemonsets" + singular = "daemonset" + namespaced = True + + +class Deployment(APIObject): + """A Kubernetes Deployment.""" + + version = "apps/v1" + endpoint = "deployments" + kind = "Deployment" + plural = "deployments" + singular = "deployment" + namespaced = True + scalable = True + + async def ready(self): + """Check if the deployment is ready.""" + await self._refresh() + return ( + self.raw["status"].get("observedGeneration", 0) + >= self.raw["metadata"]["generation"] + and self.raw["status"].get("readyReplicas", 0) == self.replicas + ) + + +class ReplicaSet(APIObject): + """A Kubernetes ReplicaSet.""" + + version = "apps/v1" + endpoint = "replicasets" + kind = "ReplicaSet" + plural = "replicasets" + singular = "replicaset" + namespaced = True + scalable = True + + +class StatefulSet(APIObject): + """A Kubernetes StatefulSet.""" + + version = "apps/v1" + endpoint = "statefulsets" + kind = "StatefulSet" + plural = "statefulsets" + singular = "statefulset" + namespaced = True + scalable = True + + +## autoscaling/v1 objects + + +class HorizontalPodAutoscaler(APIObject): + """A Kubernetes HorizontalPodAutoscaler.""" + + version = "autoscaling/v2" + endpoint = "horizontalpodautoscalers" + kind = "HorizontalPodAutoscaler" + plural = "horizontalpodautoscalers" + singular = "horizontalpodautoscaler" + namespaced = True + + +## batch/v1 objects + + +class CronJob(APIObject): + """A Kubernetes CronJob.""" + + version = "batch/v1" + endpoint = "cronjobs" + kind = "CronJob" + plural = "cronjobs" + singular = "cronjob" + namespaced = True + + +class Job(APIObject): + """A Kubernetes Job.""" + + version = "batch/v1" + endpoint = "jobs" + kind = "Job" + plural = "jobs" + singular = "job" + namespaced = True + scalable = True + scalable_spec = "parallelism" + + +## networking.k8s.io/v1 objects + + +class IngressClass(APIObject): + """A Kubernetes IngressClass.""" + + version = "networking.k8s.io/v1" + endpoint = "ingressclasses" + kind = "IngressClass" + plural = "ingressclasses" + singular = "ingressclass" + namespaced = False + + +class Ingress(APIObject): + """A Kubernetes Ingress.""" + + version = "networking.k8s.io/v1" + endpoint = "ingresses" + kind = "Ingress" + plural = "ingresses" + singular = "ingress" + namespaced = True + + +class NetworkPolicy(APIObject): + """A Kubernetes NetworkPolicy.""" + + version = "networking.k8s.io/v1" + endpoint = "networkpolicies" + kind = "NetworkPolicy" + plural = "networkpolicies" + singular = "networkpolicy" + namespaced = True + + +## policy/v1 objects + + +class PodDisruptionBudget(APIObject): + """A Kubernetes PodDisruptionBudget.""" + + version = "policy/v1" + endpoint = "poddisruptionbudgets" + kind = "PodDisruptionBudget" + plural = "poddisruptionbudgets" + singular = "poddisruptionbudget" + namespaced = True + + +## rbac.authorization.k8s.io/v1 objects + + +class ClusterRoleBinding(APIObject): + """A Kubernetes ClusterRoleBinding.""" + + version = "rbac.authorization.k8s.io/v1" + endpoint = "clusterrolebindings" + kind = "ClusterRoleBinding" + plural = "clusterrolebindings" + singular = "clusterrolebinding" + namespaced = False + + +class ClusterRole(APIObject): + """A Kubernetes ClusterRole.""" + + version = "rbac.authorization.k8s.io/v1" + endpoint = "clusterroles" + kind = "ClusterRole" + plural = "clusterroles" + singular = "clusterrole" + namespaced = False + + +class RoleBinding(APIObject): + """A Kubernetes RoleBinding.""" + + version = "rbac.authorization.k8s.io/v1" + endpoint = "rolebindings" + kind = "RoleBinding" + plural = "rolebindings" + singular = "rolebinding" + namespaced = True + + +class Role(APIObject): + """A Kubernetes Role.""" + + version = "rbac.authorization.k8s.io/v1" + endpoint = "roles" + kind = "Role" + plural = "roles" + singular = "role" + namespaced = True + + +## apiextensions.k8s.io/v1 objects + + +class CustomResourceDefinition(APIObject): + """A Kubernetes CustomResourceDefinition.""" + + version = "apiextensions.k8s.io/v1" + endpoint = "customresourcedefinitions" + kind = "CustomResourceDefinition" + plural = "customresourcedefinitions" + singular = "customresourcedefinition" + namespaced = False + + +def get_class(kind, version=None, _asyncio=True): + for cls in APIObject.__subclasses__(): + if ( + hasattr(cls, "kind") + and (cls.kind == kind or cls.singular == kind or cls.plural == kind) + and (version is None or cls.version == version) + and cls._asyncio == _asyncio + ): + return cls + raise KeyError(f"No object registered for {version}/{kind}") + + +def object_from_spec(spec: dict, api: Api = None) -> APIObject: + """Create an APIObject from a Kubernetes resource spec. + + Args: + spec: A Kubernetes resource spec. + + Returns: + A corresponding APIObject subclass instance. + + Raises: + ValueError: If the resource kind or API version is not supported. + """ + cls = get_class(spec["kind"], spec["apiVersion"]) + return cls(spec, api=api) diff --git a/kr8s/_portforward.py b/kr8s/_portforward.py new file mode 100644 index 00000000..8f4302cf --- /dev/null +++ b/kr8s/_portforward.py @@ -0,0 +1,200 @@ +# SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, NVIDIA +# SPDX-License-Identifier: BSD 3-Clause License +import asyncio +import random +import socket +from contextlib import asynccontextmanager + +import aiohttp + +from ._exceptions import ConnectionClosedError + + +class PortForward: + """Start a tcp server and forward all connections to a Pod port. + + You can either pass a :class:`kr8s.objects.Pod` or any resource with a ``ready_pods`` method + such as a :class:`kr8s.objects.Service`. + + .. note:: + The ``ready_pods`` method should return a list of Pods that are ready to accept connections. + + Args: + ``resource`` (Pod or Resource): The Pod or Resource to forward to. + + ``remote_port`` (int): The port on the Pod to forward to. + + ``local_port`` (int, optional): The local port to listen on. Defaults to 0, which will choose a random port. + + Example: + This class can be used as a an async context manager or with explicit start/stop methods. + + Context manager: + + >>> async with PortForward(pod, 8888) as port: + ... print(f"Forwarding to port {port}") + ... # Do something with port 8888 on the Pod + + + Explict start/stop: + + >>> pf = PortForward(pod, 8888) + >>> await pf.start() + >>> print(f"Forwarding to port {pf.local_port}") + >>> # Do something with port 8888 on the Pod + >>> await pf.stop() + + + """ + + def __init__(self, resource, remote_port, local_port=None) -> None: + self.running = True + self.server = None + self.websocket = None + self.remote_port = remote_port + self.local_port = local_port if local_port is not None else 0 + self._resource = resource + from ._objects import Pod + + self.pod = None + if isinstance(resource, Pod): + self.pod = resource + else: + if not hasattr(resource, "ready_pods"): + raise ValueError( + "resource must be a Pod or a resource with a ready_pods method" + ) + self.connection_attempts = 0 + self._loop = asyncio.get_event_loop() + self._tasks = [] + self._run_task = None + self._bg_future = None + self._bg_task = None + + async def __aenter__(self, *args, **kwargs): + self._run_task = self._run() + return await self._run_task.__aenter__(*args, **kwargs) + + async def __aexit__(self, *args, **kwargs): + return await self._run_task.__aexit__(*args, **kwargs) + + async def start(self): + """Start a background task with the port forward running.""" + if self._bg_task is not None: + return + + async def f(): + self._bg_future = self._loop.create_future() + async with self as port: + self.local_port = port + await self._bg_future + + self._bg_task = asyncio.create_task(f()) + while self.local_port == 0: + await asyncio.sleep(0.1) + return self.local_port + + async def stop(self): + """Stop the background task.""" + self._bg_future.set_result(None) + self._bg_task = None + + @asynccontextmanager + async def _run(self): + """Start the port forward and yield the local port.""" + if not self.pod: + try: + self.pod = random.choice(await self._resource.ready_pods()) + except IndexError: + raise RuntimeError("No ready pods found") + self.server = await asyncio.start_server( + self._sync_sockets, port=self.local_port, host="0.0.0.0" + ) + async with self.server: + await self.server.start_serving() + for sock in self.server.sockets: + if sock.family == socket.AF_INET: + yield sock.getsockname()[1] + self.server.close() + await self.server.wait_closed() + + async def _connect_websocket(self): + while self.running: + self.connection_attempts += 1 + try: + async with self.pod.api.call_api( + version=self.pod.version, + url=f"{self.pod.endpoint}/{self.pod.name}/portforward", + namespace=self.pod.namespace, + websocket=True, + params={ + "name": self.pod.name, + "namespace": self.pod.namespace, + "ports": f"{self.remote_port}", + "_preload_content": "false", + }, + ) as websocket: + self.websocket = websocket + while not self.websocket.closed: + await asyncio.sleep(0.1) + except (aiohttp.WSServerHandshakeError, aiohttp.ServerDisconnectedError): + await asyncio.sleep(0.1) + + async def _sync_sockets(self, reader, writer): + """Start two tasks to copy bytes from tcp=>websocket and websocket=>tcp.""" + try: + self.tasks = [ + asyncio.create_task(self._connect_websocket()), + asyncio.create_task(self._tcp_to_ws(reader)), + asyncio.create_task(self._ws_to_tcp(writer)), + ] + await asyncio.gather(*self.tasks) + except ConnectionClosedError as e: + self.running = False + for task in self.tasks: + task.cancel() + raise e + finally: + writer.close() + + async def _tcp_to_ws(self, reader): + while True: + if self.websocket and not self.websocket.closed: + data = await reader.read(1024 * 1024) + if not data: + raise ConnectionClosedError("TCP socket closed") + else: + # Send data to channel 0 of the websocket. + # TODO Support multiple channels for multiple ports. + while not self.websocket or self.websocket.closed: + await asyncio.sleep(0.1) + await self.websocket.send_bytes(b"\x00" + data) + else: + await asyncio.sleep(0.1) + + async def _ws_to_tcp(self, writer): + channels = [] + while True: + if ( + self.websocket + and not self.websocket.closed + and self.websocket._waiting is None + ): + message = await self.websocket.receive() + if message.type == aiohttp.WSMsgType.CLOSED: + await asyncio.sleep(0.1) + continue + elif message.type == aiohttp.WSMsgType.BINARY: + # Kubernetes portforward protocol prefixes all frames with a byte to represent + # the channel. Channel 0 is rw for data and channel 1 is ro for errors. + if message.data[0] not in channels: + # Keep track of our channels. Could be useful later for listening to multiple ports. + channels.append(message.data[0]) + else: + if message.data[0] % 2 == 1: # pragma: no cover + # Odd channels are for errors. + raise ConnectionClosedError(message.data[1:].decode()) + writer.write(message.data[1:]) + await writer.drain() + else: + await asyncio.sleep(0.1) diff --git a/kr8s/asyncio/__init__.py b/kr8s/asyncio/__init__.py new file mode 100644 index 00000000..dfea7dc3 --- /dev/null +++ b/kr8s/asyncio/__init__.py @@ -0,0 +1,28 @@ +def api(url=None, kubeconfig=None, serviceaccount=None, namespace=None, _asyncio=True): + """Create a :class:`kr8s.Api` object for interacting with the Kubernetes API. + + If a kr8s object already exists with the same arguments, it will be returned. + """ + + from kr8s import Api as _SyncApi + from kr8s._api import Api as _AsyncApi + + if _asyncio: + _cls = _AsyncApi + else: + _cls = _SyncApi + + def _f(**kwargs): + key = frozenset(kwargs.items()) + if key in _cls._instances: + return _cls._instances[key] + if all(k is None for k in kwargs.values()) and list(_cls._instances.values()): + return list(_cls._instances.values())[0] + return _cls(**kwargs, bypass_factory=True) + + return _f( + url=url, + kubeconfig=kubeconfig, + serviceaccount=serviceaccount, + namespace=namespace, + ) diff --git a/kr8s/asyncio/objects.py b/kr8s/asyncio/objects.py new file mode 100644 index 00000000..31033355 --- /dev/null +++ b/kr8s/asyncio/objects.py @@ -0,0 +1,37 @@ +from kr8s._objects import ( # noqa + APIObject, + Binding, + ClusterRole, + ClusterRoleBinding, + ComponentStatus, + ConfigMap, + ControllerRevision, + CronJob, + CustomResourceDefinition, + DaemonSet, + Deployment, + Endpoints, + Event, + HorizontalPodAutoscaler, + Ingress, + IngressClass, + Job, + LimitRange, + Namespace, + NetworkPolicy, + Node, + PersistentVolume, + PersistentVolumeClaim, + Pod, + PodDisruptionBudget, + PodTemplate, + ReplicaSet, + ReplicationController, + ResourceQuota, + Role, + RoleBinding, + Secret, + Service, + ServiceAccount, + StatefulSet, +) diff --git a/kr8s/asyncio/portforward.py b/kr8s/asyncio/portforward.py new file mode 100644 index 00000000..ba438eaa --- /dev/null +++ b/kr8s/asyncio/portforward.py @@ -0,0 +1 @@ +from kr8s._portforward import PortForward # noqa diff --git a/kr8s/conftest.py b/kr8s/conftest.py index c6f450af..25ebbe78 100644 --- a/kr8s/conftest.py +++ b/kr8s/conftest.py @@ -15,6 +15,7 @@ import yaml from pytest_kind.cluster import KindCluster +from kr8s._api import Api from kr8s._testutils import set_env HERE = Path(__file__).parent.resolve() @@ -180,3 +181,9 @@ def serviceaccount(k8s_cluster): k8s_cluster.kubectl( "delete", "-f", str(HERE / "tests" / "resources" / "serviceaccount.yaml") ) + + +@pytest.fixture(autouse=True) +def ensure_new_api_between_tests(): + yield + Api._instances.clear() diff --git a/kr8s/objects.py b/kr8s/objects.py index 70de2405..227e23bf 100644 --- a/kr8s/objects.py +++ b/kr8s/objects.py @@ -1,823 +1,319 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, Yuvi Panda, Anaconda Inc, NVIDIA -# SPDX-License-Identifier: BSD 3-Clause License -import asyncio -import json -from typing import Any, List, Optional - -import aiohttp -from aiohttp import ClientResponse - -import kr8s -from kr8s._api import Api -from kr8s._data_utils import list_dict_unpack -from kr8s._exceptions import NotFoundError -from kr8s.portforward import PortForward - - -class APIObject: - """Base class for Kubernetes objects.""" - - namespaced = False - scalable = False - scalable_spec = "replicas" - - def __init__(self, resource: dict, api: Api = None) -> None: - """Initialize an APIObject.""" - # TODO support passing pykube or kubernetes objects in addition to dicts - self._raw = resource - self.api = api or kr8s.api() - - def __repr__(self): - """Return a string representation of the Kubernetes resource.""" - return f"<{self.kind} {self.name}>" - - def __str__(self): - """Return a string representation of the Kubernetes resource.""" - return self.name - - @property - def raw(self) -> str: - """Raw object returned from the Kubernetes API.""" - return self._raw - - @raw.setter - def raw(self, value): - self._raw = value - - @property - def name(self) -> str: - """Name of the Kubernetes resource.""" - return self.raw["metadata"]["name"] - - @property - def namespace(self) -> str: - """Namespace of the Kubernetes resource.""" - if self.namespaced: - return self.raw.get("metadata", {}).get( - "namespace", self.api.auth.namespace - ) - return None - - @property - def metadata(self) -> dict: - """Metadata of the Kubernetes resource.""" - return self.raw["metadata"] - - @property - def spec(self) -> dict: - """Spec of the Kubernetes resource.""" - return self.raw["spec"] - - @property - def status(self) -> dict: - """Status of the Kubernetes resource.""" - return self.raw["status"] - - @property - def labels(self) -> dict: - """Labels of the Kubernetes resource.""" - try: - return self.raw["metadata"]["labels"] - except KeyError: - return {} - - @property - def annotations(self) -> dict: - """Annotations of the Kubernetes resource.""" - try: - return self.raw["metadata"]["annotations"] - except KeyError: - return {} - - @property - def replicas(self) -> int: - """Replicas of the Kubernetes resource.""" - if self.scalable: - return self.raw["spec"][self.scalable_spec] - raise NotImplementedError(f"{self.kind} is not scalable") - - @classmethod - async def get( - cls, name: str, namespace: str = None, api: Api = None, **kwargs - ) -> "APIObject": - """Get a Kubernetes resource by name.""" - - api = api or kr8s.api() - try: - resources = await api.get(cls.endpoint, name, namespace=namespace, **kwargs) - [resource] = resources - except ValueError: - raise ValueError( - f"Expected exactly one {cls.kind} object. Use selectors to narrow down the search." - ) - - return resource - - async def exists(self, ensure=False) -> bool: - """Check if this object exists in Kubernetes.""" - async with self.api.call_api( - "GET", - version=self.version, - url=f"{self.endpoint}/{self.name}", - namespace=self.namespace, - raise_for_status=False, - ) as resp: - status = resp.status - if status == 200: - return True - if ensure: - raise NotFoundError(f"Object {self.name} does not exist") - return False - - async def create(self) -> None: - """Create this object in Kubernetes.""" - async with self.api.call_api( - "POST", - version=self.version, - url=self.endpoint, - namespace=self.namespace, - data=json.dumps(self.raw), - ) as resp: - self.raw = await resp.json() - - async def delete(self, propagation_policy: str = None) -> None: - """Delete this object from Kubernetes.""" - data = {} - if propagation_policy: - data["propagationPolicy"] = propagation_policy - try: - async with self.api.call_api( - "DELETE", - version=self.version, - url=f"{self.endpoint}/{self.name}", - namespace=self.namespace, - data=json.dumps(data), - ) as resp: - self.raw = await resp.json() - except aiohttp.ClientResponseError as e: - raise NotFoundError(f"Object {self.name} does not exist") from e - - async def refresh(self) -> None: - """Refresh this object from Kubernetes.""" - async with self.api.call_api( - "GET", - version=self.version, - url=f"{self.endpoint}/{self.name}", - namespace=self.namespace, - ) as resp: - self.raw = await resp.json() - - async def patch(self, patch, *, subresource=None) -> None: - """Patch this object in Kubernetes.""" - url = f"{self.endpoint}/{self.name}" - if subresource: - url = f"{url}/{subresource}" - async with self.api.call_api( - "PATCH", - version=self.version, - url=url, - namespace=self.namespace, - data=json.dumps(patch), - headers={"Content-Type": "application/merge-patch+json"}, - ) as resp: - self.raw = await resp.json() - - async def scale(self, replicas=None): - """Scale this object in Kubernetes.""" - if not self.scalable: - raise NotImplementedError(f"{self.kind} is not scalable") - await self.exists(ensure=True) - # TODO support dot notation in self.scalable_spec to support nested fields - await self.patch({"spec": {self.scalable_spec: replicas}}) - while self.replicas != replicas: - await self.refresh() - await asyncio.sleep(0.1) - - async def watch(self): - """Watch this object in Kubernetes.""" - since = self.metadata.get("resourceVersion") - async for event, obj in self.api.watch( - self.endpoint, - namespace=self.namespace, - field_selector=f"metadata.name={self.name}", - since=since, - ): - self.raw = obj.raw - yield event, self - - -## v1 objects - - -class Binding(APIObject): - """A Kubernetes Binding.""" - - version = "v1" - endpoint = "bindings" - kind = "Binding" - plural = "bindings" - singular = "binding" - namespaced = True - - -class ComponentStatus(APIObject): - """A Kubernetes ComponentStatus.""" - - version = "v1" - endpoint = "componentstatuses" - kind = "ComponentStatus" - plural = "componentstatuses" - singular = "componentstatus" - namespaced = False - - -class ConfigMap(APIObject): - """A Kubernetes ConfigMap.""" - - version = "v1" - endpoint = "configmaps" - kind = "ConfigMap" - plural = "configmaps" - singular = "configmap" - namespaced = True - - -class Endpoints(APIObject): - """A Kubernetes Endpoints.""" - - version = "v1" - endpoint = "endpoints" - kind = "Endpoints" - plural = "endpoints" - singular = "endpoint" - namespaced = True - - -class Event(APIObject): - """A Kubernetes Event.""" - - version = "v1" - endpoint = "events" - kind = "Event" - plural = "events" - singular = "event" - namespaced = True - - -class LimitRange(APIObject): - """A Kubernetes LimitRange.""" - - version = "v1" - endpoint = "limitranges" - kind = "LimitRange" - plural = "limitranges" - singular = "limitrange" - namespaced = True - - -class Namespace(APIObject): - """A Kubernetes Namespace.""" - - version = "v1" - endpoint = "namespaces" - kind = "Namespace" - plural = "namespaces" - singular = "namespace" - namespaced = False - - -class Node(APIObject): - """A Kubernetes Node.""" - - version = "v1" - endpoint = "nodes" - kind = "Node" - plural = "nodes" - singular = "node" - namespaced = False - - @property - def unschedulable(self): - if "unschedulable" in self.raw["spec"]: - return self.raw["spec"]["unschedulable"] - return False - - async def cordon(self): - await self.patch({"spec": {"unschedulable": True}}) - - async def uncordon(self): - await self.patch({"spec": {"unschedulable": False}}) - - -class PersistentVolumeClaim(APIObject): - """A Kubernetes PersistentVolumeClaim.""" - - version = "v1" - endpoint = "persistentvolumeclaims" - kind = "PersistentVolumeClaim" - plural = "persistentvolumeclaims" - singular = "persistentvolumeclaim" - namespaced = True - - -class PersistentVolume(APIObject): - """A Kubernetes PersistentVolume.""" - - version = "v1" - endpoint = "persistentvolumes" - kind = "PersistentVolume" - plural = "persistentvolumes" - singular = "persistentvolume" - namespaced = False - - -class Pod(APIObject): - """A Kubernetes Pod.""" - - version = "v1" - endpoint = "pods" - kind = "Pod" - plural = "pods" - singular = "pod" - namespaced = True - - async def ready(self): - """Check if the pod is ready.""" - await self.refresh() - conditions = list_dict_unpack( - self.status.get("conditions", []), - key="type", - value="status", - ) - return ( - "Ready" in conditions - and "ContainersReady" in conditions - and conditions.get("Ready", "False") == "True" - and conditions.get("ContainersReady", "False") == "True" - ) - - async def logs( - self, - container=None, - pretty=None, - previous=False, - since_seconds=None, - since_time=None, - timestamps=False, - tail_lines=None, - limit_bytes=None, - ): - params = {} - if container is not None: - params["container"] = container - if pretty is not None: - params["pretty"] = pretty - if previous: - params["previous"] = "true" - if since_seconds is not None and since_time is None: - params["sinceSeconds"] = int(since_seconds) - elif since_time is not None and since_seconds is None: - params["sinceTime"] = since_time - if timestamps: - params["timestamps"] = "true" - if tail_lines is not None: - params["tailLines"] = int(tail_lines) - if limit_bytes is not None: - params["limitBytes"] = int(limit_bytes) - - async with self.api.call_api( - "GET", - version=self.version, - url=f"{self.endpoint}/{self.name}/log", - namespace=self.namespace, - params=params, - ) as resp: - return await resp.text() - - def portforward(self, remote_port: int, local_port: int = None) -> int: - """Port forward a pod. - - Returns an instance of :class:`kr8s.portforward.PortForward` for this Pod. - - Example: - This can be used as a an async context manager or with explicit start/stop methods. - - Context manager: - - >>> async with pod.portforward(8888) as port: - ... print(f"Forwarding to port {port}") - ... # Do something with port 8888 - - - Explict start/stop: - - >>> pf = pod.portforward(8888) - >>> await pf.start() - >>> print(f"Forwarding to port {pf.local_port}") - >>> # Do something with port 8888 - >>> await pf.stop() - """ - return PortForward(self, remote_port, local_port) - - -class PodTemplate(APIObject): - """A Kubernetes PodTemplate.""" - - version = "v1" - endpoint = "podtemplates" - kind = "PodTemplate" - plural = "podtemplates" - singular = "podtemplate" - namespaced = True - - -class ReplicationController(APIObject): - """A Kubernetes ReplicationController.""" - - version = "v1" - endpoint = "replicationcontrollers" - kind = "ReplicationController" - plural = "replicationcontrollers" - singular = "replicationcontroller" - namespaced = True - scalable = True - - async def ready(self): - """Check if the deployment is ready.""" - await self.refresh() - return ( - self.raw["status"].get("observedGeneration", 0) - >= self.raw["metadata"]["generation"] - and self.raw["status"].get("readyReplicas", 0) == self.replicas - ) - - -class ResourceQuota(APIObject): - """A Kubernetes ResourceQuota.""" - - version = "v1" - endpoint = "resourcequotas" - kind = "ResourceQuota" - plural = "resourcequotas" - singular = "resourcequota" - namespaced = True - - -class Secret(APIObject): - """A Kubernetes Secret.""" - - version = "v1" - endpoint = "secrets" - kind = "Secret" - plural = "secrets" - singular = "secret" - namespaced = True - - -class ServiceAccount(APIObject): - """A Kubernetes ServiceAccount.""" - - version = "v1" - endpoint = "serviceaccounts" - kind = "ServiceAccount" - plural = "serviceaccounts" - singular = "serviceaccount" - namespaced = True +from ._asyncio import sync +from ._objects import ( + APIObject as _APIObject, +) +from ._objects import ( + Binding as _Binding, +) +from ._objects import ( + ClusterRole as _ClusterRole, +) +from ._objects import ( + ClusterRoleBinding as _ClusterRoleBinding, +) +from ._objects import ( + ComponentStatus as _ComponentStatus, +) +from ._objects import ( + ConfigMap as _ConfigMap, +) +from ._objects import ( + ControllerRevision as _ControllerRevision, +) +from ._objects import ( + CronJob as _CronJob, +) +from ._objects import ( + CustomResourceDefinition as _CustomResourceDefinition, +) +from ._objects import ( + DaemonSet as _DaemonSet, +) +from ._objects import ( + Deployment as _Deployment, +) +from ._objects import ( + Endpoints as _Endpoints, +) +from ._objects import ( + Event as _Event, +) +from ._objects import ( + HorizontalPodAutoscaler as _HorizontalPodAutoscaler, +) +from ._objects import ( + Ingress as _Ingress, +) +from ._objects import ( + IngressClass as _IngressClass, +) +from ._objects import ( + Job as _Job, +) +from ._objects import ( + LimitRange as _LimitRange, +) +from ._objects import ( + Namespace as _Namespace, +) +from ._objects import ( + NetworkPolicy as _NetworkPolicy, +) +from ._objects import ( + Node as _Node, +) +from ._objects import ( + PersistentVolume as _PersistentVolume, +) +from ._objects import ( + PersistentVolumeClaim as _PersistentVolumeClaim, +) +from ._objects import ( + Pod as _Pod, +) +from ._objects import ( + PodDisruptionBudget as _PodDisruptionBudget, +) +from ._objects import ( + PodTemplate as _PodTemplate, +) +from ._objects import ( + ReplicaSet as _ReplicaSet, +) +from ._objects import ( + ReplicationController as _ReplicationController, +) +from ._objects import ( + ResourceQuota as _ResourceQuota, +) +from ._objects import ( + Role as _Role, +) +from ._objects import ( + RoleBinding as _RoleBinding, +) +from ._objects import ( + Secret as _Secret, +) +from ._objects import ( + Service as _Service, +) +from ._objects import ( + ServiceAccount as _ServiceAccount, +) +from ._objects import ( + StatefulSet as _StatefulSet, +) +from ._objects import ( + object_from_spec, # noqa +) + + +@sync +class APIObject(_APIObject): + __doc__ = _APIObject.__doc__ + _asyncio = False + + +@sync +class Binding(_Binding): + __doc__ = _Binding.__doc__ + _asyncio = False + + +@sync +class ComponentStatus(_ComponentStatus): + __doc__ = _ComponentStatus.__doc__ + _asyncio = False + + +@sync +class ConfigMap(_ConfigMap): + __doc__ = _ConfigMap.__doc__ + _asyncio = False + + +@sync +class Endpoints(_Endpoints): + __doc__ = _Endpoints.__doc__ + _asyncio = False + + +@sync +class Event(_Event): + __doc__ = _Event.__doc__ + _asyncio = False + + +@sync +class LimitRange(_LimitRange): + __doc__ = _LimitRange.__doc__ + _asyncio = False + + +@sync +class Namespace(_Namespace): + __doc__ = _Namespace.__doc__ + _asyncio = False -class Service(APIObject): - """A Kubernetes Service.""" - - version = "v1" - endpoint = "services" - kind = "Service" - plural = "services" - singular = "service" - namespaced = True +@sync +class Node(_Node): + __doc__ = _Node.__doc__ + _asyncio = False - async def proxy_http_request( - self, method: str, path: str, port: Optional[int] = None, **kwargs: Any - ) -> ClientResponse: - """Issue a HTTP request with specific HTTP method to proxy of a Service. - Args: - method: HTTP method to use. - path: Path to proxy. - port: Port to proxy to. If not specified, the first port in the - Service's spec will be used. - **kwargs: Additional keyword arguments to pass to the API call. - """ - if port is None: - port = self.raw["spec"]["ports"][0]["port"] - async with self.api.call_api( - method, - version=self.version, - url=f"{self.endpoint}/{self.name}:{port}/proxy/{path}", - namespace=self.namespace, - **kwargs, - ) as response: - return response - - async def proxy_http_get( - self, path: str, port: Optional[int] = None, **kwargs - ) -> None: - return await self.proxy_http_request("GET", path, port, **kwargs) - - async def proxy_http_post( - self, path: str, port: Optional[int] = None, **kwargs - ) -> None: - return await self.proxy_http_request("POST", path, port, **kwargs) - - async def proxy_http_put( - self, path: str, port: Optional[int] = None, **kwargs - ) -> None: - return await self.proxy_http_request("PUT", path, port, **kwargs) - - async def proxy_http_delete( - self, path: str, port: Optional[int] = None, **kwargs - ) -> None: - return await self.proxy_http_request("DELETE", path, port, **kwargs) - - async def ready_pods(self) -> List[Pod]: - """Return a list of ready pods for this service.""" - pod_selector = ",".join([f"{k}={v}" for k, v in self.labels.items()]) - pods = await self.api.get("pods", label_selector=pod_selector) - return [pod for pod in pods if await pod.ready()] - - async def ready(self) -> bool: - """Check if the service is ready.""" - pods = await self.ready_pods() - return len(pods) > 0 +@sync +class PersistentVolume(_PersistentVolume): + __doc__ = _PersistentVolume.__doc__ + _asyncio = False - def portforward(self, remote_port: int, local_port: int = None) -> int: - """Port forward a service. - Returns an instance of :class:`kr8s.portforward.PortForward` for this Service. +@sync +class PersistentVolumeClaim(_PersistentVolumeClaim): + __doc__ = _PersistentVolumeClaim.__doc__ + _asyncio = False - Example: - This can be used as a an async context manager or with explicit start/stop methods. - - Context manager: - >>> async with service.portforward(8888) as port: - ... print(f"Forwarding to port {port}") - ... # Do something with port 8888 +@sync +class Pod(_Pod): + __doc__ = _Pod.__doc__ + _asyncio = False - Explict start/stop: +@sync +class PodTemplate(_PodTemplate): + __doc__ = _PodTemplate.__doc__ + _asyncio = False - >>> pf = service.portforward(8888) - >>> await pf.start() - >>> print(f"Forwarding to port {pf.local_port}") - >>> # Do something with port 8888 - >>> await pf.stop() - """ - return PortForward(self, remote_port, local_port) +@sync +class ReplicationController(_ReplicationController): + __doc__ = _ReplicationController.__doc__ + _asyncio = False -## apps/v1 objects +@sync +class ResourceQuota(_ResourceQuota): + __doc__ = _ResourceQuota.__doc__ + _asyncio = False -class ControllerRevision(APIObject): - """A Kubernetes ControllerRevision.""" +@sync +class Secret(_Secret): + __doc__ = _Secret.__doc__ + _asyncio = False - version = "apps/v1" - endpoint = "controllerrevisions" - kind = "ControllerRevision" - plural = "controllerrevisions" - singular = "controllerrevision" - namespaced = True +@sync +class Service(_Service): + __doc__ = _Service.__doc__ + _asyncio = False -class DaemonSet(APIObject): - """A Kubernetes DaemonSet.""" - version = "apps/v1" - endpoint = "daemonsets" - kind = "DaemonSet" - plural = "daemonsets" - singular = "daemonset" - namespaced = True +@sync +class ServiceAccount(_ServiceAccount): + __doc__ = _ServiceAccount.__doc__ + _asyncio = False -class Deployment(APIObject): - """A Kubernetes Deployment.""" +@sync +class ControllerRevision(_ControllerRevision): + __doc__ = _ControllerRevision.__doc__ + _asyncio = False - version = "apps/v1" - endpoint = "deployments" - kind = "Deployment" - plural = "deployments" - singular = "deployment" - namespaced = True - scalable = True - async def ready(self): - """Check if the deployment is ready.""" - await self.refresh() - return ( - self.raw["status"].get("observedGeneration", 0) - >= self.raw["metadata"]["generation"] - and self.raw["status"].get("readyReplicas", 0) == self.replicas - ) +@sync +class DaemonSet(_DaemonSet): + __doc__ = _DaemonSet.__doc__ + _asyncio = False -class ReplicaSet(APIObject): - """A Kubernetes ReplicaSet.""" +@sync +class Deployment(_Deployment): + __doc__ = _Deployment.__doc__ + _asyncio = False - version = "apps/v1" - endpoint = "replicasets" - kind = "ReplicaSet" - plural = "replicasets" - singular = "replicaset" - namespaced = True - scalable = True +@sync +class ReplicaSet(_ReplicaSet): + __doc__ = _ReplicaSet.__doc__ + _asyncio = False -class StatefulSet(APIObject): - """A Kubernetes StatefulSet.""" - version = "apps/v1" - endpoint = "statefulsets" - kind = "StatefulSet" - plural = "statefulsets" - singular = "statefulset" - namespaced = True - scalable = True +@sync +class StatefulSet(_StatefulSet): + __doc__ = _StatefulSet.__doc__ + _asyncio = False -## autoscaling/v1 objects +@sync +class HorizontalPodAutoscaler(_HorizontalPodAutoscaler): + __doc__ = _HorizontalPodAutoscaler.__doc__ + _asyncio = False -class HorizontalPodAutoscaler(APIObject): - """A Kubernetes HorizontalPodAutoscaler.""" +@sync +class CronJob(_CronJob): + __doc__ = _CronJob.__doc__ + _asyncio = False - version = "autoscaling/v2" - endpoint = "horizontalpodautoscalers" - kind = "HorizontalPodAutoscaler" - plural = "horizontalpodautoscalers" - singular = "horizontalpodautoscaler" - namespaced = True +@sync +class Job(_Job): + __doc__ = _Job.__doc__ + _asyncio = False -## batch/v1 objects +@sync +class Ingress(_Ingress): + __doc__ = _Ingress.__doc__ + _asyncio = False -class CronJob(APIObject): - """A Kubernetes CronJob.""" - version = "batch/v1" - endpoint = "cronjobs" - kind = "CronJob" - plural = "cronjobs" - singular = "cronjob" - namespaced = True +@sync +class IngressClass(_IngressClass): + __doc__ = _IngressClass.__doc__ + _asyncio = False -class Job(APIObject): - """A Kubernetes Job.""" +@sync +class NetworkPolicy(_NetworkPolicy): + __doc__ = _NetworkPolicy.__doc__ + _asyncio = False - version = "batch/v1" - endpoint = "jobs" - kind = "Job" - plural = "jobs" - singular = "job" - namespaced = True - scalable = True - scalable_spec = "parallelism" +@sync +class PodDisruptionBudget(_PodDisruptionBudget): + __doc__ = _PodDisruptionBudget.__doc__ + _asyncio = False -## networking.k8s.io/v1 objects +@sync +class ClusterRoleBinding(_ClusterRoleBinding): + __doc__ = _ClusterRoleBinding.__doc__ + _asyncio = False -class IngressClass(APIObject): - """A Kubernetes IngressClass.""" - version = "networking.k8s.io/v1" - endpoint = "ingressclasses" - kind = "IngressClass" - plural = "ingressclasses" - singular = "ingressclass" - namespaced = False +@sync +class ClusterRole(_ClusterRole): + __doc__ = _ClusterRole.__doc__ + _asyncio = False -class Ingress(APIObject): - """A Kubernetes Ingress.""" +@sync +class RoleBinding(_RoleBinding): + __doc__ = _RoleBinding.__doc__ + _asyncio = False - version = "networking.k8s.io/v1" - endpoint = "ingresses" - kind = "Ingress" - plural = "ingresses" - singular = "ingress" - namespaced = True +@sync +class Role(_Role): + __doc__ = _Role.__doc__ + _asyncio = False -class NetworkPolicy(APIObject): - """A Kubernetes NetworkPolicy.""" - version = "networking.k8s.io/v1" - endpoint = "networkpolicies" - kind = "NetworkPolicy" - plural = "networkpolicies" - singular = "networkpolicy" - namespaced = True - - -## policy/v1 objects - - -class PodDisruptionBudget(APIObject): - """A Kubernetes PodDisruptionBudget.""" - - version = "policy/v1" - endpoint = "poddisruptionbudgets" - kind = "PodDisruptionBudget" - plural = "poddisruptionbudgets" - singular = "poddisruptionbudget" - namespaced = True - - -## rbac.authorization.k8s.io/v1 objects - - -class ClusterRoleBinding(APIObject): - """A Kubernetes ClusterRoleBinding.""" - - version = "rbac.authorization.k8s.io/v1" - endpoint = "clusterrolebindings" - kind = "ClusterRoleBinding" - plural = "clusterrolebindings" - singular = "clusterrolebinding" - namespaced = False - - -class ClusterRole(APIObject): - """A Kubernetes ClusterRole.""" - - version = "rbac.authorization.k8s.io/v1" - endpoint = "clusterroles" - kind = "ClusterRole" - plural = "clusterroles" - singular = "clusterrole" - namespaced = False - - -class RoleBinding(APIObject): - """A Kubernetes RoleBinding.""" - - version = "rbac.authorization.k8s.io/v1" - endpoint = "rolebindings" - kind = "RoleBinding" - plural = "rolebindings" - singular = "rolebinding" - namespaced = True - - -class Role(APIObject): - """A Kubernetes Role.""" - - version = "rbac.authorization.k8s.io/v1" - endpoint = "roles" - kind = "Role" - plural = "roles" - singular = "role" - namespaced = True - - -## apiextensions.k8s.io/v1 objects - - -class CustomResourceDefinition(APIObject): - """A Kubernetes CustomResourceDefinition.""" - - version = "apiextensions.k8s.io/v1" - endpoint = "customresourcedefinitions" - kind = "CustomResourceDefinition" - plural = "customresourcedefinitions" - singular = "customresourcedefinition" - namespaced = False - - -def get_class(kind, version=None): - for cls in APIObject.__subclasses__(): - if (cls.kind == kind or cls.singular == kind or cls.plural == kind) and ( - version is None or cls.version == version - ): - return cls - raise KeyError(f"No object registered for {version}/{kind}") - - -def object_from_spec(spec: dict, api: Api = None) -> APIObject: - """Create an APIObject from a Kubernetes resource spec. - - Args: - spec: A Kubernetes resource spec. - - Returns: - A corresponding APIObject subclass instance. - - Raises: - ValueError: If the resource kind or API version is not supported. - """ - cls = get_class(spec["kind"], spec["apiVersion"]) - return cls(spec, api=api) +@sync +class CustomResourceDefinition(_CustomResourceDefinition): + __doc__ = _CustomResourceDefinition.__doc__ + _asyncio = False diff --git a/kr8s/portforward.py b/kr8s/portforward.py index d1b867f9..52359786 100644 --- a/kr8s/portforward.py +++ b/kr8s/portforward.py @@ -1,200 +1,7 @@ -# SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, NVIDIA -# SPDX-License-Identifier: BSD 3-Clause License -import asyncio -import random -import socket -from contextlib import asynccontextmanager +from ._asyncio import sync +from ._portforward import PortForward as _PortForward -import aiohttp -from ._exceptions import ConnectionClosedError - - -class PortForward: - """Start a tcp server and forward all connections to a Pod port. - - You can either pass a :class:`kr8s.objects.Pod` or any resource with a ``ready_pods`` method - such as a :class:`kr8s.objects.Service`. - - .. note:: - The ``ready_pods`` method should return a list of Pods that are ready to accept connections. - - Args: - ``resource`` (Pod or Resource): The Pod or Resource to forward to. - - ``remote_port`` (int): The port on the Pod to forward to. - - ``local_port`` (int, optional): The local port to listen on. Defaults to 0, which will choose a random port. - - Example: - This class can be used as a an async context manager or with explicit start/stop methods. - - Context manager: - - >>> async with PortForward(pod, 8888) as port: - ... print(f"Forwarding to port {port}") - ... # Do something with port 8888 on the Pod - - - Explict start/stop: - - >>> pf = PortForward(pod, 8888) - >>> await pf.start() - >>> print(f"Forwarding to port {pf.local_port}") - >>> # Do something with port 8888 on the Pod - >>> await pf.stop() - - - """ - - def __init__(self, resource, remote_port, local_port=None) -> None: - self.running = True - self.server = None - self.websocket = None - self.remote_port = remote_port - self.local_port = local_port if local_port is not None else 0 - self._resource = resource - from .objects import Pod - - self.pod = None - if isinstance(resource, Pod): - self.pod = resource - else: - if not hasattr(resource, "ready_pods"): - raise ValueError( - "resource must be a Pod or a resource with a ready_pods method" - ) - self.connection_attempts = 0 - self._loop = asyncio.get_event_loop() - self._tasks = [] - self._run_task = None - self._bg_future = None - self._bg_task = None - - async def __aenter__(self, *args, **kwargs): - self._run_task = self._run() - return await self._run_task.__aenter__(*args, **kwargs) - - async def __aexit__(self, *args, **kwargs): - return await self._run_task.__aexit__(*args, **kwargs) - - async def start(self): - """Start a background task with the port forward running.""" - if self._bg_task is not None: - return - - async def f(): - self._bg_future = self._loop.create_future() - async with self as port: - self.local_port = port - await self._bg_future - - self._bg_task = asyncio.create_task(f()) - while self.local_port == 0: - await asyncio.sleep(0.1) - return self.local_port - - async def stop(self): - """Stop the background task.""" - self._bg_future.set_result(None) - self._bg_task = None - - @asynccontextmanager - async def _run(self): - """Start the port forward and yield the local port.""" - if not self.pod: - try: - self.pod = random.choice(await self._resource.ready_pods()) - except IndexError: - raise RuntimeError("No ready pods found") - self.server = await asyncio.start_server( - self._sync_sockets, port=self.local_port, host="0.0.0.0" - ) - async with self.server: - await self.server.start_serving() - for sock in self.server.sockets: - if sock.family == socket.AF_INET: - yield sock.getsockname()[1] - self.server.close() - await self.server.wait_closed() - - async def _connect_websocket(self): - while self.running: - self.connection_attempts += 1 - try: - async with self.pod.api.call_api( - version=self.pod.version, - url=f"{self.pod.endpoint}/{self.pod.name}/portforward", - namespace=self.pod.namespace, - websocket=True, - params={ - "name": self.pod.name, - "namespace": self.pod.namespace, - "ports": f"{self.remote_port}", - "_preload_content": "false", - }, - ) as websocket: - self.websocket = websocket - while not self.websocket.closed: - await asyncio.sleep(0.1) - except (aiohttp.WSServerHandshakeError, aiohttp.ServerDisconnectedError): - await asyncio.sleep(0.1) - - async def _sync_sockets(self, reader, writer): - """Start two tasks to copy bytes from tcp=>websocket and websocket=>tcp.""" - try: - self.tasks = [ - asyncio.create_task(self._connect_websocket()), - asyncio.create_task(self._tcp_to_ws(reader)), - asyncio.create_task(self._ws_to_tcp(writer)), - ] - await asyncio.gather(*self.tasks) - except ConnectionClosedError as e: - self.running = False - for task in self.tasks: - task.cancel() - raise e - finally: - writer.close() - - async def _tcp_to_ws(self, reader): - while True: - if self.websocket and not self.websocket.closed: - data = await reader.read(1024 * 1024) - if not data: - raise ConnectionClosedError("TCP socket closed") - else: - # Send data to channel 0 of the websocket. - # TODO Support multiple channels for multiple ports. - while not self.websocket or self.websocket.closed: - await asyncio.sleep(0.1) - await self.websocket.send_bytes(b"\x00" + data) - else: - await asyncio.sleep(0.1) - - async def _ws_to_tcp(self, writer): - channels = [] - while True: - if ( - self.websocket - and not self.websocket.closed - and self.websocket._waiting is None - ): - message = await self.websocket.receive() - if message.type == aiohttp.WSMsgType.CLOSED: - await asyncio.sleep(0.1) - continue - elif message.type == aiohttp.WSMsgType.BINARY: - # Kubernetes portforward protocol prefixes all frames with a byte to represent - # the channel. Channel 0 is rw for data and channel 1 is ro for errors. - if message.data[0] not in channels: - # Keep track of our channels. Could be useful later for listening to multiple ports. - channels.append(message.data[0]) - else: - if message.data[0] % 2 == 1: # pragma: no cover - # Odd channels are for errors. - raise ConnectionClosedError(message.data[1:].decode()) - writer.write(message.data[1:]) - await writer.drain() - else: - await asyncio.sleep(0.1) +@sync +class PortForward(_PortForward): + __doc__ = _PortForward.__doc__ diff --git a/kr8s/tests/test_api.py b/kr8s/tests/test_api.py index 0bd633c6..c32dab90 100644 --- a/kr8s/tests/test_api.py +++ b/kr8s/tests/test_api.py @@ -5,7 +5,8 @@ import pytest import kr8s -from kr8s.objects import Pod +import kr8s.asyncio +from kr8s.asyncio.objects import Pod async def test_factory_bypass(): @@ -50,14 +51,27 @@ async def test_api_factory_with_kubeconfig(k8s_cluster, serviceaccount): assert p3.api is not k2 -async def test_version(): +def test_version_sync(): kubernetes = kr8s.api() + version = kubernetes.version() + assert "major" in version + + +@pytest.mark.xfail(reason="Cannot run nested event loops", raises=RuntimeError) +async def test_version_sync_in_async(): + kubernetes = kr8s.api() + version = kubernetes.version() + assert "major" in version + + +async def test_version(): + kubernetes = kr8s.asyncio.api() version = await kubernetes.version() assert "major" in version async def test_bad_api_version(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() with pytest.raises(ValueError): async with kubernetes.call_api("GET", version="foo"): pass # pragma: no cover @@ -65,7 +79,7 @@ async def test_bad_api_version(): @pytest.mark.parametrize("namespace", [kr8s.ALL, "kube-system"]) async def test_get_pods(namespace): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() pods = await kubernetes.get("pods", namespace=namespace) assert isinstance(pods, list) assert len(pods) > 0 @@ -73,7 +87,7 @@ async def test_get_pods(namespace): async def test_watch_pods(example_pod_spec): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() pod = Pod(example_pod_spec) await pod.create() while not await pod.ready(): @@ -93,13 +107,13 @@ async def test_watch_pods(example_pod_spec): async def test_get_deployments(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() deployments = await kubernetes.get("deployments") assert isinstance(deployments, list) async def test_api_resources(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() resources = await kubernetes.api_resources() names = [r["name"] for r in resources] diff --git a/kr8s/tests/test_auth.py b/kr8s/tests/test_auth.py index c8d3658e..fd2052ec 100644 --- a/kr8s/tests/test_auth.py +++ b/kr8s/tests/test_auth.py @@ -39,28 +39,37 @@ async def kubeconfig_with_exec(k8s_cluster): async def test_kubeconfig(k8s_cluster): - kubernetes = kr8s.api(kubeconfig=k8s_cluster.kubeconfig_path) + kubernetes = kr8s.asyncio.api(kubeconfig=k8s_cluster.kubeconfig_path) version = await kubernetes.version() assert "major" in version async def test_reauthenticate(k8s_cluster): - kubernetes = kr8s.api(kubeconfig=k8s_cluster.kubeconfig_path) + kubernetes = kr8s.asyncio.api(kubeconfig=k8s_cluster.kubeconfig_path) kubernetes.auth.reauthenticate() version = await kubernetes.version() assert "major" in version +def test_reauthenticate_sync(k8s_cluster): + kubernetes = kr8s.api(kubeconfig=k8s_cluster.kubeconfig_path) + kubernetes.auth.reauthenticate() + version = kubernetes.version() + assert "major" in version + + async def test_bad_auth(serviceaccount): (Path(serviceaccount) / "token").write_text("abc123") - kubernetes = kr8s.api(serviceaccount=serviceaccount, kubeconfig="/no/file/here") + kubernetes = kr8s.asyncio.api( + serviceaccount=serviceaccount, kubeconfig="/no/file/here" + ) serviceaccount = Path(serviceaccount) with pytest.raises(aiohttp.ClientResponseError): await kubernetes.version() async def test_url(kubectl_proxy): - kubernetes = kr8s.api(url=kubectl_proxy) + kubernetes = kr8s.asyncio.api(url=kubectl_proxy) version = await kubernetes.version() assert "major" in version @@ -71,7 +80,9 @@ async def test_no_config(): async def test_service_account(serviceaccount): - kubernetes = kr8s.api(serviceaccount=serviceaccount, kubeconfig="/no/file/here") + kubernetes = kr8s.asyncio.api( + serviceaccount=serviceaccount, kubeconfig="/no/file/here" + ) await kubernetes.version() serviceaccount = Path(serviceaccount) @@ -83,6 +94,6 @@ async def test_service_account(serviceaccount): async def test_exec(kubeconfig_with_exec): - kubernetes = kr8s.api(kubeconfig=kubeconfig_with_exec) + kubernetes = kr8s.asyncio.api(kubeconfig=kubeconfig_with_exec) version = await kubernetes.version() assert "major" in version diff --git a/kr8s/tests/test_objects.py b/kr8s/tests/test_objects.py index 341bdec8..2cb7a12b 100644 --- a/kr8s/tests/test_objects.py +++ b/kr8s/tests/test_objects.py @@ -1,21 +1,22 @@ # SPDX-FileCopyrightText: Copyright (c) 2023, Dask Developers, Yuvi Panda, Anaconda Inc, NVIDIA # SPDX-License-Identifier: BSD 3-Clause License import asyncio +import time import aiohttp import pytest import kr8s -from kr8s.objects import ( +from kr8s._objects import get_class, object_from_spec +from kr8s.asyncio.objects import ( APIObject, Deployment, PersistentVolume, Pod, Service, - get_class, - object_from_spec, ) -from kr8s.portforward import PortForward +from kr8s.asyncio.portforward import PortForward +from kr8s.objects import Pod as SyncPod DEFAULT_TIMEOUT = aiohttp.ClientTimeout(30) @@ -80,8 +81,22 @@ async def test_pod_create_and_delete(example_pod_spec): assert not await pod.exists() +def test_pod_create_and_delete_sync(example_pod_spec): + pod = SyncPod(example_pod_spec) + pod.create() + with pytest.raises(NotImplementedError): + pod.replicas + assert pod.exists() + while not pod.ready(): + time.sleep(0.1) + pod.delete() + while pod.exists(): + time.sleep(0.1) + assert not pod.exists() + + async def test_list_and_ensure(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() pods = await kubernetes.get("pods", namespace=kr8s.ALL) assert len(pods) > 0 for pod in pods: @@ -146,7 +161,7 @@ async def test_selectors(example_pod_spec): pod = Pod(example_pod_spec) await pod.create() - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() pods = await kubernetes.get("pods", namespace=kr8s.ALL, label_selector="abc=123def") assert len(pods) == 1 @@ -173,6 +188,16 @@ async def test_pod_watch(example_pod_spec): await pod.delete() +def test_pod_watch_sync(example_pod_spec): + pod = SyncPod(example_pod_spec) + pod.create() + for event, obj in pod.watch(): + assert event in ("ADDED", "MODIFIED", "DELETED") + assert obj.name == pod.name + break + pod.delete() + + async def test_patch_pod(example_pod_spec): pod = Pod(example_pod_spec) await pod.create() @@ -183,7 +208,7 @@ async def test_patch_pod(example_pod_spec): async def test_all_v1_objects_represented(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() objects = await kubernetes.api_resources() supported_apis = ( "v1", @@ -243,7 +268,7 @@ async def test_deployment_scale(example_deployment_spec): async def test_node(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() nodes = await kubernetes.get("nodes") assert len(nodes) > 0 for node in nodes: @@ -254,7 +279,7 @@ async def test_node(): async def test_service_proxy(): - kubernetes = kr8s.api() + kubernetes = kr8s.asyncio.api() [service] = await kubernetes.get("services", "kubernetes") assert service.name == "kubernetes" data = await service.proxy_http_get("/version", raise_for_status=False) @@ -284,6 +309,17 @@ async def test_pod_port_forward_context_manager(nginx_service): await resp.read() +@pytest.mark.skip(reason="For manual testing only") +async def test_pod_port_forward_context_manager_manual(nginx_service): + [nginx_pod, *_] = await nginx_service.ready_pods() + pf = nginx_pod.portforward(80, 8184) + async with pf: + done = False + while not done: + # Put a breakpoint here and set done = True when you're finished. + await asyncio.sleep(1) + + async def test_pod_port_forward_start_stop(nginx_service): [nginx_pod, *_] = await nginx_service.ready_pods() pf = nginx_pod.portforward(80) diff --git a/pyproject.toml b/pyproject.toml index e807a60f..2d39160f 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -61,6 +61,7 @@ sphinx-autoapi = "^2.1.0" [tool.pytest.ini_options] addopts = "-v --keep-cluster --durations=10 --cov=kr8s --cov-report term-missing --cov-report xml:coverage.xml" timeout = 300 +xfail_strict = true reruns = 5 reruns_delay = 1 asyncio_mode = "auto"