Skip to content

Commit

Permalink
updated websocket output to be handled directly by the source queue b…
Browse files Browse the repository at this point in the history
…uffer
  • Loading branch information
mliarakos committed Oct 12, 2020
1 parent 027566b commit fa6b910
Showing 1 changed file with 29 additions and 65 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,8 @@ import org.reactivestreams.Subscription
import org.scalajs.dom.CloseEvent
import org.scalajs.dom.Event
import org.scalajs.dom.WebSocket
import org.scalajs.dom.raw.MessageEvent
import play.api.http.Status

import scala.collection.JavaConverters._
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.Promise
Expand All @@ -36,8 +34,11 @@ import scala.util.Success
private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: ExecutionContext, materializer: Materializer)
extends LagomServiceApiBridge {

private val NormalClosure = 1000

// Defines the internal buffer size for the websocket when using ServiceCalls containing a Source as it's not possible to use backpressure in the JS websocket implementation
val maxBufferSize = Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_!=0).getOrElse(1024)
val maxBufferSize =
Option(config.getInt("lagom.client.websocket.scalajs.maxBufferSize")).filter(_ != 0).getOrElse(1024)

/**
* Connect to the given URI
Expand All @@ -64,8 +65,8 @@ private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: Execu
val subscriber = new WebSocketSubscriber(socket, exceptionSerializer)
val socketSink = Sink.fromSubscriber(subscriber)
// Create source that will receive the incoming socket data and send it to the response source
val publisher = new WebSocketPublisher(socket, exceptionSerializer, messageHeaderProtocol(requestHeader))
val socketSource = Source.fromPublisher(publisher)
val socketSource = createSocketSource(socket, exceptionSerializer, messageHeaderProtocol(requestHeader))

// Create flow that represents sending data to and receiving data from the socket
// The sending side is: socketSink -> socket -> service
// The receiving side is: service -> socket -> socketSource
Expand Down Expand Up @@ -123,88 +124,51 @@ private[lagom] abstract class WebSocketClient(config: Config)(implicit ec: Execu
}
}

private class WebSocketPublisher(
private def createSocketSource(
socket: WebSocket,
exceptionSerializer: ExceptionSerializer,
requestProtocol: MessageProtocol
) extends Publisher[ByteString] {
private val NormalClosure = 1000
private var hasSubscription = false

// acts as a buffer between the backpressare unaware Websocket and backpressure aware reactive-streams Subscriber
val (messageSource, messageSink) = Source
.queue[ByteString](maxBufferSize, OverflowStrategy.fail)
.toMat(Sink.queue())(Keep.both)
.run()
): Source[ByteString, NotUsed] = {
// Create a buffer between the back-pressure unaware WebSocket and back-pressure aware stream
val (buffer, socketSource) = Source.queue[ByteString](maxBufferSize, OverflowStrategy.fail).preMaterialize()

// fill buffer even before a subscriber is connected, otherwise we might loose elements
// Forward messages from the socket to the buffer
// Start filling the buffer even before the stream is connected to prevent loss of elements
socket.onmessage = { message =>
// The message data should be either an ArrayBuffer or a String
// It should not be a Blob because the socket binaryType was set
val data = message.data match {
case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
case data => ByteString.fromString(data.toString)
}
case buffer: ArrayBuffer => ByteString.apply(TypedArrayBuffer.wrap(buffer))
case data => ByteString.fromString(data.toString)
}

messageSource.offer(data).onComplete {
case Failure(exception) =>
throw exception
case _ =>
buffer.offer(data).onComplete {
case Failure(exception) => throw exception
case _ =>
}
}

socket.onerror = event =>
messageSource.fail(new RuntimeException(s"WebSocket error: ${event.`type`}"))
// Fail the buffer and ultimately the stream with an error if the socket encounters an error
socket.onerror = event => buffer.fail(new RuntimeException(s"WebSocket error: ${event.`type`}"))

// acting on the buffer instead of directly on the subscriber e.g. to make sure pending elements are delivered before completion
// Complete the buffer and ultimately the stream when the socket closes based on the close code
// This should ensure that pending elements are delivered before completion
socket.onclose = event => {
if (event.code == NormalClosure) {
messageSource.complete()
buffer.complete()
} else {
// Complete the subscriber with an error because the socket closed with an error
// Complete with an error because the socket closed with an error
// Parse the error reason as an exception
val bytes = ByteString.fromString(event.reason)
val exception =
exceptionSerializerDeserializeWebSocketException(exceptionSerializer, event.code, requestProtocol, bytes)
messageSource.fail(exception)
buffer.fail(exception)
}
}

var completed = false

override def subscribe(subscriber: Subscriber[_ >: ByteString]): Unit = {
if (!hasSubscription) {
hasSubscription = true

// Configure the subscriber to start the stream
subscriber.onSubscribe(new Subscription {
override def request(n: Long): Unit = {
// pull as many elements as requested, but only as long as element Source has not completed
def pull(): Unit = {
messageSink.pull().onComplete {
case Failure(_: StreamDetachedException) =>
// happens when subscriber requests more items than available or Subscription.request() is called before all requested elements have been onNext'ed

case Failure(exception) =>
subscriber.onError(exception)

case Success(Some(data)) =>
subscriber.onNext(data)
pull()

case Success(None) =>
subscriber.onComplete()
completed = true
}
}
if (!completed) pull()
}
// Close the socket if the subscriber cancels
override def cancel(): Unit = socket.close()
})
} else {
subscriber.onError(new IllegalStateException("This publisher only supports one subscriber"))
}
}
// Close the socket when the buffer completes
buffer.watchCompletion().onComplete(_ => socket.close())

socketSource
}
}

0 comments on commit fa6b910

Please sign in to comment.