Skip to content

Commit

Permalink
Merge branch 'main' into serras/optics-compose
Browse files Browse the repository at this point in the history
  • Loading branch information
nomisRev authored Jan 16, 2024
2 parents b5f037f + 85d2684 commit 0e99a6d
Show file tree
Hide file tree
Showing 15 changed files with 932 additions and 9 deletions.
116 changes: 116 additions & 0 deletions arrow-libs/fx/arrow-collectors/api/arrow-collectors.api
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
public final class arrow/collectors/Characteristics : java/lang/Enum {
public static final field CONCURRENT Larrow/collectors/Characteristics;
public static final field Companion Larrow/collectors/Characteristics$Companion;
public static final field IDENTITY_FINISH Larrow/collectors/Characteristics;
public static final field UNORDERED Larrow/collectors/Characteristics;
public static fun getEntries ()Lkotlin/enums/EnumEntries;
public static fun valueOf (Ljava/lang/String;)Larrow/collectors/Characteristics;
public static fun values ()[Larrow/collectors/Characteristics;
}

public final class arrow/collectors/Characteristics$Companion {
public final fun getCONCURRENT_UNORDERED ()Ljava/util/Set;
public final fun getIDENTITY ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT ()Ljava/util/Set;
public final fun getIDENTITY_CONCURRENT_UNORDERED ()Ljava/util/Set;
public final fun getIDENTITY_UNORDERED ()Ljava/util/Set;
}

public final class arrow/collectors/CollectKt {
public static final fun collect (Ljava/lang/Iterable;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Ljava/util/Iterator;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Lkotlin/sequences/Sequence;Larrow/collectors/NonSuspendCollectorI;)Ljava/lang/Object;
public static final fun collect (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun collect$default (Lkotlinx/coroutines/flow/Flow;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static final fun parCollect (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static final fun parCollect (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static synthetic fun parCollect$default (Ljava/lang/Iterable;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
public static synthetic fun parCollect$default (Lkotlin/sequences/Sequence;Larrow/collectors/CollectorI;ILkotlin/coroutines/Continuation;ILjava/lang/Object;)Ljava/lang/Object;
}

public abstract interface class arrow/collectors/CollectorI {
public static final field Companion Larrow/collectors/CollectorI$Companion;
public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun contramap (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun getCharacteristics ()Ljava/util/Set;
public abstract fun map (Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun zip (Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public abstract fun zip (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorI$Companion {
public final fun nonSuspendOf (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;)Larrow/collectors/NonSuspendCollectorI;
public static synthetic fun nonSuspendOf$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Lkotlin/jvm/functions/Function1;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun of (Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;)Larrow/collectors/CollectorI;
public static synthetic fun of$default (Larrow/collectors/CollectorI$Companion;Lkotlin/jvm/functions/Function1;Lkotlin/jvm/functions/Function3;Lkotlin/jvm/functions/Function2;Ljava/util/Set;ILjava/lang/Object;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/CollectorI$DefaultImpls {
public static fun contramap (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun map (Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
}

public final class arrow/collectors/Collectors {
public static final field INSTANCE Larrow/collectors/Collectors;
public final fun bestBy (Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
public final fun constant (Ljava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun getLength ()Larrow/collectors/NonSuspendCollectorI;
public final fun getSum ()Larrow/collectors/NonSuspendCollectorI;
public final fun intReducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
public final fun list ()Larrow/collectors/NonSuspendCollectorI;
public final fun map ()Larrow/collectors/NonSuspendCollectorI;
public final fun mapFromEntries ()Larrow/collectors/NonSuspendCollectorI;
public final fun reducer (Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;Z)Larrow/collectors/NonSuspendCollectorI;
public static synthetic fun reducer$default (Larrow/collectors/Collectors;Lkotlin/jvm/functions/Function0;Lkotlin/jvm/functions/Function2;ZILjava/lang/Object;)Larrow/collectors/NonSuspendCollectorI;
public final fun set ()Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/CollectorsKt {
public static final fun concurrentMap (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
public static final fun concurrentMapFromEntries (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
public static final fun concurrentSet (Larrow/collectors/Collectors;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/JvmCollectorKt {
public static final fun asCollector (Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI;
public static final fun jvm (Larrow/collectors/Collectors;Ljava/util/stream/Collector;)Larrow/collectors/NonSuspendCollectorI;
}

public abstract interface class arrow/collectors/NonSuspendCollectorI : arrow/collectors/CollectorI {
public abstract fun accumulate (Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun accumulateNonSuspend (Ljava/lang/Object;Ljava/lang/Object;)V
public abstract fun contramapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun finish (Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun finishNonSuspend (Ljava/lang/Object;)Ljava/lang/Object;
public abstract fun mapNonSuspend (Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun supply (Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public abstract fun supplyNonSuspend ()Ljava/lang/Object;
public abstract fun zip (Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI;
public abstract fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/NonSuspendCollectorI$DefaultImpls {
public static fun accumulate (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun contramap (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun contramapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public static fun finish (Larrow/collectors/NonSuspendCollectorI;Ljava/lang/Object;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun map (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/CollectorI;
public static fun mapNonSuspend (Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function1;)Larrow/collectors/NonSuspendCollectorI;
public static fun supply (Larrow/collectors/NonSuspendCollectorI;Lkotlin/coroutines/Continuation;)Ljava/lang/Object;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
public static fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;)Larrow/collectors/NonSuspendCollectorI;
public static fun zipNonSuspend (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

public final class arrow/collectors/ZipKt {
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function4;)Larrow/collectors/CollectorI;
public static final fun zip (Larrow/collectors/CollectorI;Larrow/collectors/CollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/CollectorI;
public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function3;)Larrow/collectors/NonSuspendCollectorI;
public static final fun zip (Larrow/collectors/NonSuspendCollectorI;Larrow/collectors/NonSuspendCollectorI;Lkotlin/jvm/functions/Function2;)Larrow/collectors/NonSuspendCollectorI;
}

65 changes: 65 additions & 0 deletions arrow-libs/fx/arrow-collectors/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
@file:Suppress("DSL_SCOPE_VIOLATION")

import java.time.Duration

plugins {
id(libs.plugins.kotlin.multiplatform.get().pluginId)
alias(libs.plugins.arrowGradleConfig.kotlin)
alias(libs.plugins.arrowGradleConfig.publish)
alias(libs.plugins.spotless)
alias(libs.plugins.kotlinx.kover)
}

apply(from = property("ANIMALSNIFFER_MPP"))

kotlin {
sourceSets {
commonMain {
dependencies {
api(projects.arrowFxCoroutines)
api(projects.arrowAtomic)
api(libs.coroutines.core)
implementation(libs.kotlin.stdlibCommon)
}
}

commonTest {
dependencies {
implementation(libs.kotlin.test)
implementation(libs.kotest.frameworkEngine)
implementation(libs.kotest.assertionsCore)
implementation(libs.kotest.property)
}
}
}

jvm {
tasks.jvmJar {
manifest {
attributes["Automatic-Module-Name"] = "arrow.collectors"
}
}
}

js {
nodejs {
testTask {
useMocha {
timeout = "60s"
}
}
}
browser {
testTask {
useKarma {
useChromeHeadless()
timeout.set(Duration.ofMinutes(1))
}
}
}
}
}

tasks.withType<Test> {
useJUnitPlatform()
}
4 changes: 4 additions & 0 deletions arrow-libs/fx/arrow-collectors/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
# Maven publishing configuration
pom.name=Arrow Collectors
# Build configuration
kapt.incremental.apt=false
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
package arrow.collectors

import arrow.fx.coroutines.parMap
import arrow.fx.coroutines.parMapUnordered
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.FlowPreview
import kotlinx.coroutines.flow.DEFAULT_CONCURRENCY
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.onCompletion
import kotlinx.coroutines.flow.onStart

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is consumed only once during collection.
* We recommend using a cold [Flow] to ensure that
* elements are produced only when needed.
*
* @receiver [Flow] of elements to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Flow<T>.collect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = collectI(collector, concurrency)

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is iterated only once during collection.
*
* Note: if you need to perform changes on the values
* before collection, we strongly recommend to convert
* the [Iterable] into a [Flow] using [asFlow],
* perform those changes, and then using [collect].
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Iterable<T>.parCollect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = asFlow().collect(collector, concurrency)

/**
* Performs collection over the elements of [this].
* The amount of concurrency depends on the
* [Characteristics] of [collector], and can
* be tweaked using the [concurrency] parameter.
*
* [this] is iterated only once during collection.
*
* Note: if you need to perform changes on the values
* before collection, we strongly recommend to convert
* the [Sequence] into a [Flow] using [asFlow],
* perform those changes, and then using [collect].
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
* @param concurrency Defines the concurrency limit
*/
@OptIn(FlowPreview::class)
public suspend fun <T, R> Sequence<T>.parCollect(
collector: Collector<T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R = asFlow().collect(collector, concurrency)

/**
* Performs collection over the elements of [this]
* in a non-concurrent fashion.
*
* [this] is iterated only once during collection.
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
*/
public fun <T, R> Iterable<T>.collect(
collector: NonSuspendCollector<T, R>
): R = iterator().collectI(collector)

/**
* Performs collection over the elements of [this]
* in a non-concurrent fashion.
*
* [this] is iterated only once during collection.
*
* @receiver Sequence of values to collect
* @param collector Describes how to collect the values
*/
public fun <T, R> Sequence<T>.collect(
collector: NonSuspendCollector<T, R>
): R = iterator().collectI(collector)

public fun <T, R> Iterator<T>.collect(
collector: NonSuspendCollector<T, R>
): R = collectI(collector)

@OptIn(FlowPreview::class, ExperimentalCoroutinesApi::class)
@Suppress("UNINITIALIZED_VARIABLE", "UNCHECKED_CAST")
internal suspend fun <A, T, R> Flow<T>.collectI(
collector: CollectorI<A, T, R>,
concurrency: Int = DEFAULT_CONCURRENCY,
): R {
var accumulator: A
val started = this.onStart { accumulator = collector.supply() }
val continued = when {
Characteristics.CONCURRENT in collector.characteristics -> when {
Characteristics.UNORDERED in collector.characteristics ->
started.parMapUnordered(concurrency) { collector.accumulate(accumulator, it) }

else ->
started.parMap(concurrency) { collector.accumulate(accumulator, it) }
}

else -> started.map { collector.accumulate(accumulator, it) }
}
continued.collect()

return when {
Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R
else -> collector.finish(accumulator)
}
}

@Suppress("UNCHECKED_CAST")
internal fun <A, T, R> Iterator<T>.collectI(
collector: NonSuspendCollectorI<A, T, R>
): R {
val accumulator = collector.supplyNonSuspend()
forEach { collector.accumulateNonSuspend(accumulator, it) }
return when {
Characteristics.IDENTITY_FINISH in collector.characteristics -> accumulator as R
else -> collector.finishNonSuspend(accumulator)
}
}
Loading

0 comments on commit 0e99a6d

Please sign in to comment.