Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/master' into niu/DAOS-17003
Browse files Browse the repository at this point in the history
Skip-func-hw-test-medium-md-on-ssd: false
Skip-func-hw-test-large-md-on-ssd: false

Signed-off-by: Niu Yawei <[email protected]>
  • Loading branch information
NiuYawei committed Feb 5, 2025
2 parents 8df2186 + 8fe77b9 commit f3d754b
Show file tree
Hide file tree
Showing 37 changed files with 1,417 additions and 269 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/ossf-scorecard.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard (optional).
# Commenting out will disable upload of results to your repo's Code Scanning dashboard
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@b6a472f63d85b9c78a3ac5e89422239fc15e9b3c # v3.28.1
uses: github/codeql-action/upload-sarif@f6091c0113d1dcf9b98e269ee48e8a7e51b7bdd4 # v3.28.5
with:
sarif_file: results.sarif
2 changes: 1 addition & 1 deletion .github/workflows/trivy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ jobs:
trivy-config: 'utils/trivy/trivy.yaml'

- name: Upload Trivy scan results to GitHub Security tab
uses: github/codeql-action/upload-sarif@b6a472f63d85b9c78a3ac5e89422239fc15e9b3c # v3.28.1
uses: github/codeql-action/upload-sarif@f6091c0113d1dcf9b98e269ee48e8a7e51b7bdd4 # v3.28.5
with:
sarif_file: 'trivy-results.sarif'

Expand Down
77 changes: 77 additions & 0 deletions docs/user/pytorch.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
# DAOS pytorch interface

PyTorch is fully featured framework for building deep learning models and training them.
It is widely used in the research community and in the industry.
PyTorch allows loading data from various sources and DAOS can be used as a storage backend for training data and models' checkpoints.

[DFS plugin](https://github.com/daos-stack/daos/tree/master/src/client/pydaos/torch) implements PyTorch interfaces for loading data from DAOS: Map and Iterable style datasets.
This allows to use all features of `torch.utils.data.DataLoader` to load data from DAOS POSIX containers, including parallel data loading, batching, shuffling, etc.

## Installation

To install the plugin, you need to have PyTorch installed. Please follow the official [PyTorch installation guide](https://pytorch.org/get-started/).
`pydoas.torch` module comes with DAOS client package. Please refer to DAOS installation guide for your distribution.


## Usage

To use DAOS as a storage backend for PyTorch, you need to have DAOS agent running on the nodes where PyTorch is running and correctly configured ACLs for the container.

Here's an example of how to use Map-style dataset with DAOS directly:

```python
import torch
from torch.utils.data import DataLoader
from pydaos.torch import Dataset

dataset = Dataset(pool='pool', container='container', path='/training/samples')
# That's it, when the Dataset is created, it will connect to DAOS, scan the namaspace of the container
# and will be ready to load data from it.

for i, sample in enumerate(dataset):
print(f"Sample {i} size: {len(sample)}")
```

To use Dataset with DataLoader, you can pass it directly to DataLoader constructor:

```python

dataloader = DataLoader(dataset,
batch_size=4,
shuffle=True,
num_workers=4,
worker_init_fn=dataset.worker_init)

# and use DataLoader as usual
for batch in dataloader:
print(f"Batch size: {len(batch)}")
```

The only notable difference is that you need to set `worker_init_fn` method of the dataset to correctly initialize the DAOS connection in the worker processes.

## Checkpoints

DAOS can be used to store model checkpoints as well.
PyTorch provides a way to save and load model checkpoints using [torch.save](https://pytorch.org/docs/main/generated/torch.save.html) and [torch.load](https://pytorch.org/docs/main/generated/torch.load.html) functions

`pydaos.torch` provides a way to save and load model checkpoints directly to/from DAOS container (could be same or different container than the one used for data).:

```python
import torch
from pydaos.torch import Checkpoint

# ...

chkp = Checkpoint(pool, cont, prefix='/training/checkpoints')

with chkp.writer('model.pt') as w:
torch.save(model.state_dict(), w)

# Later, to load the model

with chkp.reader('model.pt') as r:
torch.load(r)

```

See [pydaos.torch](https://github.com/daos-stack/daos/blob/master/src/client/pydaos/torch/Readme.md) plugin for an example of how to use checkpoints with DLIO benchmark
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ nav:
- 'MPI-IO Support': 'user/mpi-io.md'
- 'HDF5 Support': 'user/hdf5.md'
- 'Python Support': 'user/python.md'
- 'Pytorch Support': 'user/pytorch.md'
- 'Spark and Hadoop': 'user/spark.md'
- 'Data Mover': 'user/datamover.md'
- 'Tensorflow-IO Support': 'user/tensorflow.md'
Expand Down
1 change: 1 addition & 0 deletions requirements-ftest.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@ avocado-framework-plugin-varianter-yaml-to-mux==82
clustershell
paramiko
distro
torch
2 changes: 2 additions & 0 deletions site_scons/components/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2016-2024 Intel Corporation
# Copyright 2025 Google LLC
# Copyright 2025 Hewlett Packard Enterprise Development LP
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
Expand Down Expand Up @@ -315,6 +316,7 @@ def define_components(reqs):
'--default-library', 'static', '../fused'],
['meson', 'setup', '--reconfigure', '../fused'],
['ninja', 'install']],
pkgconfig='fused',
headers=['fused/fuse.h'],
required_progs=['libtoolize', 'ninja', 'meson'],
out_of_src_build=True)
Expand Down
11 changes: 6 additions & 5 deletions site_scons/prereq_tools/base.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
# Copyright 2016-2024 Intel Corporation
# Copyright 2025 Google LLC
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
Expand Down Expand Up @@ -902,7 +903,7 @@ def get_prebuilt_path(self, comp, name):
if not os.path.exists(ipath):
ipath = None
lpath = None
for lib in ['lib64', 'lib']:
for lib in comp.lib_path:
lpath = os.path.join(path, lib)
if os.path.exists(lpath):
break
Expand Down Expand Up @@ -1156,7 +1157,7 @@ def _parse_config(self, env, opts, comp_path=None):
if (not self.use_installed and real_comp_path is not None
and not real_comp_path == "/usr"):
path_found = False
for path in ["lib", "lib64"]:
for path in self.lib_path:
config = os.path.join(real_comp_path, path, "pkgconfig")
if not os.path.exists(config):
continue
Expand Down Expand Up @@ -1393,7 +1394,7 @@ def _patch_rpaths(self):
if not os.path.exists(comp_path):
return

for libdir in ['lib64', 'lib']:
for libdir in self.lib_path:
path = os.path.join(comp_path, libdir)
if os.path.exists(path):
norigin.append(os.path.normpath(path))
Expand All @@ -1405,14 +1406,14 @@ def _patch_rpaths(self):
comp = self.prereqs.get_component(prereq)
subpath = comp.component_prefix
if subpath and not subpath.startswith("/usr"):
for libdir in ['lib64', 'lib']:
for libdir in self.lib_path:
lpath = os.path.join(subpath, libdir)
if not os.path.exists(lpath):
continue
rpath.append(lpath)
continue

for libdir in ['lib64', 'lib']:
for libdir in self.lib_path:
path = os.path.join(rootpath, libdir)
if not os.path.exists(path):
continue
Expand Down
2 changes: 2 additions & 0 deletions src/cart/README.env
Original file line number Diff line number Diff line change
Expand Up @@ -211,3 +211,5 @@ This file lists the environment variables used in CaRT.
traffic congestion. Available options are: "unspec" (default), "best_effort",
"low_latency", "bulk_data".

. CRT_CXI_INIT_RETRY
Retry count for HG_Init_opt2() when initializing the CXI provider (default = 3).
12 changes: 11 additions & 1 deletion src/cart/crt_hg.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -834,6 +835,7 @@ crt_hg_class_init(crt_provider_t provider, int ctx_idx, bool primary, int iface_
char addr_str[CRT_ADDR_STR_MAX_LEN] = {'\0'};
size_t str_size = CRT_ADDR_STR_MAX_LEN;
struct crt_prov_gdata *prov_data;
uint32_t retry_count = 0;
int rc = DER_SUCCESS;

prov_data = crt_get_prov_gdata(primary, provider);
Expand Down Expand Up @@ -869,9 +871,17 @@ crt_hg_class_init(crt_provider_t provider, int ctx_idx, bool primary, int iface_
init_info.traffic_class = (enum na_traffic_class)crt_gdata.cg_swim_tc;
if (thread_mode_single)
init_info.na_init_info.thread_mode = NA_THREAD_MODE_SINGLE;

retry:
hg_class = HG_Init_opt2(info_string, crt_is_service(), HG_VERSION(2, 4), &init_info);
if (hg_class == NULL) {
/** workaround for DAOS-16990, DAOS-17011 - retry a few times on init */
if (provider == CRT_PROV_OFI_CXI && !crt_is_service() &&
retry_count < crt_gdata.cg_hg_init_retry_cnt) {
retry_count++;
D_WARN("Could not initialize HG class; retrying (%d)\n", retry_count);
sleep(retry_count * 5);
goto retry;
}
D_ERROR("Could not initialize HG class.\n");
D_GOTO(out, rc = -DER_HG);
}
Expand Down
7 changes: 7 additions & 0 deletions src/cart/crt_init.c
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -281,11 +282,17 @@ data_init(int server, crt_init_options_t *opt)
if (mem_pin_enable == 1)
mem_pin_workaround();
} else {
int retry_count = 3;

/*
* Client-side envariable to indicate that the cluster
* is running using a secondary provider
*/
crt_env_get(CRT_SECONDARY_PROVIDER, &is_secondary);

/** Client side env for hg_init() retries */
crt_env_get(CRT_CXI_INIT_RETRY, &retry_count);
crt_gdata.cg_hg_init_retry_cnt = retry_count;
}
crt_gdata.cg_provider_is_primary = (is_secondary) ? 0 : 1;

Expand Down
58 changes: 42 additions & 16 deletions src/cart/crt_internal.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -36,10 +37,17 @@
break; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_DEBUG(mask, (rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
if ((rpc)->crp_coll) { \
D_TRACE_DEBUG(mask, (rpc), "[opc=%#x (%s:%s) rpcid=%#lx CORPC] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, ##__VA_ARGS__); \
} else { \
D_TRACE_DEBUG(mask, (rpc), \
"[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, (rpc)->crp_pub.cr_ep.ep_rank, \
(rpc)->crp_pub.cr_ep.ep_tag, ##__VA_ARGS__); \
} \
} while (0)

/* Log an error with an RPC descriptor */
Expand All @@ -49,10 +57,16 @@
char *_opc; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_ERROR((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
if ((rpc)->crp_coll) { \
D_TRACE_ERROR((rpc), "[opc=%#x (%s:%s) rpcid=%#lx CORPC] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, ##__VA_ARGS__); \
} else { \
D_TRACE_ERROR((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, (rpc)->crp_pub.cr_ep.ep_rank, \
(rpc)->crp_pub.cr_ep.ep_tag, ##__VA_ARGS__); \
} \
} while (0)

/* Log a warning with an RPC descriptor */
Expand All @@ -62,10 +76,16 @@
char *_opc; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_WARN((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
if ((rpc)->crp_coll) { \
D_TRACE_WARN((rpc), "[opc=%#x (%s:%s) rpcid=%#lx CORPC] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, ##__VA_ARGS__); \
} else { \
D_TRACE_WARN((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, (rpc)->crp_pub.cr_ep.ep_rank, \
(rpc)->crp_pub.cr_ep.ep_tag, ##__VA_ARGS__); \
} \
} while (0)

/* Log an info message with an RPC descriptor */
Expand All @@ -75,10 +95,16 @@
char *_opc; \
\
crt_opc_decode((rpc)->crp_pub.cr_opc, &_module, &_opc); \
D_TRACE_INFO((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, (rpc)->crp_req_hdr.cch_rpcid, \
(rpc)->crp_pub.cr_ep.ep_rank, (rpc)->crp_pub.cr_ep.ep_tag, \
##__VA_ARGS__); \
if ((rpc)->crp_coll) { \
D_TRACE_INFO((rpc), "[opc=%#x (%s:%s) rpcid=%#lx CORPC] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, ##__VA_ARGS__); \
} else { \
D_TRACE_INFO((rpc), "[opc=%#x (%s:%s) rpcid=%#lx rank:tag=%d:%d] " fmt, \
(rpc)->crp_pub.cr_opc, _module, _opc, \
(rpc)->crp_req_hdr.cch_rpcid, (rpc)->crp_pub.cr_ep.ep_rank, \
(rpc)->crp_pub.cr_ep.ep_tag, ##__VA_ARGS__); \
} \
} while (0)
/**
* If \a cond is false, this is equivalent to an RPC_ERROR (i.e., \a mask is
Expand Down
4 changes: 4 additions & 0 deletions src/cart/crt_internal_types.h
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
/*
* (C) Copyright 2016-2024 Intel Corporation.
* (C) Copyright 2025 Hewlett Packard Enterprise Development LP
*
* SPDX-License-Identifier: BSD-2-Clause-Patent
*/
Expand Down Expand Up @@ -170,6 +171,8 @@ struct crt_gdata {
long cg_num_cores;
/** Inflight rpc quota limit */
uint32_t cg_rpc_quota;
/** Retry count of HG_Init_opt2() on failure when using CXI provider */
uint32_t cg_hg_init_retry_cnt;
};

extern struct crt_gdata crt_gdata;
Expand Down Expand Up @@ -197,6 +200,7 @@ struct crt_event_cb_priv {
ENV_STR(CRT_ATTACH_INFO_PATH) \
ENV(CRT_CREDIT_EP_CTX) \
ENV(CRT_CTX_NUM) \
ENV(CRT_CXI_INIT_RETRY) \
ENV(CRT_ENABLE_MEM_PIN) \
ENV_STR(CRT_L_GRP_CFG) \
ENV(CRT_L_RANK) \
Expand Down
Loading

0 comments on commit f3d754b

Please sign in to comment.