-
Notifications
You must be signed in to change notification settings - Fork 10
/
Copy pathpeer.go
121 lines (101 loc) · 2.27 KB
/
peer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
package go2p
import (
"github.com/v-braun/awaiter"
"github.com/emirpasic/gods/maps"
"github.com/emirpasic/gods/maps/hashmap"
)
// Peer represents a connection to a remote peer
type Peer struct {
io *adapterIO
send chan *Message
middleware middlewares
emitter *eventEmitter
metadata maps.Map
awaiter awaiter.Awaiter
}
func newPeer(adapter Adapter, middleware middlewares) *Peer {
p := new(Peer)
p.send = make(chan *Message, 10)
p.io = newAdapterIO(adapter)
p.awaiter = awaiter.New()
p.middleware = middleware
p.metadata = hashmap.New()
p.emitter = newEventEmitter()
return p
}
func (p *Peer) start() <-chan struct{} {
done := make(chan struct{})
p.io.emitter.On("disconnect", func(args []interface{}) {
p.emitter.EmitAsync("disconnect", p)
})
p.io.emitter.On("error", func(args []interface{}) {
p.emitter.EmitAsync("error", p, args[0])
})
p.io.start()
p.awaiter.Go(func() {
close(done)
for {
select {
case m := <-p.io.receive:
p.processPipe(m, Receive)
continue
case m := <-p.send:
p.processPipe(m, Send)
continue
case <-p.awaiter.CancelRequested():
return
}
}
})
return done
}
func (p *Peer) processPipe(m *Message, op PipeOperation) {
from := 0
to := len(p.middleware)
pos := 0
if op == Receive {
pos = to
}
pipe := newPipe(p, p.middleware, op, pos, from, to)
err := pipe.process(m)
if err == ErrPipeStopProcessing {
return
}
if err != nil {
p.io.handleError(err, "processPipe")
p.stopInternal()
return
}
if op == Receive {
p.emitter.EmitAsync("message", p, m)
} else {
err := p.io.sendMsg(m)
if err != nil {
p.io.handleError(err, "processPipe")
p.stopInternal()
return
}
}
}
func (p *Peer) stopInternal() {
p.io.adapter.Close()
p.io.awaiter.Cancel()
p.awaiter.Cancel()
}
func (p *Peer) stop() {
p.stopInternal()
p.io.awaiter.AwaitSync()
p.awaiter.AwaitSync()
}
// RemoteAddress returns the remote address of the current peer
func (p *Peer) RemoteAddress() string {
return p.io.adapter.RemoteAddress()
}
// LocalAddress returns the local address of the current peer
func (p *Peer) LocalAddress() string {
return p.io.adapter.LocalAddress()
}
// Metadata returns a map of metadata associated to this peer
func (p *Peer) Metadata() maps.Map {
return p.metadata
}