Skip to content

Commit

Permalink
Смирнов Андрей ИТМО КТ Stage 5 (#181)
Browse files Browse the repository at this point in the history
* added first realization

* added wrk2

* remove long line

* style fix

* style fix1

* added profiling

* added stage 2

* codeclimate fix + stage2.md

* codeclimate fix1

* fixes in code provided

* codeclimate fix + added comparing

* images in md added

* executor fix

* style fix

* added first realization

* added better stage readme

* fixes with asyncSend

* removed unused implements

* removed unused import

* added realization

* fixes

* tests passed

* style fix

* style fix1

* style fix2

* flopping

* flopping1

* added first realization

* codeclimate fix

* codeclimate fix1

* codeclimate fix2

* changes provided and readme added

* codeclimate fix

* build rollback

---------

Co-authored-by: Daniil Ushkov <[email protected]>
Co-authored-by: Vadim Tsesko <[email protected]>
Co-authored-by: sandrew2003 <[email protected]>
  • Loading branch information
4 people authored May 13, 2024
1 parent 8028e6a commit abf1e4e
Show file tree
Hide file tree
Showing 24 changed files with 14,516 additions and 95 deletions.
128 changes: 66 additions & 62 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServer.java
Original file line number Diff line number Diff line change
@@ -1,44 +1,39 @@
package ru.vk.itmo.test.smirnovandrew;

import one.nio.http.Header;
import one.nio.http.HttpClient;
import one.nio.http.HttpException;
import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.pool.PoolException;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.test.reference.dao.ReferenceDao;

import java.io.IOException;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.logging.Logger;
import java.util.stream.Collectors;

public class MyServer extends HttpServer {
private static final String ROOT = "/v0/entity";
private static final String X_SENDER_NODE = "X-SenderNode";
private static final String NOT_ENOUGH_REPLICAS = "504 Not Enough Replicas";
private static final long DURATION = 1000L;
private static final int OK_STATUS = 300;
private static final int NOT_FOUND_STATUS = 404;
private static final String HEADER_DELIMITER = ": ";
private final MyServerDao dao;
private final MyExecutor executor;
private final Logger logger;
private final Map<String, HttpClient> httpClients;
private final HttpClient httpClient;
private final RendezvousClusterManager rendezvousClustersManager;
private final ServiceConfig config;

Expand All @@ -64,15 +59,13 @@ public MyServer(
this.dao = new MyServerDao(dao);
this.executor = new MyExecutor(corePoolSize, availableProcessors);
this.logger = Logger.getLogger(MyServer.class.getName());
this.httpClients = config.clusterUrls().stream()
.filter(url -> !Objects.equals(url, config.selfUrl()))
.collect(Collectors.toMap(s -> s, MyServerUtil::createClient, (c, c1) -> c));
this.httpClient = HttpClient.newHttpClient();
}

@Override
public void handleRequest(Request request, HttpSession session) throws IOException {
try {
long exp = System.currentTimeMillis() + DURATION;
long exp = System.currentTimeMillis() + MyServerUtil.DURATION;
executor.execute(() -> {
try {
if (System.currentTimeMillis() > exp) {
Expand All @@ -98,27 +91,31 @@ private static int quorum(int from) {
return from / 2 + 1;
}

private Response sendToAnotherNode(
private HttpRequest toHttpRequest(Request request, String nodeUrl, String params) {
return HttpRequest.newBuilder(URI.create(nodeUrl + MyServerUtil.ROOT + "?" + params))
.method(request.getMethodName(), request.getBody() == null
? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(request.getBody()))
.setHeader(MyServerUtil.X_SENDER_NODE, config.selfUrl())
.build();
}

private CompletableFuture<Response> sendToAnotherNode(
Request request,
int ack,
int from,
String id,
String clusterUrl,
Function<MyServerDao, Response> operation
) {
if (Objects.equals(clusterUrl, config.selfUrl())) {
return operation.apply(dao);
return CompletableFuture.completedFuture(operation.apply(dao));
}

var httpClient = httpClients.get(clusterUrl);
var httpRequest = toHttpRequest(request, clusterUrl, String.format("id=%s&from=%d&ack=%d", id, from, ack));

try {
return httpClient.invoke(request);
} catch (InterruptedException e) {
logger.info(e.getMessage());
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (HttpException | IOException | PoolException e1) {
logger.info(e1.getMessage());
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
return httpClient.sendAsync(httpRequest, HttpResponse.BodyHandlers.ofByteArray())
.thenApplyAsync(MyServerUtil::processingResponse);
}

private Response handleLocalRequest(
Expand All @@ -128,15 +125,19 @@ private Response handleLocalRequest(
Integer ackParam,
String senderNode,
Function<MyServerDao, Response> operation
) {
Integer from = fromParam;
if (Objects.isNull(from)) {
) throws ExecutionException, InterruptedException, TimeoutException {
final int from;
if (Objects.isNull(fromParam)) {
from = config.clusterUrls().size();
} else {
from = fromParam;
}

Integer ack = ackParam;
if (Objects.isNull(ack)) {
final int ack;
if (Objects.isNull(ackParam)) {
ack = quorum(from);
} else {
ack = ackParam;
}

String paramError = getParametersError(id, from, ack);
Expand All @@ -154,26 +155,29 @@ private Response handleLocalRequest(
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

var sortedNodes = RendezvousClusterManager.getSortedNodes(from, config);
var sortedNodes = RendezvousClusterManager.getSortedNodes(id, from, config);

if (sortedNodes.stream().map(config.clusterUrls()::get).noneMatch(config.selfUrl()::equals)) {
return sendToAnotherNode(request, clusterUrl, operation);
return sendToAnotherNode(request, ack, from, id, clusterUrl, operation)
.get(MyServerUtil.DURATION, TimeUnit.MILLISECONDS);
}

request.addHeader(String.join(HEADER_DELIMITER, X_SENDER_NODE, config.selfUrl()));
var responses = new ArrayList<Response>();
for (int nodeNumber : sortedNodes) {
var r = sendToAnotherNode(request, config.clusterUrls().get(nodeNumber), operation);
if (r.getStatus() < OK_STATUS
|| (r.getStatus() == NOT_FOUND_STATUS && request.getMethod() == Request.METHOD_GET)) {
responses.add(r);
}
}

if (responses.size() < ack) {
return new Response(NOT_ENOUGH_REPLICAS, Response.EMPTY);
}
return MyServerUtil.getMaxTimestampResponse(responses);
var completableResults = sortedNodes.stream()
.map(nodeNumber -> sendToAnotherNode(
request,
ack,
from,
id,
config.clusterUrls().get(nodeNumber),
operation))
.toList();

return MyServerUtil.getResults(
from,
ack,
completableResults,
logger
);
}

private String getParametersError(String id, Integer from, Integer ack) {
Expand Down Expand Up @@ -201,15 +205,15 @@ private String getParametersError(String id, Integer from, Integer ack) {
return null;
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_GET)
public Response get(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -220,15 +224,15 @@ public Response get(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_DELETE)
public Response delete(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
return handleLocalRequest(
request,
id,
Expand All @@ -239,15 +243,15 @@ public Response delete(
);
}

@Path(ROOT)
@Path(MyServerUtil.ROOT)
@RequestMethod(Request.METHOD_PUT)
public Response put(
@Param(value = "id", required = true) String id,
@Param(value = "from") Integer from,
@Param(value = "ack") Integer ack,
@Header(value = X_SENDER_NODE) String senderNode,
@Header(value = MyServerUtil.X_SENDER_NODE) String senderNode,
Request request
) {
) throws ExecutionException, InterruptedException, TimeoutException {
request.addHeader("Content-Length: " + request.getBody().length);
request.setBody(request.getBody());

Expand Down Expand Up @@ -275,6 +279,6 @@ public void handleDefault(Request request, HttpSession session) throws IOExcepti
public synchronized void stop() {
this.executor.shutdown();
super.stop();
httpClients.values().forEach(HttpClient::close);
httpClient.close();
}
}
4 changes: 2 additions & 2 deletions src/main/java/ru/vk/itmo/test/smirnovandrew/MyServerDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,11 @@ Response getEntryFromDao(String id) {
ValWithTime valueWithTimestamp = byteArrayToObject(entry.value().toArray(ValueLayout.JAVA_BYTE));
if (valueWithTimestamp.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
}
Response response = new Response(Response.OK, valueWithTimestamp.value());
response.addHeader(MyServerUtil.X_TIMESTAMP + valueWithTimestamp.timestamp());
response.addHeader("X-Timestamp: " + valueWithTimestamp.timestamp());
return response;
} catch (IOException | ClassNotFoundException e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
Expand Down
Loading

0 comments on commit abf1e4e

Please sign in to comment.