Skip to content

Commit

Permalink
Merge branch 'main' into develop5
Browse files Browse the repository at this point in the history
  • Loading branch information
AlexeyShik authored May 4, 2024
2 parents 58ecc48 + f0bb09e commit d65daf8
Show file tree
Hide file tree
Showing 627 changed files with 453,842 additions and 4,432 deletions.
1 change: 1 addition & 0 deletions .codeclimate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -79,3 +79,4 @@ exclude_patterns:
- "src/test/"
- "src/main/java/ru/vk/itmo/test/reference/"
- "src/main/java/ru/vk/itmo/test/smirnovdmitrii/application/properties"
- "src/main/java/ru/vk/itmo/test/smirnovdmitrii/server/ProcessResult.java"
34 changes: 34 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -147,3 +147,37 @@ HTTP API расширяется query-параметрами `from` и `ack`, с
Когда всё будет готово, присылайте pull request с изменениями, результатами нагрузочного тестирования и профилирования, а также **анализом результатов по сравнению с предыдущей** (синхронной) версией.
На всех этапах **оценивается и код, и анализ (отчёт)** -- без анализа полученных результатов работа оценивается минимальным количеством баллов.
Не забывайте **отвечать на комментарии в PR** (в том числе автоматизированные) и **исправлять замечания**!

## Этап 6. Range-запросы (soft deadline 2024-04-25 18:29:59 MSK, hard deadline 2024-05-01 23:59:59 MSK)

