-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpipe.go
136 lines (109 loc) · 2.92 KB
/
pipe.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
package go2p
import (
"errors"
"github.com/sirupsen/logrus"
)
// PipeOperation represents the pipe direction (Send or Receive)
type PipeOperation int
const (
// Send represents an outgoing message pipe processing
Send PipeOperation = iota
// Receive represents an incoming message pipe processing
Receive PipeOperation = iota
)
func (po PipeOperation) String() string {
return [...]string{"Send", "Receive"}[po]
}
// ErrPipeStopProcessing is returned when the pipe has stopped it execution
var ErrPipeStopProcessing = errors.New("pipe stopped")
// Pipe handles the processing of an message
type Pipe struct {
peer *Peer
allActions middlewares
executingActions middlewares
op PipeOperation
pos int // instruction pointer
log *logrus.Entry
}
func newPipe(peer *Peer, allActions middlewares, op PipeOperation, pos int, fromPos int, toPos int) *Pipe {
p := new(Pipe)
p.op = op
p.pos = pos
p.allActions = allActions
p.executingActions = allActions[fromPos:toPos]
p.log = newLogger("pipe")
p.peer = peer
return p
}
func (p *Pipe) process(msg *Message) error {
nextItems := p.executingActions.nextItems(p.op)
for _, m := range nextItems {
p.log.WithFields(logrus.Fields{
"name": m.name,
"pos": m.pos,
"msg-len": len(msg.PayloadGet()),
"op": p.op.String(),
}).Debug("execute middleware")
res, err := m.execute(p.peer, p, msg)
if err != nil {
p.log.WithFields(logrus.Fields{
"name": m.name,
"pos": m.pos,
"msg-len": len(msg.PayloadGet()),
"err": err,
}).Error("middleware error")
return err
} else if res == Stop {
return ErrPipeStopProcessing
}
if p.op == Send {
p.pos++
} else {
p.pos--
}
}
return nil
}
// Send will send the provided message during the current pipe execution.
//
// The message goes only through middlewares that are after the current pipe position
func (p *Pipe) Send(msg *Message) error {
pos := p.pos + 1
from := p.pos + 1
to := len(p.allActions)
if pos > to {
err := p.peer.io.sendMsg(msg)
return err
}
subPipe := newPipe(p.peer, p.allActions, Send, pos, from, to)
if err := subPipe.process(msg); err != nil {
return err
}
err := p.peer.io.sendMsg(msg)
return err
}
// Receive will block the current call until a message was read from the peer or
// an error occurs.
//
// The message goes only through middlewares that are after the current pipe position
func (p *Pipe) Receive() (*Message, error) {
msg, err := p.peer.io.receiveMsg()
if err == nil && msg == nil {
panic("unexpected nil result from peer.receive")
} else if err != nil {
return nil, err
} else {
pos := p.pos + 1
from := p.pos + 1
to := len(p.allActions)
if pos <= to {
subPipe := newPipe(p.peer, p.allActions, Receive, pos, from, to)
err = subPipe.process(msg)
}
}
return msg, err
}
// Operation returns the current pipe operation (Send or Receive)
func (p *Pipe) Operation() PipeOperation {
return p.op
}