forked from marvinvo/Repository_Miner
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmain.py
226 lines (184 loc) · 9.6 KB
/
main.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
import itertools
from math import ceil
from multiprocessing.connection import wait
import os
from queue import Full
from time import sleep
from xmlrpc.client import Boolean
from queue import Queue
import threading
from multiprocessing import Event, Lock, Process, Value, Pool
import argparse, sys
import repository_worker.Worker as Worker
from repository_worker.Download_Worker import download_worker_func
from repository_worker.ShellScript_Worker import script_worker_func
from repository_worker.Compile_Worker import compile_worker_func
import settings
from settings import Settings
from repository_fetcher.Get_Repositories import Get_Repositories
import sys
def clear_queue(queue):
while not queue.empty():
queue.get()
#
# FUNCTIONS TO INITIAL FILL QUEUES
#
def todo(output_queue, s, end_event):
return
def fetched(output_queue, s, end_event):
# find repositories that are already fetched
for _, dirnames, _ in os.walk(s[settings.ARG_RESULTFOLDER]):
for dir in dirnames:
if end_event.is_set():
return
project_path = os.path.join(s[settings.ARG_RESULTFOLDER], dir)
log_path = os.path.join(s[settings.ARG_RESULTFOLDER], dir, settings.FILENAME_WORKER_LOG)
github_json = os.path.join(s[settings.ARG_RESULTFOLDER], dir, settings.FILENAME_REPO_JSON)
if os.path.exists(github_json):
if not os.path.exists(log_path):
output_queue.put((project_path, 0))
else:
with open(log_path, 'r') as log:
if len(log.readlines()) == 0:
output_queue.put((project_path, 0))
def fetch(output_queue, s, end_event):
# fetch new repositories from github
print("start fetching from github")
g = Get_Repositories(s)
# get generator that fetch and filter projects
gen = g.getRepositoryGeneratorFromSettings()
# write stats
i=0
while not end_event.is_set():
# fill input queue
repo = next(gen)
output_queue.put((repo,i))
i+=1
def last_state(output_queue, s, end_event, last_state):
for dirpath, dirnames, filenames in os.walk(s[settings.ARG_RESULTFOLDER]):
for dir in dirnames:
try:
if end_event.is_set():
return
project_path = os.path.join(s[settings.ARG_RESULTFOLDER], dir)
log_path = os.path.join(s[settings.ARG_RESULTFOLDER], dir, settings.FILENAME_WORKER_LOG)
if not os.path.exists(log_path):
continue
with open(log_path, 'r') as log:
# TODO file could be empty
lines = log.readlines()
if not lines:
continue
last_line = lines[-1]
if "Failed" in last_line or "Timeout" in last_line:
continue
if last_state in last_line:
output_queue.put((project_path, 0))
except Exception as e:
print(e)
continue
def downloaded(output_queue, s, end_event):
last_state(output_queue, s, end_event, "Download")
def compiled(output_queue, s, end_event):
last_state(output_queue, s, end_event, "Compile")
def after_download_script(output_queue, s, end_event):
last_state(output_queue, s, end_event, "{}".format(s[settings.ARG_EXEC_AFTER_DOWNLOAD]))
#
# Stats Process Function
#
def print_stats(s, func, end_event):
if s[settings.ARG_STATS_TO_FILE]:
stats_file = os.path.join(s[settings.ARG_RESULTFOLDER], "general_stats.log")
while not end_event.is_set():
stats = "{} ...... {}\n".format(", ".join(["{}: {}".format(f["name"] + " success ratio", "{}/{}".format(f["count_value"].value, f["failed_value"].value + f["count_value"].value)) for f in func]), ", ".join(["{}: {}".format(f["name"] + " worker", f["worker_count"].value) for f in func]) )
with open(stats_file, "a+") as f:
f.write(stats)
sleep(10)
else:
UP = "\x1B[{}A".format(len(func)+4)
CLR = "\x1B[0K"
while not end_event.is_set():
stats = f"{CLR}\n========================================={CLR}\n{CLR}"
stats += f"{CLR}\n".join(["{}: {}".format(f["name"].split("/")[-1] + " success ratio & worker", "{}/{}, {}".format(f["count_value"].value, f["failed_value"].value + f["count_value"].value, f["worker_count"].value)) for f in func])
stats += f"\n========================================={CLR}\n{CLR}"
stats += f"{UP}"
print(stats)
sleep(1)
#python3 main.py --resultsfolder /Users/marvinvogel/Downloads/test5 --tokenfile ../CREDENTIALS.txt --fetch --download --compile --execonsuccess ../run_cambench_cov.sh
if __name__ == '__main__':
# due to unexpected exit errors
sys.setrecursionlimit(20000)
argparser = argparse.ArgumentParser(description='Fetch, Filter, Download Projects from Github and Compile Downloaded Projects')
# general
argparser.add_argument('--{}'.format(settings.ARG_RESULTFOLDER), help='folder to store results', required=True)
argparser.add_argument('--{}'.format(settings.ARG_PROCESS_LIMIT), type=int, default=20)
argparser.add_argument('--{}'.format(settings.ARG_STATS_TO_FILE), action='store_true')
# fetch
argfetch = argparser.add_argument_group('fetch', 'Arguments for fetching repositories')
argfetch.add_argument('--{}'.format(settings.ARG_FETCH), help='fetch repositories', action='store_true')
argfetch.add_argument('--{}'.format(settings.ARG_TOKEN_FILE), required='--{}'.format(settings.ARG_FETCH) in sys.argv)
argfetch.add_argument('--{}'.format(settings.ARG_SORT), nargs=1, choices=['best-match', 'stars'],default='stars')
argfetch.add_argument('--{}'.format(settings.ARG_ORDER), nargs=1, choices=['desc', 'asc'], default='desc')
argfetch.add_argument('--{}'.format(settings.ARG_FORK), choices=['true', 'false'])
argfetch.add_argument('--{}'.format(settings.ARG_FILTER), nargs='*', choices=['lastcommitnotolderthan'])
argfetch.add_argument('--{}'.format(settings.ARG_LAST_SORT), type=int, default=999999999)
# download
argdownload = argparser.add_argument_group('download', 'Arguments for cloning repositories')
argdownload.add_argument('--{}'.format(settings.ARG_DOWNLOAD), help='clone fetched repositories', action='store_true')
argdownload.add_argument('--{}'.format(settings.ARG_CLEAN_AFTER_FAILURE), help='removes cloned repository after failure', action='store_true')
argdownload.add_argument('--{}'.format(settings.ARG_EXEC_AFTER_DOWNLOAD), help='path to shell script that is executed after download')
# compile
argcompile = argparser.add_argument_group('compile', 'Arguments for compile flag')
argcompile.add_argument('--{}'.format(settings.ARG_COMPILE), help='compile projects in downloaded repositories', action='store_true')
# shell
argcompile.add_argument('--{}'.format(settings.ARG_SHELL_SCRIPT), help='path to shell script that is executed on success')
args = vars(argparser.parse_args(sys.argv[1:]))
s = Settings()
s.parse_args(args)
#
required_queues = sum([s[settings.ARG_COMPILE], s[settings.ARG_DOWNLOAD], s[settings.ARG_SHELL_SCRIPT] != None, s[settings.ARG_EXEC_AFTER_DOWNLOAD] != None])
process_limit = max(1, s[settings.ARG_PROCESS_LIMIT])
queue_limit = max(1, int(process_limit/2))
queues = [Queue(queue_limit) for i in range(required_queues)] + [None,]
iolock = Lock() # general lock might be required for certain io actions
print("set up pipeline")
queue_fill_functions = []
func = []
if s[settings.ARG_FETCH]:
queue_fill_functions += [todo, fetch] # this is not implemented as worker
if s[settings.ARG_DOWNLOAD]:
func += [{"worker_func": download_worker_func, "name": "download"}]
queue_fill_functions += [fetched, downloaded]
if s[settings.ARG_EXEC_AFTER_DOWNLOAD]:
func += [{"worker_func": script_worker_func, "name": s[settings.ARG_EXEC_AFTER_DOWNLOAD]}]
queue_fill_functions += [todo, after_download_script]
if s[settings.ARG_COMPILE]:
func += [{"worker_func": compile_worker_func, "name": "compile"}]
queue_fill_functions += [todo, compiled]
if s[settings.ARG_SHELL_SCRIPT]:
func += [{"worker_func": script_worker_func, "name": s[settings.ARG_SHELL_SCRIPT]}]
queue_fill_functions += [todo,todo] # add dummy function because this cannot fill any queue
for i in range(required_queues):
func[i]["input_queue"] = queues[i]
func[i]["output_queue"] = queues[i+1]
func[i]["count_value"] = Value('i', 0)
func[i]["failed_value"] = Value('i', 0)
func[i]["worker_count"] = Value('i', 0)
end_event = Event()
end_event.clear()
wait_for_finish = Worker.start(func, s, iolock, locks={"gradle": Lock()}, end_event=end_event)
#worker_process = Process(target=workers, args=(func, s))
#worker_process.daemon = True
#worker_process.start()
print("pipeline is ready")
stats_thread = threading.Thread(target=print_stats, args=(s, func, end_event), daemon = True)
stats_thread.start()
print("start filling worker queues...")
for i in range(len(func)-1, -1, -1):
queue_fill_functions[2*i+1](func[i]["output_queue"], s, end_event)
queue_fill_functions[2*i](func[i]["input_queue"], s, end_event)
# indirect end event
func[0]["input_queue"].put((None, 0))
wait_for_finish()
#end_workers()
#exit(0)