Skip to content

Commit

Permalink
Fix consensus handler message logging (#2456)
Browse files Browse the repository at this point in the history
  • Loading branch information
StephenButtolph authored Jan 11, 2023
1 parent a3113f7 commit 0f6316c
Show file tree
Hide file tree
Showing 4 changed files with 58 additions and 44 deletions.
70 changes: 35 additions & 35 deletions message/internal_msg_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@ var (
)

type GetStateSummaryFrontierFailed struct {
ChainID ids.ID
RequestID uint32
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
}

func (m *GetStateSummaryFrontierFailed) GetChainId() []byte {
Expand Down Expand Up @@ -90,8 +90,8 @@ func InternalGetStateSummaryFrontierFailed(
}

type GetAcceptedStateSummaryFailed struct {
ChainID ids.ID
RequestID uint32
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
}

func (m *GetAcceptedStateSummaryFailed) GetChainId() []byte {
Expand Down Expand Up @@ -119,9 +119,9 @@ func InternalGetAcceptedStateSummaryFailed(
}

type GetAcceptedFrontierFailed struct {
ChainID ids.ID
RequestID uint32
EngineType p2p.EngineType
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
EngineType p2p.EngineType `json:"engine_type,omitempty"`
}

func (m *GetAcceptedFrontierFailed) GetChainId() []byte {
Expand Down Expand Up @@ -155,9 +155,9 @@ func InternalGetAcceptedFrontierFailed(
}

type GetAcceptedFailed struct {
ChainID ids.ID
RequestID uint32
EngineType p2p.EngineType
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
EngineType p2p.EngineType `json:"engine_type,omitempty"`
}

func (m *GetAcceptedFailed) GetChainId() []byte {
Expand Down Expand Up @@ -191,9 +191,9 @@ func InternalGetAcceptedFailed(
}

type GetAncestorsFailed struct {
ChainID ids.ID
RequestID uint32
EngineType p2p.EngineType
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
EngineType p2p.EngineType `json:"engine_type,omitempty"`
}

func (m *GetAncestorsFailed) GetChainId() []byte {
Expand Down Expand Up @@ -227,9 +227,9 @@ func InternalGetAncestorsFailed(
}

type GetFailed struct {
ChainID ids.ID
RequestID uint32
EngineType p2p.EngineType
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
EngineType p2p.EngineType `json:"engine_type,omitempty"`
}

func (m *GetFailed) GetChainId() []byte {
Expand Down Expand Up @@ -263,9 +263,9 @@ func InternalGetFailed(
}

type QueryFailed struct {
ChainID ids.ID
RequestID uint32
EngineType p2p.EngineType
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
EngineType p2p.EngineType `json:"engine_type,omitempty"`
}

func (m *QueryFailed) GetChainId() []byte {
Expand Down Expand Up @@ -299,8 +299,8 @@ func InternalQueryFailed(
}

type AppRequestFailed struct {
ChainID ids.ID
RequestID uint32
ChainID ids.ID `json:"chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
}

func (m *AppRequestFailed) GetChainId() []byte {
Expand Down Expand Up @@ -328,10 +328,10 @@ func InternalAppRequestFailed(
}

type CrossChainAppRequest struct {
SourceChainID ids.ID
DestinationChainID ids.ID
RequestID uint32
Message []byte
SourceChainID ids.ID `json:"source_chain_id,omitempty"`
DestinationChainID ids.ID `json:"destination_chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
Message []byte `json:"message,omitempty"`
}

func (m *CrossChainAppRequest) GetSourceChainID() ids.ID {
Expand Down Expand Up @@ -368,9 +368,9 @@ func InternalCrossChainAppRequest(
}

type CrossChainAppRequestFailed struct {
SourceChainID ids.ID
DestinationChainID ids.ID
RequestID uint32
SourceChainID ids.ID `json:"source_chain_id,omitempty"`
DestinationChainID ids.ID `json:"destination_chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
}

func (m *CrossChainAppRequestFailed) GetSourceChainID() ids.ID {
Expand Down Expand Up @@ -404,10 +404,10 @@ func InternalCrossChainAppRequestFailed(
}

type CrossChainAppResponse struct {
SourceChainID ids.ID
DestinationChainID ids.ID
RequestID uint32
Message []byte
SourceChainID ids.ID `json:"source_chain_id,omitempty"`
DestinationChainID ids.ID `json:"destination_chain_id,omitempty"`
RequestID uint32 `json:"request_id,omitempty"`
Message []byte `json:"message,omitempty"`
}

func (m *CrossChainAppResponse) GetSourceChainID() ids.ID {
Expand Down Expand Up @@ -443,7 +443,7 @@ func InternalCrossChainAppResponse(
}

type Connected struct {
NodeVersion *version.Application
NodeVersion *version.Application `json:"node_version,omitempty"`
}

func InternalConnected(nodeID ids.NodeID, nodeVersion *version.Application) InboundMessage {
Expand All @@ -460,7 +460,7 @@ func InternalConnected(nodeID ids.NodeID, nodeVersion *version.Application) Inbo
// ConnectedSubnet contains the subnet ID of the subnet that the node is
// connected to.
type ConnectedSubnet struct {
SubnetID ids.ID
SubnetID ids.ID `json:"subnet_id,omitempty"`
}

// InternalConnectedSubnet returns a message that indicates the node with [nodeID] is
Expand Down Expand Up @@ -488,7 +488,7 @@ func InternalDisconnected(nodeID ids.NodeID) InboundMessage {
}

type VMMessage struct {
Notification uint32
Notification uint32 `json:"notification,omitempty"`
}

func InternalVMMessage(
Expand Down
2 changes: 1 addition & 1 deletion network/network.go
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ func (n *network) Peers(peerID ids.NodeID) ([]ips.ClaimedIPPort, error) {
peerIP := n.peerIPs[validator.NodeID]
n.peersLock.RUnlock()
if !isConnected {
n.peerConfig.Log.Debug(
n.peerConfig.Log.Verbo(
"unable to find validator in connected peers",
zap.Stringer("nodeID", validator.NodeID),
)
Expand Down
2 changes: 1 addition & 1 deletion snow/consensus/snowman/poll/set.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func (s *set) processFinishedPolls() []ids.Bag {
}

s.log.Verbo("poll finished",
zap.Any("requestID", iter.Key()),
zap.Uint32("requestID", iter.Key()),
zap.Stringer("poll", holder.GetPoll()),
)
s.durPolls.Observe(float64(time.Since(holder.StartTime())))
Expand Down
28 changes: 21 additions & 7 deletions snow/networking/handler/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -420,12 +420,17 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg message.InboundMessage)
var (
nodeID = msg.NodeID()
op = msg.Op()
body = msg.Message()
startTime = h.clock.Time()
)
h.ctx.Log.Debug("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.Any("message", msg),
)
h.ctx.Log.Verbo("forwarding sync message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.Any("message", body),
)
h.resourceTracker.StartProcessing(nodeID, startTime)
h.ctx.Lock.Lock()
Expand All @@ -448,7 +453,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg message.InboundMessage)
zap.Duration("processingTime", processingTime),
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.Any("message", msg),
zap.Any("message", body),
)
}
}()
Expand All @@ -464,7 +469,7 @@ func (h *handler) handleSyncMsg(ctx context.Context, msg message.InboundMessage)
// the timeout has already been cleared. This means the engine
// should be invoked with a failure message if parsing of the
// response fails.
switch msg := msg.Message().(type) {
switch msg := body.(type) {
// State messages should always be sent to the snowman engine
case *p2p.GetStateSummaryFrontier:
return engine.GetStateSummaryFrontier(ctx, nodeID, msg.RequestId)
Expand Down Expand Up @@ -695,12 +700,17 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg message.InboundMessag
var (
nodeID = msg.NodeID()
op = msg.Op()
body = msg.Message()
startTime = h.clock.Time()
)
h.ctx.Log.Debug("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.Any("message", msg),
)
h.ctx.Log.Verbo("forwarding async message to consensus",
zap.Stringer("nodeID", nodeID),
zap.Stringer("messageOp", op),
zap.Any("message", body),
)
h.resourceTracker.StartProcessing(nodeID, startTime)
defer func() {
Expand All @@ -721,7 +731,7 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg message.InboundMessag
return err
}

switch m := msg.Message().(type) {
switch m := body.(type) {
case *p2p.AppRequest:
return engine.AppRequest(
ctx,
Expand Down Expand Up @@ -776,11 +786,15 @@ func (h *handler) executeAsyncMsg(ctx context.Context, msg message.InboundMessag
func (h *handler) handleChanMsg(msg message.InboundMessage) error {
var (
op = msg.Op()
body = msg.Message()
startTime = h.clock.Time()
)
h.ctx.Log.Debug("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.Any("message", msg),
)
h.ctx.Log.Verbo("forwarding chan message to consensus",
zap.Stringer("messageOp", op),
zap.Any("message", body),
)
h.ctx.Lock.Lock()
defer func() {
Expand All @@ -802,7 +816,7 @@ func (h *handler) handleChanMsg(msg message.InboundMessage) error {
return err
}

switch msg := msg.Message().(type) {
switch msg := body.(type) {
case *message.VMMessage:
return engine.Notify(context.TODO(), common.Message(msg.Notification))

Expand Down

0 comments on commit 0f6316c

Please sign in to comment.