Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ходосова Елена, ИТМО DWS, Stage 6 #214

Merged
25 commits merged into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from 23 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
423 changes: 348 additions & 75 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/HttpServerImpl.java

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/Server.java
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

Expand All @@ -29,6 +30,7 @@ private Server() {

public static void main(String[] args) throws IOException {
List<ServiceConfig> nodesConfigs = new ArrayList<>(NODES_COUNT);
ExecutorService executorService = ExecutorServiceConfig.newExecutorService();

Map<Integer, String> nodes = new HashMap<>();
int nodePort = 8080;
Expand All @@ -50,12 +52,14 @@ public static void main(String[] args) throws IOException {
}
for (ServiceConfig config : nodesConfigs) {
ServiceImpl server = new ServiceImpl(config);
try {
server.start().get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException e) {
logger.error("Unable to start service instance: ", e);
Thread.currentThread().interrupt();
}
executorService.execute(() -> {
try {
server.start().get(1, TimeUnit.SECONDS);
} catch (InterruptedException | ExecutionException | TimeoutException | IOException e) {
logger.error("Unable to start service instance: ", e);
Thread.currentThread().interrupt();
}
});
}
}
}
6 changes: 4 additions & 2 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/ServiceImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ public CompletableFuture<Void> start() throws IOException {
executorService = ExecutorServiceConfig.newExecutorService();
server = new HttpServerImpl(config, dao, executorService);
server.start();
isServiceStopped.getAndSet(false);
return CompletableFuture.completedFuture(null);
}

@Override
public CompletableFuture<Void> stop() throws IOException {
public synchronized CompletableFuture<Void> stop() throws IOException {
if (isServiceStopped.getAndSet(true)) {
return CompletableFuture.completedFuture(null);
}
Expand All @@ -48,13 +49,14 @@ public CompletableFuture<Void> stop() throws IOException {
executorService.shutdownNow();
}
} catch (InterruptedException e) {
executorService.shutdownNow();
Thread.currentThread().interrupt();
}
dao.close();
return CompletableFuture.completedFuture(null);
}

