Skip to content
This repository has been archived by the owner on Dec 14, 2022. It is now read-only.

Commit

Permalink
fix test
Browse files Browse the repository at this point in the history
  • Loading branch information
jianyun8023 committed Jun 23, 2021
1 parent a216462 commit 50cf074
Showing 1 changed file with 4 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -541,7 +541,7 @@ public void testStartFromSpecific() throws Exception {
-20, -21, -22, 1, 2, 3, 10, 11, 12), Optional.empty());

Map<String, Set<Integer>> expectedData = new HashMap<>();
expectedData.put(topic, new HashSet<>(Arrays.asList(2, 3, 10, 11, 12)));
expectedData.put(topic, new HashSet<>(Arrays.asList(1, 2, 3, 10, 11, 12)));

Map<String, MessageId> offset = new HashMap<>();
offset.put(topic, mids.get(3));
Expand All @@ -554,7 +554,7 @@ public void testStartFromSpecific() throws Exception {
sourceProps.setProperty(TOPIC_SINGLE_OPTION_KEY, topic);
DataStream stream = see.addSource(
new FlinkPulsarRowSource(serviceUrl, adminUrl, sourceProps).setStartFromSpecificOffsets(offset));
stream.flatMap(new CheckAllMessageExist(expectedData, 5)).setParallelism(1);
stream.flatMap(new CheckAllMessageExist(expectedData, 6)).setParallelism(1);

TestUtils.tryExecute(see, "start from specific");
}
Expand All @@ -573,7 +573,7 @@ public void testStartFromExternalSubscription() throws Exception {
admin.topics().createSubscription(TopicName.get(topic).toString(), subName, mids.get(3));

Map<String, Set<Integer>> expectedData = new HashMap<>();
expectedData.put(topic, new HashSet<>(Arrays.asList(2, 3, 10, 11, 12)));
expectedData.put(topic, new HashSet<>(Arrays.asList(1, 2, 3, 10, 11, 12)));

StreamExecutionEnvironment see = StreamExecutionEnvironment.getExecutionEnvironment();
see.getConfig().disableSysoutLogging();
Expand All @@ -583,7 +583,7 @@ public void testStartFromExternalSubscription() throws Exception {
sourceProps.setProperty(TOPIC_SINGLE_OPTION_KEY, topic);
DataStream stream = see.addSource(
new FlinkPulsarRowSource(serviceUrl, adminUrl, sourceProps).setStartFromSubscription(subName));
stream.flatMap(new CheckAllMessageExist(expectedData, 5)).setParallelism(1);
stream.flatMap(new CheckAllMessageExist(expectedData, 6)).setParallelism(1);

TestUtils.tryExecute(see, "start from specific");

Expand Down

0 comments on commit 50cf074

Please sign in to comment.