Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Cloud Deployment IVc] Deploy NeuroConv in AWS with EFS #1086

Open
wants to merge 32 commits into
base: rclone_aws
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
32 commits
Select commit Hold shift + click to select a range
380b57a
setup
CodyCBakerPhD Sep 16, 2024
a083737
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 16, 2024
a4cd38e
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 16, 2024
6c59c89
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 17, 2024
ea992fb
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 27, 2024
77b252a
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 27, 2024
c97f67b
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 27, 2024
ca48752
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Sep 27, 2024
03e7a98
restore
CodyCBakerPhD Sep 27, 2024
500f15b
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 28, 2024
6bac79f
split workflows and tests; initial push trigger
CodyCBakerPhD Sep 28, 2024
867cac4
fix workflow; suppress normal tests
CodyCBakerPhD Sep 28, 2024
e40f8ef
debug
CodyCBakerPhD Sep 28, 2024
afeec8d
debug
CodyCBakerPhD Sep 28, 2024
f688e99
debug
CodyCBakerPhD Sep 28, 2024
7b8500e
fix job dependencies syntax
CodyCBakerPhD Sep 28, 2024
4054b4c
fix rclone command
CodyCBakerPhD Sep 28, 2024
9faf04a
add sleep to avoid race condition
CodyCBakerPhD Sep 28, 2024
4463a7a
add proper wait check
CodyCBakerPhD Sep 28, 2024
77c25e4
fix
CodyCBakerPhD Sep 28, 2024
b119181
try to pin down source
CodyCBakerPhD Sep 28, 2024
6bd2999
fix dockerfile publishing
CodyCBakerPhD Sep 28, 2024
359c593
restore commented paths; try again
CodyCBakerPhD Sep 28, 2024
fe9bca7
try different CLI call pattern
CodyCBakerPhD Sep 28, 2024
b5e2ef4
disable efs cleanup to tap in manually
CodyCBakerPhD Sep 28, 2024
6909323
silly bug fix
CodyCBakerPhD Sep 28, 2024
236be55
final fixes
CodyCBakerPhD Sep 29, 2024
b3bd249
Merge branch 'rclone_aws' into neuroconv_aws
CodyCBakerPhD Sep 29, 2024
1ace016
add auto efs cleanup attempt
CodyCBakerPhD Sep 29, 2024
6a8d552
fix info structure
CodyCBakerPhD Sep 29, 2024
7e30330
add comment
CodyCBakerPhD Sep 29, 2024
bac3510
restore test triggers
CodyCBakerPhD Sep 29, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions .github/workflows/neuroconv_deployment_aws_tests.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
name: NeuroConv Deployment AWS Tests
on:
schedule:
- cron: "0 16 * * 3" # Weekly at noon on Wednesday
workflow_dispatch:

concurrency: # Cancel previous workflows on the same pull request
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

env:
AWS_ACCESS_KEY_ID: ${{ secrets.AWS_ACCESS_KEY_ID }}
AWS_SECRET_ACCESS_KEY: ${{ secrets.AWS_SECRET_ACCESS_KEY }}
RCLONE_DRIVE_ACCESS_TOKEN: ${{ secrets.RCLONE_DRIVE_ACCESS_TOKEN }}
RCLONE_DRIVE_REFRESH_TOKEN: ${{ secrets.RCLONE_DRIVE_REFRESH_TOKEN }}
RCLONE_EXPIRY_TOKEN: ${{ secrets.RCLONE_EXPIRY_TOKEN }}
DANDI_API_KEY: ${{ secrets.DANDI_API_KEY }}

jobs:
run:
name: ${{ matrix.os }} Python ${{ matrix.python-version }}
runs-on: ${{ matrix.os }}
strategy:
fail-fast: false
matrix:
python-version: ["3.12"]
os: [ubuntu-latest]
steps:
- uses: actions/checkout@v4
- run: git fetch --prune --unshallow --tags
- name: Setup Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Global Setup
run: |
python -m pip install -U pip # Official recommended way
git config --global user.email "[email protected]"
git config --global user.name "CI Almighty"

- name: Install AWS requirements
run: pip install .[aws,test]

- name: Run NeuroConv Deployment on AWS tests
run: pytest -rsx -n auto tests/test_on_data/test_yaml/neuroconv_deployment_aws_tools_tests.py
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

