Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

6 stage - Emelyanov Vitaliy, SPbSTU #215

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from 27 commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
0af7602
Stage 2
Feb 28, 2024
2a91217
Report 4 2nd stage
Mar 6, 2024
58a3b0a
3rd stage
Mar 13, 2024
e867ae3
2nd stage fix after review
Mar 17, 2024
6a3a352
3nd stage fix after pre-review
Mar 20, 2024
6937aa0
3rd stage report
Mar 20, 2024
5d1dce7
Add forgotten markdown file
Mar 23, 2024
089100c
Copy reference dao to own dir
Mar 23, 2024
e80716b
Add timestamps to dao
Mar 23, 2024
29a1e50
Merge branch 'main' of github.com:polis-vk/2024-highload-dht into 4_s…
Mar 25, 2024
81b68f7
4th stage
Mar 27, 2024
5ab3f3e
4th stage stylefix
Mar 27, 2024
9b9fbb0
Merge branch 'main' into 4_stage
incubos Mar 28, 2024
d2d4e0c
4th stage report
Apr 3, 2024
8d5e4f5
Merge branch '4_stage' of github.com:GenryEden/2024-highload-dht into…
Apr 3, 2024
356df3c
5th stage
Apr 8, 2024
4355090
Merge branch 'main' of github.com:polis-vk/2024-highload-dht into 5_s…
Apr 8, 2024
f21cad5
5th stage - fix reused connection to old server
Apr 8, 2024
7d6d6f1
5th stage - stylefix
Apr 8, 2024
4dc932a
Merge branch 'main' into 5_stage
incubos Apr 13, 2024
621996c
5th stage - report
Apr 17, 2024
f9471d3
Merge branch '5_stage' of github.com:GenryEden/2024-highload-dht into…
Apr 17, 2024
022c576
5th stage - report stylefix
Apr 17, 2024
804e0be
6th stage
Apr 24, 2024
d5107dd
Merge upstream
Apr 24, 2024
493e151
6th stage report and fixes
Apr 24, 2024
fbd1aa9
Merge branch 'main' into 6_stage
incubos Apr 28, 2024
9957f41
Merge branch 'main' into 6_stage
lamtev May 9, 2024
1e41ced
Merge branch 'main' into 6_stage
incubos May 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ compileTestJava {
}

