From 33af2409e8206213098f3d3216e4e1b283778174 Mon Sep 17 00:00:00 2001 From: Nate Walck Date: Sun, 1 Mar 2020 19:26:09 -0500 Subject: [PATCH] Use head notif func for current node height, add pubsub metrics --- chain/store/store.go | 12 ++++++++++- chain/sub/incoming.go | 13 ++++++++++++ chain/sync.go | 6 +----- metrics/metrics.go | 48 ++++++++++++++++++++++++++++--------------- 4 files changed, 57 insertions(+), 22 deletions(-) diff --git a/chain/store/store.go b/chain/store/store.go index 2f024d6b1e6..f98f1df138d 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -13,6 +13,8 @@ import ( "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/state" "github.com/filecoin-project/lotus/chain/vm" + "github.com/filecoin-project/lotus/metrics" + "go.opencensus.io/stats" "go.opencensus.io/trace" "go.uber.org/multierr" @@ -100,7 +102,15 @@ func NewChainStore(bs bstore.Blockstore, ds dstore.Batching, vmcalls *types.VMSy return nil } - cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf) + hcmetric := func(rev, app []*types.TipSet) error { + ctx := context.Background() + for _, r := range app { + stats.Record(ctx, metrics.ChainNodeHeight.M(int64(r.Height()))) + } + return nil + } + + cs.headChangeNotifs = append(cs.headChangeNotifs, hcnf, hcmetric) return cs } diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index b4bd8b3fa7c..93977c6c6ae 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -2,6 +2,7 @@ package sub import ( "context" + "fmt" "time" lru "github.com/hashicorp/golang-lru" @@ -10,11 +11,14 @@ import ( connmgr "github.com/libp2p/go-libp2p-core/connmgr" peer "github.com/libp2p/go-libp2p-peer" pubsub "github.com/libp2p/go-libp2p-pubsub" + "go.opencensus.io/stats" + "go.opencensus.io/tag" "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/lotus/metrics" ) var log = logging.Logger("sub") @@ -162,14 +166,23 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { } func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { + ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String())) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode incoming message: %s", err) + stats.Record(ctx, metrics.MessageDecodeFailure.M(1)) return false } if err := mv.mpool.Add(m); err != nil { log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) + ctx, _ = tag.New( + ctx, + tag.Insert(metrics.MessageFrom, m.Message.From.String()), + tag.Insert(metrics.MessageTo, m.Message.To.String()), + tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)), + ) + stats.Record(ctx, metrics.MessageAddFailure.M(1)) return false } diff --git a/chain/sync.go b/chain/sync.go index d04feab9ca8..9d875e01782 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -396,9 +396,6 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ) } - // Record current network chain height when sync is called - stats.Record(ctx, metrics.ChainHeight.M(int64(maybeHead.Height()))) - if syncer.store.GetHeaviestTipSet().ParentWeight().GreaterThan(maybeHead.ParentWeight()) { return nil } @@ -1043,8 +1040,7 @@ func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []* return xerrors.Errorf("message processing failed: %w", err) } - // Set current node sync height - stats.Record(ctx, metrics.ChainNodeHeight.M(int64(fts.TipSet().Height()))) + stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height()))) ss.SetHeight(fts.TipSet().Height()) return nil diff --git a/metrics/metrics.go b/metrics/metrics.go index ca32b81b532..e10beda8d4b 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -8,20 +8,26 @@ import ( // Global Tags var ( - Version, _ = tag.NewKey("version") - Commit, _ = tag.NewKey("commit") - RPCMethod, _ = tag.NewKey("method") + Version, _ = tag.NewKey("version") + Commit, _ = tag.NewKey("commit") + RPCMethod, _ = tag.NewKey("method") + PeerID, _ = tag.NewKey("peer_id") + MessageFrom, _ = tag.NewKey("message_from") + MessageTo, _ = tag.NewKey("message_to") + MessageNonce, _ = tag.NewKey("message_nonce") ) // Measures var ( - LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) - ChainHeight = stats.Int64("chain/height", "Current Height of the chain", stats.UnitDimensionless) - ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) - PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) - RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) - RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) - RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) + LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) + ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) + ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) + MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless) + MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless) + PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) + RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) + RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) + RPCResponseError = stats.Int64("rpc/response_error", "Total number of responses errors handled", stats.UnitDimensionless) ) // DefaultViews is an array of Consensus views for metric gathering purposes @@ -34,11 +40,25 @@ var DefaultViews = []*view.View{ TagKeys: []tag.Key{Version, Commit}, }, &view.View{ - Measure: ChainHeight, + Measure: ChainNodeHeight, Aggregation: view.LastValue(), }, &view.View{ - Measure: ChainNodeHeight, + Measure: ChainNodeWorkerHeight, + Aggregation: view.LastValue(), + }, + &view.View{ + Measure: MessageAddFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce}, + }, + &view.View{ + Measure: MessageDecodeFailure, + Aggregation: view.Count(), + TagKeys: []tag.Key{PeerID}, + }, + &view.View{ + Measure: PeerCount, Aggregation: view.LastValue(), }, // All RPC related metrics should at the very least tag the RPCMethod @@ -57,8 +77,4 @@ var DefaultViews = []*view.View{ Aggregation: view.Count(), TagKeys: []tag.Key{RPCMethod}, }, - &view.View{ - Measure: PeerCount, - Aggregation: view.LastValue(), - }, }