diff --git a/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala b/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala index 272a3c9..50033ed 100644 --- a/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala +++ b/lagomjs-client/js/src/main/scala/com/lightbend/lagom/internal/client/WebSocketClient.scala @@ -20,10 +20,8 @@ import org.reactivestreams.Subscription import org.scalajs.dom.CloseEvent import org.scalajs.dom.Event import org.scalajs.dom.WebSocket -import org.scalajs.dom.raw.MessageEvent import play.api.http.Status -import scala.collection.JavaConverters._ import scala.concurrent.ExecutionContext import scala.concurrent.Future import scala.concurrent.Promise @@ -36,8 +34,11 @@ import scala.util.Success private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: ExecutionContext, materializer: Materializer) extends LagomServiceApiBridge { + private val NormalClosure = 1000 + // Defines the internal buffer size for the websocket when using ServiceCalls containing a Source as it's not possible to use backpressure in the JS websocket implementation - val maxBufferSize = Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_!=0).getOrElse(1024) + val maxBufferSize = + Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_ != 0).getOrElse(1024) /** * Connect to the given URI @@ -64,8 +65,8 @@ private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: Execu val subscriber = new WebSocketSubscriber(socket, exceptionSerializer) val socketSink = Sink.fromSubscriber(subscriber) // Create source that will receive the incoming socket data and send it to the response source - val publisher = new WebSocketPublisher(socket, exceptionSerializer, messageHeaderProtocol(requestHeader)) - val socketSource = Source.fromPublisher(publisher) + val socketSource = createSocketSource(socket, exceptionSerializer, messageHeaderProtocol(requestHeader)) + // Create flow that represents sending data to and receiving data from the socket // The sending side is: socketSink -> socket -> service // The receiving side is: service -> socket -> socketSource @@ -123,88 +124,51 @@ private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: Execu } } - private class WebSocketPublisher( + private def createSocketSource( socket: WebSocket, exceptionSerializer: ExceptionSerializer, requestProtocol: MessageProtocol - ) extends Publisher[ByteString] { - private val NormalClosure = 1000 - private var hasSubscription = false - - // acts as a buffer between the backpressare unaware Websocket and backpressure aware reactive-streams Subscriber - val (messageSource, messageSink) = Source - .queue[ByteString](maxBufferSize, OverflowStrategy.fail) - .toMat(Sink.queue())(Keep.both) - .run() + ): Source[ByteString, NotUsed] = { + // Create a buffer between the back-pressure unaware WebSocket and back-pressure aware stream + val (buffer, socketSource) = Source.queue[ByteString](maxBufferSize, OverflowStrategy.fail).preMaterialize() - // fill buffer even before a subscriber is connected, otherwise we might loose elements + // Forward messages from the socket to the buffer + // Start filling the buffer even before the stream is connected to prevent loss of elements socket.onmessage = { message => // The message data should be either an ArrayBuffer or a String // It should not be a Blob because the socket binaryType was set val data = message.data match { - case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer)) - case data => ByteString.fromString(data.toString) - } + case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer)) + case data => ByteString.fromString(data.toString) + } - messageSource.offer(data).onComplete { - case Failure(exception) => - throw exception - case _ => + buffer.offer(data).onComplete { + case Failure(exception) => throw exception + case _ => } } - socket.onerror = event => - messageSource.fail(new RuntimeException(s"WebSocket error: ${event.`type`}")) + // Fail the buffer and ultimately the stream with an error if the socket encounters an error + socket.onerror = event => buffer.fail(new RuntimeException(s"WebSocket error: ${event.`type`}")) - // acting on the buffer instead of directly on the subscriber e.g. to make sure pending elements are delivered before completion + // Complete the buffer and ultimately the stream when the socket closes based on the close code + // This should ensure that pending elements are delivered before completion socket.onclose = event => { if (event.code == NormalClosure) { - messageSource.complete() + buffer.complete() } else { - // Complete the subscriber with an error because the socket closed with an error + // Complete with an error because the socket closed with an error // Parse the error reason as an exception val bytes = ByteString.fromString(event.reason) val exception = exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes) - messageSource.fail(exception) + buffer.fail(exception) } } - var completed = false - - override def subscribe(subscriber: Subscriber[_ >: ByteString]): Unit = { - if (!hasSubscription) { - hasSubscription = true - - // Configure the subscriber to start the stream - subscriber.onSubscribe(new Subscription { - override def request(n: Long): Unit = { - // pull as many elements as requested, but only as long as element Source has not completed - def pull(): Unit = { - messageSink.pull().onComplete { - case Failure(_: StreamDetachedException) => - // happens when subscriber requests more items than available or Subscription.request() is called before all requested elements have been onNext'ed - - case Failure(exception) => - subscriber.onError(exception) - - case Success(Some(data)) => - subscriber.onNext(data) - pull() - - case Success(None) => - subscriber.onComplete() - completed = true - } - } - if (!completed) pull() - } - // Close the socket if the subscriber cancels - override def cancel(): Unit = socket.close() - }) - } else { - subscriber.onError(new IllegalStateException("This publisher only supports one subscriber")) - } - } + // Close the socket when the buffer completes + buffer.watchCompletion().onComplete(_ => socket.close()) + + socketSource } }