Skip to content

Commit

Permalink
[RSDK-3640] Implement More APP API's (#379)
Browse files Browse the repository at this point in the history
  • Loading branch information
Bashar authored Aug 17, 2023
1 parent 13a8d88 commit 69487f4
Show file tree
Hide file tree
Showing 12 changed files with 1,100 additions and 173 deletions.
2 changes: 1 addition & 1 deletion docs/examples/example.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1117,7 +1117,7 @@
"source": [
"my_organizations = await app_client.list_organizations()\n",
"MY_ORG_ID = my_organizations[0].id\n",
"my_locations = await app_client.list_locations(organization_id=MY_ORG_ID)\n",
"my_locations = await app_client.list_locations()\n",
"robots = []\n",
"\n",
"for location in my_locations:\n",
Expand Down
43 changes: 43 additions & 0 deletions src/viam/app/_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import sys

if sys.version_info >= (3, 9):
from collections.abc import AsyncIterator
else:
from typing import AsyncIterator

from typing import Protocol, TypeVar

LogsType = TypeVar("LogsType", covariant=True)


class _LogsStream(Protocol[LogsType]):
async def next(self) -> LogsType:
...

async def close(self):
...

def __aiter__(self):
return self

async def __anext__(self) -> LogsType:
return await self.next()


class _LogsStreamWithIterator(_LogsStream[LogsType]):
_stream: AsyncIterator[LogsType]

def __init__(self, stream: AsyncIterator[LogsType]):
self._stream = stream

async def next(self) -> LogsType:
return await self._stream.__anext__()

def __aiter__(self):
return self._stream

async def __anext__(self) -> LogsType:
return await self._stream.__anext__()

async def close(self):
return await super().close()
Loading

0 comments on commit 69487f4

Please sign in to comment.