Skip to content

Commit

Permalink
Пронин Валентин / ИТМО DWS / Stage 6 (#206)
Browse files Browse the repository at this point in the history
* First version

* Refactor

* Add report + improve code style

* Add a suggestion to the report about compression need

* Stage 2 only code

* PR fixes

* Add report with code optimization

* Add information about Get queries to report

* Add comparison between Array and Linked blocking queue

* Add sharding algorithm

* Refactoring

* First version of code

* Fix codestyle

* Fix codestyle

* Add failure limiter

* Fix codestyle

* Refactor + change some parameters

* Add report and RendezvousHashing (to compare performance between Consistent and Rendezvous hashing algorithms)

* Fix codestyle

* Add png

* Change report

* Add dao without tombstone cleaning

* Fix dao and add template for interaction with replicas

* Fix code for tests

* Fix code style

* Add report

* Remove failure limiter

* Add async interaction

* Refactor

* Fix due to code climate

* Fixes

* Add report

* Merge master

* Add ignore for "FutureReturnValueIgnored" warnings

* Fix codeclimate issues

* Add streaming

* Code climate fixes

* Add report

---------

Co-authored-by: Valentin Pronin <[email protected]>
Co-authored-by: Vadim Tsesko <[email protected]>
  • Loading branch information
3 people authored May 25, 2024
1 parent d15d648 commit 0b04920
Show file tree
Hide file tree
Showing 57 changed files with 20,002 additions and 363 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package ru.vk.itmo.test.proninvalentin;

import one.nio.http.HttpServer;
import one.nio.http.HttpSession;
import one.nio.http.Response;
import one.nio.net.Socket;
import ru.vk.itmo.test.proninvalentin.streaming.StreamingResponse;

import java.io.IOException;

public class CustomHttpSession extends HttpSession {
public CustomHttpSession(Socket socket, HttpServer server) {
super(socket, server);
}

public void safetySendResponse(Response response) {
try {
if (response instanceof StreamingResponse streamingResponse) {
sendStreamResponse(streamingResponse);
} else {
super.sendResponse(response);
}
} catch (IOException e) {
log.error("Error while sending response", e);
scheduleClose();
}
}

private void sendStreamResponse(StreamingResponse response) {
try {
if (response.remaining() < 1) {
safetySendResponse(new Response(Response.OK, Response.EMPTY));
return;
}

response.start(socket);
while (response.remaining() > 0) {
response.writePart(socket);
}
response.finish(socket);
close();
} catch (IOException e) {
log.error("Error while sending response", e);
scheduleClose();
}
}
}
80 changes: 80 additions & 0 deletions src/main/java/ru/vk/itmo/test/proninvalentin/RequestHandler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
package ru.vk.itmo.test.proninvalentin;

import one.nio.http.Param;
import one.nio.http.Request;
import one.nio.http.Response;
import ru.vk.itmo.test.proninvalentin.dao.ExtendedBaseEntry;
import ru.vk.itmo.test.proninvalentin.dao.ExtendedEntry;
import ru.vk.itmo.test.proninvalentin.dao.ReferenceDao;
import ru.vk.itmo.test.proninvalentin.request_parameter.RangeRequestParameters;
import ru.vk.itmo.test.proninvalentin.streaming.StreamingResponse;
import ru.vk.itmo.test.proninvalentin.utils.Constants;
import ru.vk.itmo.test.proninvalentin.utils.MemorySegmentFactory;
import ru.vk.itmo.test.proninvalentin.utils.Utils;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;

public class RequestHandler {
private final ReferenceDao dao;

private static final String HTTP_TIMESTAMP_HEADER = Constants.HTTP_TIMESTAMP_HEADER + ":";

public RequestHandler(ReferenceDao dao) {
this.dao = dao;
}

public Response handle(Request request, String entryId) {
return switch (request.getMethod()) {
case Request.METHOD_PUT -> upsert(entryId, request);
case Request.METHOD_GET -> get(entryId);
case Request.METHOD_DELETE -> delete(entryId);
default -> new Response(Response.BAD_REQUEST, Response.EMPTY);
};
}

public Response upsert(@Param(value = "id", required = true) String id, Request request) {
if (Utils.isNullOrBlank(id) || request.getBody() == null) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

MemorySegment key = MemorySegmentFactory.fromString(id);
MemorySegment value = MemorySegment.ofArray(request.getBody());
dao.upsert(new ExtendedBaseEntry<>(key, value, System.currentTimeMillis()));
return new Response(Response.CREATED, Response.EMPTY);
}

public Response get(@Param(required = true, value = "id") String id) {
if (Utils.isNullOrBlank(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}

MemorySegment key = MemorySegmentFactory.fromString(id);
ExtendedEntry<MemorySegment> entry = dao.get(key);

if (entry == null || entry.value() == null) {
Response response = new Response(Response.NOT_FOUND, Response.EMPTY);
response.addHeader(HTTP_TIMESTAMP_HEADER + (entry != null ? entry.timestamp() : 0));
return response;
}

Response response = new Response(Response.OK, MemorySegmentFactory.toByteArray(entry.value()));
response.addHeader(HTTP_TIMESTAMP_HEADER + entry.timestamp());
return response;
}

public Response delete(@Param(required = true, value = "id") String id) {
if (Utils.isNullOrBlank(id)) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}
ExtendedEntry<MemorySegment> deletedMemorySegment =
MemorySegmentFactory.toDeletedMemorySegment(id, System.currentTimeMillis());
dao.upsert(deletedMemorySegment);
return new Response(Response.ACCEPTED, Response.EMPTY);
}

public StreamingResponse getRange(RangeRequestParameters params) {
Iterator<ExtendedEntry<MemorySegment>> entryIterator = dao.get(params.start(), params.end());
return new StreamingResponse(entryIterator);
}
}
Loading

0 comments on commit 0b04920

Please sign in to comment.