Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: list sessions #64

Merged
merged 14 commits into from
Jun 9, 2023
12 changes: 9 additions & 3 deletions deepset_cloud_sdk/_api/upload_sessions.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import enum
from dataclasses import dataclass
from typing import Dict, List
from typing import Dict, List, Optional
from uuid import UUID

import structlog
Expand Down Expand Up @@ -224,7 +224,9 @@ async def status(self, workspace_name: str, session_id: UUID) -> UploadSessionSt
wait=wait_fixed(1),
reraise=True,
)
async def list(self, workspace_name: str, limit: int = 10, page_number: int = 1) -> UploadSessionDetailList:
async def list(
self, workspace_name: str, is_expired: Optional[bool] = False, limit: int = 10, page_number: int = 1
) -> UploadSessionDetailList:
"""List upload sessions.

This method lists all upload sessions for a given workspace.
Expand All @@ -235,10 +237,14 @@ async def list(self, workspace_name: str, limit: int = 10, page_number: int = 1)
:raises FailedToSendUploadSessionRequest: If the list could not be fetched.
:return: UploadSessionDetailList object.
"""
params = {"limit": limit, "page_number": page_number}
if is_expired:
params["is_expired"] = is_expired

response = await self._deepset_cloud_api.get(
workspace_name=workspace_name,
endpoint="upload_sessions",
params={"limit": limit, "page_number": page_number},
params=params,
)
if response.status_code != codes.OK:
logger.error(
Expand Down
52 changes: 52 additions & 0 deletions deepset_cloud_sdk/_service/files_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@
from deepset_cloud_sdk._api.files import File, FilesAPI
from deepset_cloud_sdk._api.upload_sessions import (
UploadSession,
UploadSessionDetail,
UploadSessionsAPI,
UploadSessionStatus,
WriteMode,
)
from deepset_cloud_sdk._s3.upload import S3
Expand Down Expand Up @@ -388,3 +390,53 @@ async def list_all(
after_value = response.data[-1].created_at
after_file_id = response.data[-1].file_id
yield response.data

async def list_upload_sessions(
self,
workspace_name: str,
is_expired: Optional[bool] = False,
batch_size: int = 100,
timeout_s: int = 20,
) -> AsyncGenerator[List[UploadSessionDetail], None]: # noqa: F821
"""List all upload sessions files in a workspace.

Returns an async generator that yields lists of files. The generator is finished when all files are listed.
You can specify the batch size per number of returned files using `batch_size`.

:param workspace_name: Name of the workspace whose files you want to list.
:param is_expired: Whether to list expired upload sessions.
:param batch_size: Number of files to return per request.
:param timeout_s: Timeout in seconds for the listing.
:raises TimeoutError: If the listing takes longer than timeout_s.
"""
start = time.time()
has_more = True

page_number: int = 1
while has_more:
if time.time() - start > timeout_s:
ArzelaAscoIi marked this conversation as resolved.
Show resolved Hide resolved
raise TimeoutError(f"Listing all upload sessions files in workspace {workspace_name} timed out.")
response = await self._upload_sessions.list(
workspace_name,
is_expired=is_expired,
limit=batch_size,
page_number=page_number,
)
has_more = response.has_more
if not response.data:
return

page_number += 1
yield response.data

async def get_upload_session(self, workspace_name: str, session_id: UUID) -> UploadSessionStatus:
"""Get the status of an upload session.

:param workspace_name: Name of the workspace whose upload session you want to get.
:param session_id: ID of the upload session.
:return: UploadSessionStatus object.
"""
upload_session_status: UploadSessionStatus = await self._upload_sessions.status(
workspace_name=workspace_name, session_id=session_id
)
return upload_session_status
128 changes: 119 additions & 9 deletions deepset_cloud_sdk/cli.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,21 @@
"""The CLI for the deepset Cloud SDK."""
import json
import os
from typing import Optional
from typing import List, Optional
from uuid import UUID

import typer
from tabulate import tabulate

from deepset_cloud_sdk.__about__ import __version__
from deepset_cloud_sdk._api.config import DEFAULT_WORKSPACE_NAME, ENV_FILE_PATH
from deepset_cloud_sdk.workflows.sync_client.files import (
get_upload_session as sync_get_upload_session,
)
from deepset_cloud_sdk.workflows.sync_client.files import list_files as sync_list_files
from deepset_cloud_sdk.workflows.sync_client.files import (
list_upload_sessions as sync_list_upload_sessions,
)
from deepset_cloud_sdk.workflows.sync_client.files import upload

cli_app = typer.Typer(pretty_exceptions_show_locals=False)
Expand Down Expand Up @@ -70,14 +78,116 @@ def list_files(
:param odata_filter: odata_filter to apply to the file list.
:param batch_size: Batch size to use for the file list.
"""
headers = ["file_id", "url", "name", "size", "created_at", "meta"] # Assuming the first row contains the headers
for files in sync_list_files(api_key, api_url, workspace_name, name, content, odata_filter, batch_size, timeout_s):
table = tabulate(files, headers, tablefmt="grid") # type: ignore
typer.echo(table)
if len(files) > 0:
prompt_input = typer.prompt("Print more results ?", default="y")
if prompt_input != "y":
break
try:
headers = [
"file_id",
"url",
"name",
"size",
"created_at",
"meta",
] # Assuming the first row contains the headers
for files in sync_list_files(
api_key, api_url, workspace_name, name, content, odata_filter, batch_size, timeout_s
):
table = tabulate(files, headers, tablefmt="grid") # type: ignore
typer.echo(table)
if len(files) > 0:
prompt_input = typer.prompt("Print more results ?", default="y")
if prompt_input != "y":
break
except TimeoutError:
typer.echo("Command timed out.")


@cli_app.command()
def list_upload_sessions(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
is_expired: Optional[bool] = False,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
batch_size: int = 10,
timeout_s: int = 300,
) -> None:
"""List files in deepset Cloud.

A CLI method to list files that exist in deepset Cloud.

:param api_key: deepset Cloud API key to use for authentication.
:param api_url: API URL to use for authentication.
:param workspace_name: Name of the workspace to list the files from.
:param is_expired: Whether to list expired upload sessions.
:param batch_size: Batch size to use for the file list.
ArzelaAscoIi marked this conversation as resolved.
Show resolved Hide resolved
:param timeout_s: Timeout in seconds for the API requests.
"""
headers: List[str] = ["session_id", "created_by", "created_at", "expires_at", "write_mode", "status"]
try:
for upload_sessions in sync_list_upload_sessions(
api_key=api_key,
api_url=api_url,
workspace_name=workspace_name,
is_expired=is_expired,
batch_size=batch_size,
timeout_s=timeout_s,
):
table = tabulate(
[
{
"session_id": str(el.session_id),
"created_by": f"{el.created_by.given_name} {el.created_by.family_name}",
"created_at": str(el.created_at),
"expires_at": str(el.expires_at),
"write_mode": el.write_mode,
ArzelaAscoIi marked this conversation as resolved.
Show resolved Hide resolved
"status": el.status,
ArzelaAscoIi marked this conversation as resolved.
Show resolved Hide resolved
}
for el in upload_sessions
],
dict(enumerate(headers)), # type: ignore
tablefmt="grid",
)
typer.echo(table)
if len(upload_sessions) > 0:
prompt_input = typer.prompt("Print more results ?", default="y")
if prompt_input != "y":
break
except TimeoutError:
typer.echo("Command timed out. Please try again later.")


@cli_app.command()
def get_upload_session(
session_id: UUID,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
) -> None:
"""Get an upload session from deepset Cloud.

A CLI method to get an upload session from deepset Cloud. This method is useful to
check the status of an upload session after uploading files to deepset Cloud.

:param session_id: ID of the upload session to get the status for.
:param api_key: deepset Cloud API key to use for authentication.
:param api_url: API URL to use for authentication.
:param workspace_name: Name of the workspace to upload the files to.
"""
session = sync_get_upload_session(
session_id=session_id, api_key=api_key, api_url=api_url, workspace_name=workspace_name
)
typer.echo(
json.dumps(
{
"session_id": str(session.session_id),
"expires_at": str(session.expires_at),
"documentation_url": str(session.documentation_url),
"ingestion_status": {
"failed_files": session.ingestion_status.failed_files,
"finished_files": session.ingestion_status.finished_files,
},
},
indent=4,
)
)


def version_callback(value: bool) -> None:
Expand Down
60 changes: 59 additions & 1 deletion deepset_cloud_sdk/workflows/async_client/files.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"""This module contains async functions for uploading files and folders to deepset Cloud."""
from pathlib import Path
from typing import AsyncGenerator, List, Optional
from uuid import UUID

from sniffio import AsyncLibraryNotFoundError

Expand All @@ -12,7 +13,11 @@
CommonConfig,
)
from deepset_cloud_sdk._api.files import File
from deepset_cloud_sdk._api.upload_sessions import WriteMode
from deepset_cloud_sdk._api.upload_sessions import (
UploadSessionDetail,
UploadSessionStatus,
WriteMode,
)
from deepset_cloud_sdk._service.files_service import DeepsetCloudFile, FilesService


Expand Down Expand Up @@ -54,6 +59,59 @@ async def list_files(
pass


async def list_upload_sessions(
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
is_expired: Optional[bool] = None,
batch_size: int = 100,
timeout_s: int = 300,
) -> AsyncGenerator[List[UploadSessionDetail], None]:
"""List all files in a workspace.

:param api_key: deepset Cloud API key to use for authentication.
:param api_url: API URL to use for authentication.
:param workspace_name: Name of the workspace to list the files from.
:param is_expired: Whether to list expired upload sessions.
:param batch_size: Batch size for the listing.
ArzelaAscoIi marked this conversation as resolved.
Show resolved Hide resolved
:param timeout_s: Timeout in seconds for the API requests.
:return: List of files.
"""
try:
async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service:
async for upload_session_batch in file_service.list_upload_sessions(
workspace_name=workspace_name,
is_expired=is_expired,
batch_size=batch_size,
timeout_s=timeout_s,
):
yield upload_session_batch
except AsyncLibraryNotFoundError:
# since we are using asyncio.run() in the sync wrapper, we need to catch this error
rjanjua marked this conversation as resolved.
Show resolved Hide resolved
pass


async def get_upload_session(
session_id: UUID,
api_key: Optional[str] = None,
api_url: Optional[str] = None,
workspace_name: str = DEFAULT_WORKSPACE_NAME,
) -> UploadSessionStatus:
"""Get the status of an upload session.

:param session_id: ID of the upload session to get the status for.
:param api_key: deepset Cloud API key to use for authentication.
:param api_url: API URL to use for authentication.
:param workspace_name: Name of the workspace to list the files from.
:return: List of files.
"""
async with FilesService.factory(_get_config(api_key=api_key, api_url=api_url)) as file_service:
return await file_service.get_upload_session(
workspace_name=workspace_name,
session_id=session_id,
)


async def upload_file_paths(
file_paths: List[Path],
api_key: Optional[str] = None,
Expand Down
Loading