Skip to content

Commit

Permalink
add other logics
Browse files Browse the repository at this point in the history
  • Loading branch information
mattisonchao committed Jan 24, 2025
1 parent 84b852c commit 1be8c62
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 7 deletions.
4 changes: 0 additions & 4 deletions common/error_codes.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ const (
CodeInvalidSessionTimeout codes.Code = 109
CodeNamespaceNotFound codes.Code = 110
CodeNotificationsNotEnabled codes.Code = 111
CodeFollowerAlreadyPresent codes.Code = 112
CodeFollowerAlreadyFenced codes.Code = 113
)

var (
Expand All @@ -49,6 +47,4 @@ var (
ErrorInvalidSessionTimeout = status.Error(CodeInvalidSessionTimeout, "oxia: invalid session timeout")
ErrorNamespaceNotFound = status.Error(CodeNamespaceNotFound, "oxia: namespace not found")
ErrorNotificationsNotEnabled = status.Error(CodeNotificationsNotEnabled, "oxia: notifications not enabled on namespace")
ErrorFollowerAlreadyPresent = status.Error(CodeFollowerAlreadyPresent, "oxia: follower is already present")
ErrorFollowerAlreadyFenced = status.Error(CodeFollowerAlreadyFenced, "oxia: follower is already fenced")
)
12 changes: 10 additions & 2 deletions coordinator/impl/shard_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,14 @@ func (s *shardController) keepFencingFollower(ctx context.Context, node model.Se
)
return nil
}
if status.Code(err) == common.CodeInvalidStatus {
s.log.Warn(
"Failed to newTerm, invalid status. Stop trying",
slog.Any("follower", node),
slog.Int64("term", s.Term()),
)
return nil
}
return err
}, backOff, func(err error, duration time.Duration) {
s.log.Warn(
Expand Down Expand Up @@ -498,7 +506,7 @@ func (s *shardController) newTermAndAddFollower(ctx context.Context, node model.

func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, node model.ServerAddress, res chan error) {
fr, err := s.newTerm(ctx, node)
if err != nil && status.Code(err) != common.CodeFollowerAlreadyFenced {
if err != nil {
res <- err
return
}
Expand All @@ -512,7 +520,7 @@ func (s *shardController) internalNewTermAndAddFollower(ctx context.Context, nod
if err = s.addFollower(*s.shardMetadata.Leader, node.Internal, &proto.EntryId{
Term: fr.Term,
Offset: fr.Offset,
}); err != nil && status.Code(err) != common.CodeFollowerAlreadyPresent {
}); err != nil {
res <- err
return
}
Expand Down
2 changes: 1 addition & 1 deletion server/leader_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func (lc *leaderController) AddFollower(req *proto.AddFollowerRequest) (*proto.A
}

if _, followerAlreadyPresent := lc.followers[req.FollowerName]; followerAlreadyPresent {
return nil, errors.Wrapf(common.ErrorFollowerAlreadyPresent, "follower: %s", req.FollowerName)
return &proto.AddFollowerResponse{}, nil
}

if len(lc.followers) == int(lc.replicationFactor)-1 {
Expand Down

0 comments on commit 1be8c62

Please sign in to comment.