Skip to content

Commit

Permalink
2024-11-08 nightly release (1fcd5e1)
Browse files Browse the repository at this point in the history
  • Loading branch information
pytorchbot committed Nov 8, 2024
1 parent 4686780 commit 4c2507f
Show file tree
Hide file tree
Showing 21 changed files with 754 additions and 174 deletions.
2 changes: 1 addition & 1 deletion packaging/pre_build_script_linux.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
set -ex

source packaging/manylinux/python_helper.sh
yum -y install ninja-build zlib-static
yum -y install ninja-build zlib
# Docker path is /__w by default
export WORKSPACE="/__w"
# Install static OpenSSL/libcrypto library
Expand Down
69 changes: 50 additions & 19 deletions test/nodes/test_adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,40 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import testslide
from torch.utils.data import IterableDataset, RandomSampler
from torchdata.nodes.adapters import IterableWrapper, MapStyleWrapper, ToIterableDataset
from typing import Any, Dict, Iterator

from .utils import DummyIterableDataset, DummyMapDataset, MockSource
from parameterized import parameterized
from torch.testing._internal.common_utils import TestCase

from torch.utils.data import RandomSampler
from torchdata.nodes.adapters import IterableWrapper, MapStyleWrapper

class TestIterableWrapper(testslide.TestCase):
from torchdata.nodes.types import Stateful

from .utils import DummyIterableDataset, DummyMapDataset, run_test_save_load_state


class _StatefulRange(Stateful):
def __init__(self, n: int) -> None:
self.n = n
self._num_yielded = 0
self._next_start = 0

def __iter__(self) -> Iterator[int]:
self._num_yielded = self._next_start # Reset for next iter call
self._next_start = 0
for i in range(self._num_yielded, self.n):
self._num_yielded += 1
yield i

def state_dict(self) -> Dict[str, Any]:
return {"_num_yielded": self._num_yielded}

def load_state_dict(self, state_dict: Dict[str, Any]):
self._next_start = state_dict["_num_yielded"]


class TestIterableWrapper(TestCase):
def test_iterable(self):
n = 20
node = IterableWrapper(range(n))
Expand Down Expand Up @@ -44,11 +70,19 @@ def test_iterable_dataset(self):
self.assertEqual(row["test_tensor"].item(), i)
self.assertEqual(row["test_str"], f"str_{i}")

@parameterized.expand([0, 5])
def test_save_load_state_fast_forward(self, midpoint: int):
run_test_save_load_state(self, IterableWrapper(range(10)), midpoint)

@parameterized.expand([0, 5])
def test_save_load_state_stateful(self, midpoint: int):
run_test_save_load_state(self, IterableWrapper(_StatefulRange(10)), midpoint)

class TestMapStyle(testslide.TestCase):

class TestMapStyle(TestCase):
def test_default_sampler(self):
n = 20
node = MapStyleWrapper(DummyMapDataset(n))
node = MapStyleWrapper(DummyMapDataset(n), sampler=range(n))
for epoch in range(2):
result = list(node)
self.assertEqual(len(result), n)
Expand Down Expand Up @@ -89,17 +123,14 @@ def test_dict(self):
self.assertEqual(row["test_tensor"].item(), i)
self.assertEqual(row["test_str"], f"str_{i}")

@parameterized.expand([0, 7])
def test_save_load_state_fast_forward(self, midpoint: int):
n = 20
node = MapStyleWrapper(DummyMapDataset(n), sampler=range(n))
run_test_save_load_state(self, node, midpoint)

class TestToIterableDataset(testslide.TestCase):
def test_to_iterable_dataset(self):
@parameterized.expand([0, 7])
def test_save_load_state_stateful(self, midpoint: int):
n = 20
node = MockSource(n)
iterable_ds = ToIterableDataset(node)
self.assertIsInstance(iterable_ds, IterableDataset)
for epoch in range(2):
result = list(iterable_ds)
self.assertEqual(len(result), n)
for i, row in enumerate(result):
self.assertEqual(row["step"], i)
self.assertEqual(row["test_tensor"].item(), i)
self.assertEqual(row["test_str"], f"str_{i}")
node = MapStyleWrapper(DummyMapDataset(n), sampler=_StatefulRange(n))
run_test_save_load_state(self, node, midpoint)
31 changes: 31 additions & 0 deletions test/nodes/test_base_node.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from torch.testing._internal.common_utils import TestCase
from torchdata.nodes.adapters import IterableWrapper
from torchdata.nodes.base_node import BaseNodeIterator

from .utils import run_test_save_load_state


