Skip to content

Commit

Permalink
fixed delay between websocket connection and .onmessage subscription …
Browse files Browse the repository at this point in the history
…which can cause lost elements
  • Loading branch information
an-tex committed Oct 7, 2020
1 parent 496c985 commit 0a0f21f
Show file tree
Hide file tree
Showing 5 changed files with 88 additions and 46 deletions.
14 changes: 9 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Lagom.js is built against specific versions of Lagom, the latest are:
|-------------|-------|-----------------|----------|
| 0.1.2-1.5.5 | 1.5.5 | 2.11 <br> 2.12 | 0.6.31 |
| 0.3.2-1.6.2 | 1.6.2 | 2.12 <br> 2.13 | 0.6.33 |
| 0.4.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |
| 0.4.1-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |

Lagom.js moved to Scala.js 1.x starting with version `0.4.0-1.6.2`. Scala.js 0.6 is no longer supported, the last version to support it was `0.3.2-1.6.2`. For all past releases, see [releases](#Releases).

Expand All @@ -27,7 +27,7 @@ Lagom.js provides JavaScript versions of several Lagom artifacts. The two most i
The `lagomjs-scaladsl-api` artifact provides the JavaScript implementation of the Lagom service API:

```sbt
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.1-1.6.4"
```

To use it you'll need to configure your service API as a [Scala.js cross project](https://github.com/portable-scala/sbt-crossproject) for the JVM and JS platforms. Then, add the `lagomjs-scaladsl-api` dependency to the JS platform:
Expand All @@ -39,7 +39,7 @@ lazy val `service-api` = crossProject(JVMPlatform, JSPlatform)
libraryDependencies += lagomScaladslApi
)
.jsSettings(
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.1-1.6.4"
)
```

Expand All @@ -50,15 +50,15 @@ This enables your Lagom service definition to be compiled into JavaScript. In ad
The `lagomjs-scaladsl-client` artifact provides the JavaScript implementation of the Lagom service client:

```sbt
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.1-1.6.4"
```

You can use it in a Scala.js project along with your service API to generate a service client:

```scala
lazy val `client-js` = project
.settings(
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.1-1.6.4"
)
.enablePlugins(ScalaJSPlugin)
.dependsOn(`service-api`.js)
Expand All @@ -80,6 +80,9 @@ However, the service client does not support a few the features available in Lag
- subscribing to topics: topic definitions are available in the service client, but attempting to subscribe to the topic throws an exception
- advanced service locators: service locators outside of the built-in service locators, such as `AkkaDiscoveryServiceLocator`, are not available

## Known issues
`ServiceCalls` based on akka-stream `Source` are implemented using Websockets, which don't propagate backpressure. This might change with [WebSocketStream API](https://web.dev/websocketstream/) once it's widely available.

## Releases

Lagom.js tracks Lagom and generally doesn't continue development on previous Lagom releases. Since Lagom maintains a stable API within a minor release (e.g., 1.6.x) any version of Lagom.js built for that minor release should work. However, if you need Lagom.js for a specific previous Lagom release the previous Lagom.js releases are listed below. If you have an issue with a previous Lagom.js release please open an [issue](https://github.com/mliarakos/lagom-js/issues) and it will be considered on a case-by-case basis.
Expand All @@ -96,4 +99,5 @@ Lagom.js tracks Lagom and generally doesn't continue development on previous Lag
| 0.4.0-1.6.2 | 1.6.2 | 2.12 <br> 2.13 | 1.0.1 |
| 0.4.0-1.6.3 | 1.6.3 | 2.12 <br> 2.13 | 1.1.1 |
| 0.4.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |
| 0.4.1-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |

