From b4bafc9f4736a4f533103a609fdc793a4ae39a31 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Sun, 5 May 2024 15:43:31 -0500 Subject: [PATCH 1/2] wip --- buildSrc/build.gradle.kts | 4 +- src/main/kotlin/kweb/Kweb.kt | 26 ++-- .../kotlin/kweb/client/ClientConnection.kt | 13 +- .../kotlin/kweb/client/RemoteClientState.kt | 15 ++- .../kweb/config/KwebDefaultConfiguration.kt | 2 +- .../kweb/client/ClientConnectionTest.kt | 2 +- src/test/kotlin/kweb/memory/memoryLeak.kt | 118 ++++++++++++++++++ 7 files changed, 158 insertions(+), 22 deletions(-) create mode 100644 src/test/kotlin/kweb/memory/memoryLeak.kt diff --git a/buildSrc/build.gradle.kts b/buildSrc/build.gradle.kts index 201b589ca..04082f1eb 100644 --- a/buildSrc/build.gradle.kts +++ b/buildSrc/build.gradle.kts @@ -16,8 +16,8 @@ dependencies { // files in the project. // Use their Maven coordinates (plus versions), not Gradle plugin IDs! // This should be the only place that Gradle plugin versions are defined, so they are aligned across all build scripts - implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.9.22") - implementation("org.jetbrains.kotlin:kotlin-serialization:1.9.22") + implementation("org.jetbrains.kotlin:kotlin-gradle-plugin:1.9.23") + implementation("org.jetbrains.kotlin:kotlin-serialization:1.9.23") } val gradleJvmTarget = 17 diff --git a/src/main/kotlin/kweb/Kweb.kt b/src/main/kotlin/kweb/Kweb.kt index e4fa451d9..1c1e45ce5 100755 --- a/src/main/kotlin/kweb/Kweb.kt +++ b/src/main/kotlin/kweb/Kweb.kt @@ -271,8 +271,8 @@ class Kweb private constructor( val remoteClientState = clientState.getIfPresent(hello.id).ensureSessionExists(this, hello.id) - val currentCC = remoteClientState.clientConnection - remoteClientState.clientConnection = when (currentCC) { + val currentCC = remoteClientState.getClientConnection() + val newClientConnection = when (currentCC) { is Caching -> { val webSocketClientConnection = ClientConnection.WebSocket(this) currentCC.read().forEach { @@ -293,6 +293,7 @@ class Kweb private constructor( ws } } + remoteClientState.updateClientConnection(newClientConnection) try { for (frame in incoming) { @@ -340,7 +341,7 @@ class Kweb private constructor( } } finally { logger.info("WS session disconnected for client id: ${remoteClientState.id}") - remoteClientState.clientConnection = Caching() + remoteClientState.updateClientConnection(Caching("After WS disconnection")) } } @@ -365,12 +366,8 @@ class Kweb private constructor( val httpRequestInfo = HttpRequestInfo(call.request) - - //this doesn't work. I get this error when running the todo Demo - //Caused by: java.lang.IllegalStateException: Client id Nb9_U7:eJ5dw4 not found - //The debugger says that the remoteClientState ID here matches the clientPrefix and the kwebSessionID from a few lines ago. val remoteClientState = clientState.get(kwebSessionId) { - RemoteClientState(id = kwebSessionId, clientConnection = Caching()) + RemoteClientState(id = kwebSessionId, initialClientConnection = Caching("Initial render")) } @@ -396,7 +393,7 @@ class Kweb private constructor( } catch (e: Exception) { logger.error("Exception thrown building page", e) } - logger.debug { "Outbound message queue size after buildPage is ${(remoteClientState.clientConnection as Caching).queueSize()}" } + logger.debug { "Outbound message queue size after buildPage is ${(remoteClientState.getClientConnection() as Caching).queueSize()}" } } } else { try { @@ -404,7 +401,7 @@ class Kweb private constructor( } catch (e: Exception) { logger.error("Exception thrown building page", e) } - logger.debug { "Outbound message queue size after buildPage is ${(remoteClientState.clientConnection as Caching).queueSize()}" } + logger.debug { "Outbound message queue size after buildPage is ${(remoteClientState.getClientConnection() as Caching).queueSize()}" } } for (plugin in plugins) { //this code block looks a little funny now, but I still think moving the message creation out of Kweb.callJs() was the right move @@ -418,11 +415,12 @@ class Kweb private constructor( webBrowser.htmlDocument.set(null) // Don't think this webBrowser will be used again, but not going to risk it - val initialCachedMessages = remoteClientState.clientConnection as Caching + val initialCachedMessages = remoteClientState.getClientConnection() as Caching - remoteClientState.clientConnection = Caching() + // TODO: Verify that this is correct + remoteClientState.updateClientConnection(Caching("Awaiting WS connection cache")) - val initialMessages = initialCachedMessages.read()//the initialCachedMessages queue can only be read once + val initialMessages = initialCachedMessages.read() val cachedFunctions = mutableListOf() val cachedIds = mutableListOf() @@ -504,7 +502,7 @@ class Kweb private constructor( for (client in clientState.asMap().values) { val refreshCall = FunctionCall(js = "window.location.reload(true);") val message = Server2ClientMessage(client.id, refreshCall) - client.clientConnection.send(Json.encodeToString(message)) + client.getClientConnection().send(Json.encodeToString(message)) } } } diff --git a/src/main/kotlin/kweb/client/ClientConnection.kt b/src/main/kotlin/kweb/client/ClientConnection.kt index 5d3f1d4e1..d131f1cdd 100755 --- a/src/main/kotlin/kweb/client/ClientConnection.kt +++ b/src/main/kotlin/kweb/client/ClientConnection.kt @@ -46,9 +46,13 @@ sealed class ClientConnection { sendBuffer.close() } } + + override fun toString(): String { + return "WebSocket()" + } } - class Caching : ClientConnection() { + class Caching(val description : String) : ClientConnection() { private val queue = ConcurrentLinkedQueue() private val lock = ReentrantLock() private val isRead = AtomicBoolean(false) @@ -58,7 +62,7 @@ sealed class ClientConnection { if (isRead.get()) { error("Can't write to queue after it has been read") } else { - logger.debug("Caching '$message' as websocket isn't yet available") + logger.debug("Caching \"${message.take(20)}...\" in $description") queue.add(message) } } @@ -82,7 +86,10 @@ sealed class ClientConnection { return queue.size } } - } + override fun toString(): String { + return "Caching($description)" + } + } } \ No newline at end of file diff --git a/src/main/kotlin/kweb/client/RemoteClientState.kt b/src/main/kotlin/kweb/client/RemoteClientState.kt index 0e7c61a87..e7f61b14c 100755 --- a/src/main/kotlin/kweb/client/RemoteClientState.kt +++ b/src/main/kotlin/kweb/client/RemoteClientState.kt @@ -5,14 +5,27 @@ import kotlinx.serialization.json.Json import kotlinx.serialization.json.JsonElement import kweb.DebugInfo import kweb.util.random +import mu.two.KotlinLogging import java.time.Instant import java.util.concurrent.ConcurrentHashMap -data class RemoteClientState(val id: String, @Volatile var clientConnection: ClientConnection, +private var logger = KotlinLogging.logger {} + +data class RemoteClientState(val id: String, val initialClientConnection: ClientConnection, val eventHandlers: MutableMap Unit> = HashMap(), val onCloseHandlers : ConcurrentHashMap Unit> = ConcurrentHashMap(), val debugTokens: MutableMap = HashMap(), var lastModified: Instant = Instant.now(), var onMessageFunction: ((data: JsonElement?) -> Unit)? = null) { + + private @Volatile var clientConnection = initialClientConnection + + fun getClientConnection() = clientConnection + + fun updateClientConnection(newClientConnection: ClientConnection) { + logger.debug { "Updating client connection from $clientConnection to $newClientConnection" } + clientConnection = newClientConnection + } + fun send(message: Server2ClientMessage) { clientConnection.send(Json.encodeToString(message)) } diff --git a/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt b/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt index f0a455920..65c7888d2 100644 --- a/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt +++ b/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt @@ -17,5 +17,5 @@ open class KwebDefaultConfiguration : KwebConfiguration() { override val clientStateTimeout: Duration = Accessor.getProperty("kweb.client.state.timeout")?.let { Duration.parse(it) } - ?: Duration.ofHours(1) + ?: Duration.ofMinutes(1) } \ No newline at end of file diff --git a/src/test/kotlin/kweb/client/ClientConnectionTest.kt b/src/test/kotlin/kweb/client/ClientConnectionTest.kt index 07ce9eb21..5b071b648 100644 --- a/src/test/kotlin/kweb/client/ClientConnectionTest.kt +++ b/src/test/kotlin/kweb/client/ClientConnectionTest.kt @@ -5,7 +5,7 @@ import io.kotest.core.spec.style.FunSpec import io.kotest.matchers.shouldBe class ClientConnectionTest : FunSpec({ - val caching = ClientConnection.Caching() + val caching = ClientConnection.Caching("test") test("Adding elements to the queue") { caching.send("Message 1") diff --git a/src/test/kotlin/kweb/memory/memoryLeak.kt b/src/test/kotlin/kweb/memory/memoryLeak.kt new file mode 100644 index 000000000..5b8511cbf --- /dev/null +++ b/src/test/kotlin/kweb/memory/memoryLeak.kt @@ -0,0 +1,118 @@ +package kweb.memory + +import com.ibm.icu.text.DecimalFormat +import kotlinx.coroutines.* +import kweb.* +import kweb.plugins.fomanticUI.fomantic +import kweb.plugins.fomanticUI.fomanticUIPlugin +import kweb.state.KVar +import kweb.state.render +import kotlin.random.Random +import kotlin.time.Duration.Companion.milliseconds +import kotlin.time.Duration.Companion.seconds + +class MemoryTestSever(private val port: Int = 8080) : AutoCloseable { + + private val scope = CoroutineScope(context = Dispatchers.IO) + private lateinit var server: Kweb + private lateinit var webServerMemoryLoadJob: Job + private lateinit var randomDataJob: Job + + suspend fun start() { + val runtime = Runtime.getRuntime() + + var counter = 0 + val randomDataList = (0..100).map { KVar(initialValue = "") } + randomDataJob = scope.launch { + while (isActive) { + randomDataList.forEach { randomData -> + randomData.value = Random.nextDouble( + from = -Double.MAX_VALUE, + until = Double.MAX_VALUE + ).toString() + } + if (counter % 10 == 0) + MemoryLoad( + max = runtime.maxMemory(), + free = runtime.freeMemory(), + total = runtime.totalMemory() + ).printMemoryLoad() + + System.gc() + delay(duration = 100.milliseconds) + } + } + + server = Kweb( + port = port, + debug = false, + plugins = listOf(fomanticUIPlugin) + ) { + doc.body { + route { + path(template = "/") { + div(attributes = fomantic.field) { + label().addText(value = "Random Double") + randomDataList.forEach { randomData -> + div { + val memoryLeak = true + render(randomData) { randomData: String -> + + if(memoryLeak) { + /* With memory Leak */ + input(type = InputType.text) { element -> + element.setReadOnly(true) + } + } else { + /* without memory leak */ + label().text(randomData) + } + } + } + } + } + } + } + } + } + } + + override fun close() { + webServerMemoryLoadJob.cancel() + randomDataJob.cancel() + scope.cancel() + server.close() + } +} + + +data class MemoryLoad( + val max: Long = 0L, + val free: Long = 0L, + val total: Long = 0L +) { + companion object { + val format = DecimalFormat("0.00") + private val MEGA = (1024 * 1024).toLong() + } + + val used: Long by lazy { total - free } + + fun printMemoryLoad() { + println( + "Memory Load: ${format.format(max / MEGA)}MB max, ${format.format(free / MEGA)}MB free, ${ + format.format( + total / MEGA + ) + }MB total, ${format.format(used / MEGA)}MB used" + ) + } +} + + +fun main() = runBlocking { + MemoryTestSever().use { server -> + server.start() + delay(duration = Int.MAX_VALUE.seconds) + } +} \ No newline at end of file From 2f61336a340671ddde5dff401c16f02940a42197 Mon Sep 17 00:00:00 2001 From: Ian Clarke Date: Mon, 6 May 2024 16:20:37 -0500 Subject: [PATCH 2/2] wip --- src/main/kotlin/kweb/Kweb.kt | 135 +++++++++--------- .../kweb/config/KwebDefaultConfiguration.kt | 2 +- src/main/kotlin/kweb/state/KVal.kt | 10 +- src/main/kotlin/kweb/state/render.kt | 2 +- src/main/kotlin/kweb/util/CallbackOwner.kt | 55 +++++++ src/test/kotlin/kweb/memory/memoryLeak.kt | 4 +- src/test/resources/logback-test.xml | 2 +- 7 files changed, 133 insertions(+), 77 deletions(-) create mode 100644 src/main/kotlin/kweb/util/CallbackOwner.kt diff --git a/src/main/kotlin/kweb/Kweb.kt b/src/main/kotlin/kweb/Kweb.kt index 1c1e45ce5..84755a889 100755 --- a/src/main/kotlin/kweb/Kweb.kt +++ b/src/main/kotlin/kweb/Kweb.kt @@ -251,17 +251,6 @@ class Kweb private constructor( } } - private suspend fun RemoteClientState?.ensureSessionExists( - sock: DefaultWebSocketSession, - sessionId: String - ): RemoteClientState { - if (this == null) { - sock.close(CloseReason(CloseReason.Codes.NOT_CONSISTENT, "Session not found. Please reload")) - error("Unable to find server state corresponding to client id ${sessionId}") - } - return this - } - private suspend fun DefaultWebSocketSession.listenForWebsocketConnection() { val hello = Json.decodeFromString((incoming.receive() as Text).readText()) @@ -269,79 +258,89 @@ class Kweb private constructor( error("First message from client isn't 'hello'") } - val remoteClientState = clientState.getIfPresent(hello.id).ensureSessionExists(this, hello.id) + val remoteClientState = clientState.getIfPresent(hello.id) - val currentCC = remoteClientState.getClientConnection() - val newClientConnection = when (currentCC) { - is Caching -> { - val webSocketClientConnection = ClientConnection.WebSocket(this) - currentCC.read().forEach { - webSocketClientConnection.send(it) - } - remoteClientState.addCloseHandler { - webSocketClientConnection.close(CloseReason(4002, "RemoteClientState closed by server, likely due to cache expiry")) + if (remoteClientState == null) { + logger.warn("Client id ${hello.id} not found, closing connection") + this.close(CloseReason(4000, "Client id ${hello.id} not found")) + } else { + + val newClientConnection = when (val currentCC = remoteClientState.getClientConnection()) { + is Caching -> { + val webSocketClientConnection = ClientConnection.WebSocket(this) + currentCC.read().forEach { + webSocketClientConnection.send(it) + } + remoteClientState.addCloseHandler { + webSocketClientConnection.close( + CloseReason( + 4002, + "RemoteClientState closed by server, likely due to cache expiry" + ) + ) + } + webSocketClientConnection } - webSocketClientConnection - } - is ClientConnection.WebSocket -> { - currentCC.close(CloseReason(4001, "Client reconnected via another connection")) - val ws = ClientConnection.WebSocket(this) - remoteClientState.addCloseHandler { - ws.close(CloseReason(4002, "RemoteClientState closed by server, likely due to cache expiry")) + is ClientConnection.WebSocket -> { + currentCC.close(CloseReason(4001, "Client reconnected via another connection")) + val ws = ClientConnection.WebSocket(this) + remoteClientState.addCloseHandler { + ws.close(CloseReason(4002, "RemoteClientState closed by server, likely due to cache expiry")) + } + ws } - ws } - } - remoteClientState.updateClientConnection(newClientConnection) - - try { - for (frame in incoming) { + remoteClientState.updateClientConnection(newClientConnection) - logger.debug { "WebSocket frame of type ${frame.frameType} received" } + try { + for (frame in incoming) { - // Retrieve the clientState so that it doesn't expire, replace it if it - // has expired. - clientState.get(hello.id) { remoteClientState } + logger.debug { "WebSocket frame of type ${frame.frameType} received" } - try { - logger.debug { "Message received from client" } - - if (frame is Text) { - val message = Json.decodeFromString(frame.readText()) - - logger.debug { "Message received: $message" } - if (message.error != null) { - handleError(message.error, remoteClientState) - } else { - when { - message.callback != null -> { - val (resultId, result) = message.callback - val resultHandler = remoteClientState.eventHandlers[resultId] - ?: error("No resultHandler for $resultId, for client ${remoteClientState.id}") - resultHandler(result) - } + // Retrieve the clientState so that it doesn't expire, replace it if it + // has expired. + clientState.get(hello.id) { remoteClientState } - message.keepalive -> { - logger.debug { "keepalive received from client ${hello.id}" } - } + try { + logger.debug { "Message received from client" } + + if (frame is Text) { + val message = Json.decodeFromString(frame.readText()) + + logger.debug { "Message received: $message" } + if (message.error != null) { + handleError(message.error, remoteClientState) + } else { + when { + message.callback != null -> { + val (resultId, result) = message.callback + val resultHandler = remoteClientState.eventHandlers[resultId] + ?: error("No resultHandler for $resultId, for client ${remoteClientState.id}") + resultHandler(result) + } + + message.keepalive -> { + logger.debug { "keepalive received from client ${hello.id}" } + } + + message.onMessageData != null -> { + val data = message.onMessageData + remoteClientState.onMessageFunction!!.invoke(data) + } - message.onMessageData != null -> { - val data = message.onMessageData - remoteClientState.onMessageFunction!!.invoke(data) } - } } + } catch (e: Exception) { + logger.error("Exception while receiving websocket message", e) + kwebConfig.onWebsocketMessageHandlingFailure(e) } - } catch (e: Exception) { - logger.error("Exception while receiving websocket message", e) - kwebConfig.onWebsocketMessageHandlingFailure(e) } + } finally { + logger.info("WS session disconnected for client id: ${remoteClientState.id}") + remoteClientState.updateClientConnection(Caching("After WS disconnection")) } - } finally { - logger.info("WS session disconnected for client id: ${remoteClientState.id}") - remoteClientState.updateClientConnection(Caching("After WS disconnection")) } } diff --git a/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt b/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt index 65c7888d2..f0a455920 100644 --- a/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt +++ b/src/main/kotlin/kweb/config/KwebDefaultConfiguration.kt @@ -17,5 +17,5 @@ open class KwebDefaultConfiguration : KwebConfiguration() { override val clientStateTimeout: Duration = Accessor.getProperty("kweb.client.state.timeout")?.let { Duration.parse(it) } - ?: Duration.ofMinutes(1) + ?: Duration.ofHours(1) } \ No newline at end of file diff --git a/src/main/kotlin/kweb/state/KVal.kt b/src/main/kotlin/kweb/state/KVal.kt index f5957f0cd..555c2636e 100755 --- a/src/main/kotlin/kweb/state/KVal.kt +++ b/src/main/kotlin/kweb/state/KVal.kt @@ -1,5 +1,6 @@ package kweb.state +import kweb.util.CallbackOwner import kweb.util.random import mu.two.KotlinLogging import java.util.concurrent.ConcurrentHashMap @@ -11,7 +12,7 @@ private val logger = KotlinLogging.logger {} * A KVal is a **read-only** observable container for a value of type T. These are typically created by * [KVal.map] or [KVar.map], but can also be created directly. */ -open class KVal(value: T) : AutoCloseable{ +open class KVal(private val kvalOwner : CallbackOwner, value: T) : AutoCloseable{ @Volatile protected var closeReason: CloseReason? = null @@ -24,10 +25,11 @@ open class KVal(value: T) : AutoCloseable{ /** * Add a listener to this KVar. The listener will be called whenever the [value] property changes. */ - fun addListener(listener: (T, T) -> Unit): Long { + fun addListener(owner : CallbackOwner = kvalOwner, listener: (T, T) -> Unit) : Long { verifyNotClosed("add a listener") val handle = random.nextLong() listeners[handle] = listener + owner.onClose(listeners, handle) return handle } @@ -59,11 +61,11 @@ open class KVal(value: T) : AutoCloseable{ * * For bi-directional mappings, see [KVar.map]. */ - fun map(mapper: (T) -> O): KVal { + fun map(owner: CallbackOwner = kvalOwner, mapper: (T) -> O): KVal { if (isClosed) { error("Can't map this var because it was closed due to $closeReason") } - val mappedKVal = KVal(mapper(value)) + val mappedKVal = KVal(owner.child("map"), mapper(value)) val handle = addListener { old, new -> if (!isClosed && !mappedKVal.isClosed) { if (old != new) { diff --git a/src/main/kotlin/kweb/state/render.kt b/src/main/kotlin/kweb/state/render.kt index 475b6d64d..07740b82c 100755 --- a/src/main/kotlin/kweb/state/render.kt +++ b/src/main/kotlin/kweb/state/render.kt @@ -101,10 +101,10 @@ fun ElementCreator<*>.render( renderLoop() this.onCleanup(true) { + value.removeListener(listenerHandle) previousElementCreatorLock.withLock { previousElementCreator.getAndSet(null)?.cleanup() } - value.removeListener(listenerHandle) } return renderFragment diff --git a/src/main/kotlin/kweb/util/CallbackOwner.kt b/src/main/kotlin/kweb/util/CallbackOwner.kt new file mode 100644 index 000000000..9fa9e7847 --- /dev/null +++ b/src/main/kotlin/kweb/util/CallbackOwner.kt @@ -0,0 +1,55 @@ +package kweb.util + +import java.util.concurrent.ConcurrentHashMap +import java.util.concurrent.ConcurrentLinkedQueue + +class CallbackOwner(val label : String, val parent : CallbackOwner? = null) { + private val closeListeners = ConcurrentLinkedQueue<() -> Unit>() + private val closeMaps = ConcurrentLinkedQueue, Long>>() + + fun onClose(map : MutableMap, handle : Long) { + closeMaps.add(map to handle) + } + + fun onClose(cb : () -> Unit) { + closeListeners.add(cb) + } + + fun close() { + closeListeners.forEach { it() } + closeListeners.clear() + closeMaps.forEach { (map, handle) -> + map.remove(handle) + } + closeMaps.clear() + } + + fun child(label : String) : CallbackOwner { + val child = CallbackOwner(label, this) + onClose { + child.close() + } + return child + } +} + +class CallbackMap(private val owner : CallbackOwner) { + private val map = ConcurrentHashMap() + + fun addListener(listener : L) : Long { + val handle = random.nextLong() + map[handle] = listener + owner.onClose(map, handle) + return handle + } + + fun removeListener(handle : Long) { + map.remove(handle) + } + + fun clear() { + map.clear() + } + + val listeners : Collection get() = map.values +} \ No newline at end of file diff --git a/src/test/kotlin/kweb/memory/memoryLeak.kt b/src/test/kotlin/kweb/memory/memoryLeak.kt index 5b8511cbf..e76031ea6 100644 --- a/src/test/kotlin/kweb/memory/memoryLeak.kt +++ b/src/test/kotlin/kweb/memory/memoryLeak.kt @@ -22,7 +22,7 @@ class MemoryTestSever(private val port: Int = 8080) : AutoCloseable { val runtime = Runtime.getRuntime() var counter = 0 - val randomDataList = (0..100).map { KVar(initialValue = "") } + val randomDataList = (0..10).map { KVar(initialValue = "") } randomDataJob = scope.launch { while (isActive) { randomDataList.forEach { randomData -> @@ -39,7 +39,7 @@ class MemoryTestSever(private val port: Int = 8080) : AutoCloseable { ).printMemoryLoad() System.gc() - delay(duration = 100.milliseconds) + delay(duration = 5.seconds) } } diff --git a/src/test/resources/logback-test.xml b/src/test/resources/logback-test.xml index 854409cb3..0ef31f147 100644 --- a/src/test/resources/logback-test.xml +++ b/src/test/resources/logback-test.xml @@ -8,7 +8,7 @@ - +