class TestBaseNode(TestCase):
def test_started_finished(self) -> None:
x = IterableWrapper(range(10))
for _ in range(3): # test multi-epoch
it = iter(x)
self.assertIsInstance(it, BaseNodeIterator)
self.assertFalse(it.started())
self.assertFalse(it.finished())

for _ in it:
self.assertTrue(it.started())
self.assertFalse(it.finished())

self.assertTrue(it.started())
self.assertTrue(it.finished())

def test_save_load_state(self):
run_test_save_load_state(self, IterableWrapper(range(10)), 5)
16 changes: 13 additions & 3 deletions test/nodes/test_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,17 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import testslide
import itertools

import torch
from parameterized import parameterized
from torch.testing._internal.common_utils import TestCase
from torchdata.nodes.batch import Batcher

from .utils import MockSource
from .utils import MockSource, run_test_save_load_state


class TestBatcher(testslide.TestCase):
class TestBatcher(TestCase):
def test_batcher(self) -> None:
batch_size = 6
src = MockSource(num_samples=20)
Expand All @@ -38,3 +41,10 @@ def test_batcher_drop_last_false(self) -> None:
self.assertEqual(results[i][j]["step"], i * batch_size + j)
self.assertEqual(results[i][j]["test_tensor"], torch.tensor([i * batch_size + j]))
self.assertEqual(results[i][j]["test_str"], f"str_{i * batch_size + j}")

@parameterized.expand(itertools.product([0, 2], [True, False]))
def test_save_load_state_fast_forward(self, midpoint: int, drop_last: bool):
batch_size = 6
src = MockSource(num_samples=20)
node = Batcher(src, batch_size=batch_size, drop_last=drop_last)
run_test_save_load_state(self, node, midpoint)
63 changes: 59 additions & 4 deletions test/nodes/test_map.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,23 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import itertools

import unittest
from typing import List

import testslide
from torch.testing._internal.common_utils import IS_WINDOWS, TEST_CUDA
from parameterized import parameterized
from torch.testing._internal.common_utils import IS_WINDOWS, TEST_CUDA, TestCase
from torchdata.nodes.batch import Batcher

from torchdata.nodes.map import Mapper, ParallelMapper
from torchdata.nodes.pin_memory import PinMemory
from torchdata.nodes.prefetch import Prefetcher

from .utils import MockSource, RandomSleepUdf, udf_raises
from .utils import MockSource, RandomSleepUdf, run_test_save_load_state, udf_raises


class TestMap(testslide.TestCase):
class TestMap(TestCase):
def _test_exception_handling_mapper(self, pin_memory, method):
batch_size = 6
multiprocessing_context = None if IS_WINDOWS else "forkserver"
Expand Down Expand Up @@ -104,3 +107,55 @@ def test_in_order_process(self):

def test_out_of_order_process(self):
self._test_map(False, "process")

@parameterized.expand(
itertools.product(
[0, 7, 13],
[True], # TODO: define and fix in_order = False
[0, 1, 9], # TODO: define and fix in_order = False
)
)
def test_save_load_state_thread(self, midpoint: int, in_order: bool, snapshot_frequency: int):
method = "thread"
batch_size = 6
n = 80
multiprocessing_context = None if IS_WINDOWS else "forkserver"
src = MockSource(num_samples=n)
node = Batcher(src, batch_size=batch_size, drop_last=False)
node = ParallelMapper(
node,
RandomSleepUdf(),
num_workers=4,
in_order=in_order,
method=method,
multiprocessing_context=multiprocessing_context,
snapshot_frequency=snapshot_frequency,
)
node = Prefetcher(node, prefetch_factor=2)
run_test_save_load_state(self, node, midpoint)

@parameterized.expand(
itertools.product(
[0, 7, 13],
[True], # TODO: define and fix in_order = False
[0, 1, 9], # TODO: define and fix in_order = False
)
)
def test_save_load_state_process(self, midpoint: int, in_order: bool, snapshot_frequency: int):
method = "process"
batch_size = 6
n = 80
multiprocessing_context = None if IS_WINDOWS else "forkserver"
src = MockSource(num_samples=n)
node = Batcher(src, batch_size=batch_size, drop_last=False)
node = ParallelMapper(
node,
RandomSleepUdf(),
num_workers=4,
in_order=in_order,
method=method,
multiprocessing_context=multiprocessing_context,
snapshot_frequency=snapshot_frequency,
)
node = Prefetcher(node, prefetch_factor=2)
run_test_save_load_state(self, node, midpoint)
22 changes: 18 additions & 4 deletions test/nodes/test_pin_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,23 +4,25 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import itertools
import unittest

import testslide
import torch