2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lazy val publishSettings = Seq(

lazy val commonSettings = scalaSettings ++ publishSettings ++ Seq(
organization := "com.github.mliarakos.lagomjs",
version := s"0.4.0-$baseLagomVersion"
version := s"0.4.1-$baseLagomVersion"
)

lazy val commonJsSettings = Seq(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package com.lightbend.lagom.internal.scaladsl.client

import akka.stream.Materializer
import com.lightbend.lagom.internal.client.WebSocketClient
import com.typesafe.config.Config

import scala.concurrent.ExecutionContext

private[lagom] class ScaladslWebSocketClient()(implicit ec: ExecutionContext)
extends WebSocketClient()(ec)
private[lagom] class ScaladslWebSocketClient(config : Config)(implicit ec: ExecutionContext, materializer: Materializer)
extends WebSocketClient(config)(ec, materializer)
with ScaladslServiceApiBridge
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ trait LagomServiceClientComponents extends TopicFactoryProvider { self: LagomCon
lazy val serviceResolver: ServiceResolver = new ScaladslServiceResolver(defaultExceptionSerializer)
lazy val defaultExceptionSerializer: ExceptionSerializer = new DefaultExceptionSerializer(environment)

lazy val scaladslWebSocketClient: ScaladslWebSocketClient = new ScaladslWebSocketClient()(executionContext)
lazy val scaladslWebSocketClient: ScaladslWebSocketClient =
new ScaladslWebSocketClient(config)(executionContext, materializer)
lazy val serviceClient: ServiceClient = new ScaladslServiceClient(
scaladslWebSocketClient,
serviceInfo,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,16 @@ import java.net.URI
import java.nio.ByteBuffer

import akka.NotUsed
import akka.stream.Materializer
import akka.stream.OverflowStrategy
import akka.stream.StreamDetachedException
import akka.stream.scaladsl.Flow
import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.lightbend.lagom.internal.api.transport.LagomServiceApiBridge
import com.typesafe.config.Config
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
Expand All @@ -18,14 +23,21 @@ import org.scalajs.dom.WebSocket
import org.scalajs.dom.raw.MessageEvent
import play.api.http.Status

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.scalajs.js.typedarray.ArrayBuffer
import scala.scalajs.js.typedarray.TypedArrayBuffer
import scala.scalajs.js.typedarray.TypedArrayBufferOps._
import scala.util.Failure
import scala.util.Success

private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) extends LagomServiceApiBridge {
private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: ExecutionContext, materializer: Materializer)
extends LagomServiceApiBridge {

// Defines the internal buffer size for the websocket when using ServiceCalls containing a Source as it's not possible to use backpressure in the JS websocket implementation
val maxBufferSize = Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_!=0).getOrElse(1024)

/**
* Connect to the given URI
Expand Down Expand Up @@ -119,50 +131,73 @@ private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) e
private val NormalClosure = 1000
private var hasSubscription = false

// acts as a buffer between the backpressare unaware Websocket and backpressure aware reactive-streams Subscriber
val (messageSource, messageSink) = Source
.queue[ByteString](maxBufferSize, OverflowStrategy.fail)
.toMat(Sink.queue())(Keep.both)
.run()

// fill buffer even before a subscriber is connected, otherwise we might loose elements
socket.onmessage = { message =>
// The message data should be either an ArrayBuffer or a String
// It should not be a Blob because the socket binaryType was set
val data = message.data match {
case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
case data => ByteString.fromString(data.toString)
}

messageSource.offer(data).onComplete {
case Failure(exception) =>
throw exception
case _ =>
}
}

socket.onerror = event =>
messageSource.fail(new RuntimeException(s"WebSocket error: ${event.`type`}"))

// acting on the buffer instead of directly on the subscriber e.g. to make sure pending elements are delivered before completion
socket.onclose = event => {
if (event.code == NormalClosure) {
messageSource.complete()
} else {
// Complete the subscriber with an error because the socket closed with an error
// Parse the error reason as an exception
val bytes = ByteString.fromString(event.reason)
val exception =
exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
messageSource.fail(exception)
}
}

var completed = false

override def subscribe(subscriber: Subscriber[_ >: ByteString]): Unit = {
if (!hasSubscription) {
hasSubscription = true

// Forward messages from the socket to the subscriber
val onMessage = (message: MessageEvent) => {
// The message data should be either an ArrayBuffer or a String
// It should not be a Blob because the socket binaryType was set
val data = message.data match {
case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
case data => ByteString.fromString(data.toString)
}
subscriber.onNext(data)
}
// Complete the subscriber with an error if the socket encounters an error
val onError = (event: Event) => {
subscriber.onError(new RuntimeException(s"WebSocket error: ${event.`type`}"))
}
// Complete the subscriber when the socket closes based on the close code
val onClose = (event: CloseEvent) => {
if (event.code == NormalClosure) {
// Successfully complete the subscriber because the socket closed normally
subscriber.onComplete()
} else {
// Complete the subscriber with an error because the socket closed with an error
// Parse the error reason as an exception
val bytes = ByteString.fromString(event.reason)
val exception =
exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
subscriber.onError(exception)
}
}

// Configure the subscriber to start the stream
subscriber.onSubscribe(new Subscription {
private var hasStarted = false
// Configure the socket after the initial subscriber request
override def request(n: Long): Unit = {
if (!hasStarted) {
hasStarted = true
socket.onmessage = onMessage
socket.onerror = onError
socket.addEventListener[CloseEvent]("close", onClose)
// pull as many elements as requested, but only as long as element Source has not completed
def pull(): Unit = {
messageSink.pull().onComplete {
case Failure(_: StreamDetachedException) =>
// happens when subscriber requests more items than available or Subscription.request() is called before all requested elements have been onNext'ed

case Failure(exception) =>
subscriber.onError(exception)

case Success(Some(data)) =>
subscriber.onNext(data)
pull()

case Success(None) =>
subscriber.onComplete()
completed = true
}
}
if (!completed) pull()
}
// Close the socket if the subscriber cancels
override def cancel(): Unit = socket.close()
Expand Down

0 comments on commit 0a0f21f

Please sign in to comment.