From 4cee32ea8ceab6ca003e8bb79a3a0bde87b4666d Mon Sep 17 00:00:00 2001 From: Pedro Rodriguez Date: Wed, 12 Feb 2025 18:07:21 +0000 Subject: [PATCH] Test first batch matches Summary: Test Plan: --- apps/main/lingua_train.py | 3 +- .../data/iterators/test_arrow_iterator.py | 3 ++ bytelatent/data/test_data.py | 48 +++++++++++++++++++ bytelatent/metrics.py | 6 ++- bytelatent/profiling.py | 3 +- bytelatent/test_entropy_model.py | 1 + bytelatent/train.py | 3 +- 7 files changed, 62 insertions(+), 5 deletions(-) create mode 100644 bytelatent/data/test_data.py diff --git a/apps/main/lingua_train.py b/apps/main/lingua_train.py index 7925ec6..323b101 100644 --- a/apps/main/lingua_train.py +++ b/apps/main/lingua_train.py @@ -14,7 +14,6 @@ from typing import Any, Dict, Optional import torch import torch.distributed -import wandb import xformers.profiler from lingua.args import dataclass_from_dict, dump_config, flatten_dict from lingua.data import ( @@ -70,6 +69,8 @@ from bytelatent.transformer import ( tp_parallelize, ) +import wandb + logger = logging.getLogger() diff --git a/bytelatent/data/iterators/test_arrow_iterator.py b/bytelatent/data/iterators/test_arrow_iterator.py index fd448eb..064217e 100644 --- a/bytelatent/data/iterators/test_arrow_iterator.py +++ b/bytelatent/data/iterators/test_arrow_iterator.py @@ -28,6 +28,7 @@ def test_basic_arrow_file(): row_num=0, arrow_batch_size=100, s3_profile=None, + file_format="arrow", ) arrow_file = initial_state.build() start_state = arrow_file.get_state() @@ -57,6 +58,7 @@ def test_basic_arrow_file(): row_num=251, arrow_batch_size=100, s3_profile=None, + file_format="arrow", ) arrow_file = resumed_state.build() for example in arrow_file.create_iter(): @@ -77,6 +79,7 @@ def test_basic_arrow_file(): row_num=0, arrow_batch_size=100, s3_profile=None, + file_format="arrow", ) arrow_file = rank_state.build() expected_ids = [] diff --git a/bytelatent/data/test_data.py b/bytelatent/data/test_data.py new file mode 100644 index 0000000..efb8bcf --- /dev/null +++ b/bytelatent/data/test_data.py @@ -0,0 +1,48 @@ +import os +import pickle + +import pytest +from omegaconf import OmegaConf + +from bytelatent.args import TrainArgs +from bytelatent.constants import BLT_DATA + + +def get_test_config(): + if "BLT_INTERNAL" in os.environ: + internal_dir = os.environ["BLT_INTERNAL"] + else: + internal_dir = "../internal-blt/configs" + test_config = os.path.join(internal_dir, "tests.yaml") + return test_config + + +@pytest.mark.skipif( + not os.path.exists(get_test_config()), + reason="Skipping since internal config is missing", +) +def test_first_batch_matches(): + test_config_path = get_test_config() + default_cfg = OmegaConf.create(TrainArgs().model_dump()) + file_cfg = OmegaConf.load(test_config_path) + merged_cfg = OmegaConf.merge(default_cfg, file_cfg) + merged_cfg = OmegaConf.to_container(merged_cfg, resolve=True, throw_on_missing=True) + train_args = TrainArgs.model_validate(merged_cfg) + # MP doesn't work with async very well, but it doesn't change logic + train_args.data.load_async = False + + # Test data created by pickling first batch in train loop then exiting + with open(os.path.join(BLT_DATA, "fixtures", "first_batch_0.pickle"), "rb") as f: + first_batch = pickle.load(f) + + # Emulate 1 node, 8 gpu training + data_loader = train_args.data.build_from_rank(0, 8) + batch_iterator = data_loader.create_iter() + print("Getting first batch") + batch = next(batch_iterator) + assert (batch.x == first_batch.x).all() + assert (batch.y == first_batch.y).all() + assert (batch.mask == first_batch.mask).all() + assert (batch.patch_lengths == first_batch.patch_lengths).all() + assert batch.ngram_ids is None and first_batch.ngram_ids is None + assert batch.is_final == False and batch.is_final == False diff --git a/bytelatent/metrics.py b/bytelatent/metrics.py index ed805e5..86c6910 100644 --- a/bytelatent/metrics.py +++ b/bytelatent/metrics.py @@ -11,11 +11,12 @@ from typing import Any, Union import fsspec import torch import torch.nn as nn -import wandb from pydantic import BaseModel, ConfigDict from bytelatent.distributed import get_is_master +import wandb + logger = logging.getLogger() @@ -198,9 +199,10 @@ def upload_train_to_wandb( import json from pathlib import Path - import wandb from omegaconf import OmegaConf + import wandb + cfg = OmegaConf.load(Path(ckpt_dir) / "config.yaml") cfg = OmegaConf.to_container(cfg) diff --git a/bytelatent/profiling.py b/bytelatent/profiling.py index da3c90d..66bcbd6 100644 --- a/bytelatent/profiling.py +++ b/bytelatent/profiling.py @@ -7,7 +7,6 @@ import os from pathlib import Path import torch.distributed -import wandb import xformers.profiler from pydantic import BaseModel from torch.profiler.profiler import profile @@ -15,6 +14,8 @@ from xformers.profiler import MemSnapshotsProfiler, PyTorchProfiler from bytelatent.distributed import get_is_master +import wandb + class ProfilerArgs(BaseModel): run: bool = False diff --git a/bytelatent/test_entropy_model.py b/bytelatent/test_entropy_model.py index 9db7ff6..8623eb1 100644 --- a/bytelatent/test_entropy_model.py +++ b/bytelatent/test_entropy_model.py @@ -25,6 +25,7 @@ def test_entropy_model(): row_num=0, arrow_batch_size=100, s3_profile=None, + file_format="arrow", ) arrow_file = initial_state.build() tokenizer_args = TokenizerArgs( diff --git a/bytelatent/train.py b/bytelatent/train.py index ed84233..a5c0a83 100644 --- a/bytelatent/train.py +++ b/bytelatent/train.py @@ -17,7 +17,6 @@ import torch import torch.distributed import torch.nn.functional import torch.nn.functional as F -import wandb import xformers.profiler from torch.distributed._tensor import DTensor from torch.distributed.checkpoint.stateful import Stateful @@ -63,6 +62,8 @@ from bytelatent.transformer import ( tp_parallelize, ) +import wandb + logger = logging.getLogger() T = TypeVar("T")