Skip to content

Commit

Permalink
Refactor error handling in localAddEvent and replicatedAddEvent funct… (
Browse files Browse the repository at this point in the history
#2308)

…ions.
  • Loading branch information
sergekh2 authored Feb 8, 2025
1 parent a69674b commit c6cfd2b
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
18 changes: 11 additions & 7 deletions core/node/rpc/add_event.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,12 @@ import (
func (s *Service) localAddEvent(
ctx context.Context,
req *connect.Request[AddEventRequest],
streamId StreamId,
localStream *Stream,
streamView *StreamView,
) (*connect.Response[AddEventResponse], error) {
log := logging.FromCtx(ctx)

streamId, err := StreamIdFromBytes(req.Msg.StreamId)
if err != nil {
return nil, AsRiverError(err).Func("localAddEvent")
}

parsedEvent, err := ParseEvent(req.Msg.Event)
if err != nil {
return nil, AsRiverError(err).Func("localAddEvent")
Expand All @@ -37,10 +33,18 @@ func (s *Service) localAddEvent(
log.Debugw("localAddEvent", "parsedEvent", parsedEvent)

newEvents, err := s.addParsedEvent(ctx, streamId, parsedEvent, localStream, streamView)

if err != nil {
err = AsRiverError(err).Func("localAddEvent").Tags(
"eventMiniblock", parsedEvent.MiniblockRef,
"lastLocalMiniblock", streamView.LastBlock().Ref,
)
}

if err != nil && req.Msg.Optional {
// aellis 5/2024 - we only want to wrap errors from canAddEvent,
// currently this is catching all errors, which is not ideal
riverError := AsRiverError(err).Func("localAddEvent")
riverError := AsRiverError(err)
return connect.NewResponse(&AddEventResponse{
Error: &AddEventResponse_Error{
Code: riverError.Code,
Expand All @@ -50,7 +54,7 @@ func (s *Service) localAddEvent(
NewEvents: newEvents,
}), nil
} else if err != nil {
return nil, AsRiverError(err).Func("localAddEvent")
return nil, err
} else {
return connect.NewResponse(&AddEventResponse{
NewEvents: newEvents,
Expand Down
2 changes: 1 addition & 1 deletion core/node/rpc/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (s *Service) addEventImpl(
}

if view != nil {
return s.localAddEvent(ctx, req, stream, view)
return s.localAddEvent(ctx, req, streamId, stream, view)
}

if req.Header().Get(RiverNoForwardHeader) == RiverNoForwardValue {
Expand Down
5 changes: 3 additions & 2 deletions core/node/rpc/replicated_add.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,9 @@ func (s *Service) replicatedAddEvent(ctx context.Context, stream *Stream, event
return nil
}

// Check if Err_MINIBLOCK_TOO_NEW code is present.
if AsRiverError(err).IsCodeWithBases(Err_MINIBLOCK_TOO_NEW) {
// Check if Err_MINIBLOCK_TOO_NEW or Err_BAD_PREV_MINIBLOCK_HASH code is present in the error chain.
riverErr := AsRiverError(err)
if riverErr.IsCodeWithBases(Err_MINIBLOCK_TOO_NEW) || riverErr.IsCodeWithBases(Err_BAD_PREV_MINIBLOCK_HASH) {
err = backoff.Wait(ctx, err)
if err != nil {
logging.FromCtx(ctx).Warnw("replicatedAddEvent: no backoff left", "error", err, "attempts", backoff.NumAttempts, "originalDeadline", originalDeadline.String(), "deadline", contextDeadlineLeft(ctx).String())
Expand Down

0 comments on commit c6cfd2b

Please sign in to comment.