-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathworker.go
64 lines (51 loc) · 1.1 KB
/
worker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package evbundler
import (
"context"
"fmt"
"runtime/trace"
"time"
)
type Worker interface {
Process(context.Context, Event) *Result
Close() error
// StateTransaction(<-chan WorkerState)
}
type WorkerFunc func(context.Context, Event) error
func defaultWorkerFunc(ctx context.Context, ev Event) error {
return ev.Fire(ctx)
}
type worker struct {
state WorkerState
f WorkerFunc
}
func (w *worker) Process(ctx context.Context, ev Event) *Result {
w.setState(StateProcess)
defer w.setState(StateActive)
defer trace.StartRegion(ctx, fmt.Sprintf("process event: %q", ev.Name())).End()
start := time.Now()
err := w.f(ctx, ev)
elapsed := time.Since(start)
return &Result{
Weight: 1,
EventName: ev.Name(),
Error: err,
Latency: elapsed,
Timestamp: start,
}
}
func (w worker) Close() error {
return nil
}
type WorkerState int
const (
StateDead WorkerState = iota
StateActive
StateWaiting
StateProcess
)
func (ws WorkerState) String() string {
return []string{"DEAD", "ACTIVE", "WAIT_EVENT", "PROCESS_EVENT"}[ws]
}
func (w *worker) setState(state WorkerState) {
w.state = state
}