From 4409b70741b9c501df31f8186127dcf8a5c5e515 Mon Sep 17 00:00:00 2001 From: Markus Jansen Date: Thu, 29 Apr 2021 16:00:07 +0200 Subject: [PATCH 1/4] Generalized Mojo::Promise::map to also support the 'any' aggregation. --- lib/Mojo/Promise.pm | 28 ++++++++++++-- t/mojo/promise.t | 92 ++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 115 insertions(+), 5 deletions(-) diff --git a/lib/Mojo/Promise.pm b/lib/Mojo/Promise.pm index 8109bf10f1..29478a6a7e 100644 --- a/lib/Mojo/Promise.pm +++ b/lib/Mojo/Promise.pm @@ -61,7 +61,8 @@ sub finally { shift->_finally(1, @_) } sub map { my ($class, $options, $cb, @items) = (shift, ref $_[0] eq 'HASH' ? shift : {}, @_); - return $class->all(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency}; + return $options->{any} ? $class->any(map { $_->$cb } @items) : $class->all(map { $_->$cb } @items) + if !$options->{concurrency} || @items <= $options->{concurrency}; my @start = map { $_->$cb } splice @items, 0, $options->{concurrency}; my @wait = map { $start[0]->clone } 0 .. $#items; @@ -69,13 +70,23 @@ sub map { my $start_next = sub { return () unless my $item = shift @items; my ($start_next, $chain) = (__SUB__, shift @wait); - $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); @items = () }) for $item; + if ($options->{any}) { + $_->$cb->then(sub { $chain->resolve(@_); @items = () }, sub { $chain->reject(@_); $start_next->() }) for $item; + } + else { + $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); @items = () }) for $item; + } return (); }; - $_->then($start_next, sub { }) for @start; + if ($options->{any}) { + $_->then(sub { }, $start_next) for @start; + } + else { + $_->then($start_next, sub { }) for @start; + } - return $class->all(@start, @wait); + return $options->{any} ? $class->any(@start, @wait) : $class->all(@start, @wait); } sub new { @@ -409,6 +420,9 @@ original fulfillment value or rejection reason. Apply a function that returns a L to each item in a list of items while optionally limiting concurrency. Returns a L that collects the results in the same manner as L. If any item's promise is rejected, +any remaining items which have not yet been mapped will not be. + +With the C option, the behaviour can be changed to the same manner as L. If any item's promise is resolved, any remaining items which have not yet been mapped will not be. # Perform 3 requests at a time concurrently @@ -419,6 +433,12 @@ These options are currently available: =over 2 +=item any + + any => 1 + +Changes the aggregation behaviour from 'all' to 'any' if set to a true value. + =item concurrency concurrency => 3 diff --git a/t/mojo/promise.t b/t/mojo/promise.t index 6feddda051..08c205de48 100644 --- a/t/mojo/promise.t +++ b/t/mojo/promise.t @@ -509,7 +509,7 @@ subtest 'Map (with concurrency limit)' => sub { is_deeply \@errors, [], 'promise not rejected'; }; -subtest 'Map (with reject)' => sub { +subtest 'Map (with early reject)' => sub { my (@results, @errors, @started); Mojo::Promise->map( {concurrency => 3}, @@ -525,6 +525,96 @@ subtest 'Map (with reject)' => sub { is_deeply \@started, [1, 2, 3], 'only initial batch started'; }; +subtest 'Map (with later reject)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then( + sub { + if ($n >= 5) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + } + ); + }, + 1 .. 8 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise not resolved'; + is_deeply \@errors, [5], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started'; +}; + +subtest 'Map (any, with early success)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, any => 1}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->resolve($n) }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [1], 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'only initial batch started'; +}; + +subtest 'Map (any, with later success)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, any => 1}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then( + sub { + if ($n >= 5) { Mojo::Promise->resolve($n) } + else { Mojo::Promise->reject($n) } + } + ); + }, + 1 .. 7 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [5], 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5, 6, 7], 'only maximum concurrent promises started'; +}; + +subtest 'Map (any, all rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {any => 1}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) }); + }, + 1 .. 3 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [[1], [2], [3]], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'all started without concurrency'; +}; + +subtest 'Map (concurrency, any, all rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, any => 1}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { Mojo::Promise->reject($n) }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [[1], [2], [3], [4], [5]], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; +}; + subtest 'Map (custom event loop)' => sub { my $ok; my $loop = Mojo::IOLoop->new; From 9e619345e2ec256da4c1cc17c64ed9317fd27219 Mon Sep 17 00:00:00 2001 From: Markus Jansen Date: Thu, 29 Apr 2021 17:22:24 +0200 Subject: [PATCH 2/4] Generalized the aggregation behaviour of Mojo::Promise::map to support 'any' and 'all_settled'. --- lib/Mojo/Promise.pm | 53 ++++++++++++++++++++++++++++---------------- t/mojo/promise.t | 54 +++++++++++++++++++++++++++++++-------------- 2 files changed, 72 insertions(+), 35 deletions(-) diff --git a/lib/Mojo/Promise.pm b/lib/Mojo/Promise.pm index 29478a6a7e..d12843aa3e 100644 --- a/lib/Mojo/Promise.pm +++ b/lib/Mojo/Promise.pm @@ -61,8 +61,13 @@ sub finally { shift->_finally(1, @_) } sub map { my ($class, $options, $cb, @items) = (shift, ref $_[0] eq 'HASH' ? shift : {}, @_); - return $options->{any} ? $class->any(map { $_->$cb } @items) : $class->all(map { $_->$cb } @items) - if !$options->{concurrency} || @items <= $options->{concurrency}; + my ($akey, $aggregation) + = !defined $options->{aggregation} ? (0, 'all') + : $options->{aggregation} eq 'any' ? (1, 'any') + : $options->{aggregation} eq 'all_settled' ? (2, 'all_settled') + : (0, 'all'); + + return $class->$aggregation(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency}; my @start = map { $_->$cb } splice @items, 0, $options->{concurrency}; my @wait = map { $start[0]->clone } 0 .. $#items; @@ -70,23 +75,32 @@ sub map { my $start_next = sub { return () unless my $item = shift @items; my ($start_next, $chain) = (__SUB__, shift @wait); - if ($options->{any}) { - $_->$cb->then(sub { $chain->resolve(@_); @items = () }, sub { $chain->reject(@_); $start_next->() }) for $item; - } - else { - $_->$cb->then(sub { $chain->resolve(@_); $start_next->() }, sub { $chain->reject(@_); @items = () }) for $item; - } + $_->$cb->then( + sub { + $chain->resolve(@_); + if ($akey == 1) { @items = () } + else { $start_next->() } + }, + sub { + $chain->reject(@_); + if ($akey == 0) { @items = () } + else { $start_next->() } + } + ) for $item; return (); }; - if ($options->{any}) { + if ($akey == 0) { + $_->then($start_next, sub { }) for @start; + } + elsif ($akey == 1) { $_->then(sub { }, $start_next) for @start; } else { - $_->then($start_next, sub { }) for @start; + $_->then($start_next, $start_next) for @start; } - return $options->{any} ? $class->any(@start, @wait) : $class->all(@start, @wait); + return $class->$aggregation(@start, @wait); } sub new { @@ -417,13 +431,14 @@ original fulfillment value or rejection reason. my $new = Mojo::Promise->map(sub {...}, @items); my $new = Mojo::Promise->map({concurrency => 3}, sub {...}, @items); + my $new = Mojo::Promise->map({aggregation => 'any', concurrency => 3}, sub {...}, @items); + my $new = Mojo::Promise->map({aggregation => 'all_settled', concurrency => 3}, sub {...}, @items); Apply a function that returns a L to each item in a list of items while optionally limiting concurrency. -Returns a L that collects the results in the same manner as L. If any item's promise is rejected, -any remaining items which have not yet been mapped will not be. - -With the C option, the behaviour can be changed to the same manner as L. If any item's promise is resolved, -any remaining items which have not yet been mapped will not be. +Returns a L that collects the results in the same manner as L. +With the C option, the behaviour can be changed to the same manner as L or L. +If nothing or C is specified, and any item's promise is rejected, any remaining items which have not yet been mapped will not be. +If C is specified and any item's promise is resolved, any remaining items which have not yet been mapped will not be. # Perform 3 requests at a time concurrently Mojo::Promise->map({concurrency => 3}, sub { $ua->get_p($_) }, @urls) @@ -433,11 +448,11 @@ These options are currently available: =over 2 -=item any +=item aggregation - any => 1 + aggregation => 'any' -Changes the aggregation behaviour from 'all' to 'any' if set to a true value. +Specifies the aggregation behaviour. Supported values are L (default), L, and L. =item concurrency diff --git a/t/mojo/promise.t b/t/mojo/promise.t index 08c205de48..2ec9ffbc7c 100644 --- a/t/mojo/promise.t +++ b/t/mojo/promise.t @@ -532,12 +532,10 @@ subtest 'Map (with later reject)' => sub { sub { my $n = $_; push @started, $n; - Mojo::Promise->resolve->then( - sub { - if ($n >= 5) { Mojo::Promise->reject($n) } - else { Mojo::Promise->resolve($n) } - } - ); + Mojo::Promise->resolve->then(sub { + if ($n >= 5) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); }, 1 .. 8 )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; @@ -549,7 +547,7 @@ subtest 'Map (with later reject)' => sub { subtest 'Map (any, with early success)' => sub { my (@results, @errors, @started); Mojo::Promise->map( - {concurrency => 3, any => 1}, + {concurrency => 3, aggregation => 'any'}, sub { my $n = $_; push @started, $n; @@ -565,16 +563,14 @@ subtest 'Map (any, with early success)' => sub { subtest 'Map (any, with later success)' => sub { my (@results, @errors, @started); Mojo::Promise->map( - {concurrency => 3, any => 1}, + {concurrency => 3, aggregation => 'any'}, sub { my $n = $_; push @started, $n; - Mojo::Promise->resolve->then( - sub { - if ($n >= 5) { Mojo::Promise->resolve($n) } - else { Mojo::Promise->reject($n) } - } - ); + Mojo::Promise->resolve->then(sub { + if ($n >= 5) { Mojo::Promise->resolve($n) } + else { Mojo::Promise->reject($n) } + }); }, 1 .. 7 )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; @@ -586,7 +582,7 @@ subtest 'Map (any, with later success)' => sub { subtest 'Map (any, all rejected)' => sub { my (@results, @errors, @started); Mojo::Promise->map( - {any => 1}, + {aggregation => 'any'}, sub { my $n = $_; push @started, $n; @@ -602,7 +598,7 @@ subtest 'Map (any, all rejected)' => sub { subtest 'Map (concurrency, any, all rejected)' => sub { my (@results, @errors, @started); Mojo::Promise->map( - {concurrency => 3, any => 1}, + {concurrency => 3, aggregation => 'any'}, sub { my $n = $_; push @started, $n; @@ -615,6 +611,32 @@ subtest 'Map (concurrency, any, all rejected)' => sub { is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; }; +subtest 'Map (concurrency, all settled, partially rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'all_settled'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->resolve($n) } + else { Mojo::Promise->reject($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + my $result = [ + {status => 'fulfilled', value => [1]}, + {status => 'rejected', reason => [2]}, + {status => 'fulfilled', value => [3]}, + {status => 'rejected', reason => [4]}, + {status => 'fulfilled', value => [5]} + ]; + is_deeply \@results, $result, 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; +}; + subtest 'Map (custom event loop)' => sub { my $ok; my $loop = Mojo::IOLoop->new; From adfa25c52b3b21d0434a5d85ca62cc8b6fb18630 Mon Sep 17 00:00:00 2001 From: Markus Jansen Date: Fri, 30 Apr 2021 23:57:14 +0200 Subject: [PATCH 3/4] Added a delay option to Mojo::Promise::map, and introduced the function in the cookbook. --- lib/Mojo/Promise.pm | 47 +++++++++++++++++++---------- lib/Mojolicious/Guides/Cookbook.pod | 37 +++++++++++++++++++++++ t/mojo/promise.t | 28 +++++++++++++++++ 3 files changed, 96 insertions(+), 16 deletions(-) diff --git a/lib/Mojo/Promise.pm b/lib/Mojo/Promise.pm index d12843aa3e..90236e961d 100644 --- a/lib/Mojo/Promise.pm +++ b/lib/Mojo/Promise.pm @@ -75,18 +75,27 @@ sub map { my $start_next = sub { return () unless my $item = shift @items; my ($start_next, $chain) = (__SUB__, shift @wait); - $_->$cb->then( - sub { - $chain->resolve(@_); - if ($akey == 1) { @items = () } - else { $start_next->() } - }, - sub { - $chain->reject(@_); - if ($akey == 0) { @items = () } - else { $start_next->() } - } - ) for $item; + my $exec_next = sub { + $_->$cb->then( + sub { + $chain->resolve(@_); + if ($akey == 1) { @items = () } + else { $start_next->() } + }, + sub { + $chain->reject(@_); + if ($akey == 0) { @items = () } + else { $start_next->() } + } + ) for $item; + return (); + }; + if (!$options->{delay}) { + $exec_next->(); + } + else { + Mojo::IOLoop->timer($options->{delay} => sub { $exec_next->() }); + } return (); }; @@ -432,13 +441,13 @@ original fulfillment value or rejection reason. my $new = Mojo::Promise->map(sub {...}, @items); my $new = Mojo::Promise->map({concurrency => 3}, sub {...}, @items); my $new = Mojo::Promise->map({aggregation => 'any', concurrency => 3}, sub {...}, @items); - my $new = Mojo::Promise->map({aggregation => 'all_settled', concurrency => 3}, sub {...}, @items); + my $new = Mojo::Promise->map({aggregation => 'all_settled', concurrency => 1, delay => 2.5 }, sub {...}, @items); -Apply a function that returns a L to each item in a list of items while optionally limiting concurrency. +Apply a function that returns a L to each item in a list of items while optionally limiting concurrency and inserting delays between processing items. Returns a L that collects the results in the same manner as L. With the C option, the behaviour can be changed to the same manner as L or L. -If nothing or C is specified, and any item's promise is rejected, any remaining items which have not yet been mapped will not be. -If C is specified and any item's promise is resolved, any remaining items which have not yet been mapped will not be. +If nothing or L is specified, and any item's promise is rejected, any remaining items which have not yet been mapped will not be. +If L is specified and any item's promise is resolved, any remaining items which have not yet been mapped will not be. # Perform 3 requests at a time concurrently Mojo::Promise->map({concurrency => 3}, sub { $ua->get_p($_) }, @urls) @@ -460,6 +469,12 @@ Specifies the aggregation behaviour. Supported values are L (default), L 2.5 + +Insert a delay of 2.5 seconds after each processed items. N.B. delay makes sense only in case concurrency is specified. + =back =head2 new diff --git a/lib/Mojolicious/Guides/Cookbook.pod b/lib/Mojolicious/Guides/Cookbook.pod index 06880db74c..0989dcbbda 100644 --- a/lib/Mojolicious/Guides/Cookbook.pod +++ b/lib/Mojolicious/Guides/Cookbook.pod @@ -676,6 +676,43 @@ that create them for you. app->start; +When processing a number of requests towards a non-robust or external resource, the kind of aggregation, maximum +concurrency and an optional delay can be specified using L. + + use Mojolicious::Lite -signatures; + use Mojo::Promise; + use Mojo::URL; + + # Search MetaCPAN for a larger number of items, with mild concurrency and some throttling + get '/' => sub ($c) { + + my $url = Mojo::URL->new('http://fastapi.metacpan.org/v1/module/_search'); + my @items = (qw(perl mojolicious mojo minion)); + + # Render a response once all promises have been resolved + Mojo::Promise->map( + {concurrency => 2, delay => 0.5}, # average max 4 requests/s + sub { + my $item = $_; + $c->ua->get_p($url->clone->query({q => $item}))->then(sub { my @res = @_; Mojo::Promise->resolve($item, @res); }); + }, + @items + )->then(sub (@results) { + $c->render( + json => { + map { + my ($item, $res) = @$_; + $item => $res->result->json('/hits/hits/0/_source/release'); + } @results + } + ); + })->catch(sub ($err) { + $c->reply->exception($err); + })->wait; + }; + + app->start; + To create promises manually you just wrap your continuation-passing style APIs in functions that return promises. Here's an example for how L works internally. diff --git a/t/mojo/promise.t b/t/mojo/promise.t index 2ec9ffbc7c..53b5b58249 100644 --- a/t/mojo/promise.t +++ b/t/mojo/promise.t @@ -637,6 +637,34 @@ subtest 'Map (concurrency, all settled, partially rejected)' => sub { is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; }; +subtest 'Map (concurrency, delay, all settled, partially rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 2, delay => 0.1, aggregation => 'all_settled'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + my $result = [ + {status => 'rejected', reason => [1]}, + {status => 'fulfilled', value => [2]}, + {status => 'rejected', reason => [3]}, + {status => 'fulfilled', value => [4]}, + {status => 'rejected', reason => [5]}, + ]; + is_deeply \@results, $result, 'promise resolved'; + is_deeply \@errors, [], 'correct errors'; + + # is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; + is scalar @started, 5, 'all started with concurrency'; +}; + subtest 'Map (custom event loop)' => sub { my $ok; my $loop = Mojo::IOLoop->new; From 1647d5736366531f4277669dcb6ca15589611ee8 Mon Sep 17 00:00:00 2001 From: Markus Jansen Date: Mon, 3 May 2021 16:23:06 +0200 Subject: [PATCH 4/4] Added support for race to Mojo::Promise::map, and simplified the function. --- lib/Mojo/Promise.pm | 27 ++++++++++++--------------- t/mojo/promise.t | 19 +++++++++++++++++++ 2 files changed, 31 insertions(+), 15 deletions(-) diff --git a/lib/Mojo/Promise.pm b/lib/Mojo/Promise.pm index 90236e961d..b886a0e77e 100644 --- a/lib/Mojo/Promise.pm +++ b/lib/Mojo/Promise.pm @@ -61,18 +61,23 @@ sub finally { shift->_finally(1, @_) } sub map { my ($class, $options, $cb, @items) = (shift, ref $_[0] eq 'HASH' ? shift : {}, @_); - my ($akey, $aggregation) - = !defined $options->{aggregation} ? (0, 'all') - : $options->{aggregation} eq 'any' ? (1, 'any') - : $options->{aggregation} eq 'all_settled' ? (2, 'all_settled') - : (0, 'all'); + my $start_next; + my $block_next = sub { }; + + my ($akey, $aggregation, $next_after_fullfil, $next_after_reject) + = !defined $options->{aggregation} ? (0, 'all', \$start_next, \$block_next) + : $options->{aggregation} eq 'any' ? (1, 'any', \$block_next, \$start_next) + : $options->{aggregation} eq 'all_settled' ? (2, 'all_settled', \$start_next, \$start_next) + : $options->{aggregation} eq 'race' ? (3, 'race', \$block_next, \$block_next) + : (0, 'all', \$start_next, \$block_next); return $class->$aggregation(map { $_->$cb } @items) if !$options->{concurrency} || @items <= $options->{concurrency}; my @start = map { $_->$cb } splice @items, 0, $options->{concurrency}; my @wait = map { $start[0]->clone } 0 .. $#items; - my $start_next = sub { + # N.B. $start_next will never be called for $aggregation eq 'race' + $start_next = sub { return () unless my $item = shift @items; my ($start_next, $chain) = (__SUB__, shift @wait); my $exec_next = sub { @@ -99,15 +104,7 @@ sub map { return (); }; - if ($akey == 0) { - $_->then($start_next, sub { }) for @start; - } - elsif ($akey == 1) { - $_->then(sub { }, $start_next) for @start; - } - else { - $_->then($start_next, $start_next) for @start; - } + $_->then($$next_after_fullfil, $$next_after_reject) for @start; return $class->$aggregation(@start, @wait); } diff --git a/t/mojo/promise.t b/t/mojo/promise.t index 53b5b58249..a3de3cd786 100644 --- a/t/mojo/promise.t +++ b/t/mojo/promise.t @@ -611,6 +611,25 @@ subtest 'Map (concurrency, any, all rejected)' => sub { is_deeply \@started, [1, 2, 3, 4, 5], 'all started with concurrency'; }; +subtest 'Map (concurrency, race, 2 of 3 rejected)' => sub { + my (@results, @errors, @started); + Mojo::Promise->map( + {concurrency => 3, aggregation => 'race'}, + sub { + my $n = $_; + push @started, $n; + Mojo::Promise->resolve->then(sub { + if ($n % 2) { Mojo::Promise->reject($n) } + else { Mojo::Promise->resolve($n) } + }); + }, + 1 .. 5 + )->then(sub { @results = @_ }, sub { @errors = @_ })->wait; + is_deeply \@results, [], 'promise rejected'; + is_deeply \@errors, [1], 'correct errors'; + is_deeply \@started, [1, 2, 3], 'only 3 of 5 started with concurrency'; +}; + subtest 'Map (concurrency, all settled, partially rejected)' => sub { my (@results, @errors, @started); Mojo::Promise->map(