Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Свистухин Андрей, ИТМО DWS, Stage 5 #186

Merged
merged 58 commits into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from 50 commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
ca6190a
[lab-1] init with reference dao and ok tests
Feb 18, 2024
c0ffd6a
[lab-1] changed to my own dao impl
Feb 18, 2024
8ce7866
[lab-1] reformat
Feb 18, 2024
1eae6e8
[lab-1] results
Feb 20, 2024
1ec14c5
[lab-1] results
Feb 20, 2024
150539b
[lab-1] results
Feb 20, 2024
b1cb543
Merge branch 'main' into lab2
Feb 28, 2024
9de63be
[lab-2] init
Feb 28, 2024
10f916f
[lab-2] init
Feb 28, 2024
cd4d836
[lab-2] init
Feb 29, 2024
0ddd946
[lab-2] rollback reference dao
Feb 29, 2024
b5c5ec5
[lab-2] init
Feb 29, 2024
8496b42
Merge branch 'main' into lab2
Feb 29, 2024
f166923
[lab-2] someone has flack test?
Feb 29, 2024
f5a57f0
[lab-2] stage 2
Feb 29, 2024
9b53f9b
[lab-2] rollback andreycheshev package
Feb 29, 2024
c3af33d
Merge remote-tracking branch 'upstream/main' into lab3
Mar 12, 2024
af02092
[lab-3] init
Mar 14, 2024
544da9f
Merge branch 'polis-vk:main' into lab3
stormrvge Mar 14, 2024
2902abc
Merge remote-tracking branch 'upstream/main' into lab3
Mar 14, 2024
c27a160
[lab-3] codeclimate
Mar 14, 2024
f16fb11
Merge remote-tracking branch 'origin/lab3' into lab3
Mar 14, 2024
5591ca1
[lab-3] reports
Mar 20, 2024
4a4b53c
[lab-3] reports
Mar 20, 2024
07237aa
Merge branch 'main' into lab3
stormrvge Mar 25, 2024
59c1274
[lab-3] fix by comments
Mar 25, 2024
fe68b7e
[lab-3] codeclimate
Mar 28, 2024
ebee01f
Merge remote-tracking branch 'upstream/main' into lab4
Mar 28, 2024
3f9943e
[lab-4] dao with timestamp
Mar 28, 2024
8ae8aae
[lab-4] init
Mar 28, 2024
2e65dfc
[lab-4] codeclimate
Mar 28, 2024
8c3c057
[lab-4] codeclimate
Mar 28, 2024
5cbc5d5
[lab-4] codeclimate
Mar 28, 2024
0f19d62
[lab-4] codeclimate
Mar 28, 2024
7c1e615
[lab-4] codeclimate
Mar 28, 2024
161bf45
[lab-4] codeclimate
Mar 28, 2024
1a60fe7
[lab-4] reports
Apr 2, 2024
863dacc
Merge branch 'main' into lab4
stormrvge Apr 3, 2024
2bf673f
[lab-4] fix imports after merge
Apr 3, 2024
f135715
[lab-4] report
Apr 3, 2024
8b0931e
Merge remote-tracking branch 'upstream/main' into lab5
Apr 10, 2024
f1664da
[lab-5] v1
Apr 10, 2024
defaa55
[lab-5] v2?
Apr 10, 2024
bff9239
[lab-5] codeclimate
Apr 11, 2024
3425ed6
[lab-5] only asvistukhin package
Apr 11, 2024
569a0fb
Revert "[lab-5] only asvistukhin package"
Apr 11, 2024
2d6c579
Merge branch 'main' into lab5
incubos Apr 13, 2024
ae7f7ca
Merge branch 'main' into lab5
stormrvge Apr 14, 2024
b503ee1
Merge branch 'main' into lab5
stormrvge Apr 15, 2024
834b5ee
[lab-5] reports
Apr 17, 2024
d8814f6
[lab-5] reports
Apr 17, 2024
c2310c7
[lab-5] reports
Apr 17, 2024
6dff63f
Merge branch 'main' into lab5
stormrvge Apr 25, 2024
405f6aa
[lab-5] fix codeclimate after resolving merge conflict
Apr 25, 2024
c10f085
[lab-5] fixes
May 1, 2024
0aa4829
[lab-5] fixes
May 1, 2024
ee6c565
[lab-5] fixes
May 1, 2024
6e0d9f4
Merge branch 'main' into lab5
atimofeyev May 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 84 additions & 56 deletions src/main/java/ru/vk/itmo/test/asvistukhin/ProxyRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,89 +7,117 @@
import org.slf4j.LoggerFactory;
import ru.vk.itmo.ServiceConfig;

