Skip to content

Commit

Permalink
Use head notif func for current node height, add pubsub metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
natewalck committed Mar 2, 2020
1 parent 7db3911 commit 33af240
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 22 deletions.
12 changes: 11 additions & 1 deletion chain/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/metrics"
"go.opencensus.io/stats"
"go.opencensus.io/trace"
"go.uber.org/multierr"

Expand Down Expand Up @@ -100,7 +102,15 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSy
return nil
}

cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf)
hcmetric := func(rev, app []*types.TipSet) error {
ctx := context.Background()
for _, r := range app {
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height())))
}
return nil
}

cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric)

return cs
}
Expand Down
13 changes: 13 additions & 0 deletions chain/sub/incoming.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sub

import (
"context"
"fmt"
"time"

lru "github.com/hashicorp/golang-lru"
Expand All @@ -10,11 +11,14 @@ import (
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
peer "github.com/libp2p/go-libp2p-peer"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"go.opencensus.io/stats"
"go.opencensus.io/tag"

"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
)

var log = logging.Logger("sub")
Expand Down Expand Up @@ -162,14 +166,23 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
}

func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String()))
m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil {
log.Warnf("failed to decode incoming message: %s", err)
stats.Record(ctx, metrics.MessageDecodeFailure.M(1))
return false
}

if err := mv.mpool.Add(m); err != nil {
log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New(
ctx,
tag.Insert(metrics.MessageFrom, m.Message.From.String()),
tag.Insert(metrics.MessageTo, m.Message.To.String()),
tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)),
)
stats.Record(ctx, metrics.MessageAddFailure.M(1))
return false
}

Expand Down
6 changes: 1 addition & 5 deletions chain/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -396,9 +396,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
)
}

// Record current network chain height when sync is called
stats.Record(ctx, metrics.ChainHeight.M(int64(maybeHead.Height())))

if syncer.store.GetHeaviestTipSet().ParentWeight().GreaterThan(maybeHead.ParentWeight()) {
return nil
}
Expand Down Expand Up @@ -1043,8 +1040,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*
return xerrors.Errorf("message processing failed: %w", err)
}

// Set current node sync height
stats.Record(ctx, metrics.ChainNodeHeight.M(int64(fts.TipSet().Height())))
stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height())))
ss.SetHeight(fts.TipSet().Height())

return nil
Expand Down
48 changes: 32 additions & 16 deletions metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,26 @@ import (

// Global Tags
var (
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
RPCMethod, _ = tag.NewKey("method")
Version, _ = tag.NewKey("version")
Commit, _ = tag.NewKey("commit")
RPCMethod, _ = tag.NewKey("method")
PeerID, _ = tag.NewKey("peer_id")
MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce")
)

// Measures
var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless)
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless)
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless)
MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless)
)

// DefaultViews is an array of Consensus views for metric gathering purposes
Expand All @@ -34,11 +40,25 @@ var DefaultViews = []*view.View{
TagKeys: []tag.Key{Version, Commit},
},
&view.View{
Measure: ChainHeight,
Measure: ChainNodeHeight,
Aggregation: view.LastValue(),
},
&view.View{
Measure: ChainNodeHeight,
Measure: ChainNodeWorkerHeight,
Aggregation: view.LastValue(),
},
&view.View{
Measure: MessageAddFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce},
},
&view.View{
Measure: MessageDecodeFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
},
&view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
},
// All RPC related metrics should at the very least tag the RPCMethod
Expand All @@ -57,8 +77,4 @@ var DefaultViews = []*view.View{
Aggregation: view.Count(),
TagKeys: []tag.Key{RPCMethod},
},
&view.View{
Measure: PeerCount,
Aggregation: view.LastValue(),
},
}

0 comments on commit 33af240

Please sign in to comment.