Реализуйте получение **диапазона данных текущего узла** с помощью HTTP `GET /v0/entities?start=<ID>[&end=<ID>]`, который возвращает:
* Статус код `200 OK`
* Возможно пустой **отсортированный** (по ключу) набор **ключей** и **значений** в диапазоне ключей от **обязательного** `start` (включая) до **опционального** `end` (не включая)
* Используйте [Chunked transfer encoding](https://en.wikipedia.org/wiki/Chunked_transfer_encoding)
* Чанки в формате `<key>\n<value>`

Диапазон должен отдаваться в **потоковом режиме** без формирования всего ответа в памяти.
Проверить корректность можно, запросив весь диапазон данных предварительно наполненной БД размером больше Java Heap.

### Report
После прохождения модульных тестов, присылайте pull request с изменениями.
Наполните БД большим объёмом данных и отпрофилируйте cpu, alloc и lock при получении range всей базы одним запросом curl'ом.
Присылайте отчёт с анализом результатов и оптимизаций.

## Этап 7. Бонусный (hard deadline 2024-05-15 23:59:59 MSK)

Фичи, которые позволяют получить дополнительные баллы (при условии **добавления набора тестов**, демонстрирующих корректность, где применимо):
* Развёрнутая конструктивная обратная связь по курсу: достоинства и недостатки курса, сложность тем, предложения по улучшению
* Кластерные range-запросы с учётом шардирования и репликации
* Read repair при обнаружении расхождений между репликами
* Expire: возможность указания [времени жизни записей](https://en.wikipedia.org/wiki/Time_to_live)
* Server-side processing: трансформация данных с помощью скрипта, запускаемого на узлах кластера через API
* Нагрузочное тестирование при помощи [Y!CSB](https://github.com/brianfrankcooper/YCSB)
* Нагрузочное тестирование при помощи [Yandex.Tank](https://overload.yandex.net)
* Регулярный автоматический фоновый compaction (модульные и нагрузочные тесты)
* Hinted handoff [по аналогии с Cassandra](https://cassandra.apache.org/doc/latest/operating/hints.html)
* Устранение неконсистентностей между репликами [по аналогии с Cassandra](https://www.datastax.com/blog/advanced-repair-techniques) [nodetool repair](https://docs.datastax.com/en/archived/cassandra/2.0/cassandra/operations/ops_repair_nodes_c.html), например, на основе [Merkle Tree](https://en.wikipedia.org/wiki/Merkle_tree)
* Блочная компрессия данных на основе LZ4/zSTD/...
* Что-нибудь **своё**?

Перед началом работ продумайте и согласуйте с преподавателем её технический дизайн и получите вспомогательные материалы.
41 changes: 41 additions & 0 deletions src/main/java/ru/vk/itmo/test/andreycheshev/ParametersParser.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package ru.vk.itmo.test.andreycheshev;

public class ParametersParser {
private final RendezvousDistributor distributor;
private int ack;
private int from;

public ParametersParser(RendezvousDistributor distributor) {
this.distributor = distributor;
}

public void parseAckFrom(String ackParameter, String fromParameter) throws IllegalArgumentException {
if (ackParameter == null || fromParameter == null) {
setDefault();
} else {
try {
ack = Integer.parseInt(ackParameter);
from = Integer.parseInt(fromParameter);
} catch (Exception e) {
setDefault();
return;
}
if (ack <= 0 || ack > from) {
throw new IllegalArgumentException();
}
}
}

private void setDefault() {
ack = distributor.getQuorumNumber();
from = distributor.getNodeCount();
}

public int getAck() {
return ack;
}

public int getFrom() {
return from;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import one.nio.util.Hash;

import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.PriorityQueue;
import java.util.stream.Collectors;

public class RendezvousDistributor {
private final int nodeCount;
private final int thisNodeNumber;
Expand All @@ -11,25 +17,53 @@ public RendezvousDistributor(int nodeCount, int thisNodeNumber) {
this.thisNodeNumber = thisNodeNumber;
}

public int getNode(String stringKey) { // Rendezvous hashing algorithm.
int key = Hash.murmur3(stringKey);
int maxHash = Integer.MIN_VALUE;
int node = 0;
for (int i = 0; i < nodeCount; i++) {
int currHash = hashCode(key + i);
if (currHash > maxHash) {
maxHash = currHash;
node = thisNodeNumber == i ? -1 : i;
}
}
return node; // Return -1 if this node was selected.
}

private static int hashCode(int key) {
int x = key;
x = ((x >> 16) ^ x) * 0x45d9f3b;
x = ((x >> 16) ^ x) * 0x45d9f3b;
x = (x >> 16) ^ x;
return x;
}

public List<Integer> getQuorumNodes(String stringKey, int quorumNumber) {
PriorityQueue<HashPair> queue = new PriorityQueue<>(
quorumNumber,
Comparator.comparingInt(HashPair::getHash).reversed()
);
int key = Hash.murmur3(stringKey);
for (int i = 0; i < quorumNumber; i++) {
queue.add(new HashPair(hashCode(key + i), i));
}
return queue.stream().map(HashPair::getIndex).collect(Collectors.toCollection(ArrayList::new));
}

public int getNodeCount() {
return nodeCount;
}

public int getQuorumNumber() {
return nodeCount / 2 + 1;
}

public boolean isOurNode(int node) {
return node == thisNodeNumber;
}

private static class HashPair {
int hash;
int index;

public HashPair(int hash, int index) {
this.hash = hash;
this.index = index;
}

public int getHash() {
return hash;
}

public int getIndex() {
return index;
}
}
}
10 changes: 3 additions & 7 deletions src/main/java/ru/vk/itmo/test/andreycheshev/RequestExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,19 +8,19 @@

import java.io.IOException;
import java.io.UncheckedIOException;
import java.net.SocketTimeoutException;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import static one.nio.http.Response.INTERNAL_ERROR;
import static one.nio.http.Response.SERVICE_UNAVAILABLE;

public class RequestExecutor {
private static final String TOO_MANY_REQUESTS = "429 Too many requests";
private static final Logger LOGGER = LoggerFactory.getLogger(RequestExecutor.class);

private static final String TOO_MANY_REQUESTS = "429 Too many requests";

private static final int CPU_THREADS_COUNT = Runtime.getRuntime().availableProcessors();
private static final int MAX_CPU_THREADS_TIMES = 1;
private static final int KEEPALIVE_MILLIS = 3000;
Expand All @@ -32,7 +32,6 @@ public class RequestExecutor {

public RequestExecutor(RequestHandler requestHandler) {
this.requestHandler = requestHandler;

this.executor = new ThreadPoolExecutor(
CPU_THREADS_COUNT,
CPU_THREADS_COUNT * MAX_CPU_THREADS_TIMES,
Expand All @@ -58,9 +57,6 @@ public void execute(Request request, HttpSession session) {
} else {
try {
response = requestHandler.handle(request);
} catch (SocketTimeoutException e) {
LOGGER.error("Processing time exceeded on another node in the cluster", e);
response = new Response(SERVICE_UNAVAILABLE, Response.EMPTY);
} catch (Exception ex) {
LOGGER.error("Internal error of the DAO operation", ex);
response = new Response(INTERNAL_ERROR, Response.EMPTY);
Expand Down
Loading

0 comments on commit d65daf8

Please sign in to comment.