generated from ARM-Development/arm-repository-template
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsts.go
399 lines (336 loc) · 11 KB
/
sts.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
package sts
import (
"io"
"time"
"github.com/arm-doe/sts/marshal"
)
const (
// DefLog is the default logs directory name
DefLog = "logs"
// DefLogMsg is the default log messages directory name
// (appended to "logs")
DefLogMsg = "messages"
// DefLogOut is the default outgoing log messages directory name
// (appended to "logs")
DefLogOut = "outgoing_to"
// DefLogIn is the default incoming log messages directory name
// (appended to "logs")
DefLogIn = "incoming_from"
// DefOut is the default outgoing data directory name
DefOut = "outgoing_to"
// DefCache is the default cache directory name
DefCache = ".sts"
// DefStage is the default stage directory name
DefStage = "stage"
// DefFinal is the default final directory name
DefFinal = "incoming_from"
// MethodHTTP indicates HTTP transfer method
MethodHTTP = "http"
// DefaultPort is the default TCP port used for HTTP communication
DefaultPort = 1992
)
// Logger is the generic logging interface
type Logger interface {
Debug(...interface{})
Info(...interface{})
Error(...interface{})
Recent(int) []string
}
// SendLogger is the interface for logging on the sending side
type SendLogger interface {
Sent(Sent)
WasSent(name, hash string, after time.Time, before time.Time) bool
}
// ReceiveLogger is the interface for logging on the incoming side
type ReceiveLogger interface {
Parse(
handler func(name, renamed, hash string, size int64, t time.Time) bool,
after time.Time,
before time.Time) bool
Received(file Received)
WasReceived(
name, hash string,
after time.Time,
before time.Time) bool
}
// FileSource is the interface for reading and deleting from a store of file
// objects on the client (i.e. sending) side
type FileSource interface {
Scan(func(File) bool) ([]File, time.Time, error)
GetOpener() Open
Remove(File) error
Sync(File) (File, error)
IsNotExist(error) bool
}
// FileCache is the interface for caching a collection of files
type FileCache interface {
Iterate(func(Cached) bool)
Get(string) Cached
Add(Hashed)
Done(name string, whileLocked func(Cached))
Reset(string)
Remove(string)
Persist() error
}
// FileQueue is the interface for getting files in the proper order
type FileQueue interface {
Push([]Hashed)
Pop() Sendable
}
// Dispatcher is the interface for broadcasting messages
type Dispatcher interface {
Send(string) error
}
// GateKeeper is the interface for managing the "putting away" of files
// received on the server
type GateKeeper interface {
Recover()
CleanNow()
Prune(time.Duration)
Ready() bool
Scan(version string) ([]byte, error)
Prepare(request []Binned)
Receive(*Partial, io.Reader) error
Received([]Binned) (nRecvd int)
GetFileStatus(relPath string, sent time.Time) int
Stop(bool)
}
// GateKeeperFactory creates GateKeeper instances
type GateKeeperFactory func(source string) GateKeeper
// ClientStatus uses bitmasking to represent the status of a client
type ClientStatus uint
const (
// ClientIsDisabled indicates that transfer is to not occur
ClientIsDisabled ClientStatus = 1 << iota
// ClientIsApproved indicates that transfer should be enabled
ClientIsApproved
// ClientHasUpdatedConfiguration indicates, well, the updated configuration
// should be retrieved
ClientHasUpdatedConfiguration
)
// ClientState is the outer structure for a runner to communicate to the server
// its current state
type ClientState struct {
When string `json:"when"`
Version string `json:"vers"`
GoVersion string `json:"go"`
BuildTime string `json:"built"`
IsActive bool `json:"active"`
IPAddrs []string `json:"ip"`
Messages []string `json:"msg"`
Sources map[string]SourceState `json:"sources"`
}
// SourceState is the structure used to communicate to the server the current
// state of a single running source (an outgoing "client")
type SourceState struct {
FoundCount int64 `json:"foundCnt"`
FoundSize int64 `json:"foundSize"`
QueueCount int64 `json:"qCnt"`
QueueSize int64 `json:"qSize"`
SentCount int64 `json:"sentCnt"`
SentSize int64 `json:"sentSize"`
Throughput Throughput `json:"throughput"`
RecentFiles []*RecentFile `json:"files"`
}
// Throughput is the structure used to keep track of bandwidth usage
type Throughput struct {
Bytes int64 `json:"bytes"`
Seconds float64 `json:"secs"`
PctIdle float64 `json:"idle"`
}
// RecentFile is the data structure that encapsulates the state of a file used
// to communicate client state to the server
type RecentFile struct {
Name string `json:"name"`
Rename string `json:"rename"`
Path string `json:"path"`
Hash string `json:"hash"`
Size int64 `json:"size"`
Time time.Time `json:"time"`
Sent bool `json:"sent"`
}
// ClientManager is responsible for disseminating and collecting information
// about a client (sender)
type ClientManager interface {
GetClientStatus(clientID, clientName, clientOS string) (ClientStatus, error)
GetClientConf(clientID string) (*ClientConf, error)
SetClientConfReceived(clientID string, when time.Time) error
SetClientState(clientID string, state ClientState) error
}
// EncodeClientID creates a decodable composite ID from a key and unique ID
type EncodeClientID func(key, uid string) (clientID string)
// DecodeClientID converts a composite client ID into its original key and
// unique ID
type DecodeClientID func(clientID string) (key, uid string)
// RequestValidator validates an incoming request
type RequestValidator func(source, key string) bool
// DecodePartials decodes the input reader into a slice of Partial instance
// pointers
type DecodePartials func(r io.Reader) ([]*Partial, error)
// Recover is the function type for determining what partial files need to be
// sent since previous run
type Recover func() ([]*Partial, error)
// Open is a generic function for creating a Readable handle to a file
type Open func(File) (Readable, error)
// Rename is a generic function for generating a new name for a file
type Rename func(File) string
// PayloadFactory creates a Payload instance based on the input max size (in
// bytes) and Open function
type PayloadFactory func(int64, Open, Rename) Payload
// Transmit is the function type for actually sending payload over the wire (or
// whatever). It returns some representation of how much was successfully
// transmitted and any error encountered.
type Transmit func(Payload) (int, error)
// RecoverTransmission is the function type for querying the server about a
// failed payload to determine which parts if any were successfully received
// in order to avoid duplication
type RecoverTransmission func(Payload) (int, error)
// PayloadDecoderFactory creates a decoder instance to be used to parse a
// multipart byte stream with the metadata at the beginning
type PayloadDecoderFactory func(
metaLen int, pathSep string, payload io.Reader) (PayloadDecoder, error)
// Validate is the function type for validating files sent by the client were
// successfully received by the server
type Validate func([]Pollable) ([]Polled, error)
// Translate is a general function type for converting one string to another
type Translate func(string) string
// File is the most basic interface for a File object
type File interface {
GetPath() string
GetName() string
GetSize() int64
GetTime() time.Time
GetMeta() []byte
}
// Readable is a wrapper for being able to seek, read, and close a file
// generically
type Readable interface {
io.Reader
io.Seeker
io.Closer
}
// Hashed is the interface a file must implement to include a signature
type Hashed interface {
File
GetHash() string
}
// Cached is the interface a cached file must implement
type Cached interface {
Hashed
IsDone() bool
}
// Recovered is the interface a file must implement to be recovered after a
// partial send
type Recovered interface {
Hashed
GetPrev() string
GetSendSize() int64
Allocate(int64) (int64, int64)
IsAllocated() bool
}
// Sendable is the interface a file must implement to be sent by the client
type Sendable interface {
Hashed
GetPrev() string
GetSlice() (int64, int64)
GetSendSize() int64
}
// Binnable is the interface a file must implement to have a chunk be part of
// a sendable bin
type Binnable interface {
Sendable
GetNextAlloc() (int64, int64)
AddAlloc(int64)
IsAllocated() bool
}
// Binned is the interface for a single file chunk that is part of a payload
type Binned interface {
GetName() string
GetRenamed() string
GetPrev() string
GetFileTime() time.Time
GetFileHash() string
GetFileSize() int64
// GetSendSize will almost always be the same as GetFileSize. The one
// exception is for files recovered where some portion of the file was
// sent earlier.
GetSendSize() int64
GetSlice() (int64, int64)
}
// Payload is the interface to a slice of Binnables that can be read for
// transmission
type Payload interface {
Add(Binnable) bool
Remove(Binned)
IsFull() bool
Split(nParts int) Payload
GetSize() int64
GetParts() []Binned
EncodeHeader() ([]byte, error)
GetEncoder() io.ReadCloser
GetStarted() time.Time
GetCompleted() time.Time
}
// PayloadDecoder is the interface on the receiving side to a sent payload
type PayloadDecoder interface {
GetParts() []Binned
Next() (io.Reader, bool)
}
// Partial is the JSON-encodable struct containing metadata for a single file
// on the receiving end
type Partial struct {
Name string `json:"path"`
Renamed string `json:"renamed"`
Prev string `json:"prev"`
Time marshal.NanoTime `json:"time"`
Size int64 `json:"size"`
Hash string `json:"hash"`
Source string `json:"src"`
Parts []*ByteRange `json:"parts"`
}
// ByteRange is the JSON-encodable struct used by the Partial for tracking a
// a single byte range
type ByteRange struct {
Beg int64 `json:"b"`
End int64 `json:"e"`
}
// Sent is the interface a file must implement to be recorded as sent
type Sent interface {
GetName() string
GetSize() int64
GetHash() string
TimeMs() int64
}
// Pollable is the interface a file must implement to be polled by the client
// for whether or not the server received the file successfully
type Pollable interface {
Sent
GetPrev() string // Needed in case the file needs to be resent
GetStarted() time.Time
}
// Polled is the interface a file must implement as a response to being polled
type Polled interface {
Pollable
NotFound() bool
Waiting() bool
Failed() bool
Received() bool
}
// Received is the interface a file must implement to be logged as received
type Received interface {
GetName() string
GetRenamed() string
GetSize() int64
GetHash() string
}
const (
// ConfirmNone is the indicator that a file has not been confirmed.
ConfirmNone = 0
// ConfirmFailed is the indicator that file confirmation failed.
ConfirmFailed = 1
// ConfirmPassed is the indicator that file confirmation succeeded.
ConfirmPassed = 2
// ConfirmWaiting is the indicator that file confirmation succeeded but its
// predecessor has not been confirmed.
ConfirmWaiting = 3
)