blt/bytelatent/data/iterators/abstract_iterator.py
Pedro Rodriguez 8c61ab5e67
Some checks are pending
Lint with Black / lint (push) Waiting to run
Lint with isort / lint (push) Waiting to run
Fix multiprocessing dataloader checkpointing and use it in the train script (#50)
2025-02-13 11:58:23 -08:00

34 lines
856 B
Python

# Copyright (c) Meta Platforms, Inc. and affiliates.
import abc
from typing import Any, Generator, Generic, TypeVar
T = TypeVar("T")
C = TypeVar("C")
class StatefulIterator(Generic[T, C], abc.ABC):
@abc.abstractmethod
def get_state(self) -> C:
pass
@abc.abstractmethod
def create_iter(self) -> Generator[T, Any, None]:
pass
class IteratorState(Generic[C]):
@abc.abstractmethod
def build(self) -> StatefulIterator[T, C]:
pass
def get_state_and_refresh(iterator: StatefulIterator):
# Re-init dataloader and iterator is necessary since get_state()
# on mp iterator shuts down MP to correctly persist state and it needs
# to be restarted.
state = iterator.get_state()
data_loader = state.build()
py_iterator = data_loader.create_iter()
return state, data_loader, py_iterator