Skip to content

Commit

Permalink
Add SuspendApp to main repo
Browse files Browse the repository at this point in the history
  • Loading branch information
serras committed Feb 7, 2025
1 parent 85dc0e0 commit 0e09570
Show file tree
Hide file tree
Showing 16 changed files with 617 additions and 35 deletions.
14 changes: 14 additions & 0 deletions arrow-libs/suspendapp/suspendapp-ktor/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
plugins {
id("arrow.kotlin")
}

kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowFxCoroutines)
api(libs.ktor.server.core)
}
}
}
}
3 changes: 3 additions & 0 deletions arrow-libs/suspendapp/suspendapp-ktor/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Maven publishing configuration
POM_NAME=SuspendApp Ktor
POM_DESCRIPTION=Ktor engine creation as a Resource, which will gracefully shut down when it's finished.
5 changes: 5 additions & 0 deletions arrow-libs/suspendapp/suspendapp-ktor/knit.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
knit.package=arrow.continuations.suspendapp.examples
knit.dir=src/commonTest/kotlin/examples/

test.package=arrow.continuations.suspendapp.examples.test
test.dir=src/commonTest/kotlin/examples/autogenerated/
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package arrow.continuations.ktor

import arrow.fx.coroutines.Resource
import arrow.fx.coroutines.ResourceScope
import io.ktor.server.application.*
import io.ktor.server.engine.*
import kotlin.time.Duration
import kotlin.time.Duration.Companion.milliseconds
import kotlin.time.Duration.Companion.seconds
import kotlinx.coroutines.delay
import kotlinx.io.files.Path
import kotlinx.io.files.SystemFileSystem

/**
* Ktor [ApplicationEngine] as a [Resource]. This [Resource] will gracefully shut down the server
* When we need to shut down a Ktor service we need to properly take into account a _grace_ period
* where we still handle requests instead of immediately cancelling any in-flight requests.
*
* @param factory Application engine for processing the requests
* @param port Server listening port. Default is set to 80
* @param host Host address. Default is set to "0.0.0.0"
* @param watchPaths specifies path substrings that will be watched for automatic reloading
* @param preWait preWait a duration to wait before beginning the stop process. During this time,
* requests will continue to be accepted. This setting is useful to allow time for the container
* to be removed from the load balancer. This is disabled when `io.ktor.development=true`.
* @param grace grace a duration during which already inflight requests are allowed to continue
* before the shutdown process begins.
* @param timeout timeout a duration after which the server will be forcibly shutdown.
* @param module Represents configured and running web application, capable of handling requests.
*/
public suspend fun <
TEngine : ApplicationEngine,
TConfiguration : ApplicationEngine.Configuration,
> ResourceScope.server(
factory: ApplicationEngineFactory<TEngine, TConfiguration>,
port: Int = 80,
host: String = "0.0.0.0",
watchPaths: List<String> = listOf(WORKING_DIRECTORY_PATH),
preWait: Duration = 30.seconds,
grace: Duration = 500.milliseconds,
timeout: Duration = 500.milliseconds,
module: Application.() -> Unit = {},
): EmbeddedServer<TEngine, TConfiguration> =
install({
embeddedServer(factory, host = host, port = port, watchPaths = watchPaths, module = module)
.apply(EmbeddedServer<TEngine, TConfiguration>::start)
}) { engine, _ ->
engine.release(preWait, grace, timeout)
}

/**
* Ktor [ApplicationEngine] as a [Resource]. This [Resource] will gracefully shut down the server
* When we need to shut down a Ktor service we need to properly take into account a _grace_ period
* where we still handle requests instead of immediately cancelling any in-flight requests.
*
* @param factory Application engine for processing the requests
* @param rootConfig definition of the core configuration of the server, including modules, paths,
* and environment details.
* @param preWait preWait a duration to wait before beginning the stop process. During this time,
* requests will continue to be accepted. This setting is useful to allow time for the container
* to be removed from the load balancer. This is disabled when `io.ktor.development=true`.
* @param grace grace a duration during which already inflight requests are allowed to continue
* before the shutdown process begins.
* @param timeout timeout a duration after which the server will be forcibly shutdown.
*/
public suspend fun <
TEngine : ApplicationEngine,
TConfiguration : ApplicationEngine.Configuration,
> ResourceScope.server(
factory: ApplicationEngineFactory<TEngine, TConfiguration>,
rootConfig: ServerConfig,
configure: TConfiguration.() -> Unit = {},
preWait: Duration = 30.seconds,
grace: Duration = 500.milliseconds,
timeout: Duration = 500.milliseconds,
): EmbeddedServer<TEngine, TConfiguration> =
install({
embeddedServer(factory, rootConfig, configure)
.apply(EmbeddedServer<TEngine, TConfiguration>::start)
}) { engine, _ ->
engine.release(preWait, grace, timeout)
}

