Skip to content

Commit

Permalink
Add metrics counts
Browse files Browse the repository at this point in the history
  • Loading branch information
dramich authored and Alena Prokharchyk committed Nov 18, 2019
1 parent 8cc0ab9 commit c5a31e6
Show file tree
Hide file tree
Showing 3 changed files with 25 additions and 3 deletions.
14 changes: 12 additions & 2 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
"net"
"sync"
"time"

"github.com/rancher/norman/metrics"
)

type connection struct {
Expand All @@ -33,10 +35,12 @@ func newConnection(connID int64, session *Session, proto, address string) *conne
session: session,
buf: make(chan []byte, 1024),
}
metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address)
return c
}

func (c *connection) tunnelClose(err error) {
metrics.IncSMTotalRemoveConnectionsForWS(c.session.clientKey, c.addr.Network(), c.addr.String())
c.writeErr(err)
c.doTunnelClose(err)
}
Expand Down Expand Up @@ -79,6 +83,7 @@ func (c *connection) Read(b []byte) (int, error) {

n := c.copyData(b)
if n > 0 {
metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n))
return n, nil
}

Expand All @@ -95,6 +100,7 @@ func (c *connection) Read(b []byte) (int, error) {

c.readBuf = next
n = c.copyData(b)
metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n))
return n, nil
}

Expand All @@ -110,12 +116,16 @@ func (c *connection) Write(b []byte) (int, error) {
if !c.writeDeadline.IsZero() {
deadline = c.writeDeadline.Sub(time.Now()).Nanoseconds() / 1000000
}
return c.session.writeMessage(newMessage(c.connID, deadline, b))
msg := newMessage(c.connID, deadline, b)
metrics.AddSMTotalTransmitBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
return c.session.writeMessage(msg)
}

func (c *connection) writeErr(err error) {
if err != nil {
c.session.writeMessage(newErrorMessage(c.connID, err))
msg := newErrorMessage(c.connID, err)
metrics.AddSMTotalTransmitErrorBytesOnWS(c.session.clientKey, float64(len(msg.Bytes())))
c.session.writeMessage(msg)
}
}

Expand Down
3 changes: 3 additions & 0 deletions peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/rancher/norman/metrics"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -90,12 +91,14 @@ outer:
default:
}

metrics.IncSMTotalAddPeerAttempt(p.id)
ws, _, err := dialer.Dial(p.url, headers)
if err != nil {
logrus.Errorf("Failed to connect to peer %s [local ID=%s]: %v", p.url, s.PeerID, err)
time.Sleep(5 * time.Second)
continue
}
metrics.IncSMTotalPeerConnected(p.id)

session := NewClientSession(func(string, string) bool { return true }, ws)
session.dialer = func(network, address string) (net.Conn, error) {
Expand Down
11 changes: 10 additions & 1 deletion session_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"time"

"github.com/gorilla/websocket"
"github.com/rancher/norman/metrics"
)

type sessionListener interface {
Expand Down Expand Up @@ -100,6 +101,7 @@ func (sm *sessionManager) add(clientKey string, conn *websocket.Conn, peer bool)
} else {
sm.clients[clientKey] = append(sm.clients[clientKey], session)
}
metrics.IncSMTotalAddWS(clientKey, peer)

for l := range sm.listeners {
l.sessionAdded(clientKey, session.sessionKey)
Expand All @@ -109,14 +111,21 @@ func (sm *sessionManager) add(clientKey string, conn *websocket.Conn, peer bool)
}

func (sm *sessionManager) remove(s *Session) {
var isPeer bool
sm.Lock()
defer sm.Unlock()

for _, store := range []map[string][]*Session{sm.clients, sm.peers} {
for i, store := range []map[string][]*Session{sm.clients, sm.peers} {
var newSessions []*Session

for _, v := range store[s.clientKey] {
if v.sessionKey == s.sessionKey {
if i == 0 {
isPeer = false
} else {
isPeer = true
}
metrics.IncSMTotalRemoveWS(s.clientKey, isPeer)
continue
}
newSessions = append(newSessions, v)
Expand Down

0 comments on commit c5a31e6

Please sign in to comment.