-
Notifications
You must be signed in to change notification settings - Fork 33
/
Copy pathlmdb_dataset.py
executable file
·177 lines (142 loc) · 5.62 KB
/
lmdb_dataset.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
"""
Copyright (c) Facebook, Inc. and its affiliates.
This source code is licensed under the MIT license found in the
LICENSE file in the root directory of this source tree.
1. Modify pyg2 data transformation:
Since S2EF dataset is generated with PyG2, there is no need to convert again
and the original transformation can result in nothing in Data.
"""
import bisect
import logging
import math
import pickle
import random
import warnings
from pathlib import Path
import lmdb
import numpy as np
import torch
from torch.utils.data import Dataset
from torch_geometric.data import Batch, Data
import torch_geometric
from ocpmodels.common import distutils
from ocpmodels.common.registry import registry
#from ocpmodels.common.utils import pyg2_data_transform
def pyg2_data_transform(data: Data):
# if we're on the new pyg (2.0 or later), we need to convert the data to the new format
if torch_geometric.__version__ >= "2.0":
if '_store' not in data.__dict__:
return Data(**{k: v for k, v in data.__dict__.items() if v is not None})
return data
@registry.register_dataset("lmdb_v2")
@registry.register_dataset("single_point_lmdb_v2")
@registry.register_dataset("trajectory_lmdb_v2")
class LmdbDatasetV2(Dataset):
r"""Dataset class to load from LMDB files containing relaxation
trajectories or single point computations.
Useful for Structure to Energy & Force (S2EF), Initial State to
Relaxed State (IS2RS), and Initial State to Relaxed Energy (IS2RE) tasks.
Args:
config (dict): Dataset configuration
transform (callable, optional): Data transform function.
(default: :obj:`None`)
"""
def __init__(self, config, transform=None):
super(LmdbDatasetV2, self).__init__()
self.config = config
self.path = Path(self.config["src"])
if not self.path.is_file():
db_paths = sorted(self.path.glob("*.lmdb"))
assert len(db_paths) > 0, f"No LMDBs found in '{self.path}'"
self.metadata_path = self.path / "metadata.npz"
self._keys, self.envs = [], []
for db_path in db_paths:
self.envs.append(self.connect_db(db_path))
length = pickle.loads(
self.envs[-1].begin().get("length".encode("ascii"))
)
self._keys.append(list(range(length)))
keylens = [len(k) for k in self._keys]
self._keylen_cumulative = np.cumsum(keylens).tolist()
self.num_samples = sum(keylens)
else:
self.metadata_path = self.path.parent / "metadata.npz"
self.env = self.connect_db(self.path)
self._keys = [
f"{j}".encode("ascii")
for j in range(self.env.stat()["entries"])
]
self.num_samples = len(self._keys)
self.transform = transform
def __len__(self):
return self.num_samples
def __getitem__(self, idx):
if not self.path.is_file():
# Figure out which db this should be indexed from.
db_idx = bisect.bisect(self._keylen_cumulative, idx)
# Extract index of element within that db.
el_idx = idx
if db_idx != 0:
el_idx = idx - self._keylen_cumulative[db_idx - 1]
assert el_idx >= 0
# Return features.
datapoint_pickled = (
self.envs[db_idx]
.begin()
.get(f"{self._keys[db_idx][el_idx]}".encode("ascii"))
)
data_object = pyg2_data_transform(pickle.loads(datapoint_pickled))
data_object.id = f"{db_idx}_{el_idx}"
else:
datapoint_pickled = self.env.begin().get(self._keys[idx])
data_object = pyg2_data_transform(pickle.loads(datapoint_pickled))
if self.transform is not None:
data_object = self.transform(data_object)
return data_object
def connect_db(self, lmdb_path=None):
env = lmdb.open(
str(lmdb_path),
subdir=False,
readonly=True,
lock=False,
readahead=False,
meminit=False,
max_readers=1,
)
return env
def close_db(self):
if not self.path.is_file():
for env in self.envs:
env.close()
else:
self.env.close()
class SinglePointLmdbDatasetV2(LmdbDatasetV2):
def __init__(self, config, transform=None):
super(SinglePointLmdbDatasetV2, self).__init__(config, transform)
warnings.warn(
"SinglePointLmdbDataset is deprecated and will be removed in the future."
"Please use 'LmdbDataset' instead.",
stacklevel=3,
)
class TrajectoryLmdbDatasetV2(LmdbDatasetV2):
def __init__(self, config, transform=None):
super(TrajectoryLmdbDatasetV2, self).__init__(config, transform)
warnings.warn(
"TrajectoryLmdbDataset is deprecated and will be removed in the future."
"Please use 'LmdbDataset' instead.",
stacklevel=3,
)
def data_list_collater(data_list, otf_graph=False):
batch = Batch.from_data_list(data_list)
if not otf_graph:
try:
n_neighbors = []
for i, data in enumerate(data_list):
n_index = data.edge_index[1, :]
n_neighbors.append(n_index.shape[0])
batch.neighbors = torch.tensor(n_neighbors)
except NotImplementedError:
logging.warning(
"LMDB does not contain edge index information, set otf_graph=True"
)
return batch