diff --git a/gradle.properties b/gradle.properties index 40f646a..f95ab0c 100644 --- a/gradle.properties +++ b/gradle.properties @@ -5,5 +5,7 @@ org.gradle.jvmargs=-Xmx2g -XX:MaxMetaspaceSize=512m -Dfile.encoding=UTF-8 kotlinVersion=1.8.10 coroutinesVersion=1.6.4 +jUnitJupiterVersion=5.9.2 + jacksonVersion=2.14.1 okhttpVersion=4.9.3 \ No newline at end of file diff --git a/m1l6-flows-and-channels/build.gradle.kts b/m1l6-flows-and-channels/build.gradle.kts new file mode 100644 index 0000000..07953b4 --- /dev/null +++ b/m1l6-flows-and-channels/build.gradle.kts @@ -0,0 +1,19 @@ +plugins { + kotlin("jvm") +} + +val coroutinesVersion: String by project +val jUnitJupiterVersion: String by project + +dependencies { + implementation(kotlin("stdlib")) + implementation("org.jetbrains.kotlinx:kotlinx-coroutines-core:$coroutinesVersion") + + testImplementation(kotlin("test-junit")) + testImplementation("org.junit.jupiter:junit-jupiter-api:$jUnitJupiterVersion") + testImplementation("org.junit.jupiter:junit-jupiter-engine:$jUnitJupiterVersion") +} + +tasks.test { + useJUnitPlatform() +} diff --git a/m1l6-flows-and-channels/src/test/kotlin/channel/ex1ChannelTest.kt b/m1l6-flows-and-channels/src/test/kotlin/channel/ex1ChannelTest.kt new file mode 100644 index 0000000..a51b791 --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/channel/ex1ChannelTest.kt @@ -0,0 +1,110 @@ +package channel + +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.channels.Channel +import kotlinx.coroutines.delay +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +class ex1ChannelTest { + + @Test + fun test1(): Unit = runBlocking { + val channel = Channel() + + launch { + for (x in 1..5) channel.send(x * x) + channel.close() + } + + for (value in channel) { + println(value) + } + println("Done!") + } + + @Test + fun test2(): Unit = runBlocking { + val channel = Channel() + + launch { + for (x in 1..5) channel.send(x * x) + } + + launch { + for (x in 10..15) channel.send(x * x) + } + + launch { + delay(2000) + channel.close() + } + + for (value in channel) { + println(value) + } + } + + @Test + fun test3(): Unit = runBlocking { + val channel = Channel() + + launch { + for (x in 1..5) channel.send(x * x) + } + + launch { + for (x in 10..15) channel.send(x * x) + } + + launch { + delay(2000) + channel.close() + } + + launch { + for (value in channel) { + println("First consumer: $value") + } + } + + launch { + for (value in channel) { + println("Second consumer: $value") + } + } + } + + @Test + fun test4(): Unit = runBlocking { + val channel = Channel( + capacity = 5, + onBufferOverflow = BufferOverflow.SUSPEND + ) { + // never call, because onBufferOverflow = SUSPEND + println("Call for value: $it") + } + + launch { + for (x in 1..10) { + val value = x * x + channel.send(value) + println("Send value: $value") + } + } + + launch { + delay(11000) + channel.close() + } + + launch { + for (value in channel) { + println("Consumer: $value") + delay(1000) + } + } + } + +} \ No newline at end of file diff --git a/m1l6-flows-and-channels/src/test/kotlin/flow/FlowClasses.kt b/m1l6-flows-and-channels/src/test/kotlin/flow/FlowClasses.kt new file mode 100644 index 0000000..4713edc --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/flow/FlowClasses.kt @@ -0,0 +1,71 @@ +package flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.awaitClose +import kotlinx.coroutines.channels.trySendBlocking +import kotlinx.coroutines.flow.* +import java.time.Instant +import java.util.* +import kotlin.concurrent.schedule + +data class Sample( + val serialNumber: String, + val value: Double, + val timestamp: Instant = Instant.now() +) + +interface Detector { + fun samples(): Flow +} + +class CoroutineDetector( + private val serialNumber: String, + private val sampleDistribution: Sequence, + private val samplePeriod: Long +) : Detector { + override fun samples(): Flow = + flow { + val values = sampleDistribution.iterator() + while (true) { + emit(Sample(serialNumber, values.next())) + delay(samplePeriod) + } + } +} + +class BlockingDetector( + private val serialNumber: String, + private val sampleDistribution: Sequence, + private val samplePeriod: Long +) : Detector { + override fun samples(): Flow = + flow { + val values = sampleDistribution.iterator() + while (true) { + emit(Sample(serialNumber, values.next())) + Thread.sleep(samplePeriod) + } + }.flowOn(Dispatchers.IO) +} + +class CallbackDetector( + private val serialNumber: String, + private val sampleDistribution: Sequence, + private val samplePeriod: Long +) : Detector { + override fun samples(): Flow = + callbackFlow { + val values = sampleDistribution.iterator() + + val timer = Timer() + timer.schedule(0L, samplePeriod) { + trySendBlocking(Sample(serialNumber, values.next())) + } + timer.schedule(10_000L) { close() } + + awaitClose { timer.cancel() } + } +} + +fun Flow.rollingMax(comparator: Comparator): Flow = + runningReduce { max, current -> maxOf(max, current, comparator) } \ No newline at end of file diff --git a/m1l6-flows-and-channels/src/test/kotlin/flow/ex1FlowOperatorsTest.kt b/m1l6-flows-and-channels/src/test/kotlin/flow/ex1FlowOperatorsTest.kt new file mode 100644 index 0000000..508c07c --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/flow/ex1FlowOperatorsTest.kt @@ -0,0 +1,151 @@ +package flow + +import kotlinx.coroutines.* +import kotlinx.coroutines.channels.BufferOverflow +import kotlinx.coroutines.flow.* +import org.junit.jupiter.api.Test + +class ex1FlowOperatorsTest { + + @Test + fun test1(): Unit = runBlocking { + flowOf(1, 2, 3, 4) + .onEach { println(it) } + .map { it + 1 } + .filter { it % 2 == 0 } + .collect { println("Result number $it") } + } + + + @OptIn(DelicateCoroutinesApi::class) + private val ApiDispatcher = newSingleThreadContext("Api-Thread") + + @OptIn(DelicateCoroutinesApi::class) + private val DbDispatcher = newSingleThreadContext("Db-Thread") + + fun Flow.printThreadName(msg: String) = + this.onEach { println("Msg = $msg, thread name = ${Thread.currentThread().name}") } + + @Test + fun test2(): Unit = runBlocking { + flowOf(10, 20, 30) + .filter { it % 2 == 0 } + .map { + delay(2000) + it + } + .printThreadName("api call") + .flowOn(ApiDispatcher) + .map { it + 1 } + .printThreadName("db call") + .flowOn(DbDispatcher) + .printThreadName("last operation") + .onEach { println("On each $it") } + .collect() + } + + @Test + fun test3(): Unit = runBlocking { + flow { + while (true) { + emit(1) + delay(1000) + emit(2) + delay(1000) + emit(3) + delay(1000) + throw RuntimeException("Custom error!") + } + } + .onStart { println("On start") } + .onCompletion { println(" On completion") } + .catch { println("Catch: ${it.message}") } + .onEach { println("On each: $it") } + .collect { } + } + + @Test + fun test4(): Unit = runBlocking { + var sleepIndex = 1 + flow { + while (sleepIndex < 3) { + delay(500) + emit(sleepIndex) + } + } + .onEach { println("Send to flow: $it") } + .buffer(3, BufferOverflow.DROP_LATEST) + .onEach { println("Processing : $it") } + .collect { + println("Sleep") + sleepIndex++ + delay(2_000) + } + } + + + fun Flow.zipWithNext(): Flow> = flow { + var prev: T? = null + collect { + if (prev != null) emit(prev!! to it) + prev = it + } + } + + @Test + fun test5(): Unit = runBlocking { + flowOf(1, 2, 3, 4) + .zipWithNext() + .collect { println(it) } + } + + @Test + fun test6(): Unit = runBlocking { + val coldFlow = flowOf(100, 101, 102, 103, 104, 105).onEach { println("Cold: $it") } + + launch { coldFlow.collect() } + launch { coldFlow.collect() } + + val hotFlow = flowOf(200, 201, 202, 203, 204, 205) + .onEach { println("Hot: $it") } + .shareIn(this, SharingStarted.Lazily) + + launch { hotFlow.collect() } + launch { hotFlow.collect() } + + delay(500) + coroutineContext.cancelChildren() + } + + @Test + fun test7(): Unit = runBlocking { + val list = flow { + emit(1) + delay(100) + emit(2) + delay(100) + } + .onEach { println("$it") } + .toList() + + println("List: $list") + } + + @Test + fun test8(): Unit = runBlocking { + val list = flow { + var index = 0 + // If there is an infinite loop here, while (true) + // then we will never output to the console + // println("List: $list") + while (index < 10) { + emit(index++) + delay(100) + } + } + .onEach { println("$it") } + .toList() + + println("List: $list") + } +} \ No newline at end of file diff --git a/m1l6-flows-and-channels/src/test/kotlin/flow/ex2FlowVsSequenceTest.kt b/m1l6-flows-and-channels/src/test/kotlin/flow/ex2FlowVsSequenceTest.kt new file mode 100644 index 0000000..430e102 --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/flow/ex2FlowVsSequenceTest.kt @@ -0,0 +1,51 @@ +package flow + +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.Flow +import kotlinx.coroutines.flow.flow +import kotlinx.coroutines.launch +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test + +class ex2FlowVsSequenceTest { + + private fun simpleSequence(): Sequence = sequence { + for (i in 1..5) { +// delay(1000) // can't use it here + Thread.sleep(1000) + yield(i) + } + } + + private fun simpleFlow(): Flow = flow { + for (i in 1..5) { + delay(1000) + emit(i) + } + } + + @Test + fun sequenceTest(): Unit = runBlocking { + launch { + for (k in 1..5) { + println("I'm not blocked $k") + delay(1000) + } + } + simpleSequence().forEach { println(it) } + } + + @Test + fun flowTest(): Unit = runBlocking { + launch { + for (k in 1..5) { + println("I'm not blocked $k") + delay(1000) + } + } + simpleFlow() + .collect { println(it) } + + println("Flow end") + } +} \ No newline at end of file diff --git a/m1l6-flows-and-channels/src/test/kotlin/flow/ex3FlowTest.kt b/m1l6-flows-and-channels/src/test/kotlin/flow/ex3FlowTest.kt new file mode 100644 index 0000000..8cedc93 --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/flow/ex3FlowTest.kt @@ -0,0 +1,128 @@ +package flow + +import kotlinx.coroutines.cancelChildren +import kotlinx.coroutines.delay +import kotlinx.coroutines.flow.* +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import java.time.Instant +import kotlin.random.Random + +class ex3FlowTest { + private fun detectors() : List { + val random = Random.Default + val seq = sequence { + while (true) { + yield(random.nextDouble()) + } + } + + return listOf( + CoroutineDetector("coroutine", seq, 500L), + BlockingDetector("blocking", seq, 800L), + CallbackDetector("callback", seq, 2_000L) + ) + } + + @Test + fun test1(): Unit = runBlocking { + // сырые данные от датчиков + detectors() + .map { it.samples() } + .merge() + .onEach { println(it) } + .launchIn(this) + + delay(2000) + coroutineContext.cancelChildren() + } + + @Test + fun test2(): Unit = runBlocking { + // данные от датчиков раз в секунду от каждого (если нового нет, то последнее) + val desiredPeriod = 1000L + detectors() + .map { + it.samples() + .transformLatest { sample -> + //println("Start transformLatest for ${sample.serialNumber}") + emit(sample) + while (true) { + delay(desiredPeriod) + //println("Add old value to flow in transformLatest for = ${sample.serialNumber}") + emit(sample.copy(timestamp = Instant.now())) + } + } + .sample(desiredPeriod) + } + .merge() + .onEach { println(it) } + .launchIn(this) + + delay(5_000) + coroutineContext.cancelChildren() + } + + @Test + fun test3(): Unit = runBlocking { + val desiredPeriod = 1000L + val samples = detectors() + .map { + it.samples() + .transformLatest { sample -> +// println("Start transformLatest for ${sample.serialNumber}") + emit(sample) + while (true) { + delay(desiredPeriod) +// println("Add old value to flow in transformLatest for = ${sample.serialNumber}") + emit(sample.copy(timestamp = Instant.now())) + } + } + .sample(desiredPeriod) + } + .merge() + .shareIn(this, SharingStarted.Lazily) + + samples + .rollingMax(compareBy { it.value }) + .sample(desiredPeriod) + .onEach { println(it) } + .launchIn(this) + + delay(5_000) + coroutineContext.cancelChildren() + } + + @Test + fun test4(): Unit = runBlocking { + val desiredPeriod = 1000L + val flows = detectors() + .map { + it.samples() + .transformLatest { sample -> + emit(sample) + while (true) { + delay(desiredPeriod) + emit(sample.copy(timestamp = Instant.now())) + } + } + .sample(desiredPeriod) + } + + var index = 0 + val samples = combineTransform(flows) { + it.forEach { s -> println("$index: value = $s") } + index++ + + emit(it.maxBy { s -> s.value }) + } + .shareIn(this, SharingStarted.Lazily) + + samples + .onEach { println(it) } + .launchIn(this) + + delay(5_000) + coroutineContext.cancelChildren() + } +} \ No newline at end of file diff --git a/m1l6-flows-and-channels/src/test/kotlin/sequence/ex1SequenceTest.kt b/m1l6-flows-and-channels/src/test/kotlin/sequence/ex1SequenceTest.kt new file mode 100644 index 0000000..0dd811f --- /dev/null +++ b/m1l6-flows-and-channels/src/test/kotlin/sequence/ex1SequenceTest.kt @@ -0,0 +1,215 @@ +package sequence + +import org.junit.jupiter.api.Test + +enum class Color { + YELLOW, + GREEN, + BLUE, + RED, + VIOLET, + +} + +enum class Shape { + SQUARE, + CIRCLE, + TRIANGLE, + RHOMBUS + +} + +data class Figure( + val color: Color, + val shape: Shape +) + +fun processAsList( + words: List, + wordLength: Int, take: Int, + printOperation: Boolean +) { + println("Processing list") + var counter = 0 + + val lengthsList = words + .filter { + counter++ + if (printOperation) println("filter: $it") + + it.length > wordLength + } + .map { + counter++ + if (printOperation) println("length: ${it.length}") + + it.length + } + .take(take) + + println("Lengths of first $take words longer than $wordLength chars:") + println(lengthsList) + + println("List counter: $counter") + println() +} + +fun processAsSeq( + words: List, + wordLength: Int, + take: Int, + printOperation: Boolean +) { + println("Processing sequence") + var counter = 0 + + //convert the List to a Sequence + val wordsSequence = words.asSequence() + + val lengthsSequence = wordsSequence + .filter { + counter++ + if (printOperation) println("filter: $it") + + it.length > wordLength + } + .map { + counter++ + if (printOperation) println("length: ${it.length}") + + it.length + } + .take(take) + .toList() + + println("Lengths of first $take words longer than $wordLength chars") + println(lengthsSequence) + + println("Sequence counter: $counter") + println() +} + +class SequenceTest { + + companion object { + private val testFigure = listOf( + Figure(Color.GREEN, Shape.CIRCLE), + Figure(Color.VIOLET, Shape.SQUARE), + Figure(Color.BLUE, Shape.RHOMBUS), + Figure(Color.RED, Shape.TRIANGLE) + ) + } + + @Test + fun collection() { + var counter = 0 + val figure = testFigure + .map { + counter++ + println("Change color") + it.copy(color = Color.YELLOW) + } + .first { + counter++ + println("Filter by shape") + it.shape == Shape.SQUARE + } + println("Figure: $figure") + println("Counter: $counter") + } + + @Test + fun sequence() { + var counter = 0 + val figure = testFigure.asSequence() + .map { + counter++ + println("Change color") + it.copy(color = Color.YELLOW) + } + .first { + counter++ + println("Filter by shape") + it.shape == Shape.SQUARE + } + println("Figure: $figure") + println("Counter: $counter") + } + + @Test + fun smallSequence() { + val words = "The quick brown fox jumps over the lazy dog".split(" ") + + val wordLength = 3 + val take = 3 + val printOperation = false + + processAsList(words, wordLength, take, printOperation) + processAsSeq(words, wordLength, take, printOperation) + } + + @Test + fun bigSequence() { + val words = ("So, the sequences let you avoid building results of intermediate steps, " + + "therefore improving the performance of the whole collection processing chain. " + + "However, the lazy nature of sequences adds some overhead which may be significant " + + "when processing smaller collections or doing simpler computations. Hence, you should consider both " + + "Sequence and Iterable and decide which one is better for your case.") + .split(" ") + + val wordLength = 3 + val take = 10 + + val printOperation = false + + processAsList(words, wordLength, take, printOperation) + processAsSeq(words, wordLength, take, printOperation) + } + + + @Test + fun collectionIsNotLazy() { + var counter = 0 + val list = listOf(1, 2, 3, 4) + .map { + counter++ + it * it + } + .take(2) + println("List: $list") + println("Counter: $counter") + } + + @Test + fun sequenceIsLazy() { + var counter = 0 + val sequence = sequenceOf(1, 2, 3, 4) + .map { + counter++ + it * it + } + .take(2) + + println("Sequence: $sequence") + println("Counter: $counter") + + val result = sequence + .toList() + + println("Result: $result") + println("Counter: $counter") + } + + @Test + fun blockingCall() { + val sequence = sequenceOf(1, 2, 3) + .map { + println("Make blocking call to API") + Thread.sleep(3000) + it + 1 + } + .toList() + println("Sequence: $sequence") + } + +} \ No newline at end of file diff --git a/readme.md b/readme.md index e7db0e5..d28267f 100644 --- a/readme.md +++ b/readme.md @@ -11,4 +11,5 @@ 2. [m1l2-basic](m1l2-basic) - Основные конструкции Kotlin 3. [m1l3-oop](m1l3-oop) - Объектно-ориентированное программирование 4. [m1l4-dsl](m1l4-dsl) - Предметно ориентированные языки (DSL) -5. [m1l5-coroutines](m1l5-coroutines) - Корутины, ч.1 \ No newline at end of file +5. [m1l5-coroutines](m1l5-coroutines) - Корутины, ч.1 +6. [m1l6-flows-and-channels](m1l6-flows-and-channels) - Корутины, ч.2 - flow, channels \ No newline at end of file diff --git a/settings.gradle.kts b/settings.gradle.kts index 8540b8f..44dbb01 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -12,3 +12,4 @@ include("m1l2-basic") include("m1l3-oop") include("m1l4-dsl") include("m1l5-coroutines") +include("m1l6-flows-and-channels")