private suspend fun EmbeddedServer<*, *>.release(
preWait: Duration,
grace: Duration,
timeout: Duration,
) {
if (!application.developmentMode) {
environment.log.info(
"prewait delay of ${preWait.inWholeMilliseconds}ms, turn it off using io.ktor.development=true"
)
delay(preWait.inWholeMilliseconds)
}
environment.log.info("Shutting down HTTP server...")
stop(grace.inWholeMilliseconds, timeout.inWholeMicroseconds)
environment.log.info("HTTP server shutdown!")
}

internal val WORKING_DIRECTORY_PATH: String = SystemFileSystem.resolve(Path(".")).toString()
13 changes: 13 additions & 0 deletions arrow-libs/suspendapp/suspendapp/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
plugins {
id("arrow.kotlin")
}

kotlin {
sourceSets {
commonMain {
dependencies {
api(libs.coroutines.core)
}
}
}
}
2 changes: 2 additions & 0 deletions arrow-libs/suspendapp/suspendapp/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Maven publishing configuration
POM_NAME=SuspendApp
5 changes: 5 additions & 0 deletions arrow-libs/suspendapp/suspendapp/knit.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
knit.package=arrow.continuations.suspendapp.examples
knit.dir=src/commonTest/kotlin/examples/

test.package=arrow.continuations.suspendapp.examples.test
test.dir=src/commonTest/kotlin/examples/autogenerated/
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package arrow.continuations

import kotlin.coroutines.CoroutineContext
import kotlinx.coroutines.CoroutineScope

/** KMP constructor for [Process]. */
public expect fun process(): Process

/**
* [Process] offers a common API to work with our application's process, installing signal handlers,
* shutdown hooks, running scopes in our process (runBlocking), and exiting the process.
*/
@OptIn(ExperimentalStdlibApi::class)
public interface Process : AutoCloseable {
public fun onSigTerm(block: suspend (code: Int) -> Unit)

public fun onSigInt(block: suspend (code: Int) -> Unit)

public fun onShutdown(block: suspend () -> Unit): suspend () -> Unit

/**
* On JVM, and Native this will use kotlinx.coroutines.runBlocking, On NodeJS we need an infinite
* heartbeat to keep main alive. The heartbeat is fast enough that it isn't silently discarded, as
* longer ticks are, but slow enough that we don't interrupt often.
* https://stackoverflow.com/questions/23622051/how-to-forcibly-keep-a-node-js-process-from-terminating
*/
public fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit)

public fun exit(code: Int)

override fun close()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package arrow.continuations

import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.time.Duration
import kotlinx.coroutines.*

/**
* An unsafe blocking edge that wires the [CoroutineScope] (and structured concurrency) to the
* [SuspendApp], such that the [CoroutineScope] gets cancelled when the `App` is requested to
* gracefully shutdown. => `SIGTERM` & `SIGINT` on Native & NodeJS and a ShutdownHook for JVM.
*
* It applies backpressure to the process such that they can gracefully shutdown.
*
* @param context the [CoroutineContext] where [block] will execute. Use [EmptyCoroutineContext] to
* create an `CoroutineDispatcher` for the main thread and run there instead.
* @param timeout the maximum backpressure time that can be applied to the process. This emulates a
* `SIGKILL` command, and after the [timeout] is passed the App will forcefully shut down
* regardless of finalizers.
* @param block the lambda of the actual application.
*/
@OptIn(ExperimentalStdlibApi::class)
public fun SuspendApp(
context: CoroutineContext = Dispatchers.Default,
uncaught: (Throwable) -> Unit = Throwable::printStackTrace,
timeout: Duration = Duration.INFINITE,
process: Process = process(),
block: suspend CoroutineScope.() -> Unit,
): Unit =
process.use { env ->
env.runScope(context) {
val job =
launch(start = CoroutineStart.LAZY) {
try {
block()
env.exit(0)
} catch (_: SuspendAppShutdown) {} catch (e: Throwable) {
uncaught(e)
env.exit(-1)
}
}
val unregister =
env.onShutdown {
withTimeout(timeout) {
job.cancel(SuspendAppShutdown)
job.join()
}
}
job.start()
job.join()
unregister()
}
}

