Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed delay between websocket connection and .onmessage subscription … #19

Merged
merged 10 commits into from
Mar 23, 2021

Conversation

an-tex
Copy link
Contributor

@an-tex an-tex commented Oct 7, 2020

…which can cause lost elements.

In my case making a call to ServiceCall[NotUsed,Source[String,NotUsed]] like myService.myServiceCall.invoke().flatMap(_.runWith(Sink.seq)) caused a delay of ~30ms between opened websocket and connected subscriber. All elements in between were lost.

Also don't push more elements to subscriber than requested.

Annoying that JS Websockets don't support backpressure :(

extends LagomServiceApiBridge {

// Defines the internal buffer size for the websocket when using ServiceCalls containing a Source as it's not possible to use backpressure in the JS websocket implementation
val maxBufferSize = Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_!=0).getOrElse(1024)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I couldn't figure out how to use the reference.conf to define the default value :( the config stayed empty. Defining it directly in the ClientApplication by overriding com.lightbend.lagom.scaladsl.api.LagomConfigComponent#config works though

@mliarakos
Copy link
Owner

@an-tex, this is great! I should be able to do full review this weekend. I want to build a test case from your example for the feature/integration-test branch so we can protect against regressions. I'll also look into the reference.conf issue to see if there's a quick solution.

@an-tex
Copy link
Contributor Author

an-tex commented Oct 8, 2020

@an-tex, this is great! I should be able to do full review this weekend. I want to build a test case from your example for the feature/integration-test branch so we can protect against regressions. I'll also look into the reference.conf issue to see if there's a quick solution.

Brilliant! Those integration tests make certainly a lot of sense considering the custom JS implementation. Thanks for your work on this :)

@mliarakos
Copy link
Owner

Can you target the PR to the develop branch (rather than master)?

@mliarakos
Copy link
Owner

mliarakos commented Oct 12, 2020

I have the feature/integration-test branch updated with tests for these stream delay cases. I thought I had tests to cover this, but obviously they didn't work. They used service calls like ServiceCall[String, Source[String, NotUsed]]. Lagom implements sending the request string as a single stream element which allows the stream time to get set up for the response. However, your test case uses ServiceCall[NotUsed, Source[String, NotUsed]] which gets the WebSocket running before the response stream is ready because there is no request.

I was looking at implementing tracking of demand for the WebSocketPublisher and I realized I think it can all be done directly with Source.queue, eliminating the WebSocketPublisher. See WebSocketClient.scala in the fix/websocket-delay branch. Does that look right to you, @an-tex?

Still need to look into reference.conf.

@an-tex
Copy link
Contributor Author

an-tex commented Oct 12, 2020

True, much better! Guess no need to push my changes into the develop branch any more?

@mliarakos mliarakos changed the base branch from master to develop October 12, 2020 13:42
@mliarakos
Copy link
Owner

I didn't know it before, but I can add commits directly to the PR. I've changed the target to develop and added my changes to the PR so we can maintain your authorship.

@an-tex
Copy link
Contributor Author

an-tex commented Oct 13, 2020

I didn't know it before, but I can add commits directly to the PR. I've changed the target to develop and added my changes to the PR so we can maintain your authorship.

I don't mind the authorship stuff but cool, thanks mate! :) If you need any further help/review let me know

@mliarakos
Copy link
Owner

Removed mixed notation in config due to akka-js/shocon#28. The mixed notation can be added back when resolved.

@mliarakos
Copy link
Owner

After reading through the Akka docs on stream rate I decided to make the WebSocket receive buffer smaller to just handle the connection delay. I added a discussion to the readme on user approaches to handle the lack of back-pressure. @an-tex do those approaches work for you?

@an-tex
Copy link
Contributor Author

an-tex commented Oct 20, 2020

Sorry for the delay but testing this turned out to be pretty tricky. First I couldn't override the bufferSize in my app, do you have a working approach? I used this in my ClientApplication

  override def config = super.config.withFallback(ConfigFactory.parseString(
    """
      |lagom.client.websocket {
      |   bufferSize = 1024
      |}
      |""".stripMargin))

but the bufferSize stayed at 16. So I've just hardcoded it for testing ;)

My local testing (which is obviously not realistic but still shows what can happen) showed that a fast sender can pretty much keep the JS thread busy at the socket.onMessage function in a way that even the extra buffer as in

apiClient.websocketServiceCall.invoke().map ( source => 
// never got here in time... 
source.buffer(10240).runWith(...)
)

didn't get a chance to be connected until the sender actually finished, so it was all about the internal websocket buffer. But even just by adding a .throttle(1,1.millis) on the sender side helped as far as getting the buffer down to 512 to handle the initial peak.

I'll try with an actual remote system tomorrow. But I guess it'll be down to really be careful with fast senders and buffers. From that point of view your impl and docs are looking great

@mliarakos
Copy link
Owner

Experimented with the config and am running into similar issues. I'll keep looking into it. Another good catch!

I've tested this fast sender and not lost elements:

  override def fast = ServerServiceCall { _ =>
    val source = Source(Seq.range(1, 5000))
    Future.successful(source)
  }

Can you post or link the code that's causing the issue so I can try to replicate it?

@mliarakos
Copy link
Owner

I believe the config issue is due to akka-js/shocon#55.

@an-tex
Copy link
Contributor Author

an-tex commented Oct 27, 2020

Experimented with the config and am running into similar issues. I'll keep looking into it. Another good catch!

I've tested this fast sender and not lost elements:

  override def fast = ServerServiceCall { _ =>
    val source = Source(Seq.range(1, 5000))
    Future.successful(source)
  }

Can you post or link the code that's causing the issue so I can try to replicate it?

Have a look here: https://github.com/an-tex/lagom-scalajs-example

As soon as there's a burst of only 32 Elements, even after a Sink.ignore is connected, you'll get a full buffer exception. I've tried locally and from another machine over wifi.

Raising the bufferSize or adding .throttle helps. It's really odd. Seems like the demand from the Sink.ignore isn't getting to the socketSource fast enough. Could be the silly single-threaded JS engine only calling .onmessage but not getting much further?

@mliarakos
Copy link
Owner

Sorry there hasn't been much progress on this lately. I'm seeing all the same issues you are (my previously working example was a fluke). I think you're right, the WebSocket .onmessage is starving the single thread and preventing any progress on the rest of the stream. The only things I've found to work are to slow down the upstream or have a large enough websocket.bufferSize (user created downstream buffers don't seem to help).

I'll see if there's anything else I can do, but if not I'll just have to update the docs to discuss the issue.

@an-tex
Copy link
Contributor Author

an-tex commented Nov 26, 2020

Thanks for the update. Sounds good. Seems not much we can do to fix it properly but rather workaround it. Bummer!

@mliarakos
Copy link
Owner

mliarakos commented Jan 31, 2021

It's been a while, but I think I have a mitigating solution. Using Akka .buffer or Source.queue didn't work well because the Akka operations use Futures, which are scheduled at the back of the JavaScript event-loop queue. Generally, the WebSocket .onmessage would fill up the event-loop queue way faster than Akka, so the Akka operations rarely ran.

To mitigate this I implemented a custom WebSocket stream buffer that schedules buffer operations and downstream consumption on the JavaScript job queue, so it runs as soon as possible and before other elements of the event-loop queue. Generally, this helps the buffer keep up better with a fast upstream, though it can still fail. I also made the buffer allow an unbounded size as a last ditch.

I updated the PR readme with a discussion of the trade-offs.

Changing the config is still wonky because of akka-js/shocon#55. I'll work on a way to make that easier in a separate PR (#21).

@an-tex
Copy link
Contributor Author

an-tex commented Feb 1, 2021

Great! Seems it's the way to go to not rely entirely on akka in js at least for such for internals.

@mliarakos mliarakos merged commit 2b307dd into mliarakos:develop Mar 23, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants