Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

maxConnections config #152

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,7 @@ func runServer(ctx *cli.Context) error {
// Create stream server
s, err := datastreamer.NewServer(uint16(port), streamerVersion, streamerSystemID, StSequencer, file,
time.Duration(writeTimeout)*time.Millisecond, time.Duration(inactivityTimeout)*time.Second,
5*time.Second, nil) //nolint:mnd
5*time.Second, nil, 0) //nolint:mnd
if err != nil {
return err
}
Expand Down Expand Up @@ -750,6 +750,7 @@ func checkEntryBlockSanity(
return err
}
blockNum := l2Block.Number
log.Debug("L2BlockNum: ", blockNum)
//Check previous End Block
if sanityBlockEnd != blockNum {
log.Warnf(`(X) SANITY CHECK failed (%d): BlockStart but the previous one is not closed yet?
Expand Down
2 changes: 1 addition & 1 deletion datastreamer/datastreamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ func TestServer(t *testing.T) {
panic(err)
}
streamServer, err = datastreamer.NewServer(config.Port, 1, 137, streamType,
config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log)
config.Filename, config.WriteTimeout, config.InactivityTimeout, 5*time.Second, &config.Log, 100)
if err != nil {
panic(err)
}
Expand Down
2 changes: 1 addition & 1 deletion datastreamer/streamrelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func NewRelay(server string, port uint16, version uint8, systemID uint64,

// Create server side
r.server, err = NewServer(port, version, systemID, streamType, fileName, writeTimeout,
inactivityTimeout, inactivityCheckInterval, cfg)
inactivityTimeout, inactivityCheckInterval, cfg, 0)
if err != nil {
log.Errorf("Error creating relay server side: %v", err)
return nil, err
Expand Down
16 changes: 9 additions & 7 deletions datastreamer/streamserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ type CommandError uint32
const EntryTypeNotFound = math.MaxUint32

const (
maxConnections = 100 // Maximum number of connected clients
streamBuffer = 256 // Buffers for the stream channel
maxBookmarkLength = 16 // Maximum number of bytes for a bookmark
)
Expand Down Expand Up @@ -116,7 +115,9 @@ type StreamServer struct {
// Time interval to check for client connections that have reached
// the inactivity timeout and kill them
inactivityCheckInterval time.Duration
started bool // Flag server started
// maxConnections specifies the max number of connections open. If it is set to 0, the limit is disabled.
maxConnections uint32
started bool // Flag server started

version uint8
systemID uint64
Expand Down Expand Up @@ -165,7 +166,7 @@ type ResultEntry struct {
// NewServer creates a new data stream server
func NewServer(port uint16, version uint8, systemID uint64, streamType StreamType, fileName string,
writeTimeout time.Duration, inactivityTimeout time.Duration, inactivityCheckInterval time.Duration,
cfg *log.Config) (*StreamServer, error) {
cfg *log.Config, maxConnections uint32) (*StreamServer, error) {
// Create the server data stream
s := StreamServer{
port: port,
Expand All @@ -174,6 +175,7 @@ func NewServer(port uint16, version uint8, systemID uint64, streamType StreamTyp
inactivityTimeout: inactivityTimeout,
inactivityCheckInterval: inactivityCheckInterval,
started: false,
maxConnections: maxConnections,

version: version,
systemID: systemID,
Expand Down Expand Up @@ -284,8 +286,8 @@ func (s *StreamServer) waitConnections() {
}

// Check max connections allowed
if s.getSafeClientsLen() >= maxConnections {
log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", maxConnections)
if s.maxConnections != 0 && s.getSafeClientsLen() >= s.maxConnections {
log.Warnf("Unable to accept client connection, maximum number of connections reached (%d)", s.maxConnections)
conn.Close()
time.Sleep(timeout)
continue
Expand Down Expand Up @@ -1165,10 +1167,10 @@ func (s *StreamServer) getSafeClient(clientID string) *client {
return s.clients[clientID]
}

func (s *StreamServer) getSafeClientsLen() int {
func (s *StreamServer) getSafeClientsLen() uint32 {
s.mutexClients.RLock()
defer s.mutexClients.RUnlock()
return len(s.clients)
return uint32(len(s.clients))
}

// BookmarkPrintDump prints all bookmarks
Expand Down
Loading