Skip to content

Commit

Permalink
add auto efs cleanup attempt
Browse files Browse the repository at this point in the history
  • Loading branch information
CodyCBakerPhD committed Sep 29, 2024
1 parent b3bd249 commit 1ace016
Show file tree
Hide file tree
Showing 2 changed files with 72 additions and 37 deletions.
69 changes: 67 additions & 2 deletions src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import os
import time
import uuid
import warnings
from typing import Optional

import boto3
Expand All @@ -11,6 +12,8 @@
from ._rclone_transfer_batch_job import rclone_transfer_batch_job
from ._submit_aws_batch_job import submit_aws_batch_job

_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"]


@validate_call
def deploy_neuroconv_batch_job(
Expand Down Expand Up @@ -104,6 +107,12 @@ def deploy_neuroconv_batch_job(
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

batch_client = boto3.client(
service_name="batch",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_client = boto3.client(
service_name="efs",
region_name=region,
Expand Down Expand Up @@ -166,11 +175,67 @@ def deploy_neuroconv_batch_job(
region=region,
)

# TODO: spinup third dependent job to clean up EFS volume after neuroconv job is complete?

info = {
"rclone_job_submission_info": rclone_job_submission_info,
"neuroconv_job_submission_info": neuroconv_job_submission_info,
}

# TODO: would be better to spin up third dependent job to clean up EFS volume after neuroconv job completes
neuroconv_job_id = neuroconv_job_submission_info["jobId"]
job = None
max_retries = 60 * 12 # roughly 12 hours max runtime (aside from internet loss) for checking cleanup
sleep_time = 60 # 1 minute
retry = 0.0
time.sleep(sleep_time)
while retry < max_retries:
job_description_response = batch_client.describe_jobs(jobs=[neuroconv_job_id])
if job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
# sleep but only increment retry by a small amount
# (really should only apply if internet connection is temporarily lost)
retry += 0.1
time.sleep(sleep_time)

job = job_description_response["jobs"][0]
if job["status"] in _RETRY_STATES:
retry += 1.0
time.sleep(sleep_time)
elif job["status"] == "SUCCEEDED":
break

if retry >= max_retries:
message = (
"Maximum retries reached for checking job completion for automatic EFS cleanup! "
"Please delete the EFS volume manually."
)
warnings.warn(message=message, stacklevel=2)

return info

# Cleanup EFS after job is complete - must clear mount targets first, then wait before deleting the volume
efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
if len(matching_efs_volumes) != 1:
message = (
f"Expected to find exactly one EFS volume with name '{efs_volume_name}', "
f"but found {len(matching_efs_volumes)}\n\n{matching_efs_volumes=}\n\n!"
"You will have to delete these manually."
)
warnings.warn(message=message, stacklevel=2)

return info

efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]
mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id)
for mount_target in mount_targets["MountTargets"]:
efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"])

time.sleep(sleep_time)
efs_client.delete_file_system(FileSystemId=efs_id)

return info
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ class TestNeuroConvDeploymentBatchJob(unittest.TestCase):
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)
region = "us-east-2"
efs_id = None

def setUp(self):
self.test_folder.mkdir(exist_ok=True)
Expand All @@ -60,28 +59,6 @@ def setUp(self):
with open(file=self.test_config_file_path, mode="w") as io:
io.writelines(rclone_config_contents)

self.efs_client = boto3.client(
service_name="efs",
region_name=self.region,
aws_access_key_id=self.aws_access_key_id,
aws_secret_access_key=self.aws_secret_access_key,
)

# def tearDown(self):
# if self.efs_id is None:
# return None
#
# efs_client = self.efs_client
#
# # Cleanup EFS after testing is complete - must clear mount targets first, then wait before deleting the volume
# # TODO: cleanup job definitions? (since built daily)
# mount_targets = efs_client.describe_mount_targets(FileSystemId=self.efs_id)
# for mount_target in mount_targets["MountTargets"]:
# efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"])
#
# time.sleep(60)
# efs_client.delete_file_system(FileSystemId=self.efs_id)

def test_deploy_neuroconv_batch_job(self):
region = "us-east-2"
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
Expand All @@ -105,6 +82,7 @@ def test_deploy_neuroconv_batch_job(self):
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_volumes_before = efs_client.describe_file_systems()

rclone_command = (
"rclone copy test_google_drive_remote:testing_rclone_spikeglx_and_phy/ci_tests /mnt/efs/source "
Expand Down Expand Up @@ -132,7 +110,7 @@ def test_deploy_neuroconv_batch_job(self):
rclone_config_file_path=rclone_config_file_path,
)

# Wait for AWS to process the job
# Wait additional time for AWS to clean up resources
time.sleep(120)

info = all_info["neuroconv_job_submission_info"]
Expand All @@ -155,17 +133,9 @@ def test_deploy_neuroconv_batch_job(self):
else:
break

# Check EFS specific details
efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
assert len(matching_efs_volumes) == 1
efs_volume = matching_efs_volumes[0]
self.efs_id = efs_volume["FileSystemId"]
# Check EFS cleaned up automatically
efs_volumes_after = efs_client.describe_file_systems()
assert len(efs_volumes_after["FileSystems"]) == len(efs_volumes_before["FileSystems"])

# Check normal job completion
expected_job_name = f"{job_name}_neuroconv_deployment"
Expand Down

0 comments on commit 1ace016

Please sign in to comment.