from torch.testing._internal.common_utils import TEST_CUDA
from parameterized import parameterized

from torch.testing._internal.common_utils import TEST_CUDA, TestCase

from torchdata.nodes.batch import Batcher
from torchdata.nodes.map import Mapper
from torchdata.nodes.pin_memory import PinMemory
from torchdata.nodes.prefetch import Prefetcher

from .utils import Collate, IterInitError, MockSource
from .utils import Collate, IterInitError, MockSource, run_test_save_load_state


@unittest.skipIf(not TEST_CUDA, "CUDA unavailable")
class TestPinMemory(testslide.TestCase):
class TestPinMemory(TestCase):
def test_pin_memory(self) -> None:
batch_size = 6
src = MockSource(num_samples=20)
Expand Down Expand Up @@ -62,3 +64,15 @@ def test_iter_init_error(self):

with self.assertRaisesRegex(ValueError, "Iter Init Error"):
list(root)

@parameterized.expand(itertools.product([0, 7, 33], [0, 1, 9]))
def test_save_load_state_stateful(self, midpoint: int, snapshot_frequency: int):
batch_size = 6
n = 200
node = MockSource(num_samples=n)
node = Batcher(node, batch_size=batch_size, drop_last=False)
node = Mapper(node, Collate())
node = PinMemory(node, snapshot_frequency=snapshot_frequency)
node = Prefetcher(node, prefetch_factor=8)

run_test_save_load_state(self, node, midpoint)
18 changes: 15 additions & 3 deletions test/nodes/test_prefetch.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

import testslide
import itertools

import torch
from parameterized import parameterized
from torch.testing._internal.common_utils import TestCase
from torchdata.nodes.batch import Batcher
from torchdata.nodes.prefetch import Prefetcher

from .utils import IterInitError, MockSource
from .utils import IterInitError, MockSource, run_test_save_load_state


class TestPrefetcher(testslide.TestCase):
class TestPrefetcher(TestCase):
def test_prefetcher(self) -> None:
batch_size = 6
src = MockSource(num_samples=20)
Expand All @@ -35,3 +38,12 @@ def test_iter_init_error(self):

with self.assertRaisesRegex(ValueError, "Iter Init Error"):
list(root)

@parameterized.expand(itertools.product([0, 7, 32], [0, 1, 9]))
def test_save_load_state_stateful(self, midpoint: int, snapshot_frequency: int):
batch_size = 6
n = 200
src = MockSource(num_samples=n)
node = Batcher(src, batch_size=batch_size, drop_last=False)
node = Prefetcher(node, prefetch_factor=8, snapshot_frequency=snapshot_frequency)
run_test_save_load_state(self, node, midpoint)
53 changes: 53 additions & 0 deletions test/nodes/test_snapshot_store.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Copyright (c) Meta Platforms, Inc. and affiliates.
# All rights reserved.
#
# This source code is licensed under the BSD-style license found in the
# LICENSE file in the root directory of this source tree.

from torch.testing._internal.common_utils import TestCase
from torchdata.nodes.adapters import IterableWrapper
from torchdata.nodes.base_node import BaseNodeIterator
from torchdata.nodes.snapshot_store import DequeSnapshotStore

from .utils import run_test_save_load_state


class TestDequeSnapshotStore(TestCase):
def test_snapshot_store(self) -> None:
store = DequeSnapshotStore()
store.append({"a": 1}, 0)
store.append({"a": 2}, 10)

self.assertEqual(len(store._deque), 2)

val = store.pop_version(0)
self.assertEqual(val, {"a": 1})
self.assertEqual(len(store._deque), 1)
val = store.pop_version(1)
self.assertIsNone(val)
self.assertEqual(len(store._deque), 1)
val = store.pop_version(7)
self.assertIsNone(val)
self.assertEqual(len(store._deque), 1)
val = store.pop_version(10)
self.assertEqual(val, {"a": 2})
self.assertEqual(len(store._deque), 0)

val = store.pop_version(11)
self.assertIsNone(val)
self.assertEqual(len(store._deque), 0)

with self.assertRaisesRegex(ValueError, "is not strictly greater than"):
store.append({"a": 3}, 3)

self.assertEqual(len(store._deque), 0)

with self.assertRaisesRegex(ValueError, "is not strictly greater than"):
store.append({"a": 4}, 10)
self.assertEqual(len(store._deque), 0)

store.append({"a": 4}, 11)
store.append({"a": 5}, 19)
val = store.pop_version(19)
self.assertEqual(val, {"a": 5})
self.assertEqual(len(store._deque), 0)
Loading

0 comments on commit 4c2507f

Please sign in to comment.