Skip to content

Commit

Permalink
move subscription check up in call chain, ref tinode#865
Browse files Browse the repository at this point in the history
  • Loading branch information
or-else committed May 25, 2023
1 parent bfbb797 commit f9de546
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 14 deletions.
2 changes: 0 additions & 2 deletions server/store/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,6 @@ const (
ErrInvalidResponse = StoreError("invalid response")
// ErrRedirected means the subscription request was redirected to another topic.
ErrRedirected = StoreError("redirected")
// ErrAlreadySatisfied means the desired result, such as subscription, is already satisfied.
ErrAlreadySatisfied = StoreError("already satisfied")
)

// Uid is a database-specific record id, suitable to be used as a primary key.
Expand Down
16 changes: 6 additions & 10 deletions server/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,6 +334,12 @@ func (t *Topic) registerSession(msg *ClientComMessage) {
// Request to add a connection to this topic
if t.isInactive() {
msg.sess.queueOut(ErrLockedReply(msg, types.TimeNow()))
} else if msg.sess.getSub(t.name) != nil {
// Session is already subscribed to topic. Subscription is checked in session.go,
// but there is a gap between topic creation/un-pausing and processing the
// first subscription request, before the topic is linked to session: a client
// may send several subscription requests in that gap.
msg.sess.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp))
} else {
// The topic is alive, so stop the kill timer, if it's ticking. We don't want the topic to die
// while processing the call.
Expand Down Expand Up @@ -1330,16 +1336,6 @@ func (t *Topic) broadcastToSessions(msg *ServerComMessage) {
func (t *Topic) subscriptionReply(asChan bool, msg *ClientComMessage) error {
// The topic is already initialized by the Hub

// Check if the session is already subscribed to topic.
// Subscription is checked in session.go, but there is a gap between topic
// creation + un-pausing and processing the first subscription request,
// before the topic is linked to session: a client may send several subscription
// requests in that gap.
if msg.sess.getSub(t.name) != nil {
msg.sess.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp))
return types.ErrAlreadySatisfied
}

msgsub := msg.Sub

// For newly created topics report topic creation time.
Expand Down
2 changes: 0 additions & 2 deletions server/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,8 +271,6 @@ func decodeStoreErrorExplicitTs(err error, id, topic string, serverTs, incomingR
errmsg = ErrInvalidResponse(id, topic, serverTs, incomingReqTs)
case types.ErrRedirected:
errmsg = InfoUseOther(id, topic, params["topic"].(string), serverTs, incomingReqTs)
case types.ErrAlreadySatisfied:
errmsg = InfoNoAction(id, topic, serverTs, incomingReqTs)
default:
errmsg = ErrUnknownExplicitTs(id, topic, serverTs, incomingReqTs)
}
Expand Down

0 comments on commit f9de546

Please sign in to comment.