Skip to content

Commit

Permalink
Ходосова Елена, ИТМО DWS, Stage 6 (#214)
Browse files Browse the repository at this point in the history
  • Loading branch information
ImLena authored May 22, 2024
1 parent 6aae3db commit c23f9ba
Show file tree
Hide file tree
Showing 31 changed files with 45,036 additions and 161 deletions.
424 changes: 349 additions & 75 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/HttpServerImpl.java

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -29,6 +30,7 @@ private Server() {

public static void main(String[] args) throws IOException {
List<ServiceConfig> nodesConfigs = new ArrayList<>(NODES_COUNT);
ExecutorService executorService = ExecutorServiceConfig.newExecutorService();

Map<Integer, String> nodes = new HashMap<>();
int nodePort = 8080;
Expand All @@ -50,12 +52,14 @@ public static void main(String[] args) throws IOException {
}
for (ServiceConfig config : nodesConfigs) {
ServiceImpl server = new ServiceImpl(config);
try {
server.start().get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("Unable to start service instance: ", e);
Thread.currentThread().interrupt();
}
executorService.execute(() -> {
try {
server.start().get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException | IOException e) {
logger.error("Unable to start service instance: ", e);
Thread.currentThread().interrupt();
}
});
}
}
}
6 changes: 4 additions & 2 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/ServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ public CompletableFuture<Void> start() throws IOException {
executorService = ExecutorServiceConfig.newExecutorService();
server = new HttpServerImpl(config, dao, executorService);
server.start();
isServiceStopped.getAndSet(false);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stop() throws IOException {
public synchronized CompletableFuture<Void> stop() throws IOException {
if (isServiceStopped.getAndSet(true)) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -48,13 +49,14 @@ public CompletableFuture<Void> stop() throws IOException {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
dao.close();
return CompletableFuture.completedFuture(null);
}

@ServiceFactory(stage = 3)
@ServiceFactory(stage = 6)
public static class Factory implements ServiceFactory.Factory {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.vk.itmo.test.elenakhodosova.dao;

public record BaseEntryWithTimestamp<Data>(Data key, Data value, long timestamp) implements EntryWithTimestamp<Data> {
@Override
public String toString() {
return "{" + key + ":" + value + ":" + timestamp + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

public interface EntryWithTimestamp<D> extends Entry<D> {
long timestamp();
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Filters non tombstone {@link Entry}s.
* Filters non tombstone {@link EntryWithTimestamp}s.
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;
final class LiveFilteringIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private final Iterator<EntryWithTimestamp<MemorySegment>> delegate;
private EntryWithTimestamp<MemorySegment> next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
LiveFilteringIterator(final Iterator<EntryWithTimestamp<MemorySegment>> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
final EntryWithTimestamp<MemorySegment> entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
Expand All @@ -36,13 +34,13 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
final EntryWithTimestamp<MemorySegment> result = next;
next = null;

skipTombstones();
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/MemTable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NavigableMap;
Expand All @@ -13,15 +11,15 @@
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
private final NavigableMap<MemorySegment, EntryWithTimestamp<MemorySegment>> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
Expand All @@ -39,11 +37,11 @@ Iterator<Entry<MemorySegment>> get(
}
}

Entry<MemorySegment> get(final MemorySegment key) {
EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
return map.get(key);
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
EntryWithTimestamp<MemorySegment> upsert(final EntryWithTimestamp<MemorySegment> entry) {
return map.put(entry.key(), entry);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.List;
Expand All @@ -14,7 +12,7 @@
*
* @author incubos
*/
final class MergingEntryIterator implements Iterator<Entry<MemorySegment>> {
final class MergingEntryIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private final Queue<WeightedPeekingEntryIterator> iterators;

MergingEntryIterator(final List<WeightedPeekingEntryIterator> iterators) {
Expand All @@ -29,13 +27,13 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

final WeightedPeekingEntryIterator top = iterators.remove();
final Entry<MemorySegment> result = top.next();
final EntryWithTimestamp<MemorySegment> result = top.next();

if (top.hasNext()) {
// Not exhausted
Expand All @@ -51,7 +49,7 @@ public Entry<MemorySegment> next() {
}

// Skip entries with the same key
final Entry<MemorySegment> entry = iterator.peek();
final EntryWithTimestamp<MemorySegment> entry = iterator.peek();
if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) {
// Reached another key
break;
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/ReferenceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;

import java.io.IOException;
import java.lang.foreign.Arena;
Expand All @@ -22,7 +21,7 @@
*
* @author incubos
*/
public class ReferenceDao implements Dao<MemorySegment, Entry<MemorySegment>> {
public class ReferenceDao implements Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> {
private final Config config;
private final Arena arena;

Expand Down Expand Up @@ -63,7 +62,7 @@ public ReferenceDao(final Config config) throws IOException {
}

@Override
public Iterator<Entry<MemorySegment>> get(
public Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
return new LiveFilteringIterator(
Expand All @@ -73,13 +72,13 @@ public Iterator<Entry<MemorySegment>> get(
}

@Override
public Entry<MemorySegment> get(final MemorySegment key) {
public EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
// Without lock, just snapshot of table set
return tableSet.get(key);
}

@Override
public void upsert(final Entry<MemorySegment> entry) {
public void upsert(final EntryWithTimestamp<MemorySegment> entry) {
final boolean autoFlush;
lock.readLock().lock();
try {
Expand All @@ -89,7 +88,7 @@ public void upsert(final Entry<MemorySegment> entry) {
}

// Upsert
final Entry<MemorySegment> previous = tableSet.upsert(entry);
final EntryWithTimestamp<MemorySegment> previous = tableSet.upsert(entry);

// Update size estimate
final long size = tableSet.memTableSize.addAndGet(sizeOf(entry) - sizeOf(previous));
Expand All @@ -103,16 +102,16 @@ public void upsert(final Entry<MemorySegment> entry) {
}
}

private static long sizeOf(final Entry<MemorySegment> entry) {
private static long sizeOf(final EntryWithTimestamp<MemorySegment> entry) {
if (entry == null) {
return 0L;
}

if (entry.value() == null) {
return entry.key().byteSize();
return entry.key().byteSize() + Long.BYTES;
}

return entry.key().byteSize() + entry.value().byteSize();
return entry.key().byteSize() + entry.value().byteSize() + Long.BYTES;
}

private void initiateFlush(final boolean auto) {
Expand Down Expand Up @@ -200,8 +199,7 @@ public void compact() throws IOException {
.write(
config.basePath(),
0,
new LiveFilteringIterator(
currentTableSet.allSSTableEntries()));
new LiveFilteringIterator(currentTableSet.allSSTableEntries()));
} catch (IOException e) {
e.printStackTrace();
Runtime.getRuntime().halt(-3);
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/SSTable.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +85,7 @@ private long getLength(final long offset) {
offset);
}

Iterator<Entry<MemorySegment>> get(
Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0;
Expand Down Expand Up @@ -134,7 +131,7 @@ Iterator<Entry<MemorySegment>> get(
return new SliceIterator(fromOffset, toOffset);
}

Entry<MemorySegment> get(final MemorySegment key) {
EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
final long entry = entryBinarySearch(key);
if (entry < 0) {
return null;
Expand All @@ -146,17 +143,24 @@ Entry<MemorySegment> get(final MemorySegment key) {
// Extract value length
final long valueLength = getLength(offset);
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Get timestamp
offset += Long.BYTES;
final long timestamp = getLength(offset);
// Tombstone encountered
return new BaseEntry<>(key, null);
return new BaseEntryWithTimestamp<>(key, null, timestamp);
} else {
// Get value
offset += Long.BYTES;
final MemorySegment value = data.asSlice(offset, valueLength);
return new BaseEntry<>(key, value);

// Get timestamp
offset += valueLength;
final long timestamp = getLength(offset);
return new BaseEntryWithTimestamp<>(key, value, timestamp);
}
}

private final class SliceIterator implements Iterator<Entry<MemorySegment>> {
private final class SliceIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private long offset;
private final long toOffset;

Expand All @@ -173,7 +177,7 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand All @@ -193,11 +197,15 @@ public Entry<MemorySegment> next() {
// Read value
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Tombstone encountered
return new BaseEntry<>(key, null);
final long timestamp = getLength(offset);
offset += Long.BYTES;
return new BaseEntryWithTimestamp<>(key, null, timestamp);
} else {
final MemorySegment value = data.asSlice(offset, valueLength);
offset += valueLength;
return new BaseEntry<>(key, value);
final long timestamp = getLength(offset);
offset += Long.BYTES;
return new BaseEntryWithTimestamp<>(key, value, timestamp);
}
}
}
Expand Down
Loading

0 comments on commit c23f9ba

Please sign in to comment.