-
Notifications
You must be signed in to change notification settings - Fork 3
/
Copy pathserver.go
588 lines (514 loc) · 21 KB
/
server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
package lime
import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"go.uber.org/multierr"
"golang.org/x/sync/errgroup"
"log"
"net"
"os"
"reflect"
"runtime"
"sync"
"time"
)
// Server allows the receiving of Lime connections through multiple transport listeners, like TCP and Websockets.
// It handles the session negotiation and authentication through the handlers defined in ServerConfig.
// Finally, it allows the definition of handles for receiving envelopes from the clients.
type Server struct {
config *ServerConfig
mux *EnvelopeMux
listeners []BoundListener
mu sync.Mutex
transportChan chan Transport
shutdown context.CancelFunc
}
// NewServer creates a new instance of the Server type.
func NewServer(config *ServerConfig, mux *EnvelopeMux, listeners ...BoundListener) *Server {
if config == nil {
config = defaultServerConfig
}
if mux == nil || reflect.ValueOf(mux).IsNil() {
panic("nil mux")
}
if len(listeners) == 0 {
panic("empty listeners")
}
return &Server{
config: config,
mux: mux,
listeners: listeners,
transportChan: make(chan Transport, config.Backlog),
}
}
// ListenAndServe starts listening for new connections in the registered transport listeners.
// This is a blocking call which always returns a non nil error.
// In case of a graceful closing, the returned error is ErrServerClosed.
func (srv *Server) ListenAndServe() error {
if srv.shutdown != nil {
return errors.New("server already listening")
}
ctx, cancel := context.WithCancel(context.Background())
srv.shutdown = cancel
if len(srv.listeners) == 0 {
return errors.New("no listeners found")
}
eg, ctx := errgroup.WithContext(ctx)
for _, l := range srv.listeners {
if err := l.Listener.Listen(ctx, l.Addr); err != nil {
return fmt.Errorf("listen error: %w", err)
}
listener := l
eg.Go(func() error {
return acceptTransports(ctx, listener.Listener, srv.transportChan)
})
}
eg.Go(func() error {
srv.consumeTransports(ctx)
return nil
})
err := eg.Wait()
if errors.Is(err, ctx.Err()) {
return ErrServerClosed
}
return err
}
func acceptTransports(ctx context.Context, listener TransportListener, c chan<- Transport) error {
for {
transport, err := listener.Accept(ctx)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case c <- transport:
}
}
}
func (srv *Server) consumeTransports(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case t := <-srv.transportChan:
c := NewServerChannel(t, srv.config.ChannelBufferSize, srv.config.Node, uuid.NewString())
go func() {
srv.handleChannel(ctx, c)
}()
}
}
}
func (srv *Server) handleChannel(ctx context.Context, c *ServerChannel) {
err := c.EstablishSession(
ctx,
srv.config.CompOpts,
srv.config.EncryptOpts,
srv.config.SchemeOpts,
srv.config.Authenticate,
srv.config.Register,
)
if err != nil {
log.Printf("server: establish: %v\n", err)
return
}
established := srv.config.Established
if established != nil {
established(c.sessionID, c)
}
defer func() {
if c.Established() {
// Do not use the shared context since it could be canceled
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
_ = c.FinishSession(ctx)
}
finished := srv.config.Finished
if finished != nil {
finished(c.sessionID)
}
}()
if err = srv.mux.ListenServer(ctx, c); err != nil {
log.Printf("server: listen: %v\n", err)
return
}
}
// Close stops the server by closing the transport listeners and all active sessions.
func (srv *Server) Close() error {
srv.mu.Lock()
defer srv.mu.Unlock()
if srv.shutdown == nil {
return errors.New("server not listening")
}
srv.shutdown()
srv.shutdown = nil
var errs []error
for _, listener := range srv.listeners {
if err := listener.Listener.Close(); err != nil {
errs = append(errs, err)
}
}
close(srv.transportChan)
return multierr.Combine(errs...)
}
// ServerConfig define the configurations for a Server instance.
type ServerConfig struct {
Node Node // Node represents the server's address.
CompOpts []SessionCompression // CompOpts defines the compression options to be used in the session negotiation.
EncryptOpts []SessionEncryption // EncryptOpts defines the encryption options to be used in the session negotiation.
SchemeOpts []AuthenticationScheme // SchemeOpts defines the authentication schemes that should be presented to the clients during session establishment.
Backlog int // Backlog defines the size of the listener's pending connections queue.
ChannelBufferSize int // ChannelBufferSize determines the internal envelope buffer size for the channels.
// Authenticate is called for authenticating a client session.
// It should return an AuthenticationResult instance with DomainRole different of DomainRoleUnknown for a successful authentication.
Authenticate func(ctx context.Context, identity Identity, a Authentication) (*AuthenticationResult, error)
// Register is called for the client Node address registration.
// It receives a candidate node from the client and should return the effective node address that will be assigned
// to the session.
Register func(ctx context.Context, candidate Node, c *ServerChannel) (Node, error)
// Established is called when a session with a node is established.
Established func(sessionID string, c *ServerChannel)
// Finished is called when an established session with a node is finished.
Finished func(sessionID string)
}
var defaultServerConfig = NewServerConfig()
// NewServerConfig creates a new instance of ServerConfig with the default configuration values.
func NewServerConfig() *ServerConfig {
instance, err := os.Hostname()
if err != nil || instance == "" {
instance = uuid.NewString()
}
return &ServerConfig{
Node: Node{
Identity: Identity{
Name: "postmaster",
Domain: "localhost",
},
Instance: instance,
},
CompOpts: []SessionCompression{SessionCompressionNone},
EncryptOpts: []SessionEncryption{SessionEncryptionNone, SessionEncryptionTLS},
SchemeOpts: []AuthenticationScheme{AuthenticationSchemeTransport},
Backlog: runtime.NumCPU() * 8,
ChannelBufferSize: runtime.NumCPU() * 32,
Authenticate: func(ctx context.Context, identity Identity, authentication Authentication) (*AuthenticationResult, error) {
return MemberAuthenticationResult(), nil
},
Register: func(ctx context.Context, node Node, serverChannel *ServerChannel) (Node, error) {
return Node{
Identity: Identity{
Name: node.Name,
Domain: serverChannel.localNode.Domain,
},
Instance: uuid.New().String()}, nil
},
}
}
// ServerBuilder is a helper for building instances of Server.
// Avoid instantiating it directly, use the NewServerBuilder() function instead.
type ServerBuilder struct {
config *ServerConfig
mux *EnvelopeMux
listeners []BoundListener
plainAuth PlainAuthenticator
keyAuth KeyAuthenticator
externalAuth ExternalAuthenticator
}
// NewServerBuilder creates a new ServerBuilder, which is a helper for building Server instances.
// It provides a fluent interface for convenience.
func NewServerBuilder() *ServerBuilder {
return &ServerBuilder{config: NewServerConfig(), mux: &EnvelopeMux{}}
}
// Name sets the server's node name.
func (b *ServerBuilder) Name(name string) *ServerBuilder {
b.config.Node.Name = name
return b
}
// Domain sets the server's node domain.
func (b *ServerBuilder) Domain(domain string) *ServerBuilder {
b.config.Node.Domain = domain
return b
}
// Instance sets the server's node instance.
func (b *ServerBuilder) Instance(instance string) *ServerBuilder {
b.config.Node.Instance = instance
return b
}
// MessageHandlerFunc allows the registration of a function for handling received messages that matches
// the specified predicate. Note that the registration order matters, since the receiving process stops when
// the first predicate match occurs.
func (b *ServerBuilder) MessageHandlerFunc(predicate MessagePredicate, f MessageHandlerFunc) *ServerBuilder {
b.mux.MessageHandlerFunc(predicate, f)
return b
}
// MessagesHandlerFunc allows the registration of a function for handling all received messages.
// This handler should be the last one to be registered, since it will capture all messages received by the client.
func (b *ServerBuilder) MessagesHandlerFunc(f MessageHandlerFunc) *ServerBuilder {
b.mux.MessageHandlerFunc(func(msg *Message) bool { return true }, f)
return b
}
// MessageHandler allows the registration of a MessageHandler.
// Note that the registration order matters, since the receiving process stops when the first predicate match occurs.
func (b *ServerBuilder) MessageHandler(handler MessageHandler) *ServerBuilder {
b.mux.MessageHandler(handler)
return b
}
// NotificationHandlerFunc allows the registration of a function for handling received notifications that matches
// the specified predicate. Note that the registration order matters, since the receiving process stops when
// the first predicate match occurs.
func (b *ServerBuilder) NotificationHandlerFunc(predicate NotificationPredicate, f NotificationHandlerFunc) *ServerBuilder {
b.mux.NotificationHandlerFunc(predicate, f)
return b
}
// NotificationsHandlerFunc allows the registration of a function for handling all received notifications.
// This handler should be the last one to be registered, since it will capture all notifications received by the client.
func (b *ServerBuilder) NotificationsHandlerFunc(f NotificationHandlerFunc) *ServerBuilder {
b.mux.NotificationHandlerFunc(func(not *Notification) bool { return true }, f)
return b
}
// NotificationHandler allows the registration of a NotificationHandler.
// Note that the registration order matters, since the receiving process stops when the first predicate match occurs.
func (b *ServerBuilder) NotificationHandler(handler NotificationHandler) *ServerBuilder {
b.mux.NotificationHandler(handler)
return b
}
// RequestCommandHandlerFunc allows the registration of a function for handling received commands that matches
// the specified predicate. Note that the registration order matters, since the receiving process stops when
// the first predicate match occurs.
func (b *ServerBuilder) RequestCommandHandlerFunc(predicate RequestCommandPredicate, f RequestCommandHandlerFunc) *ServerBuilder {
b.mux.RequestCommandHandlerFunc(predicate, f)
return b
}
// RequestCommandsHandlerFunc allows the registration of a function for handling all received commands.
// This handler should be the last one to be registered, since it will capture all commands received by the client.
func (b *ServerBuilder) RequestCommandsHandlerFunc(f RequestCommandHandlerFunc) *ServerBuilder {
b.mux.RequestCommandHandlerFunc(func(cmd *RequestCommand) bool { return true }, f)
return b
}
// RequestCommandHandler allows the registration of a NotificationHandler.
// Note that the registration order matters, since the receiving process stops when the first predicate match occurs.
func (b *ServerBuilder) RequestCommandHandler(handler RequestCommandHandler) *ServerBuilder {
b.mux.RequestCommandHandler(handler)
return b
}
// AutoReplyPings adds a RequestCommandHandler handler to automatically reply ping requests from the remote node.
func (b *ServerBuilder) AutoReplyPings() *ServerBuilder {
return b.RequestCommandHandlerFunc(
func(cmd *RequestCommand) bool {
return cmd.Method == CommandMethodGet && cmd.URI.Path() == "/ping"
},
func(ctx context.Context, cmd *RequestCommand, s Sender) error {
return s.SendResponseCommand(
ctx,
cmd.SuccessResponseWithResource(&Ping{}))
})
}
// ResponseCommandHandlerFunc allows the registration of a function for handling received commands that matches
// the specified predicate. Note that the registration order matters, since the receiving process stops when
// the first predicate match occurs.
func (b *ServerBuilder) ResponseCommandHandlerFunc(predicate ResponseCommandPredicate, f ResponseCommandHandlerFunc) *ServerBuilder {
b.mux.ResponseCommandHandlerFunc(predicate, f)
return b
}
// ResponseCommandsHandlerFunc allows the registration of a function for handling all received commands.
// This handler should be the last one to be registered, since it will capture all commands received by the client.
func (b *ServerBuilder) ResponseCommandsHandlerFunc(f ResponseCommandHandlerFunc) *ServerBuilder {
b.mux.ResponseCommandHandlerFunc(func(cmd *ResponseCommand) bool { return true }, f)
return b
}
// ResponseCommandHandler allows the registration of a NotificationHandler.
// Note that the registration order matters, since the receiving process stops when the first predicate match occurs.
func (b *ServerBuilder) ResponseCommandHandler(handler ResponseCommandHandler) *ServerBuilder {
b.mux.ResponseCommandHandler(handler)
return b
}
// ListenTCP adds a new TCP transport listener with the specified configuration.
// This method can be called multiple times.
func (b *ServerBuilder) ListenTCP(addr *net.TCPAddr, config *TCPConfig) *ServerBuilder {
listener := NewTCPTransportListener(config)
b.listeners = append(b.listeners, NewBoundListener(listener, addr))
return b
}
// ListenWebsocket adds a new Websocket transport listener with the specified configuration.
// This method can be called multiple times.
func (b *ServerBuilder) ListenWebsocket(addr *net.TCPAddr, config *WebsocketConfig) *ServerBuilder {
listener := NewWebsocketTransportListener(config)
b.listeners = append(b.listeners, NewBoundListener(listener, addr))
return b
}
// ListenInProcess adds a new in-process transport listener with the specified configuration.
// This method can be called multiple times.
func (b *ServerBuilder) ListenInProcess(addr InProcessAddr) *ServerBuilder {
listener := NewInProcessTransportListener(addr)
b.listeners = append(b.listeners, NewBoundListener(listener, addr))
return b
}
// CompressionOptions defines the compression options to be used in the session negotiation.
func (b *ServerBuilder) CompressionOptions(compOpts ...SessionCompression) *ServerBuilder {
if len(compOpts) == 0 {
panic("empty compOpts")
}
b.config.CompOpts = compOpts
return b
}
// EncryptionOptions defines the encryption options to be used in the session negotiation.
func (b *ServerBuilder) EncryptionOptions(encryptOpts ...SessionEncryption) *ServerBuilder {
if len(encryptOpts) == 0 {
panic("empty encryptOpts")
}
b.config.EncryptOpts = encryptOpts
return b
}
// EnableGuestAuthentication enables the use of guest authentication scheme during the authentication of the
// client sessions.
// The guest authentication scheme do not require any credentials from the clients.
func (b *ServerBuilder) EnableGuestAuthentication() *ServerBuilder {
if !contains(b.config.SchemeOpts, AuthenticationSchemeGuest) {
b.config.SchemeOpts = append(b.config.SchemeOpts, AuthenticationSchemeGuest)
}
return b
}
// EnableTransportAuthentication enables the use of transport authentication scheme during the authentication of the
// client sessions.
// The transport authentication will delegate the authentication to the session transport and the form of passing the
// credentials may vary depending on the transport type.
// For instance, in TCP transport connections, the client certificate used during the mutual TLS negotiation is
// considered the credentials by the server.
func (b *ServerBuilder) EnableTransportAuthentication() *ServerBuilder {
if !contains(b.config.SchemeOpts, AuthenticationSchemeTransport) {
b.config.SchemeOpts = append(b.config.SchemeOpts, AuthenticationSchemeTransport)
}
return b
}
// PlainAuthenticator defines a function for authenticating an identity session using a password.
type PlainAuthenticator func(ctx context.Context, identity Identity, password string) (*AuthenticationResult, error)
// EnablePlainAuthentication enables the use of plain authentication scheme during the authentication of the
// client sessions. The provided PlainAuthentication function is called for authenticating any session with this scheme.
func (b *ServerBuilder) EnablePlainAuthentication(a PlainAuthenticator) *ServerBuilder {
if a == nil {
panic("nil authenticator")
}
b.plainAuth = a
if !contains(b.config.SchemeOpts, AuthenticationSchemePlain) {
b.config.SchemeOpts = append(b.config.SchemeOpts, AuthenticationSchemePlain)
}
return b
}
// KeyAuthenticator defines a function for authenticating an identity session using a key.
type KeyAuthenticator func(ctx context.Context, identity Identity, key string) (*AuthenticationResult, error)
// EnableKeyAuthentication enables the use of key authentication scheme during the authentication of the
// client sessions. The provided KeyAuthenticator function is called for authenticating any session with this scheme.
func (b *ServerBuilder) EnableKeyAuthentication(a KeyAuthenticator) *ServerBuilder {
if a == nil {
panic("nil authenticator")
}
b.keyAuth = a
if !contains(b.config.SchemeOpts, AuthenticationSchemeKey) {
b.config.SchemeOpts = append(b.config.SchemeOpts, AuthenticationSchemeKey)
}
return b
}
// ExternalAuthenticator defines a function for authenticating an identity session using tokens emitted by an issuer.
type ExternalAuthenticator func(ctx context.Context, identity Identity, token string, issuer string) (*AuthenticationResult, error)
// EnableExternalAuthentication enables the use of key authentication scheme during the authentication of the
// client sessions. The provided ExternalAuthenticator function is called for authenticating any session with this scheme.
func (b *ServerBuilder) EnableExternalAuthentication(a ExternalAuthenticator) *ServerBuilder {
if a == nil {
panic("nil authenticator")
}
b.externalAuth = a
if !contains(b.config.SchemeOpts, AuthenticationSchemeExternal) {
b.config.SchemeOpts = append(b.config.SchemeOpts, AuthenticationSchemeExternal)
}
return b
}
// ChannelBufferSize determines the internal envelope buffer size for the channels.
func (b *ServerBuilder) ChannelBufferSize(bufferSize int) *ServerBuilder {
b.config.ChannelBufferSize = bufferSize
return b
}
// Register is called for the client Node address registration.
// It receives a candidate node from the client and should return the effective node address that will be assigned
// to the session.
func (b *ServerBuilder) Register(register func(ctx context.Context, candidate Node, c *ServerChannel) (Node, error)) *ServerBuilder {
b.config.Register = register
return b
}
// Established is called when a session with a node is established.
func (b *ServerBuilder) Established(established func(sessionID string, c *ServerChannel)) *ServerBuilder {
b.config.Established = established
return b
}
// Finished is called when an established session with a node is finished.
func (b *ServerBuilder) Finished(finished func(sessionID string)) *ServerBuilder {
b.config.Finished = finished
return b
}
// Build creates a new instance of Server.
func (b *ServerBuilder) Build() *Server {
b.config.Authenticate = buildAuthenticate(b.plainAuth, b.keyAuth, b.externalAuth)
return NewServer(b.config, b.mux, b.listeners...)
}
func buildAuthenticate(plainAuth PlainAuthenticator, keyAuth KeyAuthenticator, externalAuth ExternalAuthenticator) func(
ctx context.Context,
identity Identity,
authentication Authentication,
) (*AuthenticationResult, error) {
return func(ctx context.Context, identity Identity, authentication Authentication) (*AuthenticationResult, error) {
switch a := authentication.(type) {
case *GuestAuthentication:
if _, err := uuid.Parse(identity.Name); err != nil {
return UnknownAuthenticationResult(), nil
}
return MemberAuthenticationResult(), nil
case *TransportAuthentication:
return nil, errors.New("transport auth not implemented yet")
case *PlainAuthentication:
if plainAuth == nil {
return nil, errors.New("plain authenticator is nil")
}
pwd, err := a.GetPasswordFromBase64()
if err != nil {
return nil, fmt.Errorf("plain authenticator: %w", err)
}
return plainAuth(ctx, identity, pwd)
case *KeyAuthentication:
if keyAuth == nil {
return nil, errors.New("key authenticator is nil")
}
key, err := a.GetKeyFromBase64()
if err != nil {
return nil, fmt.Errorf("key authenticator: %w", err)
}
return keyAuth(ctx, identity, key)
case *ExternalAuthentication:
if externalAuth == nil {
return nil, errors.New("external authenticator is nil")
}
return externalAuth(ctx, identity, a.Token, a.Issuer)
}
return nil, errors.New("unknown authentication scheme")
}
}
// BoundListener represents a pair of a TransportListener and a net.Addr values.
type BoundListener struct {
Listener TransportListener
Addr net.Addr
}
func NewBoundListener(listener TransportListener, addr net.Addr) BoundListener {
if listener == nil || reflect.ValueOf(listener).IsNil() {
panic("nil Listener")
}
if addr == nil || reflect.ValueOf(addr).IsZero() {
panic("zero addr value")
}
return BoundListener{
Listener: listener,
Addr: addr,
}
}
// ErrServerClosed is returned by the Server's ListenAndServe,
// method after a call to Close.
var ErrServerClosed = errors.New("lime: Server closed")