application {
mainClass = 'ru.vk.itmo.test.reference.ReferenceService'
mainClass = 'ru.vk.itmo.test.emelyanovvitaliy.TestServerRunner'
applicationDefaultJvmArgs = ['-Xmx128m', '--enable-preview', '-Xlog:gc']
// + ['-XX:+UseSerialGC']
//
Expand Down
38 changes: 38 additions & 0 deletions src/main/java/ru/vk/itmo/test/emelyanovvitaliy/BoolMerger.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package ru.vk.itmo.test.emelyanovvitaliy;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class BoolMerger implements Merger<Boolean> {
private final int from;
private final int ack;
private final AtomicInteger answeredOk;
private final AtomicInteger answeredNotOk;
private final CompletableFuture<Boolean> futureToComplete;

public BoolMerger(int from, int ack) {
this.from = from;
this.ack = ack;
answeredOk = new AtomicInteger(0);
answeredNotOk = new AtomicInteger(0);
futureToComplete = new CompletableFuture<>();
}

@Override
public void acceptResult(Boolean success, Throwable throwable) {
if (success) {
if (answeredOk.incrementAndGet() == ack) {
futureToComplete.complete(true);
}
} else {
if (answeredNotOk.incrementAndGet() == from - ack + 1) {
futureToComplete.complete(false);
}
}
}

@Override
public CompletableFuture<Boolean> getCompletableFuture() {
return futureToComplete;
}
}
22 changes: 22 additions & 0 deletions src/main/java/ru/vk/itmo/test/emelyanovvitaliy/ByteQueueItem.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.vk.itmo.test.emelyanovvitaliy;

import one.nio.net.Session;
import one.nio.net.Socket;

import java.io.IOException;
import java.util.Arrays;

public class ByteQueueItem extends Session.QueueItem {
private final byte[] bytes;

public ByteQueueItem(byte[] bytes) {
this.bytes = Arrays.copyOf(bytes, bytes.length);
// я не хотел копировать тут, меня заставил код стайл(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

как вариант, если есть уверенность в безопасности, можно было бы попробовать вынести в функцию типа byte[] haha(byte[] b) { return b; } или добавить @SuppressWarning

}

@Override
public int write(Socket socket) throws IOException {
socket.write(bytes, 0, bytes.length);
return bytes.length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.CompletableFuture;

public abstract class DaoMediator {
public static final long NEVER_TIMESTAMP = Long.MIN_VALUE;
Expand All @@ -23,13 +24,13 @@ protected static byte[] valueFor(Entry<MemorySegment> entry) {

abstract boolean isStopped();

abstract boolean put(Request request);
abstract CompletableFuture<Boolean> put(Request request);

// returns null if error,
// entry with null value and timestamp if the answer is tombstone
// entry with null value and timestamp equals NEVER_TIMESTAMP if no answer
// entry if it exists
abstract TimestampedEntry<MemorySegment> get(Request request);
abstract CompletableFuture<TimestampedEntry<MemorySegment>> get(Request request);

abstract boolean delete(Request request);
abstract CompletableFuture<Boolean> delete(Request request);
}
168 changes: 111 additions & 57 deletions src/main/java/ru/vk/itmo/test/emelyanovvitaliy/DhtServer.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,45 +17,44 @@
import java.io.UncheckedIOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Logger;

import static one.nio.http.Request.METHOD_DELETE;
import static one.nio.http.Request.METHOD_GET;
import static one.nio.http.Request.METHOD_PUT;

public class DhtServer extends HttpServer {
public static final String NOT_ENOUGH_REPLICAS_STATUS = "504 Not Enough Replicas";
public static final String TIMESTAMP_HEADER = "X-Timestamp: ";
public static final String TIMESTAMP_HEADER = "X-Timestamp";
public static final String ID_KEY = "id=";
protected static final int THREADS_PER_PROCESSOR = 2;
protected static final byte[] EMPTY_BODY = new byte[0];
public static final String START_KEY = "start=";
public static final String END_KEY = "end=";
protected static final int THREADS_PER_PROCESSOR = 1;
protected static final long KEEP_ALIVE_TIME_MILLIS = 1000;
protected static final int REQUEST_TIMEOUT_MILLIS = 1024;
protected static final int THREAD_POOL_TERMINATION_TIMEOUT_SECONDS = 600;
protected static final int TASK_QUEUE_SIZE = Runtime.getRuntime().availableProcessors() * THREADS_PER_PROCESSOR;
private static final Logger LOGGER = Logger.getLogger(DhtServer.class.getName());
private static final String FUTURE_RETURN_VALUE_IGNORED = "FutureReturnValueIgnored";
protected final MergeDaoMediator mergeDaoMediator;
protected final AtomicInteger threadsInPool;
protected final ThreadPoolExecutor threadPoolExecutor;

public DhtServer(ServiceConfig config) throws IOException {
super(createConfig(config));
mergeDaoMediator = new MergeDaoMediator(config.workingDir(), config.selfUrl(), config.clusterUrls());
threadsInPool = new AtomicInteger(0);
AtomicInteger threadsInPool = new AtomicInteger(0);
threadPoolExecutor = new ThreadPoolExecutor(
Runtime.getRuntime().availableProcessors() * THREADS_PER_PROCESSOR,
Runtime.getRuntime().availableProcessors() * THREADS_PER_PROCESSOR,
KEEP_ALIVE_TIME_MILLIS,
TimeUnit.MILLISECONDS,
KEEP_ALIVE_TIME_MILLIS, TimeUnit.MILLISECONDS,
new ArrayBlockingQueue<>(TASK_QUEUE_SIZE),
r -> {
Thread t = new Thread(r);
t.setDaemon(true);
t.setName("DhtServerThreadPool-Thread-" + threadsInPool.incrementAndGet());
return t;
}
r -> new Thread(r, "DhtServerThreadPool-Thread-" + threadsInPool.incrementAndGet())
);
}

Expand All @@ -65,7 +64,7 @@ public synchronized void stop() {
try {
threadPoolExecutor.shutdown();
if (!threadPoolExecutor.awaitTermination(THREAD_POOL_TERMINATION_TIMEOUT_SECONDS, TimeUnit.SECONDS)) {
throw new UncheckedTimeoutException("Waited too lot to stop the thread pool");
threadPoolExecutor.shutdownNow();
}
mergeDaoMediator.stop();
} catch (InterruptedException e) {
Expand All @@ -74,73 +73,131 @@ public synchronized void stop() {
}
}

@SuppressWarnings(FUTURE_RETURN_VALUE_IGNORED)
@RequestMethod(METHOD_GET)
@Path("/v0/entity")
public void entity(HttpSession session, Request request) throws IOException {
String id = request.getParameter(ID_KEY);
requestProccessing(id, session,
() -> {
try {
TimestampedEntry<MemorySegment> entry = mergeDaoMediator.get(request);
if (entry == null) {
sendNotEnoughReplicas(session);
} else if (entry.timestamp() == DaoMediator.NEVER_TIMESTAMP) {
session.sendResponse(new Response(Response.NOT_FOUND, EMPTY_BODY));
} else {
Response response;
if (entry.value() == null) {
response = new Response(Response.NOT_FOUND, EMPTY_BODY);
} else {
response = new Response(
Response.OK,
((Entry<MemorySegment>) entry).value().toArray(ValueLayout.JAVA_BYTE)
);
}
response.addHeader(TIMESTAMP_HEADER + entry.timestamp());
session.sendResponse(response);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
CompletableFuture<TimestampedEntry<MemorySegment>> future = mergeDaoMediator.get(request);
future.whenCompleteAsync(
(entry, ex) -> {
try {
if (entry == null) {
sendNotEnoughReplicas(session);
} else if (entry.timestamp() == DaoMediator.NEVER_TIMESTAMP) {
session.sendResponse(new Response(Response.NOT_FOUND, Response.EMPTY));
} else {
Response response;
if (entry.value() == null) {
response = new Response(Response.NOT_FOUND, Response.EMPTY);
} else {
response = new Response(
Response.OK,
((Entry<MemorySegment>) entry).value()
.toArray(ValueLayout.JAVA_BYTE)
);
}
response.addHeader(TIMESTAMP_HEADER + ": " + entry.timestamp());
session.sendResponse(response);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, threadPoolExecutor
);
} catch (IllegalArgumentException e) {
sendBadRequestResponseUnchecked(session);
}
}
);
}

@SuppressWarnings(FUTURE_RETURN_VALUE_IGNORED)
@RequestMethod(METHOD_GET)
@Path("/v0/entities")
public void entities(@Param(value = "start") String start, HttpSession httpSession, Request request)
throws IOException {
requestProccessing(start, httpSession,
() -> mergeDaoMediator.getRange(request).whenCompleteAsync(
(result, ex) -> {
if (ex == null) {
try {
stream(httpSession, result);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
} else {
LOGGER.warning(ex.toString());
}
}, threadPoolExecutor
)
);
}

private static void stream(HttpSession httpSession, Iterator<TimestampedEntry<MemorySegment>> result)
throws IOException {
if (result == null) {
httpSession.sendResponse(new Response(Response.INTERNAL_ERROR, Response.EMPTY));
} else {
httpSession.write(new HttpChunkedHeaderQueueItem());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Можно ли было бы сделать константой этот объект с целью уменьшений количества аллокаций? И HttpChunkedEntry

Не приложены профили, к сожалению, не посмотреть, видно ли на них эти аллокации

while (result.hasNext()) {
httpSession.write(new HttpChunkedEntry(result.next()));
}
httpSession.write(new EmptyChunk());
httpSession.scheduleClose();
}
}

@SuppressWarnings(FUTURE_RETURN_VALUE_IGNORED)
@RequestMethod(METHOD_PUT)
@Path("/v0/entity")
public void putEntity(@Param(value = "id") String id, HttpSession httpSession, Request request) throws IOException {
requestProccessing(id, httpSession,
() -> {
try {
if (mergeDaoMediator.put(request)) {
httpSession.sendResponse(new Response(Response.CREATED, EMPTY_BODY));
} else {
sendNotEnoughReplicas(httpSession);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
mergeDaoMediator.put(request).whenCompleteAsync(
(success, ex) -> {
try {
if (success) {
httpSession.sendResponse(new Response(Response.CREATED, Response.EMPTY));
} else {
sendNotEnoughReplicas(httpSession);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}, threadPoolExecutor
);
} catch (IllegalArgumentException e) {
sendBadRequestResponseUnchecked(httpSession);
}
}
);
}

@SuppressWarnings(FUTURE_RETURN_VALUE_IGNORED)
@RequestMethod(METHOD_DELETE)
@Path("/v0/entity")
public void deleteEntity(@Param("id") String id, HttpSession httpSession, Request request) throws IOException {
requestProccessing(id, httpSession,
() -> {
try {
if (mergeDaoMediator.delete(request)) {
httpSession.sendResponse(new Response(Response.ACCEPTED, EMPTY_BODY));
} else {
sendNotEnoughReplicas(httpSession);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
mergeDaoMediator.delete(request).whenComplete(
(success, ex) -> {
try {
if (success) {
httpSession.sendResponse(new Response(Response.ACCEPTED, Response.EMPTY));
} else {
sendNotEnoughReplicas(httpSession);
}
} catch (IOException e) {
throw new UncheckedIOException(e);
}
}
);
} catch (IllegalArgumentException e) {
sendBadRequestResponseUnchecked(httpSession);
}
Expand All @@ -154,20 +211,20 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
if (requestMethod == METHOD_GET || requestMethod == METHOD_PUT || requestMethod == METHOD_DELETE) {
sendBadRequestResponse(session);
} else {
session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, EMPTY_BODY));
session.sendResponse(new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY));
}
}

private void requestProccessing(String id, HttpSession session, Runnable runnable) throws IOException {
if (isKeyIncorrect(id)) {
if (id == null || id.isEmpty()) {
sendBadRequestResponse(session);
} else {
long startTimeInMillis = System.currentTimeMillis();
threadPoolExecutor.execute(
() -> {
try {
if (System.currentTimeMillis() - startTimeInMillis > REQUEST_TIMEOUT_MILLIS) {
session.sendResponse(new Response(Response.PAYMENT_REQUIRED, EMPTY_BODY));
session.sendResponse(new Response(Response.PAYMENT_REQUIRED, Response.EMPTY));
return;
}
runnable.run();
Expand All @@ -180,11 +237,11 @@ private void requestProccessing(String id, HttpSession session, Runnable runnabl
}

private static void sendNotEnoughReplicas(HttpSession session) throws IOException {
session.sendResponse(new Response(NOT_ENOUGH_REPLICAS_STATUS, EMPTY_BODY));
session.sendResponse(new Response(NOT_ENOUGH_REPLICAS_STATUS, Response.EMPTY));
}

private static void sendBadRequestResponse(HttpSession httpSession) throws IOException {
httpSession.sendResponse(new Response(Response.BAD_REQUEST, EMPTY_BODY));
httpSession.sendResponse(new Response(Response.BAD_REQUEST, Response.EMPTY));
}

private void sendBadRequestResponseUnchecked(HttpSession session) {
Expand All @@ -195,16 +252,13 @@ private void sendBadRequestResponseUnchecked(HttpSession session) {
}
}

private static boolean isKeyIncorrect(String key) {
return key == null || key.isEmpty();
}

private static HttpServerConfig createConfig(ServiceConfig serviceConfig) {
HttpServerConfig config = new HttpServerConfig();
AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;
config.acceptors = new AcceptorConfig[] {acceptorConfig};
config.keepAlive = 1000;
config.closeSessions = true;
return config;
}
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/ru/vk/itmo/test/emelyanovvitaliy/EmptyChunk.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package ru.vk.itmo.test.emelyanovvitaliy;

import java.nio.charset.StandardCharsets;

public class EmptyChunk extends ByteQueueItem {
private static final byte[] HTTP_CHUNKED_HEADER = "0\r\n\r\n".getBytes(StandardCharsets.UTF_8);

public EmptyChunk() {
super(HTTP_CHUNKED_HEADER);
}
}

Loading
Loading