Skip to content

Commit

Permalink
Fix: Manifest request in prod failed due to a service execution timeo…
Browse files Browse the repository at this point in the history
…ut (#5528)
  • Loading branch information
achave11-ucsc committed Oct 5, 2023
1 parent 7674aed commit a15abaa
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 11 deletions.
17 changes: 17 additions & 0 deletions src/azul/azulclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
PrettyPrinter,
)
from typing import (
Optional,
Union,
cast,
)
Expand Down Expand Up @@ -481,6 +482,22 @@ def wait_for_indexer(self):
"""
self.queues.wait_to_stabilize()

def indexing_eta(self) -> Optional[int]:
"""
When a reindexing is ongoing, return the estimated time of completion.
Otherwise, return None.
"""

def eta(q_length: int) -> int:
return int((-4.9 * 10 ** (-8) * q_length ** 2 + 0.27 * q_length + q_length) ** .883)

work_queues_length = self.queues.get_queues(config.work_queue_names)
work_queues_length, _ = self.queues.get_queue_lengths(work_queues_length)
if work_queues_length:
return eta(work_queues_length)
else:
return None


class AzulClientError(RuntimeError):
pass
Expand Down
8 changes: 4 additions & 4 deletions src/azul/queues.py
Original file line number Diff line number Diff line change
Expand Up @@ -190,9 +190,9 @@ def get_queues(self, queue_names: Iterable[str]) -> Mapping[str, Queue]:
for queue_name in queue_names
}

def _get_queue_lengths(self,
queues: Mapping[str, Queue]
) -> tuple[int, Mapping[str, int]]:
def get_queue_lengths(self,
queues: Mapping[str, Queue]
) -> tuple[int, Mapping[str, int]]:
"""
Count the number of messages in the given queues.
Expand Down Expand Up @@ -238,7 +238,7 @@ def wait_to_stabilize(self) -> int:

while True:
# Determine queue lengths
total_length, queue_lengths = self._get_queue_lengths(queues)
total_length, queue_lengths = self.get_queue_lengths(queues)
total_lengths.append(total_length)
logger.info('Counting %i messages in %i queues.',
total_length, len(queue_lengths))
Expand Down
32 changes: 25 additions & 7 deletions src/azul/service/manifest_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
from azul.auth import (
Authentication,
)
from azul.azulclient import (
AzulClient,
)
from azul.chalice import (
GoneError,
)
Expand Down Expand Up @@ -61,6 +64,10 @@ class ManifestController(SourceController):
step_function_lambda_name: str
manifest_url_func: ManifestUrlFunc

@cached_property
def azul(self) -> AzulClient:
return AzulClient(num_workers=1)

@cached_property
def async_service(self) -> AsyncManifestService:
name = config.state_machine_name(self.step_function_lambda_name)
Expand Down Expand Up @@ -96,13 +103,24 @@ def get_manifest(self, state: JSON) -> JSON:
else:
assert False, type(result)

def get_manifest_async(self,
*,
self_url: furl,
catalog: CatalogName,
query_params: Mapping[str, str],
fetch: bool,
authentication: Optional[Authentication]):
def get_manifest_async(self, **kwargs):
indexing_eta = self.azul.indexing_eta()
if indexing_eta:
msg = f'A reindex is in progress, try again in {indexing_eta}s'
headers = {
'Retry-After': str(indexing_eta)
}
return Response(body=msg, status_code=503, headers=headers)
else:
return self._get_manifest_async(**kwargs)

def _get_manifest_async(self,
*,
self_url: furl,
catalog: CatalogName,
query_params: Mapping[str, str],
fetch: bool,
authentication: Optional[Authentication]):

token = query_params.get('token')
if token is None:
Expand Down

0 comments on commit a15abaa

Please sign in to comment.