From c8ac6c06b1959135c95427fc5b74a7060785f275 Mon Sep 17 00:00:00 2001 From: "He-Pin(kerr)" Date: Sat, 4 Jan 2025 23:14:09 +0800 Subject: [PATCH] chore: Make flatMapPrefix javadsl using java.util.List (#271) --- .../apache/pekko/stream/javadsl/FlowTest.java | 30 +++++++++++++++++++ .../pekko/stream/javadsl/SourceTest.java | 25 ++++++++++++++++ ...n-flatmapPrefix-javadsl.backwards.excludes | 24 +++++++++++++++ .../apache/pekko/stream/javadsl/Flow.scala | 4 +-- .../apache/pekko/stream/javadsl/Source.scala | 4 +-- .../apache/pekko/stream/javadsl/SubFlow.scala | 2 +- .../pekko/stream/javadsl/SubSource.scala | 2 +- 7 files changed, 85 insertions(+), 6 deletions(-) create mode 100644 stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java index f2e92c88638..70a6aa98c60 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/FlowTest.java @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl; +import com.google.common.collect.Sets; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorRef; @@ -1663,4 +1664,33 @@ public void mustBeAbleToConvertToJavaInJava() { org.apache.pekko.stream.scaladsl.Flow.apply(); Flow javaFlow = scalaFlow.asJava(); } + + @Test + public void useFlatMapPrefix() { + final List resultList = + Source.range(1, 2) + .via( + Flow.of(Integer.class) + .flatMapPrefix( + 1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix)))) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + Assert.assertEquals(Arrays.asList(1, 2), resultList); + } + + @Test + public void useFlatMapPrefixSubSource() { + final Set resultSet = + Source.range(1, 2) + .via( + Flow.of(Integer.class) + .groupBy(2, i -> i % 2) + .flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix))) + .mergeSubstreams()) + .runWith(Sink.collect(Collectors.toSet()), system) + .toCompletableFuture() + .join(); + Assert.assertEquals(Sets.newHashSet(1, 2), resultSet); + } } diff --git a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java index 8464688043c..e7c37e55a3d 100644 --- a/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java +++ b/stream-tests/src/test/java/org/apache/pekko/stream/javadsl/SourceTest.java @@ -13,6 +13,7 @@ package org.apache.pekko.stream.javadsl; +import com.google.common.collect.Sets; import org.apache.pekko.Done; import org.apache.pekko.NotUsed; import org.apache.pekko.actor.ActorRef; @@ -1504,4 +1505,28 @@ public void flattenOptionalOptional() throws Exception { .get(3, TimeUnit.SECONDS); Assert.assertEquals(Arrays.asList(2, 4, 6, 8, 10), resultList); } + + @Test + public void useFlatMapPrefix() { + final List resultList = + Source.range(1, 2) + .flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix))) + .runWith(Sink.seq(), system) + .toCompletableFuture() + .join(); + Assert.assertEquals(Arrays.asList(1, 2), resultList); + } + + @Test + public void useFlatMapPrefixSubSource() { + final Set resultSet = + Source.range(1, 2) + .groupBy(2, i -> i % 2) + .flatMapPrefix(1, prefix -> Flow.of(Integer.class).prepend(Source.from(prefix))) + .mergeSubstreams() + .runWith(Sink.collect(Collectors.toSet()), system) + .toCompletableFuture() + .join(); + Assert.assertEquals(Sets.newHashSet(1, 2), resultSet); + } } diff --git a/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes new file mode 100644 index 00000000000..be512ed2b8c --- /dev/null +++ b/stream/src/main/mima-filters/1.1.x.backwards.excludes/pr-271-use-java-list-in-flatmapPrefix-javadsl.backwards.excludes @@ -0,0 +1,24 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Make flatMapPrefix javadsl using java.util.List +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefix") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Flow.flatMapPrefixMat") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefix") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.Source.flatMapPrefixMat") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubFlow.flatMapPrefix") +ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.stream.javadsl.SubSource.flatMapPrefix") diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala index 91b0385e490..df395c1ad5b 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Flow.scala @@ -2500,7 +2500,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr */ def flatMapPrefix[Out2, Mat2]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = { + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Flow[In, Out2, Mat] = { val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) new javadsl.Flow(newDelegate) } @@ -2511,7 +2511,7 @@ final class Flow[In, Out, Mat](delegate: scaladsl.Flow[In, Out, Mat]) extends Gr */ def flatMapPrefixMat[Out2, Mat2, Mat3]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]], + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]], matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Flow[In, Out2, Mat3] = { val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) => matF(m1, fm2.asJava) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala index bcd289b7857..1f50fea2559 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/Source.scala @@ -3980,7 +3980,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def flatMapPrefix[Out2, Mat2]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = { + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.Source[Out2, Mat] = { val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) new javadsl.Source(newDelegate) } @@ -3991,7 +3991,7 @@ final class Source[Out, Mat](delegate: scaladsl.Source[Out, Mat]) extends Graph[ */ def flatMapPrefixMat[Out2, Mat2, Mat3]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]], + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]], matF: function.Function2[Mat, CompletionStage[Mat2], Mat3]): javadsl.Source[Out2, Mat3] = { val newDelegate = delegate.flatMapPrefixMat(n)(seq => f(seq.asJava).asScala) { (m1, fm2) => matF(m1, fm2.asJava) diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala index 20f73c4f010..37216e02d61 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubFlow.scala @@ -1761,7 +1761,7 @@ class SubFlow[In, Out, Mat]( */ def flatMapPrefix[Out2, Mat2]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = { + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): SubFlow[In, Out2, Mat] = { val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) new javadsl.SubFlow(newDelegate) } diff --git a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala index 9ac6f9c714b..89e676d9585 100755 --- a/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala +++ b/stream/src/main/scala/org/apache/pekko/stream/javadsl/SubSource.scala @@ -1735,7 +1735,7 @@ class SubSource[Out, Mat]( */ def flatMapPrefix[Out2, Mat2]( n: Int, - f: function.Function[java.lang.Iterable[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = { + f: function.Function[java.util.List[Out], javadsl.Flow[Out, Out2, Mat2]]): javadsl.SubSource[Out2, Mat] = { val newDelegate = delegate.flatMapPrefix(n)(seq => f(seq.asJava).asScala) new javadsl.SubSource(newDelegate) }