Skip to content

Commit

Permalink
Merge remote-tracking branch 'refs/remotes/upstream/main' into stage-5
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/java/ru/vk/itmo/test/shishiginstepan/lua_scripts/random_rw_requests.lua
#	src/main/java/ru/vk/itmo/test/shishiginstepan/server/DistributedDao.java
#	src/main/java/ru/vk/itmo/test/shishiginstepan/server/RemoteDaoNode.java
#	src/main/java/ru/vk/itmo/test/shishiginstepan/server/Server.java
#	src/main/java/ru/vk/itmo/test/shishiginstepan/service/DatabaseService.java
  • Loading branch information
Степан committed Apr 24, 2024
2 parents 01b4134 + 2b4c0fb commit 23256f2
Show file tree
Hide file tree
Showing 389 changed files with 321,352 additions and 3,121 deletions.
1 change: 1 addition & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ exclude_patterns:
- "src/test/"
- "src/main/java/ru/vk/itmo/test/reference/"
- "src/main/java/ru/vk/itmo/test/smirnovdmitrii/application/properties"
- "src/main/java/ru/vk/itmo/test/smirnovdmitrii/server/ProcessResult.java"
16 changes: 16 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,19 @@ HTTP API расширяется query-параметрами `from` и `ack`, с
Когда всё будет готово, присылайте pull request с изменениями, результатами нагрузочного тестирования и профилирования, а также **анализом результатов по сравнению с предыдущей** (синхронной) версией.
На всех этапах **оценивается и код, и анализ (отчёт)** -- без анализа полученных результатов работа оценивается минимальным количеством баллов.
Не забывайте **отвечать на комментарии в PR** (в том числе автоматизированные) и **исправлять замечания**!

## Этап 6. Range-запросы (soft deadline 2024-04-25 18:29:59 MSK, hard deadline 2024-05-01 23:59:59 MSK)

Реализуйте получение **диапазона данных текущего узла** с помощью HTTP `GET /v0/entities?start=<ID>[&end=<ID>]`, который возвращает:
* Статус код `200 OK`
* Возможно пустой **отсортированный** (по ключу) набор **ключей** и **значений** в диапазоне ключей от **обязательного** `start` (включая) до **опционального** `end` (не включая)
* Используйте [Chunked transfer encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding)
* Чанки в формате `<key>\n<value>`

Диапазон должен отдаваться в **потоковом режиме** без формирования всего ответа в памяти.
Проверить корректность можно, запросив весь диапазон данных предварительно наполненной БД размером больше Java Heap.

### Report
После прохождения модульных тестов, присылайте pull request с изменениями.
Наполните БД большим объёмом данных и отпрофилируйте cpu, alloc и lock при получении range всей базы одним запросом curl'ом.
Присылайте отчёт с анализом результатов и оптимизаций.
108 changes: 61 additions & 47 deletions src/main/java/ru/vk/itmo/test/asvistukhin/ProxyRequestHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,88 +8,102 @@
import ru.vk.itmo.ServiceConfig;

import java.io.IOException;
import java.net.ConnectException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.http.HttpClient;
import java.net.http.HttpRequest;
import java.net.http.HttpResponse;
import java.util.HashMap;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.stream.Collectors;

