Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixed delay between websocket connection and .onmessage subscription … #19

Merged
merged 10 commits into from
Mar 23, 2021
48 changes: 42 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ Lagom.js is built against specific versions of Lagom, the latest are:
|-------------|-------|-----------------|----------|
| 0.1.2-1.5.5 | 1.5.5 | 2.11 <br> 2.12 | 0.6.31 |
| 0.3.2-1.6.2 | 1.6.2 | 2.12 <br> 2.13 | 0.6.33 |
| 0.4.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |
| 0.5.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |

Lagom.js moved to Scala.js 1.x starting with version `0.4.0-1.6.2`. Scala.js 0.6 is no longer supported, the last version to support it was `0.3.2-1.6.2`. For all past releases, see [releases](#Releases).

Expand All @@ -27,7 +27,7 @@ Lagom.js provides JavaScript versions of several Lagom artifacts. The two most i
The `lagomjs-scaladsl-api` artifact provides the JavaScript implementation of the Lagom service API:

```sbt
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.5.0-1.6.4"
```

To use it you'll need to configure your service API as a [Scala.js cross project](https://github.com/portable-scala/sbt-crossproject) for the JVM and JS platforms. Then, add the `lagomjs-scaladsl-api` dependency to the JS platform:
Expand All @@ -39,7 +39,7 @@ lazy val `service-api` = crossProject(JVMPlatform, JSPlatform)
libraryDependencies += lagomScaladslApi
)
.jsSettings(
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.4.0-1.6.4"
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-api" % "0.5.0-1.6.4"
)
```

Expand All @@ -50,15 +50,15 @@ This enables your Lagom service definition to be compiled into JavaScript. In ad
The `lagomjs-scaladsl-client` artifact provides the JavaScript implementation of the Lagom service client:

```sbt
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
"com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.5.0-1.6.4"
```

You can use it in a Scala.js project along with your service API to generate a service client:

```scala
lazy val `client-js` = project
.settings(
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.4.0-1.6.4"
libraryDependencies += "com.github.mliarakos.lagomjs" %%% "lagomjs-scaladsl-client" % "0.5.0-1.6.4"
)
.enablePlugins(ScalaJSPlugin)
.dependsOn(`service-api`.js)
Expand All @@ -80,6 +80,42 @@ However, the service client does not support a few the features available in Lag
- subscribing to topics: topic definitions are available in the service client, but attempting to subscribe to the topic throws an exception
- advanced service locators: service locators outside of the built-in service locators, such as `AkkaDiscoveryServiceLocator`, are not available

## Configuration

### WebSocket Stream Buffer

Streaming service requests and responses are implemented using WebSockets. When starting a WebSocket connection there is a slight delay between the socket opening and the response stream being set up and ready to consume messages. To compensate for this delay the lagom.js WebSocket client uses a receive buffer to hold messages until the stream is ready. The buffer size can be set through configuration:

```yaml
lagom.client.websocket.bufferSize = 16
```

The buffer is sized by default to compensate for this delay. However, the buffer can also be used for another purpose.

