Skip to content

Commit

Permalink
Супрядкина Дарья ИТМО DWS stage 1 (polis-vk#18)
Browse files Browse the repository at this point in the history
* HW1: add realization

* HW1: fix code style

* HW1: change exception constructor

* HW1: add report

* HW1: add flame graphs description

* HW1: add flame graphs description

* HW1: fix code style

* HW1: fixes according to review

---------

Co-authored-by: atimofeyev <[email protected]>
  • Loading branch information
2 people authored and osokindm committed Mar 6, 2024
1 parent 468286d commit 79dbf0a
Show file tree
Hide file tree
Showing 37 changed files with 4,577 additions and 0 deletions.
123 changes: 123 additions & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/Server.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package ru.vk.itmo.test.dariasupriadkina;

import one.nio.http.HttpServer;
import one.nio.http.HttpServerConfig;
import one.nio.http.HttpSession;
import one.nio.http.Param;
import one.nio.http.Path;
import one.nio.http.Request;
import one.nio.http.RequestMethod;
import one.nio.http.Response;
import one.nio.server.AcceptorConfig;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.nio.charset.StandardCharsets;
import java.util.Set;

public class Server extends HttpServer {

private final Dao<MemorySegment, Entry<MemorySegment>> dao;
private final Set<Integer> permittedMethods =
Set.of(Request.METHOD_GET, Request.METHOD_PUT, Request.METHOD_DELETE);

public Server(ServiceConfig config, Dao<MemorySegment, Entry<MemorySegment>> dao) throws IOException {
super(createHttpServerConfig(config));
this.dao = dao;
}

private static HttpServerConfig createHttpServerConfig(ServiceConfig serviceConfig) {
HttpServerConfig httpServerConfig = new HttpServerConfig();

AcceptorConfig acceptorConfig = new AcceptorConfig();
acceptorConfig.port = serviceConfig.selfPort();
acceptorConfig.reusePort = true;

httpServerConfig.acceptors = new AcceptorConfig[] {acceptorConfig};
httpServerConfig.closeSessions = true;

return httpServerConfig;
}

@Path("/health")
@RequestMethod(Request.METHOD_GET)
public Response health() {
return Response.ok(Response.OK);
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_GET)
public Response getHandler(@Param(value = "id", required = true) String id) {
try {
if (id.length() == 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}
Entry<MemorySegment> entry = getEntryById(id);
if (entry == null || entry.value() == null) {
return new Response(Response.NOT_FOUND, Response.EMPTY);
}
return Response.ok(entry.value().toArray(ValueLayout.JAVA_BYTE));
} catch (Exception e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_PUT)
public Response putHandler(Request request, @Param(value = "id", required = true) String id) {
try {
if (id.length() == 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}
dao.upsert(convertToEntry(id, request.getBody()));
return new Response(Response.CREATED, Response.EMPTY);
} catch (Exception e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

@Path("/v0/entity")
@RequestMethod(Request.METHOD_DELETE)
public Response deleteHandler(@Param(value = "id", required = true) String id) {
try {
if (id.length() == 0) {
return new Response(Response.BAD_REQUEST, Response.EMPTY);
}
dao.upsert(convertToEntry(id, null));
return new Response(Response.ACCEPTED, Response.EMPTY);
} catch (Exception e) {
return new Response(Response.INTERNAL_ERROR, Response.EMPTY);
}
}

@Override
public void handleDefault(Request request, HttpSession session) throws IOException {
Response response;
if (permittedMethods.contains(request.getMethod())) {
response = new Response(Response.BAD_REQUEST, Response.EMPTY);
} else {
response = new Response(Response.METHOD_NOT_ALLOWED, Response.EMPTY);

}
session.sendResponse(response);
}

private Entry<MemorySegment> convertToEntry(String id, byte[] body) {
return new BaseEntry<>(convertByteArrToMemorySegment(id.getBytes(StandardCharsets.UTF_8)),
convertByteArrToMemorySegment(body));
}

private MemorySegment convertByteArrToMemorySegment(byte[] bytes) {
return bytes == null ? null : MemorySegment.ofArray(bytes);
}

private Entry<MemorySegment> getEntryById(String id) {
return dao.get(convertByteArrToMemorySegment(id.getBytes(StandardCharsets.UTF_8)));
}

}
51 changes: 51 additions & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/ServiceIml.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
package ru.vk.itmo.test.dariasupriadkina;

import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.dariasupriadkina.dao.exception.ServiceImplCreationException;
import ru.vk.itmo.test.reference.dao.ReferenceDao;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.util.concurrent.CompletableFuture;

public class ServiceIml implements Service {

private Server server;
private Dao<MemorySegment, Entry<MemorySegment>> dao;
private Config daoConfig;
private ServiceConfig serviceConfig;

public ServiceIml(ServiceConfig serviceConfig, Config daoConfig) {
try {

this.daoConfig = daoConfig;
this.serviceConfig = serviceConfig;

this.dao = new ReferenceDao(daoConfig);

} catch (IOException e) {
throw new ServiceImplCreationException(e);
}
}

@Override
public synchronized CompletableFuture<Void> start() throws IOException {
dao = new ReferenceDao(daoConfig);
server = new Server(serviceConfig, dao);

server.start();
return CompletableFuture.completedFuture(null);
}

@Override
public synchronized CompletableFuture<Void> stop() throws IOException {
server.stop();
dao.close();

return CompletableFuture.completedFuture(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ru.vk.itmo.test.dariasupriadkina;

import ru.vk.itmo.Service;
import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.ServiceFactory;

import java.nio.file.Path;

@ServiceFactory(stage = 1)
public class ServiceImlFactory implements ServiceFactory.Factory {

private static final long FLUSH_THRESHOLD_BYTES = 1024 * 1024;

@Override
public Service create(ServiceConfig serviceConfig) {
Config referenceDaoConfig = new Config(Path.of(serviceConfig.workingDir().toUri()), FLUSH_THRESHOLD_BYTES);
return new ServiceIml(serviceConfig, referenceDaoConfig);
}
}
28 changes: 28 additions & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/TestServer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.vk.itmo.test.dariasupriadkina;

import ru.vk.itmo.ServiceConfig;
import ru.vk.itmo.dao.Config;
import ru.vk.itmo.test.reference.dao.ReferenceDao;

import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;

public final class TestServer {

private TestServer() {
}

public static void main(String[] args) throws IOException {
String url = "http://localhost";
ServiceConfig serviceConfig = new ServiceConfig(
8080,
url,
List.of(url),
Paths.get("./"));
ReferenceDao dao = new ReferenceDao(
new Config(serviceConfig.workingDir(), 1024 * 128));
Server server = new Server(serviceConfig, dao);
server.start();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
package ru.vk.itmo.test.dariasupriadkina.dao;

import java.io.IOException;
import java.lang.foreign.MemorySegment;
import java.nio.ByteBuffer;

/**
* Growable buffer with {@link ByteBuffer} and {@link MemorySegment} interface.
*
* @author incubos
*/
final class ByteArraySegment {
private byte[] array;
private MemorySegment segment;

ByteArraySegment(final int capacity) {
this.array = new byte[capacity];
this.segment = MemorySegment.ofArray(array);
}

void withArray(final ArrayConsumer consumer) throws IOException {
consumer.process(array);
}

MemorySegment segment() {
return segment;
}

void ensureCapacity(final long size) {
if (size > Integer.MAX_VALUE) {
throw new IllegalArgumentException("Too big!");
}

final int capacity = (int) size;
if (array.length >= capacity) {
return;
}

// Grow to the nearest bigger power of 2
final int newSize = Integer.highestOneBit(capacity) << 1;
array = new byte[newSize];
segment = MemorySegment.ofArray(array);
}

interface ArrayConsumer {
void process(byte[] array) throws IOException;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package ru.vk.itmo.test.dariasupriadkina.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Filters non tombstone {@link Entry}s.
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
}
}
}

@Override
public boolean hasNext() {
return next != null;
}

@Override
public Entry<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
next = null;

skipTombstones();

return result;
}
}
49 changes: 49 additions & 0 deletions src/main/java/ru/vk/itmo/test/dariasupriadkina/dao/MemTable.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package ru.vk.itmo.test.dariasupriadkina.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;

/**
* Memory table.
*
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
// All
return map.values().iterator();
} else if (from == null) {
// Head
return map.headMap(to).values().iterator();
} else if (to == null) {
// Tail
return map.tailMap(from).values().iterator();
} else {
// Slice
return map.subMap(from, to).values().iterator();
}
}

Entry<MemorySegment> get(final MemorySegment key) {
return map.get(key);
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
return map.put(entry.key(), entry);
}
}
Loading

0 comments on commit 79dbf0a

Please sign in to comment.