Skip to content

Commit

Permalink
m1l6 - flow
Browse files Browse the repository at this point in the history
  • Loading branch information
BarracudaPff authored and evgnep committed Sep 16, 2023
1 parent 3b998a5 commit bf77f09
Show file tree
Hide file tree
Showing 10 changed files with 750 additions and 1 deletion.
2 changes: 2 additions & 0 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,7 @@ org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -Dfile.encoding=UTF-8
kotlinVersion=1.8.10
coroutinesVersion=1.6.4

jUnitJupiterVersion=5.9.2

jacksonVersion=2.14.1
okhttpVersion=4.9.3
19 changes: 19 additions & 0 deletions m1l6-flows-and-channels/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
plugins {
kotlin("jvm")
}

val coroutinesVersion: String by project
val jUnitJupiterVersion: String by project

dependencies {
implementation(kotlin("stdlib"))
implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion")

testImplementation(kotlin("test-junit"))
testImplementation("org.junit.jupiter:junit-jupiter-api:$jUnitJupiterVersion")
testImplementation("org.junit.jupiter:junit-jupiter-engine:$jUnitJupiterVersion")
}

tasks.test {
useJUnitPlatform()
}
110 changes: 110 additions & 0 deletions m1l6-flows-and-channels/src/test/kotlin/channel/ex1ChannelTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package channel

import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.junit.jupiter.api.Test

class ex1ChannelTest {

@Test
fun test1(): Unit = runBlocking {
val channel = Channel<Int>()

launch {
for (x in 1..5) channel.send(x * x)
channel.close()
}

for (value in channel) {
println(value)
}
println("Done!")
}

@Test
fun test2(): Unit = runBlocking {
val channel = Channel<Int>()

launch {
for (x in 1..5) channel.send(x * x)
}

launch {
for (x in 10..15) channel.send(x * x)
}

launch {
delay(2000)
channel.close()
}

for (value in channel) {
println(value)
}
}

@Test
fun test3(): Unit = runBlocking {
val channel = Channel<Int>()

launch {
for (x in 1..5) channel.send(x * x)
}

launch {
for (x in 10..15) channel.send(x * x)
}

launch {
delay(2000)
channel.close()
}

launch {
for (value in channel) {
println("First consumer: $value")
}
}

launch {
for (value in channel) {
println("Second consumer: $value")
}
}
}

@Test
fun test4(): Unit = runBlocking {
val channel = Channel<Int>(
capacity = 5,
onBufferOverflow = BufferOverflow.SUSPEND
) {
// never call, because onBufferOverflow = SUSPEND
println("Call for value: $it")
}

launch {
for (x in 1..10) {
val value = x * x
channel.send(value)
println("Send value: $value")
}
}

launch {
delay(11000)
channel.close()
}

launch {
for (value in channel) {
println("Consumer: $value")
delay(1000)
}
}
}

}
71 changes: 71 additions & 0 deletions m1l6-flows-and-channels/src/test/kotlin/flow/FlowClasses.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.awaitClose
import kotlinx.coroutines.channels.trySendBlocking
import kotlinx.coroutines.flow.*
import java.time.Instant
import java.util.*
import kotlin.concurrent.schedule

data class Sample(
val serialNumber: String,
val value: Double,
val timestamp: Instant = Instant.now()
)

interface Detector {
fun samples(): Flow<Sample>
}

class CoroutineDetector(
private val serialNumber: String,
private val sampleDistribution: Sequence<Double>,
private val samplePeriod: Long
) : Detector {
override fun samples(): Flow<Sample> =
flow {
val values = sampleDistribution.iterator()
while (true) {
emit(Sample(serialNumber, values.next()))
delay(samplePeriod)
}
}
}

class BlockingDetector(
private val serialNumber: String,
private val sampleDistribution: Sequence<Double>,
private val samplePeriod: Long
) : Detector {
override fun samples(): Flow<Sample> =
flow {
val values = sampleDistribution.iterator()
while (true) {
emit(Sample(serialNumber, values.next()))
Thread.sleep(samplePeriod)
}
}.flowOn(Dispatchers.IO)
}

