Skip to content

Commit

Permalink
fix: fix issue with clone unregister
Browse files Browse the repository at this point in the history
Make sure that `drop()` won't trigger `unregister` for `Subscription` and `SubscriptionSet` clones
(which has been created with `Clone::clone()`).
  • Loading branch information
parfeon committed Jan 25, 2024
1 parent e21b5b6 commit 5269c95
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
17 changes: 16 additions & 1 deletion src/dx/subscribe/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,9 @@ pub struct Subscription<
> {
/// Subscription reference.
pub(super) inner: Arc<SubscriptionRef<T, D>>,

/// Whether subscription is `Clone::clone()` method call result or not.
is_clone: bool,
}

/// Subscription reference
Expand Down Expand Up @@ -185,6 +188,7 @@ where
) -> Self {
Self {
inner: SubscriptionRef::new(client, entity, options),
is_clone: false,
}
}

Expand Down Expand Up @@ -231,6 +235,7 @@ where
pub fn clone_empty(&self) -> Self {
Self {
inner: self.inner.clone_empty(),
is_clone: false,
}
}
}
Expand Down Expand Up @@ -266,6 +271,7 @@ where
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
is_clone: true,
}
}
}
Expand All @@ -276,13 +282,22 @@ where
D: Deserializer + Send + Sync + 'static,
{
fn drop(&mut self) {
// Nothing should be done for regular subscription clone.
if self.is_clone {
return;
}

// Unregistering self to clean up subscriptions list if required.
let Some(client) = self.client().upgrade().clone() else {
return;
};

if let Some(manager) = client.subscription_manager(false).write().as_mut() {
if let Some((_, handler)) = self.clones.read().iter().next() {
let mut clones = self.clones.write();

if clones.len().gt(&1) {
clones.retain(|instance_id, _| instance_id.ne(&self.instance_id));
} else if let Some((_, handler)) = clones.iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
Expand Down
18 changes: 17 additions & 1 deletion src/dx/subscribe/subscription_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ pub struct SubscriptionSet<
> {
/// Subscriptions set reference.
pub(super) inner: Arc<SubscriptionSetRef<T, D>>,

/// Whether subscription set is `Clone::clone()` method call result or not.
is_clone: bool,
}

/// Entities subscriptions set reference.
Expand Down Expand Up @@ -183,6 +186,7 @@ where
) -> Self {
Self {
inner: SubscriptionSetRef::new(entities, options),
is_clone: false,
}
}

Expand All @@ -208,6 +212,7 @@ where
) -> Self {
Self {
inner: SubscriptionSetRef::new_with_subscriptions(subscriptions, options),
is_clone: false,
}
}

Expand Down Expand Up @@ -251,6 +256,7 @@ where
pub fn clone_empty(&self) -> Self {
Self {
inner: self.inner.clone_empty(),
is_clone: false,
}
}

Expand Down Expand Up @@ -351,6 +357,7 @@ where
fn clone(&self) -> Self {
Self {
inner: self.inner.clone(),
is_clone: true,
}
}
}
Expand All @@ -361,13 +368,22 @@ where
D: Deserializer + Send + Sync,
{
fn drop(&mut self) {
// Nothing should be done for regular subscription clone.
if self.is_clone {
return;
}

// Unregistering self to clean up subscriptions list if required.
let Some(client) = self.client().upgrade().clone() else {
return;
};

if let Some(manager) = client.subscription_manager(false).write().as_mut() {
if let Some((_, handler)) = self.clones.read().iter().next() {
let mut clones = self.clones.write();

if clones.len().gt(&1) {
clones.retain(|instance_id, _| instance_id.ne(&self.instance_id));
} else if let Some((_, handler)) = clones.iter().next() {
let handler: Weak<dyn EventHandler<T, D> + Send + Sync> = handler.clone();
manager.unregister(&handler);
}
Expand Down

0 comments on commit 5269c95

Please sign in to comment.