diff --git a/connection.go b/connection.go index f9e6b29..13a50ef 100644 --- a/connection.go +++ b/connection.go @@ -7,6 +7,8 @@ import ( "net" "sync" "time" + + "github.com/rancher/norman/metrics" ) type connection struct { @@ -33,10 +35,12 @@ func newConnection(connID int64, session *Session, proto, address string) *conne session: session, buf: make(chan []byte, 1024), } + metrics.IncSMTotalAddConnectionsForWS(session.clientKey, proto, address) return c } func (c *connection) tunnelClose(err error) { + metrics.IncSMTotalRemoveConnectionsForWS(c.session.clientKey, c.addr.Network(), c.addr.String()) c.writeErr(err) c.doTunnelClose(err) } @@ -79,6 +83,7 @@ func (c *connection) Read(b []byte) (int, error) { n := c.copyData(b) if n > 0 { + metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n)) return n, nil } @@ -95,6 +100,7 @@ func (c *connection) Read(b []byte) (int, error) { c.readBuf = next n = c.copyData(b) + metrics.AddSMTotalReceiveBytesOnWS(c.session.clientKey, float64(n)) return n, nil } @@ -110,12 +116,16 @@ func (c *connection) Write(b []byte) (int, error) { if !c.writeDeadline.IsZero() { deadline = c.writeDeadline.Sub(time.Now()).Nanoseconds() / 1000000 } - return c.session.writeMessage(newMessage(c.connID, deadline, b)) + msg := newMessage(c.connID, deadline, b) + metrics.AddSMTotalTransmitBytesOnWS(c.session.clientKey, float64(len(msg.Bytes()))) + return c.session.writeMessage(msg) } func (c *connection) writeErr(err error) { if err != nil { - c.session.writeMessage(newErrorMessage(c.connID, err)) + msg := newErrorMessage(c.connID, err) + metrics.AddSMTotalTransmitErrorBytesOnWS(c.session.clientKey, float64(len(msg.Bytes()))) + c.session.writeMessage(msg) } } diff --git a/peer.go b/peer.go index c2d7889..47b0ec0 100644 --- a/peer.go +++ b/peer.go @@ -10,6 +10,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/rancher/norman/metrics" "github.com/sirupsen/logrus" ) @@ -90,12 +91,14 @@ outer: default: } + metrics.IncSMTotalAddPeerAttempt(p.id) ws, _, err := dialer.Dial(p.url, headers) if err != nil { logrus.Errorf("Failed to connect to peer %s [local ID=%s]: %v", p.url, s.PeerID, err) time.Sleep(5 * time.Second) continue } + metrics.IncSMTotalPeerConnected(p.id) session := NewClientSession(func(string, string) bool { return true }, ws) session.dialer = func(network, address string) (net.Conn, error) { diff --git a/session_manager.go b/session_manager.go index ae26b41..d4fecf4 100644 --- a/session_manager.go +++ b/session_manager.go @@ -8,6 +8,7 @@ import ( "time" "github.com/gorilla/websocket" + "github.com/rancher/norman/metrics" ) type sessionListener interface { @@ -100,6 +101,7 @@ func (sm *sessionManager) add(clientKey string, conn *websocket.Conn, peer bool) } else { sm.clients[clientKey] = append(sm.clients[clientKey], session) } + metrics.IncSMTotalAddWS(clientKey, peer) for l := range sm.listeners { l.sessionAdded(clientKey, session.sessionKey) @@ -109,14 +111,21 @@ func (sm *sessionManager) add(clientKey string, conn *websocket.Conn, peer bool) } func (sm *sessionManager) remove(s *Session) { + var isPeer bool sm.Lock() defer sm.Unlock() - for _, store := range []map[string][]*Session{sm.clients, sm.peers} { + for i, store := range []map[string][]*Session{sm.clients, sm.peers} { var newSessions []*Session for _, v := range store[s.clientKey] { if v.sessionKey == s.sessionKey { + if i == 0 { + isPeer = false + } else { + isPeer = true + } + metrics.IncSMTotalRemoveWS(s.clientKey, isPeer) continue } newSessions = append(newSessions, v)