/** Marker type so track shutdown signal */
private object SuspendAppShutdown : CancellationException("SuspendApp shutting down.")
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package arrow.continuations

import kotlin.coroutines.Continuation
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.EmptyCoroutineContext
import kotlin.coroutines.startCoroutine
import kotlin.js.Promise
import kotlin.time.Duration.Companion.hours
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.DelicateCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.promise

public actual fun process(): Process = JsProcess

public object JsProcess : Process {
override fun onShutdown(block: suspend () -> Unit): suspend () -> Unit {
onSigTerm { code -> exitAfter(128 + code) { block() } }
onSigInt { code -> exitAfter(128 + code) { block() } }
return { /* Nothing to unregister */ }
}

override fun onSigTerm(block: suspend (code: Int) -> Unit): Unit = onSignal("SIGTERM") { block(15) }

override fun onSigInt(block: suspend (code: Int) -> Unit): Unit = onSignal("SIGINT") { block(2) }

@OptIn(DelicateCoroutinesApi::class)
@Suppress("UNUSED_PARAMETER")
private fun onSignal(signal: String, block: suspend () -> Unit) {
@Suppress("UNUSED_VARIABLE")
val provide: () -> Promise<Unit> = { GlobalScope.promise { block() } }
js("process.on(signal, function() {\n" + " provide()\n" + "});")
}

private val jobs: MutableList<Job> = mutableListOf()

override fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit) {
val innerJob = Job()
val innerScope = CoroutineScope(innerJob)
suspend {
// An infinite heartbeat to keep main alive.
// The tick is fast enough that it isn't silently discarded, as longer ticks are,
// but slow enough that we don't interrupt often.
// https://stackoverflow.com/questions/23622051/how-to-forcibly-keep-a-node-js-process-from-terminating
val keepAlive: Job =
innerScope.launch {
while (isActive) {
// Schedule a `no-op tick` by returning to main every hour with no actual work but
// just looping.
delay(1.hours)
}
}
runCatching { block(innerScope) }.also { keepAlive.cancelAndJoin() }.getOrThrow()
}
.startCoroutine(Continuation(EmptyCoroutineContext) {})
}

override fun exit(code: Int) {
runCatching { js("process.exit(code)") }
}

override fun close() {
suspend { jobs.forEach { it.cancelAndJoin() } }
.startCoroutine(Continuation(EmptyCoroutineContext) {})
}
}

private inline fun Process.exitAfter(code: Int, block: () -> Unit): Unit =
try {
block()
exit(code)
} catch (e: Throwable) {
e.printStackTrace()
exit(-1)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
package arrow.continuations

import java.util.concurrent.atomic.AtomicBoolean
import kotlin.coroutines.CoroutineContext
import kotlin.system.exitProcess
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.runBlocking
import sun.misc.Signal
import sun.misc.SignalHandler

public actual fun process(): Process = JvmProcess

public object JvmProcess : Process {
override fun onShutdown(block: suspend () -> Unit): suspend () -> Unit {
val isShutdown = AtomicBoolean(false)
fun shutdown() {
if (!isShutdown.getAndSet(true))
runBlocking {
// We don't call exit from ShutdownHook on JVM
try {
block()
} catch (e: Throwable) {
e.printStackTrace()
}
}
}

val hook = Thread(::shutdown, "Arrow-kt SuspendApp JVM ShutdownHook")
Runtime.getRuntime().addShutdownHook(hook)
return {
if (!isShutdown.get()) {
Runtime.getRuntime().removeShutdownHook(hook)
}
}
}

override fun onSigTerm(block: suspend (code: Int) -> Unit): Unit =
addSignalHandler("SIGTERM") { block(15) }

override fun onSigInt(block: suspend (code: Int) -> Unit): Unit =
addSignalHandler("SIGINT") { block(2) }

private fun addSignalHandler(signal: String, action: suspend () -> Unit): Unit =
try {
var handle: SignalHandler? = null
handle =
Signal.handle(Signal(signal)) { prev ->
runBlocking { action() }
if (handle != SignalHandler.SIG_DFL && handle != SignalHandler.SIG_IGN) {
handle?.handle(prev)
}
}
} catch (_: Throwable) {
/* Ignore */
}

override fun runScope(context: CoroutineContext, block: suspend CoroutineScope.() -> Unit): Unit =
runBlocking(context, block)

override fun exit(code: Int): Nothing = exitProcess(code)

override fun close(): Unit = Unit
}
Loading

0 comments on commit 0e09570

Please sign in to comment.