## Features
* Added the `rclone_transfer_batch_job` helper function for executing Rclone data transfers in AWS Batch jobs. [PR #1085](https://github.com/catalystneuro/neuroconv/pull/1085)
* Added the `deploy_neuroconv_batch_job` helper function for deploying NeuroConv AWS Batch jobs. [PR #1086](https://github.com/catalystneuro/neuroconv/pull/1086)



Expand Down
7 changes: 6 additions & 1 deletion src/neuroconv/tools/aws/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
from ._submit_aws_batch_job import submit_aws_batch_job
from ._rclone_transfer_batch_job import rclone_transfer_batch_job
from ._deploy_neuroconv_batch_job import deploy_neuroconv_batch_job

__all__ = ["submit_aws_batch_job", "rclone_transfer_batch_job"]
__all__ = [
"submit_aws_batch_job",
"rclone_transfer_batch_job",
"deploy_neuroconv_batch_job",
]
241 changes: 241 additions & 0 deletions src/neuroconv/tools/aws/_deploy_neuroconv_batch_job.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
"""Collection of helper functions for deploying NeuroConv in EC2 Batch jobs on AWS."""

import os
import time
import uuid
import warnings
from typing import Optional

import boto3
from pydantic import FilePath, validate_call

from ._rclone_transfer_batch_job import rclone_transfer_batch_job
from ._submit_aws_batch_job import submit_aws_batch_job

_RETRY_STATES = ["RUNNABLE", "PENDING", "STARTING", "RUNNING"]


@validate_call
def deploy_neuroconv_batch_job(
*,
rclone_command: str,
yaml_specification_file_path: FilePath,
job_name: str,
efs_volume_name: str,
rclone_config_file_path: Optional[FilePath] = None,
status_tracker_table_name: str = "neuroconv_batch_status_tracker",
compute_environment_name: str = "neuroconv_batch_environment",
job_queue_name: str = "neuroconv_batch_queue",
job_definition_name: Optional[str] = None,
minimum_worker_ram_in_gib: int = 16, # Higher than previous recommendations for safer buffering room
minimum_worker_cpus: int = 4,
region: Optional[str] = None,
) -> dict[str, str]:
"""
Submit a job to AWS Batch for processing.

Requires AWS credentials saved to files in the `~/.aws/` folder or set as environment variables.

Parameters
----------
rclone_command : str
The command to pass directly to Rclone running on the EC2 instance.
E.g.: "rclone copy my_drive:testing_rclone /mnt/efs/source"
Must move data from or to '/mnt/efs/source'.
yaml_specification_file_path : FilePath
The path to the YAML file containing the NeuroConv specification.
job_name : str
The name of the job to submit.
efs_volume_name : str
The name of an EFS volume to be created and attached to the job.
The path exposed to the container will always be `/mnt/efs`.
rclone_config_file_path : FilePath, optional
The path to the Rclone configuration file to use for the job.
If unspecified, method will attempt to find the file in `~/.rclone` and will raise an error if it cannot.
status_tracker_table_name : str, default: "neuroconv_batch_status_tracker"
The name of the DynamoDB table to use for tracking job status.
compute_environment_name : str, default: "neuroconv_batch_environment"
The name of the compute environment to use for the job.
job_queue_name : str, default: "neuroconv_batch_queue"
The name of the job queue to use for the job.
job_definition_name : str, optional
The name of the job definition to use for the job.
If unspecified, a name starting with 'neuroconv_batch_' will be generated.
minimum_worker_ram_in_gib : int, default: 4
The minimum amount of base worker memory required to run this job.
Determines the EC2 instance type selected by the automatic 'best fit' selector.
Recommended to be several GiB to allow comfortable buffer space for data chunk iterators.
minimum_worker_cpus : int, default: 4
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
region : str, optional
The AWS region to use for the job.
If not provided, we will attempt to load the region from your local AWS configuration.
If that file is not found on your system, we will default to "us-east-2", the location of the DANDI Archive.

Returns
-------
info : dict
A dictionary containing information about this AWS Batch job.

info["rclone_job_submission_info"] is the return value of `neuroconv.tools.aws.rclone_transfer_batch_job`.
info["neuroconv_job_submission_info"] is the return value of `neuroconv.tools.aws.submit_job`.
"""
efs_volume_name = efs_volume_name or f"neuroconv_batch_efs_volume_{uuid.uuid4().hex[:4]}"
region = region or "us-east-2"

if "/mnt/efs/source" not in rclone_command:
message = (
f"The Rclone command '{rclone_command}' does not contain a reference to '/mnt/efs/source'. "
"Without utilizing the EFS mount, the instance is unlikely to have enough local disk space. "
"The subfolder 'source' is also required to eliminate ambiguity in the transfer process."
)
raise ValueError(message)

rclone_job_name = f"{job_name}_rclone_transfer"
rclone_job_submission_info = rclone_transfer_batch_job(
rclone_command=rclone_command,
job_name=rclone_job_name,
efs_volume_name=efs_volume_name,
rclone_config_file_path=rclone_config_file_path,
region=region,
)
rclone_job_id = rclone_job_submission_info["job_submission_info"]["jobId"]

# Give the EFS and other aspects time to spin up before submitting next dependent job
# (Otherwise, good chance that duplicate EFS will be created)
aws_access_key_id = os.environ.get("AWS_ACCESS_KEY_ID", None)
aws_secret_access_key = os.environ.get("AWS_SECRET_ACCESS_KEY", None)

batch_client = boto3.client(
service_name="batch",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)
efs_client = boto3.client(
service_name="efs",
region_name=region,
aws_access_key_id=aws_access_key_id,
aws_secret_access_key=aws_secret_access_key,
)

available_efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in available_efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
max_iterations = 10
Comment on lines +123 to +130
Copy link
Member Author

@CodyCBakerPhD CodyCBakerPhD Sep 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This middle-man EFS management is rather annoying but had bugs with duplicates otherwise. Maybe a better way in the future would be direct EFS ID passing that overrides the 'search' based creation based on name pattern

iteration = 0
while len(matching_efs_volumes) == 0 and iteration < max_iterations:
iteration += 1
time.sleep(30)

matching_efs_volumes = [
file_system
for file_system in available_efs_volumes["FileSystems"]
for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]

if len(matching_efs_volumes) == 0:
message = f"Unable to create EFS volume '{efs_volume_name}' after {max_iterations} attempts!"
raise ValueError(message)

docker_image = "ghcr.io/catalystneuro/neuroconv_yaml_variable:latest"

with open(file=yaml_specification_file_path, mode="r") as io:
yaml_specification_file_stream = io.read()

neuroconv_job_name = f"{job_name}_neuroconv_deployment"
job_dependencies = [{"jobId": rclone_job_id, "type": "SEQUENTIAL"}]
neuroconv_job_submission_info = submit_aws_batch_job(
job_name=neuroconv_job_name,
docker_image=docker_image,
environment_variables={
"NEUROCONV_YAML": yaml_specification_file_stream,
"NEUROCONV_DATA_PATH": "/mnt/efs/source",
# TODO: would prefer this to use subfolders for source and output, but need logic for YAML
# related code to create them if missing (hard to send EFS this command directly)
# (the code was included in this PR, but a release cycle needs to complete for the docker images before
# it can be used here)
# "NEUROCONV_OUTPUT_PATH": "/mnt/efs/output",
"NEUROCONV_OUTPUT_PATH": "/mnt/efs",
},
efs_volume_name=efs_volume_name,
job_dependencies=job_dependencies,
status_tracker_table_name=status_tracker_table_name,
compute_environment_name=compute_environment_name,
job_queue_name=job_queue_name,
job_definition_name=job_definition_name,
minimum_worker_ram_in_gib=minimum_worker_ram_in_gib,
minimum_worker_cpus=minimum_worker_cpus,
region=region,
)

info = {
"rclone_job_submission_info": rclone_job_submission_info,
"neuroconv_job_submission_info": neuroconv_job_submission_info,
}

# TODO: would be better to spin up third dependent job to clean up EFS volume after neuroconv job completes
neuroconv_job_id = neuroconv_job_submission_info["job_submission_info"]["jobId"]
job = None
max_retries = 60 * 12 # roughly 12 hours max runtime (aside from internet loss) for checking cleanup
sleep_time = 60 # 1 minute
retry = 0.0
time.sleep(sleep_time)
while retry < max_retries:
job_description_response = batch_client.describe_jobs(jobs=[neuroconv_job_id])
if job_description_response["ResponseMetadata"]["HTTPStatusCode"] == 200:
# sleep but only increment retry by a small amount
# (really should only apply if internet connection is temporarily lost)
retry += 0.1
time.sleep(sleep_time)

job = job_description_response["jobs"][0]
if job["status"] in _RETRY_STATES:
retry += 1.0
time.sleep(sleep_time)
elif job["status"] == "SUCCEEDED":
break

if retry >= max_retries:
message = (
"Maximum retries reached for checking job completion for automatic EFS cleanup! "
"Please delete the EFS volume manually."
)
warnings.warn(message=message, stacklevel=2)

return info

# Cleanup EFS after job is complete - must clear mount targets first, then wait before deleting the volume
efs_volumes = efs_client.describe_file_systems()
matching_efs_volumes = [
file_system
for file_system in efs_volumes["FileSystems"]
Comment on lines +214 to +218
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@h-mayorquin Also, in a follow-up, you may wish to add logic for detecting if the EFS volume exists before this method is called, in which case we might not want to clean it up automatically (might be shared across jobs or something)

And/or always control the behavior with an extra argument cleanup: bool perhaps. Given the billing consequences I tend to err on the side of always perform cleanup, hence the current logic

for tag in file_system["Tags"]
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]
if len(matching_efs_volumes) != 1:
message = (
f"Expected to find exactly one EFS volume with name '{efs_volume_name}', "
f"but found {len(matching_efs_volumes)}\n\n{matching_efs_volumes=}\n\n!"
"You will have to delete these manually."
)
warnings.warn(message=message, stacklevel=2)

return info

efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]
mount_targets = efs_client.describe_mount_targets(FileSystemId=efs_id)
for mount_target in mount_targets["MountTargets"]:
efs_client.delete_mount_target(MountTargetId=mount_target["MountTargetId"])

time.sleep(sleep_time)
efs_client.delete_file_system(FileSystemId=efs_id)

return info
2 changes: 1 addition & 1 deletion src/neuroconv/tools/aws/_rclone_transfer_batch_job.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Collection of helper functions for assessing and performing automated data transfers related to AWS."""
"""Collection of helper functions for performing Rclone data transfers in EC2 Batch jobs on AWS."""

import warnings
from typing import Optional
Expand Down
14 changes: 6 additions & 8 deletions src/neuroconv/tools/aws/_submit_aws_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -464,11 +464,14 @@ def _create_or_get_efs_id(
if tag["Key"] == "Name" and tag["Value"] == efs_volume_name
]

if len(matching_efs_volumes) > 1:
if len(matching_efs_volumes) == 1:
efs_volume = matching_efs_volumes[0]
efs_id = efs_volume["FileSystemId"]

return efs_id
elif len(matching_efs_volumes) > 1:
message = f"Multiple EFS volumes with the name '{efs_volume_name}' were found!\n\n{matching_efs_volumes=}\n"
raise ValueError(message)

# Existing volume not found - must create a fresh one and set mount targets on it
efs_volume = efs_client.create_file_system(
Expand Down Expand Up @@ -530,12 +533,9 @@ def _generate_job_definition_name(
The minimum number of CPUs required to run this job.
A minimum of 4 is required, even if only one will be used in the actual process.
"""
docker_tags = docker_image.split(":")[1:]
docker_tag = docker_tags[0] if len(docker_tags) > 1 else None

# AWS Batch does not allow colons, slashes, or periods in job definition names
parsed_docker_image_name = str(docker_image)
for disallowed_character in [":", r"/", "."]:
for disallowed_character in [":", "/", "."]:
parsed_docker_image_name = parsed_docker_image_name.replace(disallowed_character, "-")

job_definition_name = f"neuroconv_batch"
Expand All @@ -544,8 +544,6 @@ def _generate_job_definition_name(
job_definition_name += f"_{minimum_worker_cpus}-CPU"
if efs_id is not None:
job_definition_name += f"_{efs_id}"
if docker_tag is None or docker_tag == "latest":
date = datetime.now().strftime("%Y-%m-%d")
Comment on lines -547 to -548
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor cleanups here caused by the previous removal of the creation date tag


return job_definition_name

Expand Down Expand Up @@ -644,7 +642,7 @@ def _ensure_job_definition_exists_and_get_arn(
},
},
]
mountPoints = [{"containerPath": "/mnt/efs/", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}]
mountPoints = [{"containerPath": "/mnt/efs", "readOnly": False, "sourceVolume": "neuroconv_batch_efs_mounted"}]

# batch_client.register_job_definition is not synchronous and so we need to wait a bit afterwards
batch_client.register_job_definition(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,16 @@ def run_conversion_from_yaml(

if data_folder_path is None:
data_folder_path = Path(specification_file_path).parent
else:
data_folder_path = Path(data_folder_path)
data_folder_path.mkdir(exist_ok=True)

if output_folder_path is None:
output_folder_path = Path(specification_file_path).parent
output_folder_path = specification_file_path.parent
else:
output_folder_path = Path(output_folder_path)
output_folder_path.mkdir(exist_ok=True)

specification = load_dict_from_file(file_path=specification_file_path)
schema_folder = Path(__file__).parent.parent.parent / "schemas"
specification_schema = load_dict_from_file(file_path=schema_folder / "yaml_conversion_specification_schema.json")
Expand Down
Loading
Loading