Skip to content

Commit

Permalink
Fixes piranhacloud#4403 - Fix GrizzlyServer to not suspend indefinite…
Browse files Browse the repository at this point in the history
…ly for async requests (piranhacloud#4404)
  • Loading branch information
mnriem authored Dec 15, 2024
1 parent cb65ecf commit 3c40518
Show file tree
Hide file tree
Showing 4 changed files with 171 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void service(Request request, Response response) throws Exception {
GrizzlyHttpServerResponse gResponse = new GrizzlyHttpServerResponse(response);
HttpServerProcessorEndState state = httpServerProcessor.process(gRequest, gResponse);
if (state == ASYNCED) {
response.suspend();
response.suspend(60, SECONDS);
}
}
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
*/
package cloud.piranha.test.coreprofile.distribution;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.WebApplicationException;
Expand All @@ -46,6 +48,12 @@
@Path("/sse")
public class SseBean {

/**
* Stores the broadcaster.
*/
@Inject
private SseBroadcastBean broadcastBean;

/**
* Stores the SSE context.
*/
Expand Down Expand Up @@ -75,4 +83,26 @@ public void string(@Context SseEventSink eventSink) {
}
}).start();
}

/**
* Perform a SSE Broadcast.
*
* @param message the message to broadcast.
*/
@Path("broadcast")
@POST
public void broadcast(String message) {
broadcastBean.broadcast("Message");
}

/**
* Register to receive messages.
*
* @param eventSink the event sink.
*/
@Path("register")
@GET
public void register(@Context SseEventSink eventSink) {
broadcastBean.register(eventSink);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
/*
* Copyright (c) 2002-2024 Manorrock.com. All Rights Reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are met:
*
* 1. Redistributions of source code must retain the above copyright notice,
* this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in the
* documentation and/or other materials provided with the distribution.
* 3. Neither the name of the copyright holder nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
* ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
* LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
* CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
* SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
* CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*/
package cloud.piranha.test.coreprofile.distribution;

import jakarta.enterprise.context.ApplicationScoped;
import jakarta.ws.rs.core.Context;
import jakarta.ws.rs.sse.OutboundSseEvent;
import jakarta.ws.rs.sse.Sse;
import jakarta.ws.rs.sse.SseEventSink;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

/**
* The single and one and only SSE broadcast bean.
*
* @author Manfred Riem ([email protected])
*/
@ApplicationScoped
public class SseBroadcastBean {

/**
* List of SSE event sinks.
*/
private final List<SseEventSink> sinks = new CopyOnWriteArrayList<>();

/**
* Store the SSE.
*/
@Context
private Sse sse;

/**
* Register the given SSE event sink.
*
* @param sink the SSE event sink.
*/
public void register(SseEventSink sink) {
sinks.add(sink);
}

/**
* Broadcast the given message 10 times.
*
* @param message the message.
*/
public void broadcast(String message) {
for (int i = 1; i <= 10; i++) {
String eventMessage = message + " #" + i;
OutboundSseEvent event = sse.newEventBuilder()
.name("message")
.data(String.class, eventMessage)
.build();
sinks.forEach(sink -> sink.send(event));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,18 @@
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.net.http.HttpResponse.BodyHandlers;
import java.time.Duration;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.junit.jupiter.api.Assertions.fail;

import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

/**
Expand All @@ -59,4 +69,53 @@ void testSseString() throws Exception {
assertNotNull(response.body());
assertTrue(response.body().contains("data: Event 4"));
}

/**
* Test SSE broadcast.
*
* @throw Exception when a serious error occurs.
*/
@Disabled
@Test
void testSseBroadcast() throws Exception {

List<String> messages = new ArrayList<>();
HttpClient client = HttpClient.newBuilder()
.connectTimeout(Duration.ofSeconds(120))
.build();

/*
* Register client and collect events.
*/
CompletableFuture<Void> future = client.sendAsync(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/register"))
.build(), BodyHandlers.ofLines())
.thenAccept(response -> response.body().forEach(message -> {
messages.add(message);
System.out.println("Received message: " + message);
}));

/*
* Simulate server broadcast.
*/
client.send(HttpRequest.newBuilder()
.uri(new URI(baseUrl + "/sse/broadcast"))
.POST(HttpRequest.BodyPublishers.ofString("Broadcast message"))
.build(), HttpResponse.BodyHandlers.ofString());

/*
* Wait for the future to complete.
*/
try {
future.get(10, TimeUnit.SECONDS);
} catch (Exception e) {
fail("Test timed out");
}

/*
* Check if we have received 10 events.
*/
assertEquals(10, messages.size(), "Should have received 10 events");
messages.forEach(System.out::println);
}
}

0 comments on commit 3c40518

Please sign in to comment.