diff --git a/README.md b/README.md
index ee026ea..1e161df 100644
--- a/README.md
+++ b/README.md
@@ -12,7 +12,7 @@ Lagom.js is built against specific versions of Lagom, the latest are:
|-------------|-------|-----------------|----------|
| 0.1.2-1.5.5 | 1.5.5 | 2.11
2.12 | 0.6.31 |
| 0.3.2-1.6.2 | 1.6.2 | 2.12
2.13 | 0.6.33 |
-| 0.4.0-1.6.4 | 1.6.4 | 2.12
2.13 | 1.2.0 |
+| 0.4.1-1.6.4 | 1.6.4 | 2.12
2.13 | 1.2.0 |
Lagom.js moved to Scala.js 1.x starting with version `0.4.0-1.6.2`. Scala.js 0.6 is no longer supported, the last version to support it was `0.3.2-1.6.2`. For all past releases, see [releases](#Releases).
@@ -27,7 +27,7 @@ Lagom.js provides JavaScript versions of several Lagom artifacts. The two most i
The `lagomjs-scaladsl-api` artifact provides the JavaScript implementation of the Lagom service API:
```sbt
-"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
+"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.1-1.6.4"
```
To use it you'll need to configure your service API as a [Scala.js cross project](https://github.com/portable-scala/sbt-crossproject) for the JVM and JS platforms. Then, add the `lagomjs-scaladsl-api` dependency to the JS platform:
@@ -39,7 +39,7 @@ lazy val `service-api` = crossProject(JVMPlatform, JSPlatform)
libraryDependencies += lagomScaladslApi
)
.jsSettings(
- libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
+ libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.1-1.6.4"
)
```
@@ -50,7 +50,7 @@ This enables your Lagom service definition to be compiled into JavaScript. In ad
The `lagomjs-scaladsl-client` artifact provides the JavaScript implementation of the Lagom service client:
```sbt
-"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
+"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.1-1.6.4"
```
You can use it in a Scala.js project along with your service API to generate a service client:
@@ -58,7 +58,7 @@ You can use it in a Scala.js project along with your service API to generate a s
```scala
lazy val `client-js` = project
.settings(
- libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
+ libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.1-1.6.4"
)
.enablePlugins(ScalaJSPlugin)
.dependsOn(`service-api`.js)
@@ -80,6 +80,9 @@ However, the service client does not support a few the features available in Lag
- subscribing to topics: topic definitions are available in the service client, but attempting to subscribe to the topic throws an exception
- advanced service locators: service locators outside of the built-in service locators, such as `AkkaDiscoveryServiceLocator`, are not available
+## Known issues
+`ServiceCalls` based on akka-stream `Source` are implemented using Websockets, which don't propagate backpressure. This might change with [WebSocketStream API](https://web.dev/websocketstream/) once it's widely available.
+
## Releases
Lagom.js tracks Lagom and generally doesn't continue development on previous Lagom releases. Since Lagom maintains a stable API within a minor release (e.g., 1.6.x) any version of Lagom.js built for that minor release should work. However, if you need Lagom.js for a specific previous Lagom release the previous Lagom.js releases are listed below. If you have an issue with a previous Lagom.js release please open an [issue](https://github.com/mliarakos/lagom-js/issues) and it will be considered on a case-by-case basis.
@@ -96,4 +99,5 @@ Lagom.js tracks Lagom and generally doesn't continue development on previous Lag
| 0.4.0-1.6.2 | 1.6.2 | 2.12
2.13 | 1.0.1 |
| 0.4.0-1.6.3 | 1.6.3 | 2.12
2.13 | 1.1.1 |
| 0.4.0-1.6.4 | 1.6.4 | 2.12
2.13 | 1.2.0 |
+| 0.4.1-1.6.4 | 1.6.4 | 2.12
2.13 | 1.2.0 |
diff --git a/build.sbt b/build.sbt
index 3cfdda0..334cdca 100644
--- a/build.sbt
+++ b/build.sbt
@@ -50,7 +50,7 @@ lazy val publishSettings = Seq(
lazy val commonSettings = scalaSettings ++ publishSettings ++ Seq(
organization := "com.github.mliarakos.lagomjs",
- version := s"0.4.0-$baseLagomVersion"
+ version := s"0.4.1-$baseLagomVersion"
)
lazy val commonJsSettings = Seq(
diff --git a/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/internal/scaladsl/client/ScaladslWebSocketClient.scala b/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/internal/scaladsl/client/ScaladslWebSocketClient.scala
index 2821e91..760001c 100644
--- a/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/internal/scaladsl/client/ScaladslWebSocketClient.scala
+++ b/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/internal/scaladsl/client/ScaladslWebSocketClient.scala
@@ -1,9 +1,11 @@
package com.lightbend.lagom.internal.scaladsl.client
+import akka.stream.Materializer
import com.lightbend.lagom.internal.client.WebSocketClient
+import com.typesafe.config.Config
import scala.concurrent.ExecutionContext
-private[lagom] class ScaladslWebSocketClient()(implicit ec: ExecutionContext)
- extends WebSocketClient()(ec)
+private[lagom] class ScaladslWebSocketClient(config : Config)(implicit ec: ExecutionContext, materializer: Materializer)
+ extends WebSocketClient(config)(ec, materializer)
with ScaladslServiceApiBridge
diff --git a/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/scaladsl/client/ServiceClient.scala b/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/scaladsl/client/ServiceClient.scala
index 5f8c69c..c66f471 100644
--- a/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/scaladsl/client/ServiceClient.scala
+++ b/lagomjs-client-scaladsl/js/src/main/scala/com/lightbend/lagom/scaladsl/client/ServiceClient.scala
@@ -136,7 +136,8 @@ trait LagomServiceClientComponents extends TopicFactoryProvider { self: LagomCon
lazy val serviceResolver: ServiceResolver = new ScaladslServiceResolver(defaultExceptionSerializer)
lazy val defaultExceptionSerializer: ExceptionSerializer = new DefaultExceptionSerializer(environment)
- lazy val scaladslWebSocketClient: ScaladslWebSocketClient = new ScaladslWebSocketClient()(executionContext)
+ lazy val scaladslWebSocketClient: ScaladslWebSocketClient =
+ new ScaladslWebSocketClient(config)(executionContext, materializer)
lazy val serviceClient: ServiceClient = new ScaladslServiceClient(
scaladslWebSocketClient,
serviceInfo,
diff --git a/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala b/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala
index 7693597..272a3c9 100644
--- a/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala
+++ b/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala
@@ -4,11 +4,16 @@ import java.net.URI
import java.nio.ByteBuffer
import akka.NotUsed
+import akka.stream.Materializer
+import akka.stream.OverflowStrategy
+import akka.stream.StreamDetachedException
import akka.stream.scaladsl.Flow
+import akka.stream.scaladsl.Keep
import akka.stream.scaladsl.Sink
import akka.stream.scaladsl.Source
import akka.util.ByteString
import com.lightbend.lagom.internal.api.transport.LagomServiceApiBridge
+import com.typesafe.config.Config
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
@@ -18,14 +23,21 @@ import org.scalajs.dom.WebSocket
import org.scalajs.dom.raw.MessageEvent
import play.api.http.Status
+import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
import scala.scalajs.js.typedarray.ArrayBuffer
import scala.scalajs.js.typedarray.TypedArrayBuffer
import scala.scalajs.js.typedarray.TypedArrayBufferOps._
+import scala.util.Failure
+import scala.util.Success
-private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) extends LagomServiceApiBridge {
+private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: ExecutionContext, materializer: Materializer)
+ extends LagomServiceApiBridge {
+
+ // Defines the internal buffer size for the websocket when using ServiceCalls containing a Source as it's not possible to use backpressure in the JS websocket implementation
+ val maxBufferSize = Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_!=0).getOrElse(1024)
/**
* Connect to the given URI
@@ -119,50 +131,73 @@ private[lagom] abstract class WebSocketClient()(implicit ec: ExecutionContext) e
private val NormalClosure = 1000
private var hasSubscription = false
+ // acts as a buffer between the backpressare unaware Websocket and backpressure aware reactive-streams Subscriber
+ val (messageSource, messageSink) = Source
+ .queue[ByteString](maxBufferSize, OverflowStrategy.fail)
+ .toMat(Sink.queue())(Keep.both)
+ .run()
+
+ // fill buffer even before a subscriber is connected, otherwise we might loose elements
+ socket.onmessage = { message =>
+ // The message data should be either an ArrayBuffer or a String
+ // It should not be a Blob because the socket binaryType was set
+ val data = message.data match {
+ case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
+ case data => ByteString.fromString(data.toString)
+ }
+
+ messageSource.offer(data).onComplete {
+ case Failure(exception) =>
+ throw exception
+ case _ =>
+ }
+ }
+
+ socket.onerror = event =>
+ messageSource.fail(new RuntimeException(s"WebSocket error: ${event.`type`}"))
+
+ // acting on the buffer instead of directly on the subscriber e.g. to make sure pending elements are delivered before completion
+ socket.onclose = event => {
+ if (event.code == NormalClosure) {
+ messageSource.complete()
+ } else {
+ // Complete the subscriber with an error because the socket closed with an error
+ // Parse the error reason as an exception
+ val bytes = ByteString.fromString(event.reason)
+ val exception =
+ exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
+ messageSource.fail(exception)
+ }
+ }
+
+ var completed = false
+
override def subscribe(subscriber: Subscriber[_ >: ByteString]): Unit = {
if (!hasSubscription) {
hasSubscription = true
- // Forward messages from the socket to the subscriber
- val onMessage = (message: MessageEvent) => {
- // The message data should be either an ArrayBuffer or a String
- // It should not be a Blob because the socket binaryType was set
- val data = message.data match {
- case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
- case data => ByteString.fromString(data.toString)
- }
- subscriber.onNext(data)
- }
- // Complete the subscriber with an error if the socket encounters an error
- val onError = (event: Event) => {
- subscriber.onError(new RuntimeException(s"WebSocket error: ${event.`type`}"))
- }
- // Complete the subscriber when the socket closes based on the close code
- val onClose = (event: CloseEvent) => {
- if (event.code == NormalClosure) {
- // Successfully complete the subscriber because the socket closed normally
- subscriber.onComplete()
- } else {
- // Complete the subscriber with an error because the socket closed with an error
- // Parse the error reason as an exception
- val bytes = ByteString.fromString(event.reason)
- val exception =
- exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
- subscriber.onError(exception)
- }
- }
-
// Configure the subscriber to start the stream
subscriber.onSubscribe(new Subscription {
- private var hasStarted = false
- // Configure the socket after the initial subscriber request
override def request(n: Long): Unit = {
- if (!hasStarted) {
- hasStarted = true
- socket.onmessage = onMessage
- socket.onerror = onError
- socket.addEventListener[CloseEvent]("close", onClose)
+ // pull as many elements as requested, but only as long as element Source has not completed
+ def pull(): Unit = {
+ messageSink.pull().onComplete {
+ case Failure(_: StreamDetachedException) =>
+ // happens when subscriber requests more items than available or Subscription.request() is called before all requested elements have been onNext'ed
+
+ case Failure(exception) =>
+ subscriber.onError(exception)
+
+ case Success(Some(data)) =>
+ subscriber.onNext(data)
+ pull()
+
+ case Success(None) =>
+ subscriber.onComplete()
+ completed = true
+ }
}
+ if (!completed) pull()
}
// Close the socket if the subscriber cancels
override def cancel(): Unit = socket.close()