forked from qhool/pygrader
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathrepo.py
177 lines (152 loc) · 5.47 KB
/
repo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
import config
import subprocess
from git import *
import re
import operator
from time import *
import datetime
import os
class CommandError(Exception):
def __init__(self,args,value,stderr):
self.args = args
self.value = value
self.stderr = stderr
def __str__(self):
return """Command: {0}
Exited with {1}:
{2}""".format( " ".join(self.args), self.value, self.stderr )
def get_cmd_out(args):
p = subprocess.Popen( args, stdout = subprocess.PIPE,
stderr = subprocess.PIPE )
( out, err ) = p.communicate()
if p.returncode != 0:
raise CommandError( args, p.returncode, err )
return out
def get_student_list(ssh=True):
if ssh:
ssh_args = [ config.system.ssh,
config.repository.user + '@' + config.repository.host,
'ls ' + config.repository.path ]
out = get_cmd_out( ssh_args ).split("\n")
else:
out = filter( lambda d: os.path.isdir(os.path.join('repositories',d)),
os.listdir( 'repositories/' ) )
return set(out).difference( config.repository.exclude, [ '' ] )
def clone_repo(reponame):
git_args = [ config.system.git, "clone",
config.repository.user + '@' + config.repository.host + ':' +
config.repository.path + '/' + reponame,
'repositories/' + reponame ]
out = get_cmd_out( git_args )
_repo_cache = dict()
def get_repo( repo_name, pull = True ):
if _repo_cache.has_key(repo_name):
return _repo_cache[repo_name]
print "get repo for {0}...".format(repo_name),
try:
r = Repo('repositories/' + repo_name)
r.heads.master.checkout()
if pull:
print "pulling...",
r.remote().pull()
except NoSuchPathError:
print "cloning...",
clone_repo(repo_name)
r = Repo('repositories/' + repo_name)
except AssertionError, e:
print "[Assertion: " + str(e) + " ]"
print "done"
_repo_cache[repo_name] = r
return r
def get_solutions_repo(pull = True):
return get_repo(config.assignments.solutions, pull)
def commit_datetime( commit ):
if commit == None:
return None
else:
return datetime.datetime.fromtimestamp( commit.committed_date )
def pick_gen( pick, filt=None ):
def picker(a,b):
if filt != None:
a = filt(a)
b = filt(b)
if b == None:
return a
elif a == None:
return b
else:
return pick(a,b)
return picker
def date_pick_gen( comp, endpoint ):
endpt = endpoint
if type(endpoint) == type(datetime.datetime.now()):
endpt = mktime( endpoint.timetuple() )
def endpt_filt(a):
return a if a != None and comp(endpt,a.committed_date) else None
def comp_picker(a,b):
if comp(a.committed_date,b.committed_date):
return a
else:
return b
return pick_gen( comp_picker, endpt_filt if endpoint != None else None )
def find_commit( head, path_spec, pick, tabs=None, matches = None ):
headtype = type(head)
if re.search( 'Repo', type(head).__name__ ):
if len(head.heads) == 0:
return None
head = head.heads.master.commit
elif re.search( 'Head', type(head).__name__ ):
head = head.commit
parents = head.parents
if tabs != None:
print tabs, head, ": ", ctime(head.committed_date)
found_commit = None
if len(parents) == 0:
parents = [ None ]
def check_blob_match( blob ):
f_commit = found_commit
if tabs != None:
print tabs, " ", blob.path
m = re.search( path_spec, blob.path )
if m != None:
f_commit = pick( f_commit, head )
if type(matches) == type(list()):
matches.append( m.group() )
return f_commit
for p in parents:
if p == None:
for b in head.tree.traverse():
if re.search( 'Blob', type(b).__name__ ):
found_commit = check_blob_match( b )
else:
diffs = p.diff( head )
for d in diffs:
if d.b_blob != None:
found_commit = check_blob_match( d.b_blob )
p_found_commit = \
find_commit( p, path_spec, pick,
None if tabs == None else tabs + "\t",
matches )
found_commit = pick( found_commit, p_found_commit )
if tabs != None:
if found_commit == head:
print tabs, "<-- SELF"
elif found_commit == None:
print tabs, "<-- None"
else:
print tabs, "<==", found_commit, "/" + ctime(found_commit.committed_date)
return found_commit
def find_last_commit( head, path_spec, before=None, tabs=None, matches = None ):
return find_commit( head, path_spec,
date_pick_gen( operator.gt, before ), tabs, matches )
def find_first_commit( head, path_spec, after=None, tabs=None, matches = None ):
return find_commit( head, path_spec,
date_pick_gen( operator.lt, after ), tabs, matches )
def pull_all():
students = get_student_list()
for st in students:
get_student_repo(st)
if __name__ == '__main__':
sols = get_repo('tuser')
cmt = find_last_commit( sols, "test", before=1315319771, tabs="" )
print cmt, ctime(cmt.committed_date), cmt.committed_date