class CallbackDetector(
private val serialNumber: String,
private val sampleDistribution: Sequence<Double>,
private val samplePeriod: Long
) : Detector {
override fun samples(): Flow<Sample> =
callbackFlow {
val values = sampleDistribution.iterator()

val timer = Timer()
timer.schedule(0L, samplePeriod) {
trySendBlocking(Sample(serialNumber, values.next()))
}
timer.schedule(10_000L) { close() }

awaitClose { timer.cancel() }
}
}

fun <T> Flow<T>.rollingMax(comparator: Comparator<T>): Flow<T> =
runningReduce { max, current -> maxOf(max, current, comparator) }
151 changes: 151 additions & 0 deletions m1l6-flows-and-channels/src/test/kotlin/flow/ex1FlowOperatorsTest.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,151 @@
package flow

import kotlinx.coroutines.*
import kotlinx.coroutines.channels.BufferOverflow
import kotlinx.coroutines.flow.*
import org.junit.jupiter.api.Test

class ex1FlowOperatorsTest {

@Test
fun test1(): Unit = runBlocking {
flowOf(1, 2, 3, 4)
.onEach { println(it) }
.map { it + 1 }
.filter { it % 2 == 0 }
.collect { println("Result number $it") }
}


@OptIn(DelicateCoroutinesApi::class)
private val ApiDispatcher = newSingleThreadContext("Api-Thread")

@OptIn(DelicateCoroutinesApi::class)
private val DbDispatcher = newSingleThreadContext("Db-Thread")

fun <T> Flow<T>.printThreadName(msg: String) =
this.onEach { println("Msg = $msg, thread name = ${Thread.currentThread().name}") }

@Test
fun test2(): Unit = runBlocking {
flowOf(10, 20, 30)
.filter { it % 2 == 0 }
.map {
delay(2000)
it
}
.printThreadName("api call")
.flowOn(ApiDispatcher)
.map { it + 1 }
.printThreadName("db call")
.flowOn(DbDispatcher)
.printThreadName("last operation")
.onEach { println("On each $it") }
.collect()
}

@Test
fun test3(): Unit = runBlocking {
flow {
while (true) {
emit(1)
delay(1000)
emit(2)
delay(1000)
emit(3)
delay(1000)
throw RuntimeException("Custom error!")
}
}
.onStart { println("On start") }
.onCompletion { println(" On completion") }
.catch { println("Catch: ${it.message}") }
.onEach { println("On each: $it") }
.collect { }
}

@Test
fun test4(): Unit = runBlocking {
var sleepIndex = 1
flow {
while (sleepIndex < 3) {
delay(500)
emit(sleepIndex)
}
}
.onEach { println("Send to flow: $it") }
.buffer(3, BufferOverflow.DROP_LATEST)
.onEach { println("Processing : $it") }
.collect {
println("Sleep")
sleepIndex++
delay(2_000)
}
}


fun <T> Flow<T>.zipWithNext(): Flow<Pair<T, T>> = flow {
var prev: T? = null
collect {
if (prev != null) emit(prev!! to it)
prev = it
}
}

@Test
fun test5(): Unit = runBlocking {
flowOf(1, 2, 3, 4)
.zipWithNext()
.collect { println(it) }
}

@Test
fun test6(): Unit = runBlocking {
val coldFlow = flowOf(100, 101, 102, 103, 104, 105).onEach { println("Cold: $it") }

launch { coldFlow.collect() }
launch { coldFlow.collect() }

val hotFlow = flowOf(200, 201, 202, 203, 204, 205)
.onEach { println("Hot: $it") }
.shareIn(this, SharingStarted.Lazily)

launch { hotFlow.collect() }
launch { hotFlow.collect() }

delay(500)
coroutineContext.cancelChildren()
}

@Test
fun test7(): Unit = runBlocking {
val list = flow {
emit(1)
delay(100)
emit(2)
delay(100)
}
.onEach { println("$it") }
.toList()

println("List: $list")
}

@Test
fun test8(): Unit = runBlocking {
val list = flow {
var index = 0
// If there is an infinite loop here, while (true)
// then we will never output to the console
// println("List: $list")
while (index < 10) {
emit(index++)
delay(100)
}
}
.onEach { println("$it") }
.toList()

println("List: $list")
}
}
Loading

0 comments on commit bf77f09

Please sign in to comment.