-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcolab_util.py
143 lines (131 loc) · 5.62 KB
/
colab_util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
# fix form https://gist.github.com/Joshua1989/dc7e60aa487430ea704a8cb3f2c5d6a6#file-colab_util-py
# -----------------------------------------------------------------------------
# !pip install -U -q PyDrive
from pydrive.auth import GoogleAuth
from pydrive.drive import GoogleDrive
from google.colab import auth
from oauth2client.client import GoogleCredentials
import os
import subprocess
from pathlib import Path
__all__ = [
'create_archive',
'extract_archive',
'GoogleDriveHandler'
]
# 創造檔案
def create_archive(zip_name, local_file_paths, temp_folder='/tmp', verbose=False):
zip_name = '{0}/{1}'.format(temp_folder, zip_name) + '.tar.gz' * ('.tar.gz' not in zip_name)
# Filter out non-existing files and directorys
zipped_files = []
for f in local_file_paths:
if not Path(f).exists():
print('file {0} does not exist, ignore it'.format(f))
else:
zipped_files.append(f)
# Find common prefix to avoid a too many level folders
common_prefix = ''
for chars in zip(*zipped_files):
if len(set(chars)) == 1:
common_prefix += chars[0]
else:
break
common_prefix = '/'.join(common_prefix.split('/')[:-1]) + '/'
# Excuting tar.gz format compression
L = len(common_prefix)
zipped_files = ' '.join([f[L:] for f in zipped_files])
cmd = 'tar -czvf {0} -C {1} {2}'.format(zip_name, common_prefix, zipped_files)
if verbose:
print('ignore the common prefix {0}'.format(common_prefix))
print('running shell command:','\n'+cmd)
result = subprocess.check_output(cmd, shell=True).decode('utf-8')
if verbose: print(result)
# Return absolute path of the tar.gz file
return zip_name
def extract_archive(zip_path, target_folder='./', verbose=False):
cmd = 'tar -xf {0} -C {1}'.format(zip_path, target_folder)
if verbose: print('running shell command:','\n'+cmd)
result = subprocess.check_output(cmd, shell=True).decode('utf-8')
if verbose: print(result)
class GoogleDriveHandler:
def __init__(self):
auth.authenticate_user()
gauth = GoogleAuth()
gauth.credentials = GoogleCredentials.get_application_default()
self.drive = GoogleDrive(gauth)
def path_to_id(self, rel_path, parent_folder_id='root'):
rel_path = '/'.join(list(filter(len, rel_path.split('/'))))
if rel_path == '':
return parent_folder_id
else:
first, *rest = list(filter(len, rel_path.split('/')))
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
if first not in file_dict:
raise Exception('{0} not exist'.format(first))
else:
return self.path_to_id('/'.join(rest), file_dict[first]['id'])
def list_folder(self, root_folder_id='root', max_depth=0):
query = "'{0}' in parents and trashed=false".format(root_folder_id)
file_list, folder_type = [], 'application/vnd.google-apps.folder'
for f in self.drive.ListFile({'q': query}).GetList():
if f['mimeType'] == folder_type and max_depth > 0:
file_list.append(
{
'title': f['title'],
'id': f['id'],
'link': f['alternateLink'],
'mimeType': f['mimeType'],
'children': self.list_folder(f['id'], max_depth-1)
}
)
else:
file_list.append(
{
'title':f['title'],
'id': f['id'],
'link':f['alternateLink'],
'mimeType': f['mimeType']
}
)
return file_list
def create_folder(self, folder_name, parent_path=''):
parent_folder_id = self.path_to_id(parent_path)
folder_type = 'application/vnd.google-apps.folder'
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
if folder_name not in file_dict:
folder_metadata = {
'title' : folder_name,
'mimeType' : folder_type,
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}]
}
folder = self.drive.CreateFile(folder_metadata)
folder.Upload()
return folder['id']
else:
if file_dict[folder_name]['mimeType'] != folder_type:
raise Exception('{0} already exists as a file'.format(folder_name))
else:
print('{0} already exists'.format(folder_name))
return file_dict[folder_name]['id']
def upload(self, local_file_path, parent_path='', overwrite=True):
parent_folder_id = self.path_to_id(parent_path)
file_dict = {f['title']:f for f in self.list_folder(parent_folder_id)}
file_name = local_file_path.split('/')[-1]
# fix: dict 無 Delete 屬性
if file_name in file_dict and overwrite:
file_delete = self.drive.CreateFile(file_dict[file_name])
file_delete.Delete()
# file_dict[file_name].Delete()
file = self.drive.CreateFile(
{
'title': file_name,
'parents': [{'kind': 'drive#fileLink', 'id': parent_folder_id}]
}
)
file.SetContentFile(local_file_path)
file.Upload()
return file['id']
def download(self, local_file_path, target_path):
target_id = self.path_to_id(target_path)
file = self.drive.CreateFile({'id': target_id})
file.GetContentFile(local_file_path)