@ServiceFactory(stage = 3)
@ServiceFactory(stage = 6)
public static class Factory implements ServiceFactory.Factory {

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package ru.vk.itmo.test.elenakhodosova.dao;

public record BaseEntryWithTimestamp<Data>(Data key, Data value, long timestamp) implements EntryWithTimestamp<Data> {
@Override
public String toString() {
return "{" + key + ":" + value + ":" + timestamp + "}";
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

public interface EntryWithTimestamp<D> extends Entry<D> {
long timestamp();
}
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NoSuchElementException;

/**
* Filters non tombstone {@link Entry}s.
* Filters non tombstone {@link EntryWithTimestamp}s.
*
* @author incubos
*/
final class LiveFilteringIterator implements Iterator<Entry<MemorySegment>> {
private final Iterator<Entry<MemorySegment>> delegate;
private Entry<MemorySegment> next;
final class LiveFilteringIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private final Iterator<EntryWithTimestamp<MemorySegment>> delegate;
private EntryWithTimestamp<MemorySegment> next;

LiveFilteringIterator(final Iterator<Entry<MemorySegment>> delegate) {
LiveFilteringIterator(final Iterator<EntryWithTimestamp<MemorySegment>> delegate) {
this.delegate = delegate;
skipTombstones();
}

private void skipTombstones() {
while (delegate.hasNext()) {
final Entry<MemorySegment> entry = delegate.next();
final EntryWithTimestamp<MemorySegment> entry = delegate.next();
if (entry.value() != null) {
this.next = entry;
break;
Expand All @@ -36,13 +34,13 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

// Consume
final Entry<MemorySegment> result = next;
final EntryWithTimestamp<MemorySegment> result = next;
next = null;

skipTombstones();
Expand Down
10 changes: 4 additions & 6 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/MemTable.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.NavigableMap;
Expand All @@ -13,15 +11,15 @@
* @author incubos
*/
final class MemTable {
private final NavigableMap<MemorySegment, Entry<MemorySegment>> map =
private final NavigableMap<MemorySegment, EntryWithTimestamp<MemorySegment>> map =
new ConcurrentSkipListMap<>(
MemorySegmentComparator.INSTANCE);

boolean isEmpty() {
return map.isEmpty();
}

Iterator<Entry<MemorySegment>> get(
Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
if (from == null && to == null) {
Expand All @@ -39,11 +37,11 @@ Iterator<Entry<MemorySegment>> get(
}
}

Entry<MemorySegment> get(final MemorySegment key) {
EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
return map.get(key);
}

Entry<MemorySegment> upsert(final Entry<MemorySegment> entry) {
EntryWithTimestamp<MemorySegment> upsert(final EntryWithTimestamp<MemorySegment> entry) {
return map.put(entry.key(), entry);
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.List;
Expand All @@ -14,7 +12,7 @@
*
* @author incubos
*/
final class MergingEntryIterator implements Iterator<Entry<MemorySegment>> {
final class MergingEntryIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private final Queue<WeightedPeekingEntryIterator> iterators;

MergingEntryIterator(final List<WeightedPeekingEntryIterator> iterators) {
Expand All @@ -29,13 +27,13 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}

final WeightedPeekingEntryIterator top = iterators.remove();
final Entry<MemorySegment> result = top.next();
final EntryWithTimestamp<MemorySegment> result = top.next();

if (top.hasNext()) {
// Not exhausted
Expand All @@ -51,7 +49,7 @@ public Entry<MemorySegment> next() {
}

// Skip entries with the same key
final Entry<MemorySegment> entry = iterator.peek();
final EntryWithTimestamp<MemorySegment> entry = iterator.peek();
if (MemorySegmentComparator.INSTANCE.compare(result.key(), entry.key()) != 0) {
// Reached another key
break;
Expand Down
20 changes: 9 additions & 11 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/ReferenceDao.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;

import java.io.IOException;
import java.lang.foreign.Arena;
Expand All @@ -22,7 +21,7 @@
*
* @author incubos
*/
public class ReferenceDao implements Dao<MemorySegment, Entry<MemorySegment>> {
public class ReferenceDao implements Dao<MemorySegment, EntryWithTimestamp<MemorySegment>> {
private final Config config;
private final Arena arena;

Expand Down Expand Up @@ -63,7 +62,7 @@ public ReferenceDao(final Config config) throws IOException {
}

@Override
public Iterator<Entry<MemorySegment>> get(
public Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
return new LiveFilteringIterator(
Expand All @@ -73,13 +72,13 @@ public Iterator<Entry<MemorySegment>> get(
}

@Override
public Entry<MemorySegment> get(final MemorySegment key) {
public EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
// Without lock, just snapshot of table set
return tableSet.get(key);
}

@Override
public void upsert(final Entry<MemorySegment> entry) {
public void upsert(final EntryWithTimestamp<MemorySegment> entry) {
final boolean autoFlush;
lock.readLock().lock();
try {
Expand All @@ -89,7 +88,7 @@ public void upsert(final Entry<MemorySegment> entry) {
}

// Upsert
final Entry<MemorySegment> previous = tableSet.upsert(entry);
final EntryWithTimestamp<MemorySegment> previous = tableSet.upsert(entry);

// Update size estimate
final long size = tableSet.memTableSize.addAndGet(sizeOf(entry) - sizeOf(previous));
Expand All @@ -103,16 +102,16 @@ public void upsert(final Entry<MemorySegment> entry) {
}
}

private static long sizeOf(final Entry<MemorySegment> entry) {
private static long sizeOf(final EntryWithTimestamp<MemorySegment> entry) {
if (entry == null) {
return 0L;
}

if (entry.value() == null) {
return entry.key().byteSize();
return entry.key().byteSize() + Long.BYTES;
}

return entry.key().byteSize() + entry.value().byteSize();
return entry.key().byteSize() + entry.value().byteSize() + Long.BYTES;
}

private void initiateFlush(final boolean auto) {
Expand Down Expand Up @@ -200,8 +199,7 @@ public void compact() throws IOException {
.write(
config.basePath(),
0,
new LiveFilteringIterator(
currentTableSet.allSSTableEntries()));
new LiveFilteringIterator(currentTableSet.allSSTableEntries()));
} catch (IOException e) {
e.printStackTrace();
Runtime.getRuntime().halt(-3);
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/ru/vk/itmo/test/elenakhodosova/dao/SSTable.java
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package ru.vk.itmo.test.elenakhodosova.dao;

import ru.vk.itmo.dao.BaseEntry;
import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.lang.foreign.ValueLayout;
import java.util.Collections;
Expand Down Expand Up @@ -88,7 +85,7 @@ private long getLength(final long offset) {
offset);
}

Iterator<Entry<MemorySegment>> get(
Iterator<EntryWithTimestamp<MemorySegment>> get(
final MemorySegment from,
final MemorySegment to) {
assert from == null || to == null || MemorySegmentComparator.INSTANCE.compare(from, to) <= 0;
Expand Down Expand Up @@ -134,7 +131,7 @@ Iterator<Entry<MemorySegment>> get(
return new SliceIterator(fromOffset, toOffset);
}

Entry<MemorySegment> get(final MemorySegment key) {
EntryWithTimestamp<MemorySegment> get(final MemorySegment key) {
final long entry = entryBinarySearch(key);
if (entry < 0) {
return null;
Expand All @@ -146,17 +143,24 @@ Entry<MemorySegment> get(final MemorySegment key) {
// Extract value length
final long valueLength = getLength(offset);
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Get timestamp
offset += Long.BYTES;
final long timestamp = getLength(offset);
// Tombstone encountered
return new BaseEntry<>(key, null);
return new BaseEntryWithTimestamp<>(key, null, timestamp);
} else {
// Get value
offset += Long.BYTES;
final MemorySegment value = data.asSlice(offset, valueLength);
return new BaseEntry<>(key, value);

// Get timestamp
offset += valueLength;
final long timestamp = getLength(offset);
return new BaseEntryWithTimestamp<>(key, value, timestamp);
}
}

private final class SliceIterator implements Iterator<Entry<MemorySegment>> {
private final class SliceIterator implements Iterator<EntryWithTimestamp<MemorySegment>> {
private long offset;
private final long toOffset;

Expand All @@ -173,7 +177,7 @@ public boolean hasNext() {
}

@Override
public Entry<MemorySegment> next() {
public EntryWithTimestamp<MemorySegment> next() {
if (!hasNext()) {
throw new NoSuchElementException();
}
Expand All @@ -193,11 +197,15 @@ public Entry<MemorySegment> next() {
// Read value
if (valueLength == SSTables.TOMBSTONE_VALUE_LENGTH) {
// Tombstone encountered
return new BaseEntry<>(key, null);
final long timestamp = getLength(offset);
offset += Long.BYTES;
return new BaseEntryWithTimestamp<>(key, null, timestamp);
} else {
final MemorySegment value = data.asSlice(offset, valueLength);
offset += valueLength;
return new BaseEntry<>(key, value);
final long timestamp = getLength(offset);
offset += Long.BYTES;
return new BaseEntryWithTimestamp<>(key, value, timestamp);
}
}
}
Expand Down
Loading
Loading