From 4ae0f1219fbf177d9f3f36ccf31fe885f23889e7 Mon Sep 17 00:00:00 2001 From: maikia Date: Wed, 6 Jan 2021 11:00:48 +0100 Subject: [PATCH 1/8] init --- ramp-engine/ramp_engine/aws/api.py | 1 + ramp-engine/ramp_engine/dispatcher.py | 4 ++++ 2 files changed, 5 insertions(+) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index 2bd5068b..8a881b4a 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -797,6 +797,7 @@ def _training_finished(config, instance_id, submission_name): """ Return True if a submission has finished training """ + # TODO: check if bagged score is saved return not _has_screen(config, instance_id, submission_name) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index d000aacb..4f489e02 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -276,6 +276,10 @@ def update_database_results(self, session): # database. Since they require too much space, we stop to store # them in the database and instead, keep it onto the disk. # set_predictions(session, submission_id, path_predictions) + + # TODO: here the AWS dispatcher is failing at times because of: + # 1. missing bagged scores.csv + # 2. train_time not found set_time(session, submission_id, path_predictions) set_scores(session, submission_id, path_predictions) set_bagged_scores(session, submission_id, path_predictions) From 19be0504385c8a82d2d76aca9acefe7ef8873319 Mon Sep 17 00:00:00 2001 From: maikia Date: Thu, 7 Jan 2021 10:19:32 +0100 Subject: [PATCH 2/8] checks if the bagged_scores.csv is saved on the aws instance --- ramp-engine/ramp_engine/aws/api.py | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index 8a881b4a..016026bb 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -795,10 +795,13 @@ def _is_ready(config, instance_id): def _training_finished(config, instance_id, submission_name): """ - Return True if a submission has finished training + Return True if a submission has finished training (if the screen no longer + exists on the ec2 instance and if the bagged_scores.csv file is saved on + the instance) """ - # TODO: check if bagged score is saved - return not _has_screen(config, instance_id, submission_name) + has_screen = _has_screen(config, instance_id, submission_name) + has_score_file = _has_score_file(config, instance_id, submission_name) + return not has_screen and has_score_file def _training_successful(config, instance_id, submission_name, @@ -845,6 +848,17 @@ def _has_screen(config, instance_id, screen_name): return nb > 0 +def _has_score_file(config, instance_id, screen_name): + """ + Return True if a 'bagged_scores.csv' file exists on the ec2 instance + """ + cmd = "find {} -name bagged_scores.csv" + cmd = cmd.format(config['remote_ramp_kit_folder']) + score_path = _run(config, instance_id, cmd, return_output=True) + + return len(score_path) > 0 + + def _tag_instance_by_submission(config, instance_id, submission_name): """ Add tags to an instance with infos from the submission to know which From 2c28b6f18c4d01e266892533caaa110642982810 Mon Sep 17 00:00:00 2001 From: maikia Date: Thu, 7 Jan 2021 11:37:25 +0100 Subject: [PATCH 3/8] cleanup\ --- ramp-engine/ramp_engine/dispatcher.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index 4f489e02..8486027d 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -277,9 +277,6 @@ def update_database_results(self, session): # them in the database and instead, keep it onto the disk. # set_predictions(session, submission_id, path_predictions) - # TODO: here the AWS dispatcher is failing at times because of: - # 1. missing bagged scores.csv - # 2. train_time not found set_time(session, submission_id, path_predictions) set_scores(session, submission_id, path_predictions) set_bagged_scores(session, submission_id, path_predictions) From bedaf73f932cb6f42799c66d51f1f2304f838343 Mon Sep 17 00:00:00 2001 From: maikia Date: Thu, 7 Jan 2021 13:51:45 +0100 Subject: [PATCH 4/8] rephrasing --- ramp-engine/ramp_engine/aws/api.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index 016026bb..8b29a641 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -796,8 +796,7 @@ def _is_ready(config, instance_id): def _training_finished(config, instance_id, submission_name): """ Return True if a submission has finished training (if the screen no longer - exists on the ec2 instance and if the bagged_scores.csv file is saved on - the instance) + exists on the ec2 instance and if the bagged_scores.csv file was saved) """ has_screen = _has_screen(config, instance_id, submission_name) has_score_file = _has_score_file(config, instance_id, submission_name) From a71b66409781e70995b9dee42ff4ef56c32730dc Mon Sep 17 00:00:00 2001 From: maikia Date: Fri, 8 Jan 2021 18:20:16 +0100 Subject: [PATCH 5/8] updated the call to ec2 to check if the training has finished --- ramp-engine/ramp_engine/aws/api.py | 28 ++++++++++----- ramp-engine/ramp_engine/tests/test_aws.py | 43 +++++++++++++++++++++++ 2 files changed, 63 insertions(+), 8 deletions(-) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index 8b29a641..a5611a10 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -799,7 +799,9 @@ def _training_finished(config, instance_id, submission_name): exists on the ec2 instance and if the bagged_scores.csv file was saved) """ has_screen = _has_screen(config, instance_id, submission_name) - has_score_file = _has_score_file(config, instance_id, submission_name) + # this can only work if the training was successful + has_score_file = _has_log_or_score_file( + config, instance_id, submission_name) return not has_screen and has_score_file @@ -847,15 +849,25 @@ def _has_screen(config, instance_id, screen_name): return nb > 0 -def _has_score_file(config, instance_id, screen_name): +def _has_log_or_score_file(config, instance_id, screen_name): """ - Return True if a 'bagged_scores.csv' file exists on the ec2 instance + Return True if a 'bagged_scores.csv' file exists (submission terminated + with a success) or if log file (submisssion terminated with error) exists + but the directory with fold_0 does not exist on the ec2 + instance """ - cmd = "find {} -name bagged_scores.csv" - cmd = cmd.format(config['remote_ramp_kit_folder']) - score_path = _run(config, instance_id, cmd, return_output=True) - - return len(score_path) > 0 + submission_path = os.path.join( + config['remote_ramp_kit_folder'], + 'submissions', + screen_name) + cmd = ("bash -c '" + f"if [ -f {submission_path}/training_output/bagged_scores.csv ] || " + f"[[ -f {submission_path}/log && " + f" ! -d {submission_path}/training_output/fold_0 ]]; " + "then echo 1; else echo 0; fi'") + is_log_or_score = _run(config, instance_id, cmd, return_output=True) + + return int(is_log_or_score) def _tag_instance_by_submission(config, instance_id, submission_name): diff --git a/ramp-engine/ramp_engine/tests/test_aws.py b/ramp-engine/ramp_engine/tests/test_aws.py index f18803bd..5b040759 100644 --- a/ramp-engine/ramp_engine/tests/test_aws.py +++ b/ramp-engine/ramp_engine/tests/test_aws.py @@ -333,6 +333,49 @@ class DummyInstance: assert 'Adding the submission back to the queue' in caplog.text +@mock.patch('ramp_engine.aws.api.is_spot_terminated') +@mock.patch('ramp_engine.aws.api.launch_train') +@mock.patch('ramp_engine.aws.api._has_screen') +@mock.patch('ramp_engine.aws.api._has_log_or_score_file') +def test_not_finished_until_bagged_or_log_saved(has_score_file, has_screen, + launch_train, + spot_terminated, + caplog): + """ checks if the submission is considered finished correctly only if the + submission has finished correctly and the bagged_scores.csv file is + saved or it had some errors which appear in the log file """ + class DummyInstance: + id = 1 + launch_train.return_value = 0 + has_screen.return_value = True + has_score_file.return_value = False + + # setup the AWS worker + event_config = read_config(ramp_aws_config_template())['worker'] + + worker = AWSWorker(event_config, submission='starting_kit_local') + worker.config = event_config + worker.submission = 'dummy submissions' + worker.instance = DummyInstance + + # set the submission did not yet finish training + spot_terminated.return_value = False + + worker.launch_submission() + assert worker.status == 'running' + assert caplog.text == '' + + # set screen is no longer there + has_screen.return_value = False + assert worker.status == 'running' + assert caplog.text == '' + + # set that the score file was saved + has_score_file.return_value = True + assert worker.status == 'finished' + assert caplog.text == '' + + def test_aws_worker(): if not os.path.isfile(os.path.join(HERE, 'config.yml')): pytest.skip("Only for local tests for now") From d233106b2fbdecc6716dd0ed912c345ccbd4648f Mon Sep 17 00:00:00 2001 From: maikia Date: Fri, 8 Jan 2021 18:23:17 +0100 Subject: [PATCH 6/8] cleanup --- ramp-engine/ramp_engine/dispatcher.py | 1 - 1 file changed, 1 deletion(-) diff --git a/ramp-engine/ramp_engine/dispatcher.py b/ramp-engine/ramp_engine/dispatcher.py index 8486027d..d000aacb 100644 --- a/ramp-engine/ramp_engine/dispatcher.py +++ b/ramp-engine/ramp_engine/dispatcher.py @@ -276,7 +276,6 @@ def update_database_results(self, session): # database. Since they require too much space, we stop to store # them in the database and instead, keep it onto the disk. # set_predictions(session, submission_id, path_predictions) - set_time(session, submission_id, path_predictions) set_scores(session, submission_id, path_predictions) set_bagged_scores(session, submission_id, path_predictions) From 3c4acf3d895c306e1c3641a41b4f740063da7ff2 Mon Sep 17 00:00:00 2001 From: maikia Date: Fri, 8 Jan 2021 20:47:55 +0100 Subject: [PATCH 7/8] changed from looking for log to error.txt file --- ramp-engine/ramp_engine/aws/api.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index a5611a10..d684fe63 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -858,12 +858,10 @@ def _has_log_or_score_file(config, instance_id, screen_name): """ submission_path = os.path.join( config['remote_ramp_kit_folder'], - 'submissions', - screen_name) + 'submissions', screen_name, 'training_output') cmd = ("bash -c '" - f"if [ -f {submission_path}/training_output/bagged_scores.csv ] || " - f"[[ -f {submission_path}/log && " - f" ! -d {submission_path}/training_output/fold_0 ]]; " + f"if [ -f {submission_path}/bagged_scores.csv ] || " + f"[ -f {submission_path}/fold_*/error.txt ]; " "then echo 1; else echo 0; fi'") is_log_or_score = _run(config, instance_id, cmd, return_output=True) From fff3a5daa59aa23bf70d604f32ad615aadd7d6e7 Mon Sep 17 00:00:00 2001 From: maikia Date: Fri, 8 Jan 2021 21:09:27 +0100 Subject: [PATCH 8/8] improve the names --- ramp-engine/ramp_engine/aws/api.py | 4 ++-- ramp-engine/ramp_engine/tests/test_aws.py | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/ramp-engine/ramp_engine/aws/api.py b/ramp-engine/ramp_engine/aws/api.py index d684fe63..3ab6bde3 100644 --- a/ramp-engine/ramp_engine/aws/api.py +++ b/ramp-engine/ramp_engine/aws/api.py @@ -800,7 +800,7 @@ def _training_finished(config, instance_id, submission_name): """ has_screen = _has_screen(config, instance_id, submission_name) # this can only work if the training was successful - has_score_file = _has_log_or_score_file( + has_score_file = _has_error_or_score_file( config, instance_id, submission_name) return not has_screen and has_score_file @@ -849,7 +849,7 @@ def _has_screen(config, instance_id, screen_name): return nb > 0 -def _has_log_or_score_file(config, instance_id, screen_name): +def _has_error_or_score_file(config, instance_id, screen_name): """ Return True if a 'bagged_scores.csv' file exists (submission terminated with a success) or if log file (submisssion terminated with error) exists diff --git a/ramp-engine/ramp_engine/tests/test_aws.py b/ramp-engine/ramp_engine/tests/test_aws.py index 5b040759..3ed1dbd3 100644 --- a/ramp-engine/ramp_engine/tests/test_aws.py +++ b/ramp-engine/ramp_engine/tests/test_aws.py @@ -336,7 +336,7 @@ class DummyInstance: @mock.patch('ramp_engine.aws.api.is_spot_terminated') @mock.patch('ramp_engine.aws.api.launch_train') @mock.patch('ramp_engine.aws.api._has_screen') -@mock.patch('ramp_engine.aws.api._has_log_or_score_file') +@mock.patch('ramp_engine.aws.api._has_error_or_score_file') def test_not_finished_until_bagged_or_log_saved(has_score_file, has_screen, launch_train, spot_terminated,