public class ProxyRequestHandler {
private static final Logger log = LoggerFactory.getLogger(ProxyRequestHandler.class);

private final HashMap<String, HttpClient> nodes;
private final SortedMap<Integer, String> hashRing = new TreeMap<>();
private final String selfUrl;
private final Map<String, HttpClient> clients;
private final Map<String, Integer> urlHashes;

public ProxyRequestHandler(ServiceConfig serviceConfig) {
this.nodes = new HashMap<>(serviceConfig.clusterUrls().size());
this.selfUrl = serviceConfig.selfUrl();
this.clients = HashMap.newHashMap(serviceConfig.clusterUrls().size() - 1);

for (int i = 0; i < serviceConfig.clusterUrls().size(); i++) {
String url = serviceConfig.clusterUrls().get(i);
nodes.put(url, HttpClient.newHttpClient());
int hash = Hash.murmur3(url);
hashRing.put(hash, url);
for (String url : serviceConfig.clusterUrls()) {
if (!Objects.equals(url, serviceConfig.selfUrl())) {
clients.put(url, HttpClient.newHttpClient());
}
}
this.urlHashes = HashMap.newHashMap(serviceConfig.clusterUrls().size());
for (String url : serviceConfig.clusterUrls()) {
urlHashes.put(url, Hash.murmur3(url));
}
}

public synchronized void close() {
nodes.values().forEach(HttpClient::close);
clients.values().forEach(HttpClient::close);
}

public Response proxyRequest(Request request) {
String id = request.getParameter("id=");
String nodeUrl = getNodeByKey(id);
public Map<String, Response> proxyRequests(Request request, List<String> nodeUrls) throws IOException {
Map<String, Response> responses = HashMap.newHashMap(nodeUrls.size());
for (String url : nodeUrls) {
Response response = proxyRequest(request, url);
responses.put(url, response);
}

HttpRequest.BodyPublisher bodyPublisher = request.getBody() == null
? HttpRequest.BodyPublishers.noBody() : HttpRequest.BodyPublishers.ofByteArray(request.getBody());
return responses;
}

URI uri = URI.create(nodeUrl + request.getPath() + "?id=" + id);
log.debug("Proxy request to {}", uri);
public Response proxyRequest(Request request, String proxiedNodeUrl) throws IOException {
String id = request.getParameter("id=");
byte[] body = request.getBody();
URI uri = URI.create(proxiedNodeUrl + request.getPath() + "?id=" + id);
try {
HttpResponse<byte[]> response = nodes.get(nodeUrl).send(
HttpResponse<byte[]> httpResponse = clients.get(proxiedNodeUrl).send(
HttpRequest.newBuilder()
.uri(uri)
.method(request.getMethodName(), bodyPublisher)
.method(
request.getMethodName(),
body == null
? HttpRequest.BodyPublishers.noBody()
: HttpRequest.BodyPublishers.ofByteArray(body)
)
.header(RequestWrapper.SELF_HEADER, "true")
.build(),
HttpResponse.BodyHandlers.ofByteArray()
);
return parseResponse(response);
HttpResponse.BodyHandlers.ofByteArray());
Response response = new Response(proxyResponseCode(httpResponse), httpResponse.body());
long timestamp = httpResponse.headers().firstValueAsLong(RequestWrapper.NIO_TIMESTAMP_HEADER).orElse(0);
response.addHeader(RequestWrapper.NIO_TIMESTAMP_STRING_HEADER + timestamp);
return response;
} catch (InterruptedException ex) {
log.error("Proxy request thread interrupted", ex);
Thread.currentThread().interrupt();
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (IOException ex) {
log.error("IOException during proxy request to another node", ex);
} catch (IllegalArgumentException ex) {
log.error("IllegalArgumentException during proxy request to another node", ex);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
} catch (ConnectException ex) {
log.error("ConnectException during proxy request to another node", ex);
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

public boolean isNeedProxy(String id) {
return !getNodeByKey(id).equals(selfUrl);
}

private Response parseResponse(HttpResponse<byte[]> response) {
String responseHttpCode = switch (response.statusCode()) {
case 200 -> Response.OK;
case 201 -> Response.CREATED;
case 202 -> Response.ACCEPTED;
case 400 -> Response.BAD_REQUEST;
case 404 -> Response.NOT_FOUND;
case 405 -> Response.METHOD_NOT_ALLOWED;
case 503 -> Response.SERVICE_UNAVAILABLE;
private String proxyResponseCode(HttpResponse<byte[]> response) {
return switch (response.statusCode()) {
case HttpURLConnection.HTTP_OK -> Response.OK;
case HttpURLConnection.HTTP_CREATED -> Response.CREATED;
case HttpURLConnection.HTTP_ACCEPTED -> Response.ACCEPTED;
case HttpURLConnection.HTTP_BAD_REQUEST -> Response.BAD_REQUEST;
case HttpURLConnection.HTTP_NOT_FOUND -> Response.NOT_FOUND;
default -> Response.INTERNAL_ERROR;
};

return new Response(responseHttpCode, response.body());
}

private String getNodeByKey(String id) {
int hash = Hash.murmur3(id);
SortedMap<Integer, String> tailMap = hashRing.tailMap(hash);
hash = tailMap.isEmpty() ? hashRing.firstKey() : tailMap.firstKey();
return hashRing.get(hash);
public List<String> getNodesByHash(int numOfNodes) {
return urlHashes.entrySet().stream()
.sorted(Map.Entry.<String, Integer>comparingByValue().reversed())
.limit(numOfNodes)
.map(Map.Entry::getKey)
.collect(Collectors.toList());
}
}
78 changes: 78 additions & 0 deletions src/main/java/ru/vk/itmo/test/asvistukhin/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package ru.vk.itmo.test.asvistukhin;

import one.nio.http.Param;
import one.nio.http.Request;
import one.nio.http.Response;
import ru.vk.itmo.test.asvistukhin.dao.Dao;
import ru.vk.itmo.test.asvistukhin.dao.TimestampEntry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;

public class RequestHandler {
private final Dao<MemorySegment, TimestampEntry<MemorySegment>> dao;

public RequestHandler(Dao<MemorySegment, TimestampEntry<MemorySegment>> dao) {
this.dao = dao;
}

public Response handle(Request request) {
String id = request.getParameter("id=");
return switch (request.getMethod()) {
case Request.METHOD_GET -> get(id);
case Request.METHOD_PUT -> put(id, request);
case Request.METHOD_DELETE -> delete(id);
default -> new Response(Response.BAD_REQUEST, Response.EMPTY);
};
}

public Response get(@Param(value = "id", required = true) String id) {
if (RequestWrapper.isEmptyParam(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

TimestampEntry<MemorySegment> entry = dao.get(MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8)));

Response response;
long timestamp;
if (entry == null || entry.value() == null) {
timestamp = (entry != null ? entry.timestamp() : 0);
response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(RequestWrapper.TIMESTAMP_STRING_HEADER + timestamp);
} else {
timestamp = entry.timestamp();
response = Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
response.addHeader(RequestWrapper.TIMESTAMP_STRING_HEADER + timestamp);
}

return response;
}

public Response put(@Param(value = "id", required = true) String id, Request request) {
if (RequestWrapper.isEmptyParam(id) || RequestWrapper.isEmptyRequest(request)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

MemorySegment key = MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8));
MemorySegment value = MemorySegment.ofArray(request.getBody());
dao.upsert(new TimestampEntry<>(key, value, System.currentTimeMillis()));

return new Response(Response.CREATED, Response.EMPTY);
}

public Response delete(@Param(value = "id", required = true) String id) {
if (RequestWrapper.isEmptyParam(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

dao.upsert(
new TimestampEntry<>(
MemorySegment.ofArray(id.getBytes(StandardCharsets.UTF_8)),
null,
System.currentTimeMillis())
);

return new Response(Response.ACCEPTED, Response.EMPTY);
}
}
44 changes: 44 additions & 0 deletions src/main/java/ru/vk/itmo/test/asvistukhin/RequestWrapper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package ru.vk.itmo.test.asvistukhin;

import one.nio.http.Request;

public class RequestWrapper {
private static final String ID_PARAM = "id=";
private static final String FROM_PARAM = "from=";
private static final String ACK_PARAM = "ack=";
public static final String SELF_HEADER = "X-Self";
public static final String NIO_TIMESTAMP_HEADER = "x-timestamp";
public static final String TIMESTAMP_STRING_HEADER = "X-Timestamp:";
public static final String NIO_TIMESTAMP_STRING_HEADER = "x-timestamp:";

public final String id;
public final int from;
public final int ack;

public RequestWrapper(Request request, int clusterSize) throws IllegalArgumentException {
String idString = request.getParameter(ID_PARAM);
String fromString = request.getParameter(FROM_PARAM);
String ackString = request.getParameter(ACK_PARAM);
if (isEmptyParam(idString)) throw new IllegalArgumentException();

id = idString;
try {
from = isEmptyParam(fromString) ? clusterSize : Integer.parseInt(fromString);
ack = isEmptyParam(ackString) ? (from + 1) / 2 : Integer.parseInt(ackString);
} catch (NumberFormatException ex) {
throw new IllegalArgumentException(ex);
}

if (from <= 0 || ack <= 0 || from < ack || clusterSize < from) {
throw new IllegalArgumentException();
}
}

public static boolean isEmptyParam(String param) {
return param == null || param.isEmpty();
}

public static boolean isEmptyRequest(Request request) {
return request.getBody() == null;
}
}
Loading

0 comments on commit 23256f2

Please sign in to comment.