Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread safety in Observable.Delay #779

Closed
c0nstexpr opened this issue Jun 27, 2024 · 8 comments
Closed

Thread safety in Observable.Delay #779

c0nstexpr opened this issue Jun 27, 2024 · 8 comments

Comments

@c0nstexpr
Copy link

c0nstexpr commented Jun 27, 2024

delay method uses Scheduler parameter to execute not only the delay operation, but also the subscribe body. Different thread context could result in potential race problem.

val currentThread  = Thread.currentThread() // caller thread
observableOf(1).delay(3.seconds, ioScheduler).subscribe {
      val innerThread = Thread.currentThread() // subscribe body's thread
      logger.d { "Is current thread? ${currentThread == innerThread}" } // Answer is NO
}

To keep same thread context after delay, a Scheduler which schedule in current thread context is needed. But current library doesn't provide a straightforward way to create such Scheduler

@arkivanov
Copy link
Contributor

arkivanov commented Jun 27, 2024

Sorry, I think I don't understand the issue. The delay operator only delays emissions, not the subscription phase. You can check the related ReactiveX docs. It is by design that the subscribe {} body is executed on the specified Scheduler. You can specify the required Scheduler for the delay operator, e.g. mainScheduler. This will not block the main thread.

@c0nstexpr
Copy link
Author

Sorry, I think I don't understand the issue. The delay operator only delays emissions, not the subscription phase. You can check the related ReactiveX docs. It is by design that the subscribe {} body is executed on the specified Scheduler. You can specify the required Scheduler for the delay operator, e.g. mainScheduler. This will not block the main thread.

I test the code, and the output does confirm that the code in subscribe body was executed in ioScheduler thread, not the caller thread of subscribe method invoked. That's not what I want.

@arkivanov
Copy link
Contributor

That's not what I want

But this is how the delay operator is supposed to work. You can specify any other scheduler that you would like your subscribe callback to be called on.

@c0nstexpr
Copy link
Author

That's not what I want

But this is how the delay operator is supposed to work. You can specify any other scheduler that you would like your subscribe callback to be called on.

This is the problem I mentioned in second paragraph

To keep same thread context after delay, a Scheduler which schedule in current thread context is needed. But current library doesn't provide a straightforward way to create such Scheduler

@arkivanov
Copy link
Contributor

Can you provide an example of how you would write the code in that case?

@c0nstexpr
Copy link
Author

Like this?

val currentThread  = Thread.currentThread() // caller thread
observableOf(1).delay(3.seconds, ioScheduler)
    .subscribeOn(currentThreadSheculer) // currentThreadSheculer schedule task in caller thread
    .subscribe {
        val innerThread = Thread.currentThread() // subscribe body's thread
        logger.d { "Is current thread? ${currentThread == innerThread}" } // Answer is YES
    }

@arkivanov
Copy link
Contributor

Thanks! Here are a few points.

  1. We should use observeOn to specify the Scheduler for emissions, not subscribeOn.
  2. It looks conceptually impossible to implement currentThreadScheduler, because we can't just schedule a task on a thread, plus the current thread does not necessarily belong to a Scheduler.
  3. Instead we should explicitly specify the desired Scheduler for emissions:

observableOf(1).delay(3.seconds, mainScheduler).subscribe {}

or

observableOf(1).delay(3.seconds, ioScheduler).observeOn(mainScheduler).subscribe {}, which is just a bit less performant version of the first option.

@c0nstexpr
Copy link
Author

Thanks for the information. I have to admit that currentThreadScheduler might not be possible to implement after searching answers for hours.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants