From 3c4051868566f606cae7d6369a55ccc8b2abc377 Mon Sep 17 00:00:00 2001 From: Manfred Riem Date: Sun, 15 Dec 2024 15:02:44 -0600 Subject: [PATCH] Fixes #4403 - Fix GrizzlyServer to not suspend indefinitely for async requests (#4404) --- .../http/grizzly/GrizzlyHttpServer.java | 2 +- .../coreprofile/distribution/SseBean.java | 30 +++++++ .../distribution/SseBroadcastBean.java | 81 +++++++++++++++++++ .../test/coreprofile/distribution/SseIT.java | 59 ++++++++++++++ 4 files changed, 171 insertions(+), 1 deletion(-) create mode 100644 test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java diff --git a/http/grizzly/src/main/java/cloud/piranha/http/grizzly/GrizzlyHttpServer.java b/http/grizzly/src/main/java/cloud/piranha/http/grizzly/GrizzlyHttpServer.java index e960e118f..f9b4ff9a4 100644 --- a/http/grizzly/src/main/java/cloud/piranha/http/grizzly/GrizzlyHttpServer.java +++ b/http/grizzly/src/main/java/cloud/piranha/http/grizzly/GrizzlyHttpServer.java @@ -164,7 +164,7 @@ public void service(Request request, Response response) throws Exception { GrizzlyHttpServerResponse gResponse = new GrizzlyHttpServerResponse(response); HttpServerProcessorEndState state = httpServerProcessor.process(gRequest, gResponse); if (state == ASYNCED) { - response.suspend(); + response.suspend(60, SECONDS); } } }); diff --git a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java index 90ac6e779..a48a18193 100644 --- a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java +++ b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBean.java @@ -27,7 +27,9 @@ */ package cloud.piranha.test.coreprofile.distribution; +import jakarta.inject.Inject; import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.Produces; import jakarta.ws.rs.WebApplicationException; @@ -46,6 +48,12 @@ @Path("/sse") public class SseBean { + /** + * Stores the broadcaster. + */ + @Inject + private SseBroadcastBean broadcastBean; + /** * Stores the SSE context. */ @@ -75,4 +83,26 @@ public void string(@Context SseEventSink eventSink) { } }).start(); } + + /** + * Perform a SSE Broadcast. + * + * @param message the message to broadcast. + */ + @Path("broadcast") + @POST + public void broadcast(String message) { + broadcastBean.broadcast("Message"); + } + + /** + * Register to receive messages. + * + * @param eventSink the event sink. + */ + @Path("register") + @GET + public void register(@Context SseEventSink eventSink) { + broadcastBean.register(eventSink); + } } diff --git a/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java new file mode 100644 index 000000000..7f82e2805 --- /dev/null +++ b/test/coreprofile/integration/src/main/java/cloud/piranha/test/coreprofile/distribution/SseBroadcastBean.java @@ -0,0 +1,81 @@ +/* + * Copyright (c) 2002-2024 Manorrock.com. All Rights Reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are met: + * + * 1. Redistributions of source code must retain the above copyright notice, + * this list of conditions and the following disclaimer. + * 2. Redistributions in binary form must reproduce the above copyright + * notice, this list of conditions and the following disclaimer in the + * documentation and/or other materials provided with the distribution. + * 3. Neither the name of the copyright holder nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" + * AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE + * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE + * ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE + * LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR + * CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF + * SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS + * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN + * CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) + * ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE + * POSSIBILITY OF SUCH DAMAGE. + */ +package cloud.piranha.test.coreprofile.distribution; + +import jakarta.enterprise.context.ApplicationScoped; +import jakarta.ws.rs.core.Context; +import jakarta.ws.rs.sse.OutboundSseEvent; +import jakarta.ws.rs.sse.Sse; +import jakarta.ws.rs.sse.SseEventSink; +import java.util.List; +import java.util.concurrent.CopyOnWriteArrayList; + +/** + * The single and one and only SSE broadcast bean. + * + * @author Manfred Riem (mriem@manorrock.com) + */ +@ApplicationScoped +public class SseBroadcastBean { + + /** + * List of SSE event sinks. + */ + private final List sinks = new CopyOnWriteArrayList<>(); + + /** + * Store the SSE. + */ + @Context + private Sse sse; + + /** + * Register the given SSE event sink. + * + * @param sink the SSE event sink. + */ + public void register(SseEventSink sink) { + sinks.add(sink); + } + + /** + * Broadcast the given message 10 times. + * + * @param message the message. + */ + public void broadcast(String message) { + for (int i = 1; i <= 10; i++) { + String eventMessage = message + " #" + i; + OutboundSseEvent event = sse.newEventBuilder() + .name("message") + .data(String.class, eventMessage) + .build(); + sinks.forEach(sink -> sink.send(event)); + } + } +} diff --git a/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java b/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java index a945ba275..2bc076090 100644 --- a/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java +++ b/test/coreprofile/integration/src/test/java/cloud/piranha/test/coreprofile/distribution/SseIT.java @@ -31,8 +31,18 @@ import java.net.http.HttpClient; import java.net.http.HttpRequest; import java.net.http.HttpResponse; +import java.net.http.HttpResponse.BodyHandlers; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.junit.jupiter.api.Assertions.fail; + +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -59,4 +69,53 @@ void testSseString() throws Exception { assertNotNull(response.body()); assertTrue(response.body().contains("data: Event 4")); } + + /** + * Test SSE broadcast. + * + * @throw Exception when a serious error occurs. + */ + @Disabled + @Test + void testSseBroadcast() throws Exception { + + List messages = new ArrayList<>(); + HttpClient client = HttpClient.newBuilder() + .connectTimeout(Duration.ofSeconds(120)) + .build(); + + /* + * Register client and collect events. + */ + CompletableFuture future = client.sendAsync(HttpRequest.newBuilder() + .uri(new URI(baseUrl + "/sse/register")) + .build(), BodyHandlers.ofLines()) + .thenAccept(response -> response.body().forEach(message -> { + messages.add(message); + System.out.println("Received message: " + message); + })); + + /* + * Simulate server broadcast. + */ + client.send(HttpRequest.newBuilder() + .uri(new URI(baseUrl + "/sse/broadcast")) + .POST(HttpRequest.BodyPublishers.ofString("Broadcast message")) + .build(), HttpResponse.BodyHandlers.ofString()); + + /* + * Wait for the future to complete. + */ + try { + future.get(10, TimeUnit.SECONDS); + } catch (Exception e) { + fail("Test timed out"); + } + + /* + * Check if we have received 10 events. + */ + assertEquals(10, messages.size(), "Should have received 10 events"); + messages.forEach(System.out::println); + } }