Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added callback sender #39

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft

added callback sender #39

wants to merge 1 commit into from

Conversation

sgalpha01
Copy link
Member

Updated taskmaster to take callback_url and send notifications to the callback receiver on status change.

@@ -17,6 +18,10 @@ def __init__(self, body, name='task-job', namespace='default'):
self.timeout = 240
self.body = body
self.body['metadata']['name'] = self.name
self.callback = None
if callback_url:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to check explicitly with if callback_url is not None:

@@ -34,9 +39,15 @@ def run_to_completion(self, poll_interval, check_cancelled, pod_timeout):
raise ApiException(ex.status, ex.reason)
is_all_pods_running = False
status, is_all_pods_running = self.get_status(is_all_pods_running)
# notify the callback receiver that the job is running
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the code is obvious enough that it doesn't need a comment. I think the same goes for all other places where you call callback.send(), perhaps with the exception of this comment in the taskmaster module:

            # send "SYSTEM_ERROR" to callback receiver if taskmaster completes
            # but the output filer fails

@@ -17,6 +18,10 @@ def __init__(self, body, name='task-job', namespace='default'):
self.timeout = 240
self.body = body
self.body['metadata']['name'] = self.name
self.callback = None
if callback_url:
task_name = '-'.join(name.split('-')[:2])
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not quite sure why you process the task name. Is the processed version the one that the user knows (i.e., the one that is returned when POSTing a new task)?

@@ -34,9 +39,15 @@ def run_to_completion(self, poll_interval, check_cancelled, pod_timeout):
raise ApiException(ex.status, ex.reason)
is_all_pods_running = False
status, is_all_pods_running = self.get_status(is_all_pods_running)
# notify the callback receiver that the job is running
if self.callback and status == 'Running':
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, better to compare explicity against None, i.e., if self.callback is not None [...]. Same below.


created_jobs = []
poll_interval = 5
task_volume_basename = 'task-volume'
args = None
logger = None
callback = CallbackSender()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't quite understand this code. At this stage, CallbackSender misses required information, the URL and the task name. Now, it appears that this is filled in when newParser() is called. But when does that happen? And who/what sets the environment variable CALLBACK_URL and when? And can you be sure that data['executors'][0]['metadata']['labels']['taskmaster-name'] is always set and correct when newParser() is called? And can you be sure that callback.url and callback.task_id are always set (and correctly) whenever callback.send is called?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, setting callback as a global and changing it in newParser() is quite a side effect. Wouldn't it be better to add a setter method to CallbackSender and call that somewhere (ideally not in newParser() itself, but in the context where newParser() is called?


if not self.url:
return None
sent = False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess instead of a while loop you could use a for loop with range(number_of_retries). That way you could get rid of retries and manual incrementing. You could also use break instead of sent = True and get rid of sent.

"""

if not self.url:
return None
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be good to log something here.

None: if the callback receiver is not set or some error occurs.
"""

if not self.url:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the task ID is not set?

@@ -31,14 +36,17 @@ def run_executor(executor, namespace, pvc=None):
volumes.extend([{'name': task_volume_basename, 'persistentVolumeClaim': {
'readonly': False, 'claimName': pvc.name}}])
logger.debug('Created job: ' + jobname)
job = Job(executor, jobname, namespace)
job = Job(executor, jobname, namespace, callback.url)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At this point you already have a callback instance. Why not use that?

@uniqueg
Copy link
Member

uniqueg commented Sep 11, 2022

Thanks @sgalpha01. Added some comments. Most are minor style/best practice things, in others I'm just asking for clarification because I don't know the TESK codebase well

@sgalpha01 sgalpha01 marked this pull request as draft September 12, 2022 03:13
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants