Skip to content

Commit

Permalink
add redpajama data preprocessing code (intel#12)
Browse files Browse the repository at this point in the history
* add testing scripts

* remove temp-dir for worker

* remove test files

* add redpajama dp code

* ignore all notebook files

* update streaming code

* add write-on-host for streaming

* better line alignment

* move files

* rename folder

* rename folder and add group_files

* debug

* add recovery test scripts

* add additional python packages

* add test flag

* add README and some minor fixes

* change the image name

* change the directory back

* add training stop for the second

* fix typo

* add data source support

* clean up a bit

* restructure folders

* restructure files

* add script headers

* reorder and add READMEs

* revert back due to file movements

* fix typo

* fix lib import

* enable mounting localdisk

* change name of cc

* fix dtype

* performance optimization for streaming

* use the latest ray

* change node

* add new files

* bug fix

* add nltk

* fix hdfs after re-order folders

* set default to false

* use variables instead of credentials

* change the training config path

* update README
  • Loading branch information
faaany authored Jul 11, 2023
1 parent 3d7c7a7 commit cb036c0
Show file tree
Hide file tree
Showing 29 changed files with 1,825 additions and 83 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
__pycache__
**.ipynb
42 changes: 12 additions & 30 deletions Finetune/README.md
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
## Accelerate + Ray
### 1. Prepare environment
### Bare-metal
## 1. Prepare environment
### 1.1 Bare-metal
Follow [LLM Finetune](https://wiki.ith.intel.com/pages/viewpage.action?spaceKey=AppliedML&title=LLM+Finetune).
Please change ``huggingface accelerate`` repo to: [huggingface accelerate](https://github.com/KepingYan/accelerate) branch: FSDP_CPU

Expand All @@ -11,22 +10,8 @@ pip install -U "ray[default] @ LINK_TO_WHEEL.whl"
pip install --pre raydp
pip install "ray[tune]" tabulate tensorboard
```

### Using Docker
```bash
# on head node
git clone https://github.com/intel-sandbox/llm-ray.git
cd llm-ray/Finetune
./build-image.sh
# save docker image
docker save -o ray-image.tar ray-llm:latest
# copy over to worker nodes, this is an optional step if all your cluster nodes are NFS-shared
scp ray-image.tar <worker_node_ip>:<destination_path_on_worker_node>
# on worker nodes
docker load -i ray-image.tar
```

### 2. Enable torch_ccl [optional]
## 2. Accelerate + Ray
### 2.1 Enable torch_ccl [optional]
```python
from raydp.torch.config import TorchConfig

Expand All @@ -41,7 +26,7 @@ def train_fashion_mnist(...):
...
```

### 3. Set parameters [optional]
### 2.2 Set parameters [optional]
- FSDP parameters
```python
trainer = AccelerateTrainer(
Expand Down Expand Up @@ -73,19 +58,15 @@ def train_fashion_mnist(...):
}
```

### 5. Test Ray TorchTrainer example
#### Bare-metal
### 2.3 Test Ray TorchTrainer example
#### 2.3.1 Bare-metal
```bash
oneccl_bindings_for_pytorch_path=$(python -c "from oneccl_bindings_for_pytorch import cwd; print(cwd)") && source $oneccl_bindings_for_pytorch_path/env/setvars.sh
python -u run_clm_no_trainer_ray.py --model_name_or_path EleutherAI/gpt-j-6B --dataset_name wikitext --dataset_config_name wikitext-2-raw-v1 --per_device_train_batch_size 2 --per_device_eval_batch_size 4 --num_train_epochs 1 --address 10.165.9.53 --num_workers 2
```
#### Using Docker
```bash
python launch_workflow.py -w workflow.yaml
```

## FSDP_CPU + Ray
### 1. Enable fsdp_cpu in Ray
## 3. FSDP_CPU + Ray
### 3.1 Enable fsdp_cpu in Ray
Edit codes in train_loop_utils.py
```python
class _TorchAccelerator(Accelerator):
Expand All @@ -112,7 +93,7 @@ def train_func(config: Dict):
...
```

### 2. enable torch_ccl in Ray
### 3.2 enable torch_ccl in Ray
```bash
pip install --pre raydp
```
Expand Down Expand Up @@ -157,11 +138,12 @@ File "env/lib/python3.7/site-packages/ray/train/_internal/worker_group.py", line
RuntimeError: no support for _allgather_base in Gloo process group
```
### 3. Test Fashion MNIST example
### 3.3 Test Fashion MNIST example
```python
python run_minist_fsdp.py
```
## Memory Status
Reference to Applied Machine Learning team ([intel-sandbox/HuggingFace](https://github.com/intel-sandbox/HuggingFace/tree/main/test/memory))
- First run finetune code and get the pid of a Ray worker process.
Expand Down
31 changes: 20 additions & 11 deletions Finetune/llm_pretrain_template.conf
100644 → 100755
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,10 @@
# The type of dataset, now only HuggingfaceDataset is supported.
"type": "GroupDataset",
# The name/path of dataset in huggingface.
"path": "/mnt/DP_disk2/yuliang/workspace/data/group_token5",
"path": "/home/user/tmp/pretrain_data",
# Whether to use the datasets.load_from_disk() interface to load data.
"load_from_disk": False,
# Config of dataset, all items will be transfered to datasets.load_dataset() or datasets.load_from_disk().
# Config of dataset, all items will be transfscered to datasets.load_dataset() or datasets.load_from_disk().
"load_config" : {
"streaming": True
}
Expand Down Expand Up @@ -58,12 +58,14 @@
"num_train_epochs": 1,
# The max training step of each epoch, if set to None means unlimited.
# In most cases this item is for debugging.
"max_train_step_per_episode": None,
"max_train_step_per_episode": 2,
# The max evaluating step of each epoch, if set to None means unlimited.
# In most cases this item is for debugging.
"max_eval_step_per_episode": 0,
# Output directory. Only absolute path is supported.
"output": "/tmp/output",
# directory to save stepwise training states. this param is mainly used for recovery validation
"save_state_path": "/home/user/tmp/state",
"dataprocesser": {
# The type of dataprocesser.
"type": "PlainIDProcesser",
Expand All @@ -78,37 +80,44 @@
},
"checkpoint": {
# The root path of checkpoint. Only absolute path is supported
"root_path": "/tmp/checkpoint",
"root_path": "/home/user/tmp/checkpoint",
#"step": 39
}
},
"lr_scheduler": {
"enable": True,
"lr_scheduler_type": "linear",
"max_train_steps": 50
},
},
# Ray related configuration, Only used when mode is set to ray
"ray_config": {
# The config of ray.init. All items will be tranfered to ray.init().
# The config of ray.init. All items will be transferred to ray.init().
# More information can refer to https://docs.ray.io/en/latest/ray-core/api/doc/ray.init.html
"init": {
# Environment variables for ray workers
"runtime_env": {
"env_vars": {
"OMP_NUM_THREADS": "56",
"OMP_NUM_THREADS": "60",
"ACCELERATE_USE_CPU": "True",
"ACCELERATE_MIXED_PRECISION": "no",
"CCL_WORKER_COUNT": "2", # CCL setting
"FI_PROVIDER": "tcp", # Network setting
"FI_TCP_IFACE": "ens39f0",
"CCL_WORKER_COUNT": "4", # CCL setting
#"CCL_LOG_LEVEL": "info",
"WORLD_SIZE": "2", # Enable multi-process
}
},
# The address of the Ray cluster to connect to.
"address": "auto",
# The IP address of the node that we are on.
"_node_ip_address": "127.0.0.1",
"_node_ip_address": "10.165.9.53",
},
"scaling_config": {
# Number of worker.
"num_workers": 2,
"num_workers": 4,
# The amount of resources per worker.
"resources_per_worker": {
"CPU": 56
"CPU": 60
},
# The placement strategy to use for the placement group of the Ray actors.
"placement_strategy": "SPREAD"
Expand Down
36 changes: 35 additions & 1 deletion Finetune/plugin/trainer/pretrainer.py
100644 → 100755
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import os
import math
import time
import json
import shutil

import torch
import transformers

from ray.air.checkpoint import Checkpoint
from pathlib import Path

from .. import dataprocesser
from .trainer import Trainer
Expand Down Expand Up @@ -157,18 +160,43 @@ def prepare(self, model, tokenizer, dataset, optimizer, accelerator):
train_dataloader, eval_dataloader,
)

def _check_and_mkdir(self, path):
path = Path(path)
if not path.exists():
path.mkdir(parents=True)

def _write_json(self, target_dict, save_path):
json_object = json.dumps(target_dict, indent=4)
with open(save_path, "w") as outfile:
outfile.write(json_object)

def train(self):
num_train_epochs = self.config.get("num_train_epochs", 1)
checkpoint = self.config.get("checkpoint")
log_step = self.config.get("log_step", 1)
max_train_step_per_episode = self.config.get("max_train_step_per_episode")
max_eval_step_per_episode = self.config.get("max_eval_step_per_episode")
save_state_path = self.config.get("save_state_path")

if save_state_path is not None and int(self.rank) == 0:
self._check_and_mkdir(save_state_path)
training_state = {}
else:
training_state = None

for idx in range(self.starting_episode, len(self.train_dataloader), 1):
logger.info(f"start train episode {idx}")
if training_state is not None and int(self.rank) == 0:
training_state[f'episode_{idx}'] = {}
self.model.train()
current_train_dataloader = self.train_dataloader[idx]
start = time.time()
for step, batch in enumerate(current_train_dataloader):
if training_state is not None and int(self.rank) == 0:
training_state[f'episode_{idx}'][f'step_{step}'] = {}
training_state[f'episode_{idx}'][f'step_{step}']['data'] = batch['input_ids'][0].tolist()[:50]
training_state[f'episode_{idx}'][f'step_{step}']['learning_rate'] = self.lr_scheduler.state_dict()['_last_lr']

with self.accelerator.accumulate(self.model):
outputs = self.model(**batch)
loss = outputs.loss
Expand All @@ -182,6 +210,12 @@ def train(self):
if step % log_step == 0:
logger.info(f"train episode:[{idx}/{len(self.train_dataloader)}]\tstep:[{step}]\tloss:{loss}\tppl:{math.exp(loss)}\ttime:{time.time()-start}")
start = time.time()
if training_state is not None and int(self.rank) == 0:
training_state[f'episode_{idx}'][f'step_{step}']['loss'] = loss.item()
training_state[f'episode_{idx}'][f'step_{step}']['ppl'] = math.exp(loss)
file_name = "stepwise_training_state_recovery" if self.starting_episode > 0 else "stepwise_training_state"
self._write_json(training_state, f"{save_state_path}/{file_name}.json")

if max_train_step_per_episode is not None:
if step >= max_train_step_per_episode:
break
Expand Down
23 changes: 0 additions & 23 deletions Finetune/workflow.yaml

This file was deleted.

14 changes: 14 additions & 0 deletions tools/pretrain_recovery_test/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
# How to run End2End Validation of the Recovery Test?

## Step 1: Set up Env
Please follow [this guide](../workload_in_containers/README.md) on how to set-up the container environment of this workload. When the containers are running, you can enter the container on head node using following command:
```bash
docker exec -it ray-leader bash
```

## Step 2: Start the script
You can use the `test_end2end.sh` to run the end-to-end validation for ray recovery mechanism.
```bash
cd tools/pretrain_recovery_test
./test_end2end.sh
```
90 changes: 90 additions & 0 deletions tools/pretrain_recovery_test/compare_logs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
import json
import time
import argparse
import os

class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'

def read_json(json_file):

with open(json_file) as file:
parsed_json = json.load(file)

return parsed_json

def get_all_episodes(parsed_json):

parsed_json = dict(sorted(parsed_json.items()))

return parsed_json.keys()

def identify_common_episode(first_json, second_json):

first_episodes = get_all_episodes(first_json)
second_episodes = get_all_episodes(second_json)

common_episodes = list(set(first_episodes).intersection(second_episodes))

if len(common_episodes) == 0:
print("the 2 trainings have no episode overlapped. Check your json file!")
return -1
elif len(common_episodes) > 1:
print("the 2 trainings have more than 1 overlapped episodes. Check your json files!")
return -1
else:
return common_episodes[0]

def compare_training_states(json1, json2, step):

step = f'step_{step}'

data_result = json1[step]['data'] == json2[step]['data']
lr_result = json1[step]['learning_rate'] == json2[step]['learning_rate']
loss_result = json1[step]['loss'] == json2[step]['loss']

return data_result, lr_result, loss_result


def main():
parser = argparse.ArgumentParser()
parser.add_argument(
"--file_path",
type=str,
default='/home/user/tmp/state',
help="absolute path of the json files"
)
args = parser.parse_args()

# read the json files
state1 = read_json(os.path.join(args.file_path, 'stepwise_training_state.json'))
state2 = read_json(os.path.join(args.file_path, 'stepwise_training_state_recovery.json'))

# identify the overlapped episode
common_episode = identify_common_episode(state1, state2)
print(f"the common episode of 2 trainings: {common_episode}\n")

# compare the different training states
data_result, lr_result, loss_result = compare_training_states(state1[common_episode], state2[common_episode], 0)

# print out the detailed comparison results
print(f"Are the Data the same?\n{data_result}")
print(f"Are the Learning Rate the same?\n{lr_result}")
print(f"Are the Training Loss the same?\n{loss_result}")

if data_result and lr_result and loss_result:
print(f"{bcolors.OKGREEN}\nrecovery tests all passed!{bcolors.ENDC}")
else:
print(f"{bcolors.FAIL}recovery test failed! check the detailed log above.{bcolors.ENDC}")


if __name__ == "__main__":
main()
Loading

0 comments on commit cb036c0

Please sign in to comment.