diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/dao/iterators/ShiftedIterator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/dao/iterators/ShiftedIterator.java index a4dc99e24..fab89a0aa 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/dao/iterators/ShiftedIterator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/dao/iterators/ShiftedIterator.java @@ -35,4 +35,8 @@ public E next() { } return result; } + + public E getValue() { + return value; + } } diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/optimize_alloc.html b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/optimize_alloc.html new file mode 100644 index 000000000..880f175db --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/optimize_alloc.html @@ -0,0 +1,379 @@ + + + + + + + +

Flame Graph

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_alloc.html b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_alloc.html new file mode 100644 index 000000000..8104fe94d --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_alloc.html @@ -0,0 +1,410 @@ + + + + + + + +

Flame Graph

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_cpu.html b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_cpu.html new file mode 100644 index 000000000..23d913e6c --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_cpu.html @@ -0,0 +1,918 @@ + + + + + + + +

Flame Graph

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_lock.html b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_lock.html new file mode 100644 index 000000000..24b7ef49f --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/html/stage6/solo_lock.html @@ -0,0 +1,260 @@ + + + + + + + +

Flame Graph

+
  
+
Produced by async-profiler
+ +
+

+

Matched:

+ diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/base.sh b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/base.sh index 9481db03f..283802b4b 100755 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/base.sh +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/base.sh @@ -1,7 +1,7 @@ #!/bin/bash NAME=$(date +"%s") JFR="../../info-$NAME.jfr" -PREFIX="../../html/stage5" +PREFIX="../../html/stage6" (wrk2 -c 128 -t 8 -L -d $1 -R $2 -s "../lua/$3.lua" http://localhost:8080 > "${PREFIX}/${NAME}_wrk.txt") & ./ap/bin/asprof -t -e cpu,alloc,lock -d $1 -f $JFR MainServer diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/get_all.sh b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/get_all.sh new file mode 100755 index 000000000..b6f4fb59b --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/scripts/sh/get_all.sh @@ -0,0 +1,12 @@ +#!/bin/bash +NAME=$(date +"%s") +JFR="../../info-$NAME.jfr" +PREFIX="../../html/stage6" +(curl --location --request GET 'localhost:8080/v0/entities?start=k0' > /dev/null) & +./ap/bin/asprof -t -e cpu,alloc,lock -d $1 -f $JFR MainServer + +wait +java -cp ./ap/lib/converter.jar jfr2flame --threads $JFR > "${PREFIX}/${NAME}_cpu.html" +java -cp ./ap/lib/converter.jar jfr2flame --threads --alloc $JFR > "${PREFIX}/${NAME}_alloc.html" +java -cp ./ap/lib/converter.jar jfr2flame --threads --lock $JFR > "${PREFIX}/${NAME}_lock.html" +rm $JFR diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/reports/stage6.md b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/stage6.md new file mode 100644 index 000000000..566b0f93c --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/reports/stage6.md @@ -0,0 +1,42 @@ +## Информация + +База заполенна на 700MB + + +## Факты + +Посмотрел код HttpSession, там при вызове обычного write каждый буфер оборачивается в Item. + +Также, если сокет на данный момент заполнен, то все эти айтемы просто выстраиваются в связанный список, +таким образом, если сокет зависнет надолго, то в худшем случае мы выгрузим все наши данные. + +Мне захотелось уйти от этой проблемы, поэтому реализовал свои Items для очереди. + +При этом проблема с аллокацией нового айтема на каждый элемент в этом коде не решена. + +Также мне захотелось учесть тот факт, что MemorySegment'ы могут не влезать в память, поддержку чего добавить можно, +при том не так уж и сложно, несмотря на то, что на данный момент слишком большие данные мы принять все равно не в силах + +## Профилирование + +Запустил один запрос на все данные и начал профилирование. +Сокет работает на 1 клиента, поэтому всегда свободен, и описанной ранее проблемы не возникает. +Что демонстрируют [локи](html%2Fstage6%2Fsolo_lock.html) (они пусты) + +Как мы видим основное [время работы](html%2Fstage6%2Fsolo_cpu.html) занимает запись в сокет + +В плане [аллокаций](html%2Fstage6%2Fsolo_alloc.html) всплывает проблема с аллокацией нового айтема для каждой +уникальной записи в нашем дао. В следствие, чего на аллокацию промежуточного буфера уходит основное время + +При этом эту проблему можно достаточно просто решить, обычным переиспользованием одного элемента очереди, +т.к. считывание у нас происходит последовательно. + +## Оптимизация + +В коммите `643f950eb59595596eb1c237e5413084dd37dd75` использовал один чанк. + +Как видно по [аллокациям](html%2Fstage6%2Foptimize_alloc.html) действительно стало лучше. + +Теперь у нас нет постоянной аллокации буфера, что значительно снизило нагрузку. +Также можно переписать метод перевода числа в hex байты, +но опять же нужно учитывать заполненность буфера сокета (а мне лень) diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/FullServiceInfo.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/FullServiceInfo.java index 156bcd0a8..32126701d 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/FullServiceInfo.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/FullServiceInfo.java @@ -2,7 +2,7 @@ import ru.vk.itmo.test.kovalevigor.server.strategy.ServerRemoteStrategy; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.Partition; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Partition; import java.io.IOException; import java.net.http.HttpClient; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceFactoryImpl.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceFactoryImpl.java index 1bc2c36f4..ce09c5e12 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceFactoryImpl.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceFactoryImpl.java @@ -4,7 +4,7 @@ import ru.vk.itmo.ServiceConfig; import ru.vk.itmo.test.ServiceFactory; -@ServiceFactory(stage = 5) +@ServiceFactory(stage = 6) public class ServiceFactoryImpl implements ServiceFactory.Factory { @Override diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceImpl.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceImpl.java index f3cc7fabe..4e915804e 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceImpl.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/ServiceImpl.java @@ -11,7 +11,7 @@ import ru.vk.itmo.test.kovalevigor.server.strategy.decorators.ServerReplicationStrategyDecorator; import ru.vk.itmo.test.kovalevigor.server.strategy.decorators.ServerRequestValidationStrategyDecorator; import ru.vk.itmo.test.kovalevigor.server.strategy.decorators.ServerSendResponseStrategyDecorator; -import ru.vk.itmo.test.kovalevigor.server.util.ServerUtil; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil; import java.io.IOException; import java.net.http.HttpClient; @@ -19,7 +19,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.REMOTE_TIMEOUT; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.REMOTE_TIMEOUT; public class ServiceImpl implements Service { diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerDaoStrategy.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerDaoStrategy.java index ed1582ec6..fb9bcedd8 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerDaoStrategy.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerDaoStrategy.java @@ -3,33 +3,49 @@ import one.nio.http.HttpSession; import one.nio.http.Request; import one.nio.http.Response; +import one.nio.net.Session; import ru.vk.itmo.dao.Config; import ru.vk.itmo.dao.Dao; +import ru.vk.itmo.dao.Entry; import ru.vk.itmo.test.kovalevigor.config.DaoServerConfig; import ru.vk.itmo.test.kovalevigor.dao.DaoImpl; import ru.vk.itmo.test.kovalevigor.dao.SSTimeTableManager; import ru.vk.itmo.test.kovalevigor.dao.entry.MSegmentTimeEntry; import ru.vk.itmo.test.kovalevigor.dao.entry.TimeEntry; -import ru.vk.itmo.test.kovalevigor.server.util.Headers; -import ru.vk.itmo.test.kovalevigor.server.util.Parameters; -import ru.vk.itmo.test.kovalevigor.server.util.Paths; -import ru.vk.itmo.test.kovalevigor.server.util.Responses; +import ru.vk.itmo.test.kovalevigor.dao.iterators.ApplyIterator; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ChainedQueueItem; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ChunkQueueItem; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Headers; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Parameters; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Paths; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Responses; import java.io.IOException; import java.lang.foreign.MemorySegment; import java.lang.foreign.ValueLayout; -import java.nio.charset.Charset; -import java.nio.charset.StandardCharsets; import java.time.Instant; +import java.util.Iterator; +import java.util.List; +import java.util.function.Function; import java.util.logging.Logger; import static one.nio.http.Request.METHOD_DELETE; import static one.nio.http.Request.METHOD_GET; import static one.nio.http.Request.METHOD_PUT; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.createIllegalState; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.CHARSET; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.createIllegalState; public class ServerDaoStrategy extends ServerRejectStrategy { - private static final Charset CHARSET = StandardCharsets.UTF_8; + private static final byte[] RANGE_HEADER = """ + HTTP/1.1 200 OK\r + Content-Type: text/plain\r + Transfer-Encoding: chunked\r + Connection: keep-alive\r + \r + """.getBytes(CHARSET); + private static final byte[] RANGE_END = "0\r\n\r\n".getBytes(CHARSET); + public static final int BUFFER_SIZE = 1024; + private final Dao> dao; public static final Logger log = Logger.getLogger(ServerDaoStrategy.class.getName()); @@ -53,6 +69,27 @@ public Response handleRequest(Request request, HttpSession session) throws IOExc default -> throw createIllegalState(); }; } + case V0_ENTITIES -> { + MemorySegment startKey = fromString(Parameters.getParameter(request, Parameters.START)); + MemorySegment endKey = fromString(Parameters.getParameter(request, Parameters.END)); + Iterator> iterator = getEntities(startKey, endKey); + if (iterator.hasNext()) { + session.write( + new ChainedQueueItem( + List.of( + mapBytes(RANGE_HEADER), + new ChainedQueueItem( + mapEntry(iterator) + ), + mapBytes(RANGE_END) + ).iterator() + ) + ); + } else { + return Responses.OK.toResponse(); + } + return null; + } default -> throw createIllegalState(); } } @@ -73,6 +110,10 @@ private Response getEntity(MemorySegment key) { return addTimestamp(response, entry); } + private Iterator> getEntities(MemorySegment start, MemorySegment end) { + return dao.get(start, end); + } + private Response upsertEntry( MemorySegment key, MemorySegment value, @@ -98,6 +139,21 @@ private static Response addTimestamp(Response response, TimeEntry entry) { return response; } + private static > Iterator mapEntry(Iterator entryIterator) { + + ChunkQueueItem chunkQueueItem = new ChunkQueueItem(BUFFER_SIZE); + + Function mapEntry = entry -> { + chunkQueueItem.setChunk(entry); + return chunkQueueItem; + }; + + return new ApplyIterator<>( + entryIterator, + mapEntry + ); + } + private static Config mapConfig(DaoServerConfig config) { return new Config( config.basePath, @@ -112,4 +168,8 @@ private static TimeEntry makeEntry(MemorySegment key, MemorySegme private static MemorySegment fromString(final String data) { return data == null ? null : MemorySegment.ofArray(data.getBytes(CHARSET)); } + + private static Session.ArrayQueueItem mapBytes(byte[] data) { + return new Session.ArrayQueueItem(data, 0, data.length, 0); + } } diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRejectStrategy.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRejectStrategy.java index 58e389a18..3985c9f5b 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRejectStrategy.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRejectStrategy.java @@ -3,11 +3,11 @@ import one.nio.http.HttpSession; import one.nio.http.Request; import one.nio.http.Response; -import ru.vk.itmo.test.kovalevigor.server.util.Responses; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Responses; import java.io.IOException; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.sendResponseWithoutIo; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.sendResponseWithoutIo; public class ServerRejectStrategy implements ServerStrategy { @Override diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRemoteStrategy.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRemoteStrategy.java index a61954bb0..37bc3a5ad 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRemoteStrategy.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/ServerRemoteStrategy.java @@ -3,7 +3,7 @@ import one.nio.http.HttpSession; import one.nio.http.Request; import one.nio.http.Response; -import ru.vk.itmo.test.kovalevigor.server.util.Headers; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Headers; import java.io.IOException; import java.net.URI; @@ -15,9 +15,9 @@ import java.util.logging.Level; import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.REMOTE_TIMEOUT; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.REMOTE_TIMEOUT_TIMEUNIT; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.REMOTE_TIMEOUT_VALUE; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.REMOTE_TIMEOUT; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.REMOTE_TIMEOUT_TIMEUNIT; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.REMOTE_TIMEOUT_VALUE; public class ServerRemoteStrategy extends ServerRejectStrategy { private final HttpClient httpClient; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/AckEitherCompletableFuture.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/AckEitherCompletableFuture.java index 96c781060..24f1b6a3b 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/AckEitherCompletableFuture.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/AckEitherCompletableFuture.java @@ -1,7 +1,7 @@ package ru.vk.itmo.test.kovalevigor.server.strategy.decorators; import one.nio.http.Response; -import ru.vk.itmo.test.kovalevigor.server.util.Responses; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Responses; import java.io.IOException; import java.util.Objects; @@ -9,9 +9,9 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.GOOD_STATUSES; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.logIO; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.mergeResponses; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.GOOD_STATUSES; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.logIO; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.mergeResponses; public class AckEitherCompletableFuture extends CompletableFuture { private final AtomicReference replicasResponse; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerExecutorStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerExecutorStrategyDecorator.java index b3ab3fce7..fdf60482f 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerExecutorStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerExecutorStrategyDecorator.java @@ -6,8 +6,8 @@ import one.nio.server.SelectorThread; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerFull; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.ServerTask; -import ru.vk.itmo.test.kovalevigor.server.util.ServerUtil; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerTask; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil; import java.io.IOException; import java.util.ArrayList; @@ -20,7 +20,7 @@ import java.util.concurrent.TimeUnit; import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.shutdownAndAwaitTermination; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.shutdownAndAwaitTermination; public class ServerExecutorStrategyDecorator extends ServerStrategyDecorator implements RejectedExecutionHandler { private final ThreadPoolExecutor mainExecutor; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerOneExecutorStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerOneExecutorStrategyDecorator.java index ee75b54d9..a3ed972d8 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerOneExecutorStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerOneExecutorStrategyDecorator.java @@ -4,7 +4,7 @@ import one.nio.http.Request; import one.nio.http.Response; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.ServerTask; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerTask; import java.io.IOException; import java.util.concurrent.ArrayBlockingQueue; @@ -15,7 +15,7 @@ import java.util.concurrent.TimeUnit; import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.shutdownAndAwaitTermination; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.shutdownAndAwaitTermination; public class ServerOneExecutorStrategyDecorator extends ServerStrategyDecorator implements RejectedExecutionHandler { private final ThreadPoolExecutor executorService; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerReplicationStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerReplicationStrategyDecorator.java index 1b70a2268..21c667817 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerReplicationStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerReplicationStrategyDecorator.java @@ -5,9 +5,10 @@ import one.nio.http.Response; import ru.vk.itmo.test.kovalevigor.server.ServiceInfo; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.Headers; -import ru.vk.itmo.test.kovalevigor.server.util.Parameters; -import ru.vk.itmo.test.kovalevigor.server.util.Responses; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Headers; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Parameters; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Paths; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Responses; import java.io.IOException; import java.util.Collection; @@ -15,9 +16,9 @@ import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.GOOD_STATUSES; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.REMOTE_TIMEOUT_VALUE; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.mergeResponses; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.GOOD_STATUSES; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.REMOTE_TIMEOUT_VALUE; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.mergeResponses; public class ServerReplicationStrategyDecorator extends ServerStrategyDecorator { @@ -67,7 +68,7 @@ public CompletableFuture handleRequestAsync( HttpSession session, Executor executor ) { - if (Headers.hasHeader(request, Headers.REPLICATION)) { + if (Paths.getPathOrThrow(request.getPath()).isLocal() || Headers.hasHeader(request, Headers.REPLICATION)) { return super.handleRequestAsync(request, session, executor); } int ack = Parameters.getParameter(request, Parameters.ACK, Integer::parseInt, serviceInfo.getQuorum()); diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerRequestValidationStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerRequestValidationStrategyDecorator.java index 15a553d04..8b90f0854 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerRequestValidationStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerRequestValidationStrategyDecorator.java @@ -4,9 +4,9 @@ import one.nio.http.Request; import one.nio.http.Response; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.Parameters; -import ru.vk.itmo.test.kovalevigor.server.util.Paths; -import ru.vk.itmo.test.kovalevigor.server.util.Responses; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Parameters; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Paths; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Responses; import java.io.IOException; import java.util.EnumSet; @@ -18,11 +18,13 @@ public class ServerRequestValidationStrategyDecorator extends ServerStrategyDecorator { private static final Map> REQUIRED_PARAMETERS = Map.of( - Paths.V0_ENTITY, EnumSet.of(Parameters.ID) + Paths.V0_ENTITY, EnumSet.of(Parameters.ID), + Paths.V0_ENTITIES, EnumSet.of(Parameters.START) ); private static final Map> ALLOWED_METHODS = Map.of( - Paths.V0_ENTITY, Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE) + Paths.V0_ENTITY, Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE), + Paths.V0_ENTITIES, Set.of(Request.METHOD_GET) ); public ServerRequestValidationStrategyDecorator(ServerStrategy httpServer) { diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerSendResponseStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerSendResponseStrategyDecorator.java index 564c2a1c0..db5b2b4fb 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerSendResponseStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerSendResponseStrategyDecorator.java @@ -12,9 +12,9 @@ import java.util.logging.Level; import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.closeSession; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.sendErrorWithoutIo; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.sendResponseWithoutIo; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.closeSession; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.sendErrorWithoutIo; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.sendResponseWithoutIo; public class ServerSendResponseStrategyDecorator extends ServerStrategyDecorator { diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerShardingStrategyDecorator.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerShardingStrategyDecorator.java index ca68ea32f..b731ee44e 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerShardingStrategyDecorator.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/decorators/ServerShardingStrategyDecorator.java @@ -5,13 +5,13 @@ import one.nio.http.Response; import ru.vk.itmo.test.kovalevigor.server.ServiceInfo; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; -import ru.vk.itmo.test.kovalevigor.server.util.Headers; -import ru.vk.itmo.test.kovalevigor.server.util.Parameters; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Headers; +import ru.vk.itmo.test.kovalevigor.server.strategy.util.Parameters; import java.io.IOException; import java.util.Collection; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.GOOD_STATUSES; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.GOOD_STATUSES; public class ServerShardingStrategyDecorator extends ServerStrategyDecorator { private final ServiceInfo serviceInfo; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChainedQueueItem.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChainedQueueItem.java new file mode 100644 index 000000000..b35e930f0 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChainedQueueItem.java @@ -0,0 +1,35 @@ +package ru.vk.itmo.test.kovalevigor.server.strategy.util; + +import one.nio.net.Session; +import one.nio.net.Socket; + +import java.io.IOException; +import java.util.Iterator; + +public class ChainedQueueItem extends Session.QueueItem { + + private final Iterator items; + private Session.QueueItem current; + + public ChainedQueueItem(Iterator items) { + this.items = items; + current = items.next(); + } + + @Override + public int write(Socket socket) throws IOException { + int res; + do { + res = current.write(socket); + if (current.remaining() == 0 && items.hasNext()) { + current = items.next(); + } + } while (res > 0 && remaining() > 0); + return res; + } + + @Override + public int remaining() { + return current.remaining() + (items.hasNext() ? 1 : 0); + } +} diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChunkQueueItem.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChunkQueueItem.java new file mode 100644 index 000000000..5a95e4d79 --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ChunkQueueItem.java @@ -0,0 +1,46 @@ +package ru.vk.itmo.test.kovalevigor.server.strategy.util; + +import ru.vk.itmo.dao.Entry; + +import java.lang.foreign.MemorySegment; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.CHARSET; + +public class ChunkQueueItem extends JoinedQueueItem { + public static final MemorySegment CHUNK_LINE_END = MemorySegment.ofArray("\r\n".getBytes(CHARSET)); + public static final MemorySegment KEY_VALUE_SEP = MemorySegment.ofArray("\n".getBytes(CHARSET)); + private final List chunk; + + public ChunkQueueItem(int bufferSize) { + super(bufferSize); + chunk = new ArrayList<>( + Arrays.asList( + null, + CHUNK_LINE_END, + null, + KEY_VALUE_SEP, + null, + CHUNK_LINE_END + ) + ); + } + + public void setChunk(Entry entry) { + long keySize = entry.key().byteSize(); + long valueSize = entry.value() == null ? 0 : entry.value().byteSize(); + long totalSize = keySize + valueSize + KEY_VALUE_SEP.byteSize(); + chunk.set(0, mapToHex(totalSize)); + chunk.set(2, entry.key()); + chunk.set(4, entry.value()); + setItems(chunk.iterator()); + } + + private static MemorySegment mapToHex(long value) { + return MemorySegment.ofArray( + Long.toHexString(value).getBytes(CHARSET) + ); + } +} diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Headers.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Headers.java similarity index 96% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Headers.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Headers.java index 5042c010a..4459dfcc5 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Headers.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Headers.java @@ -1,4 +1,4 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import one.nio.http.Request; import one.nio.http.Response; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/JoinedQueueItem.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/JoinedQueueItem.java new file mode 100644 index 000000000..ef3103a5b --- /dev/null +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/JoinedQueueItem.java @@ -0,0 +1,77 @@ +package ru.vk.itmo.test.kovalevigor.server.strategy.util; + +import one.nio.net.Session; +import one.nio.net.Socket; + +import java.io.IOException; +import java.lang.foreign.MemorySegment; +import java.lang.foreign.ValueLayout; +import java.util.Iterator; + +public class JoinedQueueItem extends Session.ArrayQueueItem { + + private Iterator items; + private MemorySegment current; + private int itemOffset; + + public JoinedQueueItem(int bufferSize) { + super(new byte[bufferSize], 0, 0, 0); + } + + public void setItems(Iterator items) { + this.items = items; + nextItem(); + fillBuffer(); + } + + @Override + public int write(Socket socket) throws IOException { + int res; + do { + res = super.write(socket); + fillIfNeeded(); + } while (res > 0 && remaining() > 0); + return res; + } + + private void fillIfNeeded() { + if (super.remaining() == 0) { + fillBuffer(); + } + } + + private void fillBuffer() { + offset = 0; + count = 0; + written = 0; + while (count < data.length && currentItemRemain() > 0) { + int length = (int) Math.min(currentItemRemain(), data.length - count); + MemorySegment.copy( + current, ValueLayout.JAVA_BYTE, itemOffset, + data, count, length + ); + count += length; + itemOffset += length; + while (currentItemRemain() == 0 && items.hasNext()) { + nextItem(); + } + } + } + + private long currentItemRemain() { + return current.byteSize() - itemOffset; + } + + @Override + public int remaining() { + return super.remaining() + (currentItemRemain() > 0 ? 1 : 0); + } + + private void nextItem() { + do { + current = items.next(); + } while (current == null && items.hasNext()); + itemOffset = 0; + } +} + diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Parameters.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Parameters.java similarity index 89% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Parameters.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Parameters.java index 7d5a84db3..cb26a042e 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Parameters.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Parameters.java @@ -1,4 +1,4 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import one.nio.http.Request; @@ -7,7 +7,9 @@ public enum Parameters { ID("id"), ACK("ack"), - FROM("from"); + FROM("from"), + START("start"), + END("end"); private final String name; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Partition.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Partition.java similarity index 86% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Partition.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Partition.java index 35aefda49..2ba8a710a 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Partition.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Partition.java @@ -1,4 +1,4 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import ru.vk.itmo.test.kovalevigor.server.strategy.ServerStrategy; diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Paths.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Paths.java similarity index 64% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Paths.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Paths.java index f3b1ec1bf..2ca03dbc4 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Paths.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Paths.java @@ -1,12 +1,19 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; public enum Paths { - V0_ENTITY("/v0/entity"); + V0_ENTITY("/v0/entity"), + V0_ENTITIES("/v0/entities", true); public final String path; + private final boolean isLocal; - Paths(String path) { + Paths(String path, boolean isLocal) { this.path = path; + this.isLocal = isLocal; + } + + Paths(String path) { + this(path, false); } public static Paths getPath(String request) { @@ -26,4 +33,8 @@ public static Paths getPathOrThrow(String request) { } throw new IllegalStateException("Unexpected path"); } + + public boolean isLocal() { + return this.isLocal; + } } diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Responses.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Responses.java similarity index 91% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Responses.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Responses.java index 3ffc84414..a9ecd2cea 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/Responses.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/Responses.java @@ -1,8 +1,9 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import one.nio.http.Response; public enum Responses { + OK(Response.OK), NOT_FOUND(Response.NOT_FOUND), CREATED(Response.CREATED), ACCEPTED(Response.ACCEPTED), diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerTask.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerTask.java similarity index 81% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerTask.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerTask.java index a4c0f377a..6c739743b 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerTask.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerTask.java @@ -1,4 +1,4 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import one.nio.http.HttpSession; import one.nio.http.Request; @@ -7,8 +7,7 @@ import java.util.logging.Level; import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.closeSession; -import static ru.vk.itmo.test.kovalevigor.server.util.ServerUtil.sendResponseWithoutIo; +import static ru.vk.itmo.test.kovalevigor.server.strategy.util.ServerUtil.sendResponseWithoutIo; public class ServerTask implements Runnable { public final Request request; @@ -32,7 +31,7 @@ public void run() { task.accept(request, session); } catch (IOException ioException) { log.log(Level.SEVERE, "IO while executing", ioException); - closeSession(session, ioException); + ServerUtil.closeSession(session, ioException); } catch (Exception exception) { log.log(Level.SEVERE, "Exception while executing", exception); sendResponseWithoutIo(session, Responses.INTERNAL_ERROR); diff --git a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerUtil.java b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerUtil.java similarity index 95% rename from src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerUtil.java rename to src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerUtil.java index 049a4908a..e054ffec2 100644 --- a/src/main/java/ru/vk/itmo/test/kovalevigor/server/util/ServerUtil.java +++ b/src/main/java/ru/vk/itmo/test/kovalevigor/server/strategy/util/ServerUtil.java @@ -1,9 +1,11 @@ -package ru.vk.itmo.test.kovalevigor.server.util; +package ru.vk.itmo.test.kovalevigor.server.strategy.util; import one.nio.http.HttpSession; import one.nio.http.Response; import java.io.IOException; +import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; import java.time.Duration; import java.util.Set; import java.util.concurrent.ExecutorService; @@ -13,7 +15,7 @@ import static ru.vk.itmo.test.kovalevigor.server.strategy.ServerDaoStrategy.log; public final class ServerUtil { - + public static final Charset CHARSET = StandardCharsets.UTF_8; public static final TimeUnit SHUTDOWN_TIMEOUT_TIME_UNIT = TimeUnit.SECONDS; public static final int SHUTDOWN_TIMEOUT = 60; public static final Set GOOD_STATUSES = Set.of(