-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathrefs.go
410 lines (349 loc) · 11.8 KB
/
refs.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
package main
import (
"bytes"
"database/sql"
"encoding/binary"
"errors"
"fmt"
"math/rand"
"os"
"sync"
"time"
"github.com/lib/pq"
)
const (
refpackStoragePath = "output/refpack/"
)
type refpack struct {
Filename string // The filename of this refpack
NumUpdates uint64 // The number of updates recorded in the refpack
File *os.File // The actual file object
}
var haveCurrentRefpack = false
var currentRefpack *refpack
var refpackWriterMutex sync.Mutex
func rotateRefpackRegularly() {
for {
// Vaguely based on http://stackoverflow.com/a/19549474
now := time.Now().UTC()
nextTime := time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), 0, 0,
0, time.UTC).Add(time.Hour)
diffTime := nextTime.Sub(now)
// Make sure we're actually waiting a bit: leap seconds, clock drift, etc.
// are annoying.
if diffTime < time.Minute {
// Add produces a Time, not a Duration, so we do this instead.
diffTime = nextTime.Add(time.Hour).Sub(now)
}
<- time.After(diffTime)
// Finally, we should be at the top of an hour. Rotate.
rotateRefpack()
}
}
func rotateRefpack() error {
refpackWriterMutex.Lock()
// We're not going to defer the unlock for this, because there is no safe way
// to recover from a failure here: failure almost certainly means we don't
// have a safely open refpack, which is not a state we want to write in.
err := closeRefpack(currentRefpack); if err != nil {
return err
}
currentRefpack, err = initRefpack(); if err != nil {
return err
}
refpackWriterMutex.Unlock()
return nil
}
func CloseRefpackWriter() error {
// Here we intend to stop writing forever, generally because the program is
// closing. Therefore, we can acquire this lock and not let it go.
refpackWriterMutex.Lock()
if haveCurrentRefpack {
haveCurrentRefpack = false
err := closeRefpack(currentRefpack); if err != nil {
return err
}
}
return nil
}
func RefpackNameToDirectory(name string) (string, error) {
if len(name) != len("gitglob_YYYYMMDDHHIISS_123.refpack") {
return "", errors.New("Incorrect refpack name length.")
}
return refpackStoragePath + "/" + name[8:8+6], nil
}
func RefpackNameToFullPath(name string) (string, error) {
dir, err := RefpackNameToDirectory(name); if err != nil {
return "", err
}
return dir + "/" + name, nil
}
func initRefpack() (*refpack, error) {
pack := new(refpack)
errorCount := 0
for {
packTime := time.Now().UTC()
pack.Filename = "gitglob_"+packTime.Format("20060102150405")+"_"+
fmt.Sprintf("%03d", rand.Intn(1000))+".refpack"
packDir, err := RefpackNameToDirectory(pack.Filename); if err != nil {
return nil, err
}
if err := os.MkdirAll(packDir, 0755); err != nil {
return nil, errors.New("Unable to create refpack folder.")
}
packFullPath, err := RefpackNameToFullPath(pack.Filename); if err != nil {
return nil, err
}
fmt.Println(packFullPath)
// See if the file exists already for some reason.
if _, err := os.Stat(packFullPath); os.IsNotExist(err) {
// The file doesn't exist, so we'll use it.
pack.File, err = os.OpenFile(packFullPath,
os.O_WRONLY | os.O_CREATE | os.O_EXCL, 0644)
if err == nil {
break
} else {
errorCount += 1
if errorCount >= 3 {
return nil, errors.New("Unable to create a refpack on disk.")
}
}
}
}
fmt.Println(pack.Filename)
pack.NumUpdates = 0
return pack, nil
}
func initRefpackWriting() error {
if !haveCurrentRefpack {
var err error
currentRefpack, err = initRefpack(); if err != nil {
return err
}
go rotateRefpackRegularly()
haveCurrentRefpack = true
}
return nil
}
func closeRefpack(pack *refpack) error {
fmt.Printf("closeRefpack() called for %s\n", pack.Filename)
// Close the file
pack.File.Close()
// Determine whether we wrote any data.
if pack.NumUpdates == 0 {
// We didn't write anything, so we can just delete the file.
fullPath, err := RefpackNameToFullPath(pack.Filename); if err != nil {
return err
}
os.Remove(fullPath)
}
return nil
}
func CalcRefDiffs(oldRefs, newRefs map[string][]byte,
oldTime time.Time) refDiffs {
diffs := refDiffs {
Type: RefDiffTypeDelta,
From: pq.NullTime{Time: oldTime, Valid: true},
NewRefs: make(map[string][]byte),
ChangedRefs: make(map[string][]byte),
DeletedRefs: make([]string, 0),
}
newHashMap := make(map[[hashLen]byte]bool)
oldHashMap := make(map[[hashLen]byte]bool)
for newRef, newHash := range newRefs {
newHashMap[sliceToHashArray(newHash)] = true
// Look up the new ref in the old ref set.
oldHash, ok := oldRefs[newRef]
if ok {
// This was in the old set: check to see if the hash is the same.
if bytes.Equal(newHash, oldHash) {
// They're equal, don't save this.
} else {
// The hashes are different, store this as a changed ref.
diffs.ChangedRefs[newRef] = newHash
// And note that we had the old hash from this ref.
oldHashMap[sliceToHashArray(oldHash)] = true
}
} else {
// This didn't exist in the old set: it must be new.
diffs.NewRefs[newRef] = newHash
}
// Remove this so we don't consider it any longer.
delete(oldRefs, newRef)
}
for oldRef, oldHash := range oldRefs {
oldHashMap[sliceToHashArray(oldHash)] = true
// This wasn't deleted by the pass above, so it must not have been in the
// new ref set. Therefore, it must have been removed since then.
diffs.DeletedRefs = append(diffs.DeletedRefs, oldRef)
}
for hash, _ := range newHashMap {
diffs.NewHashes = append(diffs.NewHashes, hash)
}
for hash, _ := range oldHashMap {
diffs.OldHashes = append(diffs.OldHashes, hash)
}
return diffs
}
func RecordRepoRefs(repoPath string, repoId int, timestamp time.Time,
refs map[string][]byte) (refDiffs, map[string][]byte, error) {
var (
diffs refDiffs
lastStamp time.Time
fetchCount int64
refNames []string
refHashes pq.ByteaArray
)
oldRefs := make(map[string][]byte)
// Get the last set of revisions in the database.
err := preparedLatestRefsGet.QueryRow(repoId).Scan(pq.Array(&refNames),
&refHashes, &lastStamp, &fetchCount)
if (err != nil) && (err != sql.ErrNoRows) {
return diffs, oldRefs, err
}
// Calculate the differences.
if err == sql.ErrNoRows {
// There was no previous record of this repository.
diffs = refDiffs{
Type: RefDiffTypeAbsolute,
From: pq.NullTime{Valid: false},
NewRefs: refs,
ChangedRefs: make(map[string][]byte),
DeletedRefs: make([]string, 0),
}
// Create a list of new hashes
newHashMap := make(map[[hashLen]byte]bool)
for _, newHash := range refs {
newHashMap[sliceToHashArray(newHash)] = true
}
for hash, _ := range newHashMap {
diffs.NewHashes = append(diffs.NewHashes, hash)
}
} else {
if fetchCount % maxRefDepth == 0 {
// We want to write an absolute diff because we have gone long enough
// without one. This prevents corruption from affecting all of a
// repository's history, and means that "in-between" reflist lookups don't
// have to apply deltas all the way back to be beginning of a repository.
diffs = refDiffs{
Type: RefDiffTypeAbsolute,
From: pq.NullTime{Time: lastStamp, Valid: true},
NewRefs: refs,
ChangedRefs: make(map[string][]byte),
DeletedRefs: make([]string, 0),
}
} else {
// We want to store an actual delta: calculate it.
calcOldRefs := make(map[string][]byte)
for refIndex, refName := range refNames {
oldRefs[refName] = refHashes[refIndex]
calcOldRefs[refName] = refHashes[refIndex]
}
diffs = CalcRefDiffs(calcOldRefs, refs, lastStamp)
}
}
// Write the diffs to the database
refNewNames := make([]string, len(diffs.NewRefs))
refNewHashes := make([][]byte, len(diffs.NewRefs))
refChangedNames := make([]string, len(diffs.ChangedRefs))
refChangedHashes := make([][]byte, len(diffs.ChangedRefs))
sliceIndex := 0
for refNewName, refNewHash := range diffs.NewRefs {
refNewNames[sliceIndex] = refNewName
refNewHashes[sliceIndex] = refNewHash
sliceIndex++
}
sliceIndex = 0
for refChangedName, refChangedHash := range diffs.ChangedRefs {
refChangedNames[sliceIndex] = refChangedName
refChangedHashes[sliceIndex] = refChangedHash
sliceIndex++
}
_, err = preparedHistoryRefsAdd.Exec(repoId, timestamp, diffs.Type,
diffs.From, pq.StringArray(refNewNames), pq.ByteaArray(refNewHashes),
pq.StringArray(refChangedNames), pq.ByteaArray(refChangedHashes),
pq.StringArray(diffs.DeletedRefs))
if err != nil {
return diffs, oldRefs, err
}
// Write the file
refpackWriterMutex.Lock()
// Format:
// [repository address]\x00[type byte][uint64 timestamp][uint64 fromtimestamp]
// [vuarint new length][uvarint changed length][uvarint deleted length]
// [new refs][changed refs][deleted refs]
// New and changed refs: [20-byte commithash][refname]\n
// Deleted refs: [refname]\n
// Write the repository address
_, err = fmt.Fprintf(currentRefpack.File, "%s\x00", repoPath); if err != nil {
return diffs, oldRefs, err
}
// Write diff type byte
err = binary.Write(currentRefpack.File, binary.BigEndian, byte(diffs.Type))
if err != nil {
return diffs, oldRefs, err
}
// Write timestamps
err = binary.Write(currentRefpack.File, binary.BigEndian, timestamp.Unix())
if err != nil {
return diffs, oldRefs, err
}
err = binary.Write(currentRefpack.File, binary.BigEndian, int64(diffs.From.Time.Unix()))
if err != nil {
return diffs, oldRefs, err
}
// Write section lengths
uvarintSlice := make([]byte, binary.MaxVarintLen64)
newUvarintLen := binary.PutUvarint(uvarintSlice,
uint64(len(diffs.NewRefs)))
_, err = currentRefpack.File.Write(uvarintSlice[0:newUvarintLen]); if err != nil {
return diffs, oldRefs, err
}
changedUvarintLen := binary.PutUvarint(uvarintSlice,
uint64(len(diffs.ChangedRefs)))
_, err = currentRefpack.File.Write(uvarintSlice[0:changedUvarintLen]); if err != nil {
return diffs, oldRefs, err
}
deletedUvarintLen := binary.PutUvarint(uvarintSlice,
uint64(len(diffs.DeletedRefs)))
_, err = currentRefpack.File.Write(uvarintSlice[0:deletedUvarintLen]); if err != nil {
return diffs, oldRefs, err
}
// Write new refs
for refName, commithash := range diffs.NewRefs {
_, err = currentRefpack.File.Write(commithash[:]); if err != nil {
return diffs, oldRefs, err
}
_, err = fmt.Fprintf(currentRefpack.File, "%s\n", refName); if err != nil {
return diffs, oldRefs, err
}
}
// Write changed refs
for refName, commithash := range diffs.ChangedRefs {
_, err = currentRefpack.File.Write(commithash[:]); if err != nil {
return diffs, oldRefs, err
}
_, err = fmt.Fprintf(currentRefpack.File, "%s\n", refName); if err != nil {
return diffs, oldRefs, err
}
}
// Write deleted refs
for _, refName := range diffs.DeletedRefs {
_, err = fmt.Fprintf(currentRefpack.File, "%s\n", refName); if err != nil {
return diffs, oldRefs, err
}
}
currentRefpack.NumUpdates += 1
// We can Sync() after the unlock, because syncing in the middle of another
// write is okay.
refpackWriterMutex.Unlock()
currentRefpack.File.Sync()
influxWritePoint("update_refs", map[string]string{}, map[string]interface{}{
"total": len(refs),
"new": len(diffs.NewRefs),
"changed": len(diffs.ChangedRefs),
"deleted": len(diffs.DeletedRefs),
"repo_path": repoPath,
})
return diffs, oldRefs, nil
}