From 2f632ecc91393f6132ebf45f51e1fac4513b7367 Mon Sep 17 00:00:00 2001 From: aforge Date: Fri, 7 Jul 2023 01:14:25 -0700 Subject: [PATCH] Fix a race condition around Session.subs. 1. Session.addSub & Session.delSub should be happening (atomically) together modifications of Topic.sessions. 2. Only one of Session.subscribe() and Session.leave() is allowed to be inflight at any given moment. --- loadtest/tinode.scala | 2 +- server/session.go | 14 +++++++++----- server/sessionstore.go | 41 ++++++++++++++++++++++++++++++++++++++++- server/topic.go | 3 +++ server/topic_proxy.go | 3 +++ 5 files changed, 56 insertions(+), 7 deletions(-) diff --git a/loadtest/tinode.scala b/loadtest/tinode.scala index 3cbb26e96..a083b0167 100644 --- a/loadtest/tinode.scala +++ b/loadtest/tinode.scala @@ -29,7 +29,7 @@ class TinodeBase extends Simulation { val hello = exitBlockOnFail { exec { ws("hi").sendText( - """{"hi":{"id":"afabb3","ver":"0.18.1","ua":"Gatling-Loadtest/1.0; gatling/1.7.0"}}""" + """{"hi":{"id":"afabb3","ver":"0.22.8","ua":"Gatling-Loadtest/1.0; gatling/1.7.0"}}""" ) .await(15 seconds)( ws.checkTextMessage("hi") diff --git a/server/session.go b/server/session.go index 24a4322bc..e8879bbc4 100644 --- a/server/session.go +++ b/server/session.go @@ -117,7 +117,7 @@ type Session struct { bkgTimer *time.Timer // Number of subscribe/unsubscribe requests in flight. - inflightReqs *sync.WaitGroup + inflightReqs *boundedWaitGroup // Synchronizes access to session store in cluster mode: // subscribe/unsubscribe replies are asynchronous. sessionStoreLock sync.Mutex @@ -628,11 +628,12 @@ func (s *Session) subscribe(msg *ClientComMessage) { } } + s.inflightReqs.Add(1) // Session can subscribe to topic on behalf of a single user at a time. if sub := s.getSub(msg.RcptTo); sub != nil { s.queueOut(InfoAlreadySubscribed(msg.Id, msg.Original, msg.Timestamp)) + s.inflightReqs.Done() } else { - s.inflightReqs.Add(1) select { case globals.hub.join <- msg: default: @@ -655,18 +656,21 @@ func (s *Session) leave(msg *ClientComMessage) { return } + s.inflightReqs.Add(1) if sub := s.getSub(msg.RcptTo); sub != nil { // Session is attached to the topic. if (msg.Original == "me" || msg.Original == "fnd") && msg.Leave.Unsub { // User should not unsubscribe from 'me' or 'find'. Just leaving is fine. s.queueOut(ErrPermissionDeniedReply(msg, msg.Timestamp)) + s.inflightReqs.Done() } else { // Unlink from topic, topic will send a reply. - s.delSub(msg.RcptTo) - s.inflightReqs.Add(1) sub.done <- msg } - } else if !msg.Leave.Unsub { + return + } + s.inflightReqs.Done() + if !msg.Leave.Unsub { // Session is not attached to the topic, wants to leave - fine, no change s.queueOut(InfoNotJoined(msg.Id, msg.Original, msg.Timestamp)) } else { diff --git a/server/sessionstore.go b/server/sessionstore.go index c05291857..c5f2bc148 100644 --- a/server/sessionstore.go +++ b/server/sessionstore.go @@ -21,6 +21,43 @@ import ( "github.com/tinode/chat/server/store/types" ) +// WaitGroup with a semaphore functionality +// (limiting number of threads/goroutines accessing the guarded resource simultaneously). +type boundedWaitGroup struct { + wg sync.WaitGroup + sem chan struct{} +} + +func newBoundedWaitGroup(capacity int) *boundedWaitGroup { + return &boundedWaitGroup{sem: make(chan struct{}, capacity)} +} + +func (w *boundedWaitGroup) Add(delta int) { + if delta <= 0 { + return + } + for i := 0; i < delta; i++ { + w.sem <- struct{}{} + } + w.wg.Add(delta) +} + +func (w *boundedWaitGroup) Done() { + select { + case _, ok := <-w.sem: + if !ok { + logs.Err.Panicln("boundedWaitGroup.sem closed.") + } + default: + logs.Err.Panicln("boundedWaitGroup.Done() called before Add().") + } + w.wg.Done() +} + +func (w *boundedWaitGroup) Wait() { + w.wg.Wait() +} + // SessionStore holds live sessions. Long polling sessions are stored in a linked list with // most recent sessions on top. In addition all sessions are stored in a map indexed by session ID. type SessionStore struct { @@ -76,7 +113,9 @@ func (ss *SessionStore) NewSession(conn any, sid string) (*Session, int) { s.bkgTimer = time.NewTimer(time.Hour) s.bkgTimer.Stop() - s.inflightReqs = &sync.WaitGroup{} + // Make sure at most 1 request is modifying session/topic state at any time. + // TODO: use Mutex & CondVar? + s.inflightReqs = newBoundedWaitGroup(1) s.lastTouched = time.Now() diff --git a/server/topic.go b/server/topic.go index 1f191d25f..7e64de5d1 100644 --- a/server/topic.go +++ b/server/topic.go @@ -719,6 +719,9 @@ func (t *Topic) handleLeaveRequest(msg *ClientComMessage, sess *Session) { // User wants to leave without unsubscribing. if pssd, _ := t.remSession(sess, asUid); pssd != nil { + if !sess.isProxy() { + sess.detach <- t.name + } if pssd.isChanSub != asChan { // Cannot address non-channel subscription as channel and vice versa. if msg.init { diff --git a/server/topic_proxy.go b/server/topic_proxy.go index 41e6fbdc4..56edf35c3 100644 --- a/server/topic_proxy.go +++ b/server/topic_proxy.go @@ -138,6 +138,9 @@ func (t *Topic) handleProxyLeaveRequest(msg *ClientComMessage, killTimer *time.T // because by the time the response arrives this session may be already gone from the session store // and we won't be able to find and remove it by its sid. pssd, result := t.remSession(msg.sess, asUid) + if result { + msg.sess.detach <- t.name + } if !msg.init { // Explicitly specify the uid because the master multiplex session needs to know which // of its multiple hosted sessions to delete.