The current WebSocket standard prevents the lagom.js WebSocket client from supporting stream back-pressure for sending or receiving WebSocket data. This can cause overflow errors and stream failure when upstream production is faster than downstream consumption. Normally, standard Akka methods, such as [buffer](https://doc.akka.io/docs/akka/current/stream/stream-rate.html), could be used to mitigate this issue. Unfortunately, these methods generally do not perform well in practice because of the way operations are scheduled by the JavaScript event-loop. A fast upstream often fails the stream before the Akka buffer logic has a chance to run.

The lagom.js receive buffer is implemented to schedule downstream consumption on the event-loop as soon as possible to mitigate this issue. The buffer size can be tuned to attempt to compensate for the fast streaming response. This is useful if the upstream has bursts of throughput that can overwhelm the downstream. However, depending on the use case, it may not be possible to fully compensate for stream rate differences. An upstream rate that is consistently faster than the downstream will eventually overflow the buffer. The buffer can be set to an unbounded size:

```yaml
lagom.client.websocket.bufferSize = unlimited
```

However, an unbounded buffer can negatively impact browser performance and will eventually fail due to lack of memory.

Alternatively, if the upstream logic (before data is sent over the WebSocket) can be modified, then the stream can be throttled. For example, in a `ServiceCall` implementation:

```scala
override def zeros = ServerServiceCall { _ =>
// Throttle the source to 1 element per second
val source = Source.repeat(0).throttle(elements = 1, 1.second)
Future.successful(source)
}
```

The `throttle` operator will back-pressure to achieve the desired rate and since the upstream can slow down it will not overwhelm the downstream. The throttle parameters should be tuned per use case.

WebSocket back-pressure for streams may become available once the [WebSocketStream API](https://web.dev/websocketstream/) is complete and widely available.

## Releases

Lagom.js tracks Lagom and generally doesn't continue development on previous Lagom releases. Since Lagom maintains a stable API within a minor release (e.g., 1.6.x) any version of Lagom.js built for that minor release should work. However, if you need Lagom.js for a specific previous Lagom release the previous Lagom.js releases are listed below. If you have an issue with a previous Lagom.js release please open an [issue](https://github.com/mliarakos/lagom-js/issues) and it will be considered on a case-by-case basis.
Expand All @@ -96,4 +132,4 @@ Lagom.js tracks Lagom and generally doesn't continue development on previous Lag
| 0.4.0-1.6.2 | 1.6.2 | 2.12 <br> 2.13 | 1.0.1 |
| 0.4.0-1.6.3 | 1.6.3 | 2.12 <br> 2.13 | 1.1.1 |
| 0.4.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |

| 0.5.0-1.6.4 | 1.6.4 | 2.12 <br> 2.13 | 1.2.0 |
8 changes: 1 addition & 7 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ lazy val publishSettings = Seq(

lazy val commonSettings = scalaSettings ++ publishSettings ++ Seq(
organization := "com.github.mliarakos.lagomjs",
version := s"0.4.1-$baseLagomVersion-SNAPSHOT"
version := s"0.5.0-$baseLagomVersion-SNAPSHOT"
)

lazy val commonJsSettings = Seq(
Expand Down Expand Up @@ -232,12 +232,6 @@ lazy val `lagomjs-client` = crossProject(JSPlatform)
sourceTarget
)

val resourcesTarget = file("lagomjs-client") / "js" / "src" / "main" / "resources"
copyToSourceDirectory(
lagomTargetDirectory.value / "service" / "core" / "client" / "src" / "main" / "resources",
resourcesTarget
)

val jsSources = sourceDirectory.value / "main" / "scala"
applyOverrides(sourceTarget, jsSources)
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
package com.lightbend.lagom.internal.scaladsl.client

import akka.stream.Materializer
import com.lightbend.lagom.internal.client.WebSocketClient
import com.lightbend.lagom.internal.client.WebSocketClientConfig

import scala.concurrent.ExecutionContext

private[lagom] class ScaladslWebSocketClient()(implicit ec: ExecutionContext)
extends WebSocketClient()(ec)
private[lagom] class ScaladslWebSocketClient(config: WebSocketClientConfig)(
implicit ec: ExecutionContext,
materializer: Materializer
) extends WebSocketClient(config)(ec, materializer)
with ScaladslServiceApiBridge
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import akka.actor.CoordinatedShutdown
import akka.stream.ActorMaterializer
import akka.stream.Materializer
import com.lightbend.lagom.internal.client.CircuitBreakerMetricsProviderImpl
import com.lightbend.lagom.internal.client.WebSocketClientConfig
import com.lightbend.lagom.internal.scaladsl.api.broker.TopicFactoryProvider
import com.lightbend.lagom.internal.scaladsl.client.ScaladslClientMacroImpl
import com.lightbend.lagom.internal.scaladsl.client.ScaladslServiceClient
Expand Down Expand Up @@ -136,7 +137,8 @@ trait LagomServiceClientComponents extends TopicFactoryProvider { self: LagomCon
lazy val serviceResolver: ServiceResolver = new ScaladslServiceResolver(defaultExceptionSerializer)
lazy val defaultExceptionSerializer: ExceptionSerializer = new DefaultExceptionSerializer(environment)

lazy val scaladslWebSocketClient: ScaladslWebSocketClient = new ScaladslWebSocketClient()(executionContext)
lazy val scaladslWebSocketClient: ScaladslWebSocketClient =
new ScaladslWebSocketClient(WebSocketClientConfig(config))(executionContext, materializer)
lazy val serviceClient: ServiceClient = new ScaladslServiceClient(
scaladslWebSocketClient,
serviceInfo,
Expand Down
45 changes: 45 additions & 0 deletions lagomjs-client/js/src/main/resources/reference.conf
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
#//#circuit-breaker-default
# Circuit breakers for calls to other services are configured
# in this section. A child configuration section with the same
# name as the circuit breaker identifier will be used, with fallback
# to the `lagom.circuit-breaker.default` section.
lagom.circuit-breaker {

# Default configuration that is used if a configuration section
# with the circuit breaker identifier is not defined.
default {
# Possibility to disable a given circuit breaker.
enabled = on

# Number of failures before opening the circuit.
max-failures = 10

# Duration of time after which to consider a call a failure.
call-timeout = 10s

# Duration of time in open state after which to attempt to close
# the circuit, by first entering the half-open state.
reset-timeout = 15s

# A whitelist of fqcn of Exceptions that the CircuitBreaker
# should not consider failures. By default all exceptions are
# considered failures.
exception-whitelist = []
}
}
#//#circuit-breaker-default

#//#web-socket-client-default
# This configures the websocket clients used by this service.
# This is a global configuration and it is currently not possible
# to provide different configurations if multiple websocket services
# are consumed.
lagom.client.websocket {

# Size of the internal WebSocket data receive buffer used to
# compensate for the delay between WebSocket connection and stream
# start and for the lack of WebSocket back-pressure.
# Set to "unlimited" for an effectively ulimited buffer size.
bufferSize = 16
}
#//#web-socket-client-default
Loading