-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpreporcess.py
153 lines (117 loc) · 5.86 KB
/
preporcess.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
import os
from glob import glob
import shutil
from tqdm import tqdm
import dicom2nifti
import numpy as np
import nibabel as nib
from monai.transforms import(
Compose,
AddChanneld,
LoadImaged,
Resized,
ToTensord,
Spacingd,
Orientationd,
ScaleIntensityRanged,
CropForegroundd,
)
from monai.data import DataLoader, Dataset, CacheDataset
from monai.utils import set_determinism
"""
This file is for preporcessing only, it contains all the functions that you need
to make your data ready for training.
You need to install the required libraries if you do not already have them.
pip install os, ...
"""
def create_groups(in_dir, out_dir, Number_slices):
'''
This function is to get the last part of the path so that we can use it to name the folder.
`in_dir`: the path to your folders that contain dicom files
`out_dir`: the path where you want to put the converted nifti files
`Number_slices`: here you put the number of slices that you need for your project and it will
create groups with this number.
'''
for patient in glob(in_dir + '/*'):
patient_name = os.path.basename(os.path.normpath(patient))
# Here we need to calculate the number of folders which mean into how many groups we will divide the number of slices
number_folders = int(len(glob(patient + '/*')) / Number_slices)
for i in range(number_folders):
output_path = os.path.join(out_dir, patient_name + '_' + str(i))
os.mkdir(output_path)
# Move the slices into a specific folder so that you will save memory in your desk
for i, file in enumerate(glob(patient + '/*')):
if i == Number_slices + 1:
break
shutil.move(file, output_path)
def dcm2nifti(in_dir, out_dir):
'''
This function will be used to convert dicoms into nifti files after creating the groups with
the number of slices that you want.
`in_dir`: the path to the folder where you have all the patients (folder of all the groups).
`out_dir`: the path to the output, which means where you want to save the converted nifties.
'''
for folder in tqdm(glob(in_dir + '/*')):
patient_name = os.path.basename(os.path.normpath(folder))
dicom2nifti.dicom_series_to_nifti(folder, os.path.join(out_dir, patient_name + '.nii.gz'))
def find_empy(in_dir):
'''
This function will help you to find the empty volumes that you may not need for your training
so instead of opening all the files and search for the empty ones, them use this function to make it quick.
'''
list_patients = []
for patient in glob(os.path.join(in_dir, '*')):
img = nib.load(patient)
if len(np.unique(img.get_fdata())) > 2:
print(os.path.basename(os.path.normpath(patient)))
list_patients.append(os.path.basename(os.path.normpath(patient)))
return list_patients
def prepare(in_dir, pixdim=(1.5, 1.5, 1.0), a_min=-200, a_max=200, spatial_size=[128,128,64], cache=False):
"""
This function is for preprocessing, it contains only the basic transforms, but you can add more operations that you
find in the Monai documentation.
https://monai.io/docs.html
"""
set_determinism(seed=0)
path_train_volumes = sorted(glob(os.path.join(in_dir, "TrainVolumes", "*.nii.gz")))
path_train_segmentation = sorted(glob(os.path.join(in_dir, "TrainSegmentation", "*.nii.gz")))
path_test_volumes = sorted(glob(os.path.join(in_dir, "TestVolumes", "*.nii.gz")))
path_test_segmentation = sorted(glob(os.path.join(in_dir, "TestSegmentation", "*.nii.gz")))
train_files = [{"vol": image_name, "seg": label_name} for image_name, label_name in zip(path_train_volumes, path_train_segmentation)]
test_files = [{"vol": image_name, "seg": label_name} for image_name, label_name in zip(path_test_volumes, path_test_segmentation)]
train_transforms = Compose(
[
LoadImaged(keys=["vol", "seg"]),
AddChanneld(keys=["vol", "seg"]),
Spacingd(keys=["vol", "seg"], pixdim=pixdim, mode=("bilinear", "nearest")),
Orientationd(keys=["vol", "seg"], axcodes="RAS"),
ScaleIntensityRanged(keys=["vol"], a_min=a_min, a_max=a_max, b_min=0.0, b_max=1.0, clip=True),
CropForegroundd(keys=["vol", "seg"], source_key="vol"),
Resized(keys=["vol", "seg"], spatial_size=spatial_size),
ToTensord(keys=["vol", "seg"]),
]
)
test_transforms = Compose(
[
LoadImaged(keys=["vol", "seg"]),
AddChanneld(keys=["vol", "seg"]),
Spacingd(keys=["vol", "seg"], pixdim=pixdim, mode=("bilinear", "nearest")),
Orientationd(keys=["vol", "seg"], axcodes="RAS"),
ScaleIntensityRanged(keys=["vol"], a_min=a_min, a_max=a_max,b_min=0.0, b_max=1.0, clip=True),
CropForegroundd(keys=['vol', 'seg'], source_key='vol'),
Resized(keys=["vol", "seg"], spatial_size=spatial_size),
ToTensord(keys=["vol", "seg"]),
]
)
if cache:
train_ds = CacheDataset(data=train_files, transform=train_transforms,cache_rate=1.0)
train_loader = DataLoader(train_ds, batch_size=1)
test_ds = CacheDataset(data=test_files, transform=test_transforms, cache_rate=1.0)
test_loader = DataLoader(test_ds, batch_size=1)
return train_loader, test_loader
else:
train_ds = Dataset(data=train_files, transform=train_transforms)
train_loader = DataLoader(train_ds, batch_size=1)
test_ds = Dataset(data=test_files, transform=test_transforms)
test_loader = DataLoader(test_ds, batch_size=1)
return train_loader, test_loader