mirror of
https://github.com/facebookresearch/blt.git
synced 2025-04-10 19:59:09 +00:00
Let process start before yielding preloaded prefetch buffer, avoid needlessly losing buffer in edge cases
Summary: Test Plan:
This commit is contained in:
parent
ea1fc75862
commit
a828594625
1 changed files with 23 additions and 13 deletions
|
@ -190,7 +190,11 @@ class MultiprocessIterator(StatefulIterator):
|
||||||
logging.info(
|
logging.info(
|
||||||
"Main thread: Emptying the batch_queue until batch.is_final=True is found."
|
"Main thread: Emptying the batch_queue until batch.is_final=True is found."
|
||||||
)
|
)
|
||||||
self.prefetch_buffer = []
|
if self.prefetch_buffer is not None and len(self.prefetch_buffer) > 0:
|
||||||
|
buffer = self.prefetch_buffer
|
||||||
|
else:
|
||||||
|
buffer = []
|
||||||
|
self.prefetch_buffer = buffer
|
||||||
final_batch_received = False
|
final_batch_received = False
|
||||||
while True:
|
while True:
|
||||||
try:
|
try:
|
||||||
|
@ -261,12 +265,14 @@ class MultiprocessIterator(StatefulIterator):
|
||||||
"Attempted to get approximate state, but queue was erroniously empty."
|
"Attempted to get approximate state, but queue was erroniously empty."
|
||||||
)
|
)
|
||||||
self.received_approximate_state_event.set()
|
self.received_approximate_state_event.set()
|
||||||
|
if self.prefetch_buffer is not None and len(self.prefetch_buffer) > 0:
|
||||||
|
buffer = [b.to_python_dict() for b in self.prefetch_buffer]
|
||||||
|
else:
|
||||||
|
buffer = []
|
||||||
return MultiprocessIteratorState(
|
return MultiprocessIteratorState(
|
||||||
base_iterator_state=base_iterator_state,
|
base_iterator_state=base_iterator_state,
|
||||||
n_batches_to_prefetch=self.n_batches_to_prefetch,
|
n_batches_to_prefetch=self.n_batches_to_prefetch,
|
||||||
serialized_prefetch_buffer=json.dumps(
|
serialized_prefetch_buffer=json.dumps(buffer),
|
||||||
[b.to_python_dict() for b in self.prefetch_buffer]
|
|
||||||
),
|
|
||||||
persist_type=self.persist_type,
|
persist_type=self.persist_type,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -281,9 +287,12 @@ class MultiprocessIterator(StatefulIterator):
|
||||||
"State will be invalid if shutdown was forced before state persisted."
|
"State will be invalid if shutdown was forced before state persisted."
|
||||||
)
|
)
|
||||||
if self.producer is None:
|
if self.producer is None:
|
||||||
serialized_prefetch_buffer = json.dumps(
|
if self.prefetch_buffer is not None and len(self.prefetch_buffer) > 0:
|
||||||
[b.to_python_dict() for b in self.prefetch_buffer]
|
serialized_prefetch_buffer = json.dumps(
|
||||||
)
|
[b.to_python_dict() for b in self.prefetch_buffer]
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
serialized_prefetch_buffer = json.dumps([])
|
||||||
return MultiprocessIteratorState(
|
return MultiprocessIteratorState(
|
||||||
base_iterator_state=self.base_iterator.get_state(),
|
base_iterator_state=self.base_iterator.get_state(),
|
||||||
n_batches_to_prefetch=self.n_batches_to_prefetch,
|
n_batches_to_prefetch=self.n_batches_to_prefetch,
|
||||||
|
@ -304,12 +313,6 @@ class MultiprocessIterator(StatefulIterator):
|
||||||
"Iterator may be invalid if shutdown was forced before state persisted."
|
"Iterator may be invalid if shutdown was forced before state persisted."
|
||||||
)
|
)
|
||||||
logging.info("Main thread: Creating MP iterator")
|
logging.info("Main thread: Creating MP iterator")
|
||||||
# First yield from the stored prefetch buffer.
|
|
||||||
if self.prefetch_buffer is not None:
|
|
||||||
while len(self.prefetch_buffer) > 0:
|
|
||||||
item = self.prefetch_buffer.pop(0)
|
|
||||||
yield item
|
|
||||||
self.prefetch_buffer = None
|
|
||||||
|
|
||||||
assert (
|
assert (
|
||||||
self.producer is None
|
self.producer is None
|
||||||
|
@ -349,6 +352,13 @@ class MultiprocessIterator(StatefulIterator):
|
||||||
logger.info("Async dataloader started")
|
logger.info("Async dataloader started")
|
||||||
self.producer.start()
|
self.producer.start()
|
||||||
|
|
||||||
|
# First yield from the stored prefetch buffer.
|
||||||
|
if self.prefetch_buffer is not None:
|
||||||
|
while len(self.prefetch_buffer) > 0:
|
||||||
|
item = self.prefetch_buffer.pop(0)
|
||||||
|
yield item
|
||||||
|
self.prefetch_buffer = None
|
||||||
|
|
||||||
while True:
|
while True:
|
||||||
if self.producer.exitcode is not None:
|
if self.producer.exitcode is not None:
|
||||||
raise RuntimeError(
|
raise RuntimeError(
|
||||||
|
|
Loading…
Add table
Reference in a new issue