From cf14d1c6dbc01fc0bf91dd9b9e9a2001c054d66e Mon Sep 17 00:00:00 2001 From: Hemil Desai Date: Fri, 6 Dec 2024 20:01:04 -0800 Subject: [PATCH] feat: Add support for loading NeMo 2.0 checkpoints (#412) Signed-off-by: Terry Kong --- .github/workflows/_run_test.yml | 1 + .github/workflows/cicd-main.yml | 3 +- Dockerfile | 8 +- nemo_aligner/utils/utils.py | 145 +++++++++++++++++++++++-- tests/functional/dpo.sh | 9 +- tests/functional/test_cases/dpo-llama3 | 1 - tests/functional/test_cases/e2e-nemo2 | 73 +++++++++++++ 7 files changed, 225 insertions(+), 15 deletions(-) create mode 100755 tests/functional/test_cases/e2e-nemo2 diff --git a/.github/workflows/_run_test.yml b/.github/workflows/_run_test.yml index aa8f88752..ce70894b9 100644 --- a/.github/workflows/_run_test.yml +++ b/.github/workflows/_run_test.yml @@ -57,6 +57,7 @@ jobs: --env HF_HOME=/home/TestData/aligner/hf_home \ --env ALIGNER_CI_DIR=/home/TestData/aligner \ --env ALIGNER_REPO_DIR=/opt/NeMo-Aligner \ + --volume /mnt/datadrive/TestData/aligner/nlp-copy:/home/TestData/aligner/nlp-copy \ --volume /mnt/datadrive/TestData/aligner/checkpoints:/home/TestData/aligner/checkpoints:ro \ --volume /mnt/datadrive/TestData/aligner/hf_home/hub:/home/TestData/aligner/hf_home/hub:ro \ nemoci.azurecr.io/nemo_aligner_container:${{ github.run_id }} \ diff --git a/.github/workflows/cicd-main.yml b/.github/workflows/cicd-main.yml index cf3c2a5a8..25084cadc 100644 --- a/.github/workflows/cicd-main.yml +++ b/.github/workflows/cicd-main.yml @@ -97,9 +97,10 @@ jobs: - sft-llama3 - sft-llama3-cp - rm-llama3 + - e2e-nemo2 with: RUNNER: self-hosted-azure # Fairly aggresive timeout that all functional tests should try to adhere to - TIMEOUT: 8 + TIMEOUT: 10 SCRIPT: | bash /opt/NeMo-Aligner/tests/functional/test_cases/${{ matrix.test_case }} diff --git a/Dockerfile b/Dockerfile index 2ad368dda..30065169c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -103,6 +103,10 @@ RUN git clone https://github.com/NVIDIA/NeMo.git && \ pip install -e ".[nlp]" && \ cd nemo/collections/nlp/data/language_modeling/megatron && make +# TODO: Allow installing from the default branch, but introduce a build +# arg if compatibility starts breaking +RUN pip install --no-cache-dir git+https://github.com/NVIDIA/NeMo-Run.git + # MLM ARG MLM_TAG RUN pip uninstall -y megatron-core && \ @@ -115,7 +119,9 @@ RUN pip uninstall -y megatron-core && \ fi && \ pip install -e . -RUN pip install --no-cache-dir lightning # can remove this when NEMO_TAG is bumped to include lightning install +# TODO: This is redundant since NeMo installs this as of 24.12, but keep +# it until 25.03 to give folks enough time to transition. +RUN pip install --no-cache-dir lightning COPY --from=aligner-bump /opt/NeMo-Aligner /opt/NeMo-Aligner RUN cd /opt/NeMo-Aligner && \ diff --git a/nemo_aligner/utils/utils.py b/nemo_aligner/utils/utils.py index 8eb73d295..ad89ca16e 100644 --- a/nemo_aligner/utils/utils.py +++ b/nemo_aligner/utils/utils.py @@ -24,7 +24,7 @@ from copy import deepcopy from dataclasses import replace from functools import partial, wraps -from typing import Iterator, List +from typing import Any, Iterator, List, Optional from unittest.mock import patch import torch @@ -46,16 +46,19 @@ class CustomSaveRestoreConnector(NLPSaveRestoreConnector): the rm head if load_base_model_only is True """ - def __init__(self, *args, load_base_model_only=False, **kwargs): + def __init__(self, *args, load_base_model_only=False, replace_sharded_tensor_key: Optional[str] = None, **kwargs): super().__init__(*args, **kwargs) self.__load_base_model_only = load_base_model_only + self.__replace_sharded_tensor_key = replace_sharded_tensor_key def restore_from(self, *args, **kwargs): if not self.__load_base_model_only: - return super().restore_from(*args, **kwargs) + return super().restore_from(*args, replace_sharded_tensor_key=self.__replace_sharded_tensor_key, **kwargs) with patch.object(GPTRewardModel, "return_rm_head_in_state_dict", False): - output = super().restore_from(*args, **kwargs) + output = super().restore_from( + *args, replace_sharded_tensor_key=self.__replace_sharded_tensor_key, **kwargs + ) return output @@ -87,9 +90,21 @@ def load_from_nemo( ): """load a model using nemo checkpoint """ - connector = CustomSaveRestoreConnector(load_base_model_only=load_base_model_only) assert os.path.exists(restore_path), f"tried to load from {restore_path=} but it does not exist" + is_2_0_ckpt = load_2_0_checkpoint_model_config(restore_path) is not None + if is_2_0_ckpt: + replace_sharded_tensor_key = "module" + else: + replace_sharded_tensor_key = None + + connector = CustomSaveRestoreConnector( + load_base_model_only=load_base_model_only, replace_sharded_tensor_key=replace_sharded_tensor_key + ) + + if is_2_0_ckpt: + connector.model_weights_ckpt = "weights" + # if we gave it a directory, then load as if it was extracted already if os.path.isdir(restore_path): connector.model_extracted_dir = restore_path @@ -107,6 +122,10 @@ def load_from_nemo( save_restore_connector=connector, strict=strict, ) + + if is_2_0_ckpt: + connector.model_weights_ckpt = "model_weights.ckpt" + return (model, model_cfg) if return_updated_cfg else model @@ -131,11 +150,121 @@ def load_checkpoint_model_config(restore_path): return cfg +def load_2_0_checkpoint_model_config(restore_path: str): + try: + from nemo.lightning import io + from nemo.lightning.ckpt_utils import ckpt_to_context_subdir + from nemo.lightning.io.pl import ckpt_to_weights_subdir + + if ( + os.path.isdir(ckpt_to_context_subdir(restore_path)) + and os.path.isdir(ckpt_to_weights_subdir(restore_path, is_saving=False)) + and os.path.isfile(os.path.join(ckpt_to_context_subdir(restore_path), "io.json")) + ): + config = io.load_context(restore_path, subpath="model.config") + tokenizer_cfg = OmegaConf.load(os.path.join(ckpt_to_context_subdir(restore_path), "model.yaml")).tokenizer + + def get_tokenizer_args(tokenizer_cfg): + if "AutoTokenizer" in tokenizer_cfg._target_: + tokenizer_type = "huggingface" + tokenizer_name = ( + tokenizer_cfg.pretrained_model_name + if isinstance(tokenizer_cfg.pretrained_model_name, str) + else tokenizer_cfg.pretrained_model_name.attr + ) + if os.path.isfile( + os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name) + ) or os.path.isdir(os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)): + tokenizer_name = os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name) + + args = { + "library": tokenizer_type, + "type": tokenizer_name, + "use_fast": True, + } + if tokenizer_cfg.get("vocab_file", None): + args["vocab_file"] = os.path.join( + ckpt_to_context_subdir(restore_path), tokenizer_cfg.vocab_file + ) + if tokenizer_cfg.get("merges_file", None): + args["merges_file"] = os.path.join( + ckpt_to_context_subdir(restore_path), tokenizer_cfg.merges_file + ) + + return args + elif "SentencePieceTokenizer" in tokenizer_cfg._target_: + tokenizer_type = "sentencepiece" + tokenizer_name = tokenizer_cfg.model_path + if os.path.isfile( + os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name) + ) or os.path.isdir(os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name)): + tokenizer_name = os.path.join(ckpt_to_context_subdir(restore_path), tokenizer_name) + elif not os.path.isfile(tokenizer_name): + raise FileNotFoundError(f"Tokenizer file {tokenizer_name} not found") + + return {"library": tokenizer_type, "type": None, "model": tokenizer_name} + else: + raise ValueError(f"Unknown tokenizer type: {tokenizer_cfg}") + + tokenizer_args = get_tokenizer_args(tokenizer_cfg) + + config_dict = {} + for k, v in config.__dict__.items(): + if isinstance(v, (float, int, str, bool)): + config_dict[k] = v + elif k == "activation_func": + config_dict["activation"] = v.__name__ + + if config_dict["activation"] == "silu": + config_dict["activation"] = "fast-swiglu" + + config_dict["encoder_seq_length"] = config_dict["seq_length"] + + config_dict["mcore_gpt"] = True + config_dict["max_position_embeddings"] = config_dict.get("seq_length") + config_dict["tokenizer"] = tokenizer_args + config_dict["bias"] = config_dict.get("add_bias_linear", True) + config_dict["qkv_bias"] = config_dict.get("add_qkv_bias", False) + + try: + strategy: dict[str, Any] = io.load_context(restore_path, subpath="trainer.strategy").__dict__ + config_dict["gradient_as_bucket_view"] = strategy.get("gradient_as_bucket_view", True) + # TODO: Add any other parameters required from strategy here + except Exception: + # Default to True based on default values in https://github.com/NVIDIA/NeMo/tree/main/nemo/collections/llm/recipes + config_dict["gradient_as_bucket_view"] = True + + try: + precision_plugin: dict[str, Any] = io.load_context(restore_path, subpath="trainer.plugins").__dict__ + config_dict["fp16"] = precision_plugin.get("fp16", False) + config_dict["bf16"] = precision_plugin.get("bf16", True) + # TODO: Add any other parameters required from precision plugin here + except Exception: + # Default to True based on default values in https://github.com/NVIDIA/NeMo/tree/main/nemo/collections/llm/recipes + config_dict["fp16"] = False + config_dict["bf16"] = True + + if not os.path.isfile(os.path.join(restore_path, "model_config.yaml")): + OmegaConf.save(config=OmegaConf.create(config_dict), f=os.path.join(restore_path, "model_config.yaml")) + + return config_dict + except Exception: + # If there's a failure loading the path as a NeMo 2.0 checkpoint, + # return None and continue loading NeMo 1.0 checkpoint. + return None + + return None + + def load_and_override_model_config(restore_path, model_cfg_to_overwrite, remove_meta_info=True): """load the config in the model checkpoint and then overwrite it with whatever is provided """ - checkpoint_cfg = load_checkpoint_model_config(restore_path) + checkpoint_cfg_2_0 = load_2_0_checkpoint_model_config(restore_path) + if checkpoint_cfg_2_0 is not None: + checkpoint_cfg = checkpoint_cfg_2_0 + else: + checkpoint_cfg = load_checkpoint_model_config(restore_path) if remove_meta_info: checkpoint_cfg.pop("target", None) @@ -473,11 +602,11 @@ def convert_to_amp_o2_format(state_dict): def get_iterator_k_split_list(batch: List[str], num_microbatches: int) -> Iterator: """ Generate an iterator to split a list into microbatches of equal size. - + Args: batch (List[str]): The list to be split into microbatches. num_microbatches (int): The number of microbatches to split the list into. - + Returns: Iterator: An iterator that yields the microbatches. """ diff --git a/tests/functional/dpo.sh b/tests/functional/dpo.sh index 6dc939c62..1370801d3 100755 --- a/tests/functional/dpo.sh +++ b/tests/functional/dpo.sh @@ -1,6 +1,7 @@ #!/bin/bash -DATA_DIR=${DATA_DIR} +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd $SCRIPT_DIR set -eoux pipefail export NCCL_ALGO=Tree @@ -11,7 +12,7 @@ GBS=${GBS:-4} PRETRAINED_CHECKPOINT_NEMO_FILE=${PRETRAINED_CHECKPOINT_NEMO_FILE} -TRAIN_DATA_PATH=${TRAIN_DATA_PATH:-"${DATA_DIR}/dummy-dpo.jsonl"} +TRAIN_DATA_PATH=${TRAIN_DATA_PATH:-./test_data/dummy-dpo.jsonl} VALID_DATA_PATH=$TRAIN_DATA_PATH NAME=${NAME:-"dpo_test"} @@ -49,14 +50,14 @@ torchrun --nproc-per-node 2 ${GPFS}/examples/nlp/gpt/train_gpt_dpo.py \ ++model.dpo.preference_loss_weight=1.0 \ pretrained_checkpoint.restore_from_path=${PRETRAINED_CHECKPOINT_NEMO_FILE} \ "model.data.data_prefix={train: [${TRAIN_DATA_PATH}], validation: [${VALID_DATA_PATH}], test: [${VALID_DATA_PATH}]}" \ - exp_manager.create_checkpoint_callback=False \ + exp_manager.create_checkpoint_callback=${CREATE_CHECKPOINT_CALLBACK:-False} \ model.data.num_workers=2 \ ++model.tensor_model_parallel_size=1 \ ++model.pipeline_model_parallel_size=1 \ trainer.dpo.max_steps=${MAX_STEPS:-3} \ trainer.dpo.val_check_interval=${MAX_STEPS:-3} \ trainer.dpo.limit_val_batches=8 \ - trainer.dpo.save_interval=0 \ + trainer.dpo.save_interval=${SAVE_INTERVAL:-0} \ exp_manager.explicit_log_dir=${RESULTS_DIR} \ ++model.activations_checkpoint_granularity=full \ ++model.activations_checkpoint_method=uniform \ diff --git a/tests/functional/test_cases/dpo-llama3 b/tests/functional/test_cases/dpo-llama3 index 1ba79a9af..763c1c0c6 100755 --- a/tests/functional/test_cases/dpo-llama3 +++ b/tests/functional/test_cases/dpo-llama3 @@ -14,7 +14,6 @@ # limitations under the License. SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) -export DATA_DIR=$SCRIPT_DIR/../test_data cd $SCRIPT_DIR set -eoux pipefail diff --git a/tests/functional/test_cases/e2e-nemo2 b/tests/functional/test_cases/e2e-nemo2 new file mode 100755 index 000000000..c2e15866a --- /dev/null +++ b/tests/functional/test_cases/e2e-nemo2 @@ -0,0 +1,73 @@ +#!/bin/bash +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) +cd $SCRIPT_DIR + +set -eoux pipefail + +# This is an e2e test testing the following workflow: +# +# 1. train a dummy llama3 nemo 2.0 checkpoint +# 2. perform dpo on it and save nemo 1.0 checkpoint +# 3. convert nemo 1.0 checkpoint to 2.0 +# 4. perform eval on this newly convert 2.0 checkpoint + +PRETRAINING_PATH=/tmp/$(basename $0)-ckpt-dir +PRETRAINING_CHECKPOINT_PATH=/tmp/$(basename $0)-ckpt-path +CONVERTED_PATH=/tmp/$(basename $0)-convert-dir +rm -rf $PRETRAINING_PATH $PRETRAINING_CHECKPOINT_PATH $CONVERTED_PATH + +if ! pip show nemo-run &>/dev/null; then + echo "[ERROR] nemo-run is needed for this test. Please visit installation instructions: https://github.com/NVIDIA/NeMo-Run?tab=readme-ov-file#install-nemo-run" + exit 1 +fi + + +#################### +# Step 1: Pretrain # +#################### +python /opt/NeMo/tests/collections/llm/megatron_gpt_pretraining.py \ + --devices=1 \ + --max-steps=3 \ + --experiment-dir=${PRETRAINING_PATH} \ + --vocab-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/vocab.json \ + --merges-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/merges.txt \ + --data-path=${ALIGNER_CI_DIR}/nlp-copy/megatron_gpt/data/gpt/simple_wiki_gpt_preproc_text_document \ + --index-mapping-dir=tests/collections/llm/gpt_index_mappings \ + --no-masked-softmax-fusion + + +################# +# Step 2: Align # +################# +NEMO2_CHECKPOINT=$(find $PRETRAINING_PATH/default/checkpoints -maxdepth 1 -type d -name '*-last' | head -n 1) +mv $NEMO2_CHECKPOINT $PRETRAINING_CHECKPOINT_PATH +export PRETRAINED_CHECKPOINT_NEMO_FILE=$PRETRAINING_CHECKPOINT_PATH +CREATE_CHECKPOINT_CALLBACK=True SAVE_INTERVAL=3 \ + bash ../dpo.sh + +################### +# Step 3: Convert # +################### +export DPO_CHECKPOINT="/tmp/dpo_test/checkpoints/megatron_gpt.nemo" +python /opt/NeMo/scripts/checkpoint_converters/convert_nemo1_to_nemo2.py --input_path $DPO_CHECKPOINT --output_path $CONVERTED_PATH --model_id ${PRETRAINED_CHECKPOINT_NEMO_FILE} + +################ +# Step 4: Eval # +################ +python /opt/NeMo/scripts/llm/generate.py --model_path $CONVERTED_PATH + +echo "[Finished] $0"