import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;

public class ProxyRequestHandler {
private static final Logger log = LoggerFactory.getLogger(ProxyRequestHandler.class);

private final HashMap<String, HttpClient> nodes;
private final SortedMap<Integer, String> hashRing = new TreeMap<>();
private final String selfUrl;
private final Map<String, HttpClient> clients;
private final Map<String, Integer> urlHashes;

public ProxyRequestHandler(ServiceConfig serviceConfig) {
this.nodes = new HashMap<>(serviceConfig.clusterUrls().size());
this.selfUrl = serviceConfig.selfUrl();

for (int i = 0; i < serviceConfig.clusterUrls().size(); i++) {
String url = serviceConfig.clusterUrls().get(i);
nodes.put(url, HttpClient.newHttpClient());
int hash = Hash.murmur3(url);
hashRing.put(hash, url);
this.clients = new HashMap<>();
for (String url : serviceConfig.clusterUrls()) {
if (!Objects.equals(url, serviceConfig.selfUrl())) {
clients.put(url, HttpClient.newHttpClient());
}
}
this.urlHashes = serviceConfig.clusterUrls().stream()
.collect(Collectors.toMap(url -> url, Hash::murmur3));
}

public synchronized void close() {
nodes.values().forEach(HttpClient::close);
clients.values().forEach(HttpClient::close);
}

public Response proxyRequest(Request request) {
String id = request.getParameter("id=");
String nodeUrl = getNodeByKey(id);
public void proxyRequests(
Request request,
List<String> nodeUrls,
int ack,
List<CompletableFuture<Response>> futures,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Очень плохой паттерн: передавать в параметрах IN и OUT переменные. Лучше стараться так организовать код, чтобы в пармтерах были только IN, а OUT было в return.
И лучше чтобы это было что-то одно

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нужно было эти futures возвращать return'ом и дальше записывать их в структуру которая лежит по стеку вызовов выше? Я просто подумал, что может быть из-за этого у меня и сходил с ума GC. Но кажется, проблема была всё-таки не в этом

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

да, я бы сделал так, хотя понял что вы пробовали уменьшить число аллокаций

List<Response> collectedResponses,
AtomicInteger unsuccessfulResponsesCount
) {
AtomicInteger responsesCollected = new AtomicInteger();

HttpRequest.BodyPublisher bodyPublisher = request.getBody() == null
? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(request.getBody());
for (String url : nodeUrls) {
if (unsuccessfulResponsesCount.get() >= ack) {
futures.add(CompletableFuture.completedFuture(null));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут наверное планировалось поставить return?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

если учитывать Ваш коммент выше, то получается так))

}

URI uri = URI.create(nodeUrl + request.getPath() + "?id=" + id);
log.debug("Proxy request to {}", uri);
try {
HttpResponse<byte[]> response = nodes.get(nodeUrl).send(
HttpRequest.newBuilder()
.uri(uri)
.method(request.getMethodName(), bodyPublisher)
.build(),
HttpResponse.BodyHandlers.ofByteArray()
);
return parseResponse(response);
} catch (InterruptedException ex) {
log.error("Proxy request thread interrupted", ex);
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (IOException ex) {
log.error("IOException during proxy request to another node", ex);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
CompletableFuture<Response> futureResponse = proxyRequest(request, url);
CompletableFuture<Response> resultFuture = futureResponse.thenApply(response -> {
boolean success = ServerImpl.isSuccessProcessed(response.getStatus());
if (success && responsesCollected.getAndIncrement() < ack) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

тут что-то не то, на SUCCESS ответ, который будет ack+1 мы почему-то взведем unsuccessfulResponsesCount

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Да, проглядел этот момент. Исправлю.

collectedResponses.add(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

здесь идет незащищенное обращение к коллекции из разных потоков, должны быть ошибки

} else {
unsuccessfulResponsesCount.incrementAndGet();
}
return response;
});
futures.add(resultFuture);
}
}

public boolean isNeedProxy(String id) {
return !getNodeByKey(id).equals(selfUrl);
private CompletableFuture<Response> proxyRequest(Request request, String proxiedNodeUrl) {
String id = request.getParameter("id=");
byte[] body = request.getBody();
URI uri = URI.create(proxiedNodeUrl + request.getPath() + "?id=" + id);

HttpRequest.Builder requestBuilder = HttpRequest.newBuilder()
.uri(uri)
.method(
request.getMethodName(),
body == null ? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(body)
)
.header(RequestWrapper.SELF_HEADER, "true");

CompletableFuture<HttpResponse<byte[]>> httpResponseFuture = clients.get(proxiedNodeUrl)
.sendAsync(requestBuilder.build(), HttpResponse.BodyHandlers.ofByteArray());

return httpResponseFuture.thenApply(httpResponse -> {
Response response = new Response(proxyResponseCode(httpResponse), httpResponse.body());
long timestamp = Long.parseLong(
httpResponse.headers()
.firstValue(RequestWrapper.NIO_TIMESTAMP_HEADER)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а эта бесполезная штука так и переехала дальше в этот стейдж?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Нет, она не бесполезная. В ServerImpl идет проверка, есть ли этот хедер или нет

            String timestamp = response.getHeader(RequestWrapper.NIO_TIMESTAMP_STRING_HEADER);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ага, я ошибся

.orElse("0")
);
response.addHeader(RequestWrapper.NIO_TIMESTAMP_STRING_HEADER + timestamp);
return response;
}).exceptionally(ex -> {
log.error("Exception during proxy request to another node", ex);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
});
}

private Response parseResponse(HttpResponse<byte[]> response) {
String responseHttpCode = switch (response.statusCode()) {
case 200 -> Response.OK;
case 201 -> Response.CREATED;
case 202 -> Response.ACCEPTED;
case 400 -> Response.BAD_REQUEST;
case 404 -> Response.NOT_FOUND;
case 405 -> Response.METHOD_NOT_ALLOWED;
case 503 -> Response.SERVICE_UNAVAILABLE;
private String proxyResponseCode(HttpResponse<byte[]> response) {
return switch (response.statusCode()) {
case HttpURLConnection.HTTP_OK -> Response.OK;
case HttpURLConnection.HTTP_CREATED -> Response.CREATED;
case HttpURLConnection.HTTP_ACCEPTED -> Response.ACCEPTED;
case HttpURLConnection.HTTP_BAD_REQUEST -> Response.BAD_REQUEST;
case HttpURLConnection.HTTP_NOT_FOUND -> Response.NOT_FOUND;
default -> Response.INTERNAL_ERROR;
};

return new Response(responseHttpCode, response.body());
}

private String getNodeByKey(String id) {
int hash = Hash.murmur3(id);
SortedMap<Integer, String> tailMap = hashRing.tailMap(hash);
hash = tailMap.isEmpty() ? hashRing.firstKey() : tailMap.firstKey();
return hashRing.get(hash);
public List<String> getNodesByHash(int numOfNodes) {
return urlHashes.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(numOfNodes)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
}
99 changes: 99 additions & 0 deletions src/main/java/ru/vk/itmo/test/asvistukhin/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
package ru.vk.itmo.test.asvistukhin;

import one.nio.http.Param;
import one.nio.http.Request;
import one.nio.http.Response;
import ru.vk.itmo.test.asvistukhin.dao.Dao;
import ru.vk.itmo.test.asvistukhin.dao.TimestampEntry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

public class RequestHandler {
private final Dao<MemorySegment, TimestampEntry<MemorySegment>> dao;

public RequestHandler(Dao<MemorySegment, TimestampEntry<MemorySegment>> dao) {
this.dao = dao;
}

public Response handle(Request request) {
String id = request.getParameter("id=");
return switch (request.getMethod()) {
case Request.METHOD_GET -> get(id);
case Request.METHOD_PUT -> put(id, request);
case Request.METHOD_DELETE -> delete(id);
default -> new Response(Response.BAD_REQUEST, Response.EMPTY);
};
}

public void handle(
Request request,
List<CompletableFuture<Response>> futures,
List<Response> collectedResponses,
AtomicInteger unsuccessfulResponsesCount
) {
futures.add(new CompletableFuture<Response>().completeAsync(() -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а тут мы будем использовать commonPool, что плохо, лучше пользоваться своим пулом

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А нам ведь нужно создать отдельный пул под это, а не использовать пул из ServerImpl?

Response response = handle(request);
if (ServerImpl.isSuccessProcessed(response.getStatus())) {
collectedResponses.add(response);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

здесь идет незащищенное обращение к коллекции из разных потоков, должны быть ошибки

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

а, и правда
я почему-то подумал, что можно и обычный arraylist сделать, так как у нас для каждого запроса будет создаваться свой лист, но проблема будет именно из-за того, что в разных потоках добавляем туда
изначально у меня и было на скиплисте, но что-то я решил поменять на arraylist

Copy link
Contributor Author

@stormrvge stormrvge May 6, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А, нет, я на ArrayList не менял
Я передаю в метод

        List<Response> validResponses = new CopyOnWriteArrayList<>();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

хорошо, значит я проглядел

} else {
unsuccessfulResponsesCount.incrementAndGet();
}

return response;
}));
}

public Response get(@Param(value = "id", required = true) String id) {
if (RequestWrapper.isEmptyParam(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

TimestampEntry<MemorySegment> entry = dao.get(MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8)));

Response response;
long timestamp;
if (entry == null || entry.value() == null) {
timestamp = (entry != null ? entry.timestamp() : 0);
response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(RequestWrapper.TIMESTAMP_STRING_HEADER + timestamp);
} else {
timestamp = entry.timestamp();
response = Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
response.addHeader(RequestWrapper.TIMESTAMP_STRING_HEADER + timestamp);
}

return response;
}

public Response put(@Param(value = "id", required = true) String id, Request request) {
if (RequestWrapper.isEmptyParam(id) || RequestWrapper.isEmptyRequest(request)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
MemorySegment value = MemorySegment.ofArray(request.getBody());
dao.upsert(new TimestampEntry<>(key, value, System.currentTimeMillis()));

return new Response(Response.CREATED, Response.EMPTY);
}

public Response delete(@Param(value = "id", required = true) String id) {
if (RequestWrapper.isEmptyParam(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

dao.upsert(
new TimestampEntry<>(
MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8)),
null,
System.currentTimeMillis())
);

return new Response(Response.ACCEPTED, Response.EMPTY);
}
}
44 changes: 44 additions & 0 deletions src/main/java/ru/vk/itmo/test/asvistukhin/RequestWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ru.vk.itmo.test.asvistukhin;

import one.nio.http.Request;

public class RequestWrapper {
private static final String ID_PARAM = "id=";
private static final String FROM_PARAM = "from=";
private static final String ACK_PARAM = "ack=";
public static final String SELF_HEADER = "X-Self";
public static final String NIO_TIMESTAMP_HEADER = "x-timestamp";
public static final String TIMESTAMP_STRING_HEADER = "X-Timestamp:";
public static final String NIO_TIMESTAMP_STRING_HEADER = "x-timestamp:";

public final String id;
public final int from;
public final int ack;

public RequestWrapper(Request request, int clusterSize) throws IllegalArgumentException {
String idString = request.getParameter(ID_PARAM);
String fromString = request.getParameter(FROM_PARAM);
String ackString = request.getParameter(ACK_PARAM);
if (isEmptyParam(idString)) throw new IllegalArgumentException();

id = idString;
try {
from = isEmptyParam(fromString) ? clusterSize : Integer.parseInt(fromString);
ack = isEmptyParam(ackString) ? (from + 1) / 2 : Integer.parseInt(ackString);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(ex);
}

if (from <= 0 || ack <= 0 || from < ack || clusterSize < from) {
throw new IllegalArgumentException();
}
}

public static boolean isEmptyParam(String param) {
return param == null || param.isEmpty();
}

public static boolean isEmptyRequest(Request request) {
return request.getBody() == null;
}
}
Loading
Loading