Skip to content

Commit

Permalink
dao
Browse files Browse the repository at this point in the history
  • Loading branch information
smirnovdm2107 committed Feb 18, 2024
1 parent e22da0c commit 7b048ca
Show file tree
Hide file tree
Showing 32 changed files with 2,147 additions and 0 deletions.
100 changes: 100 additions & 0 deletions src/main/java/ru/vk/itmo/test/smirnovdmitrii/dao/DaoImpl.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package ru.vk.itmo.test.smirnovdmitrii.dao;

import ru.vk.itmo.dao.Config;
import ru.vk.itmo.dao.Dao;
import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.smirnovdmitrii.dao.inmemory.Flusher;
import ru.vk.itmo.test.smirnovdmitrii.dao.inmemory.InMemoryDao;
import ru.vk.itmo.test.smirnovdmitrii.dao.inmemory.InMemoryDaoImpl;
import ru.vk.itmo.test.smirnovdmitrii.dao.inmemory.SkipListMemtable;
import ru.vk.itmo.test.smirnovdmitrii.dao.outofmemory.FileDao;
import ru.vk.itmo.test.smirnovdmitrii.dao.outofmemory.OutMemoryDao;
import ru.vk.itmo.test.smirnovdmitrii.dao.state.State;
import ru.vk.itmo.test.smirnovdmitrii.dao.state.StateService;
import ru.vk.itmo.test.smirnovdmitrii.dao.util.EqualsComparator;
import ru.vk.itmo.test.smirnovdmitrii.dao.util.MemorySegmentComparator;
import ru.vk.itmo.test.smirnovdmitrii.dao.util.iterators.MergeIterator;
import ru.vk.itmo.test.smirnovdmitrii.dao.util.iterators.WrappedIterator;

import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.Objects;
import java.util.concurrent.atomic.AtomicBoolean;

public class DaoImpl implements Dao<MemorySegment, Entry<MemorySegment>> {
private final InMemoryDao<MemorySegment, Entry<MemorySegment>> inMemoryDao;
private final OutMemoryDao<MemorySegment, Entry<MemorySegment>> outMemoryDao;
private final StateService stateService = new StateService();
private final EqualsComparator<MemorySegment> comparator = new MemorySegmentComparator();
private final AtomicBoolean isClosed = new AtomicBoolean(false);

public DaoImpl(final Config config) {
outMemoryDao = new FileDao(config.basePath(), stateService);
final Flusher flusher = new Flusher(stateService, outMemoryDao, SkipListMemtable::new);
inMemoryDao = new InMemoryDaoImpl(config.flushThresholdBytes(), flusher);
}

@Override
public Iterator<Entry<MemorySegment>> get(final MemorySegment from, final MemorySegment to) {
int id = 0;
final MergeIterator.Builder<MemorySegment, Entry<MemorySegment>> builder
= new MergeIterator.Builder<>(comparator);
final State state = stateService.state();
for (final Iterator<Entry<MemorySegment>> inMemoryIterator : inMemoryDao.get(state, from, to)) {
builder.addIterator(new WrappedIterator<>(id++, inMemoryIterator));
}
for (final Iterator<Entry<MemorySegment>> outMemoryIterator : outMemoryDao.get(state, from, to)) {
builder.addIterator(new WrappedIterator<>(id++, outMemoryIterator));
}
return builder.build();
}

@Override
public Entry<MemorySegment> get(final MemorySegment key) {
Objects.requireNonNull(key);
final State state = stateService.state();
Entry<MemorySegment> result = inMemoryDao.get(state, key);
if (result == null) {
result = outMemoryDao.get(state, key);
}
if (result == null || result.value() == null) {
return null;
}
return result;
}

@Override
public void upsert(final Entry<MemorySegment> entry) {
try {
while (true) {
final State state = stateService.state();
if (inMemoryDao.upsert(state, entry)) {
return;
}
}
} catch (final IOException e) {
throw new UncheckedIOException(e);
}
}

@Override
public void flush() throws IOException {
inMemoryDao.flush();
}

@Override
public void compact() {
outMemoryDao.compact();
}

@Override
public void close() throws IOException {
if (!isClosed.compareAndSet(false, true)) {
return;
}
inMemoryDao.close();
outMemoryDao.close();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
package ru.vk.itmo.test.smirnovdmitrii.dao.inmemory;

import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.smirnovdmitrii.dao.outofmemory.OutMemoryDao;
import ru.vk.itmo.test.smirnovdmitrii.dao.state.StateService;

import java.io.Closeable;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.lang.foreign.MemorySegment;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Supplier;

public class Flusher implements Closeable {

private final StateService stateService;
private final OutMemoryDao<MemorySegment, Entry<MemorySegment>> outMemoryDao;
private final Supplier<Memtable> memtableSupplier;
private final AtomicBoolean isFlushing = new AtomicBoolean(false);
private final ExecutorService executorService = Executors.newSingleThreadExecutor();

public Flusher(
final StateService stateService,
final OutMemoryDao<MemorySegment, Entry<MemorySegment>> outMemoryDao,
final Supplier<Memtable> memtableSupplier
) {
this.stateService = stateService;
this.outMemoryDao = outMemoryDao;
this.memtableSupplier = memtableSupplier;

stateService.setMemtables(List.of(memtableSupplier.get()));
}

public boolean flush() {
if (isFlushing.compareAndSet(false, true)) {
forceFlush();
return true;
}
return false;
}

public void forceFlush() {
executorService.execute(() -> {
try {
final List<Memtable> memtables = stateService.memtables();
final Memtable memtable = memtables.getFirst();
final Memtable newMemtable = memtableSupplier.get();
// Creating new memory table.
stateService.addMemtable(newMemtable);
// Waiting until all upserts finished and flushing it to disk.
// need a structure only to wait until everyone is has gone
memtable.flushLock().lock();
try {
outMemoryDao.flush(memtable);
stateService.removeMemtable(memtable);
} catch (final IOException e) {
throw new UncheckedIOException(e);
} finally {
memtable.flushLock().unlock();
}
} finally {
isFlushing.set(false);
}
});
}

@Override
public void close() {
forceFlush();
executorService.close();
stateService.setMemtables(null);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package ru.vk.itmo.test.smirnovdmitrii.dao.inmemory;

import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.smirnovdmitrii.dao.state.State;

import java.io.IOException;
import java.util.Iterator;
import java.util.List;

public interface InMemoryDao<D, E extends Entry<D>> extends AutoCloseable {

/**
* Maybe flushes memtable to file. Works in background.
*/
void flush() throws IOException;

/**
* Return iterators for every in memory storage sorted from newer to older from key {@code from} to key {@code to}.
* @param from from key.
* @param to to key.
* @return returned iterator.
*/
List<Iterator<E>> get(State state, D from, D to);

/**
* Return entry that associated with key {@code key}. Null if there is no entry with such key.
* @param key key to search.
* @return entry associated with key.
*/
E get(State state, D key);

/**
* Adding entry to in memory dao. If there was entry with same key, then replace it.
* @param entry entry to add.
*/
boolean upsert(State state, E entry) throws IOException;

@Override
void close();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
package ru.vk.itmo.test.smirnovdmitrii.dao.inmemory;

import ru.vk.itmo.dao.Entry;
import ru.vk.itmo.test.smirnovdmitrii.dao.state.State;
import ru.vk.itmo.test.smirnovdmitrii.dao.util.exceptions.TooManyUpsertsException;

import java.lang.foreign.MemorySegment;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

public class InMemoryDaoImpl implements InMemoryDao<MemorySegment, Entry<MemorySegment>> {

private static final int MAX_MEMTABLES = 2;
private final long flushThresholdBytes;
private final Flusher flusher;

public InMemoryDaoImpl(
final long flushThresholdBytes,
final Flusher flusher
) {
this.flushThresholdBytes = flushThresholdBytes;
this.flusher = flusher;
}

@Override
public List<Iterator<Entry<MemorySegment>>> get(
final State state,
final MemorySegment from,
final MemorySegment to
) {
final List<Iterator<Entry<MemorySegment>>> iterators = new ArrayList<>();
for (final Memtable memtable : state.memtables()) {
iterators.add(memtable.get(from, to));
}
return iterators;
}

@Override
public Entry<MemorySegment> get(
final State state,
final MemorySegment key
) {
for (final Memtable memtable : state.memtables()) {
final Entry<MemorySegment> result = memtable.get(key);
if (result != null) {
return result;
}
}
return null;
}

@Override
public boolean upsert(
final State state,
final Entry<MemorySegment> entry
) {
final List<Memtable> currentMemtables = state.memtables();
final Memtable memtable = currentMemtables.getFirst();
if (memtable.upsertLock().tryLock()) {
try {
if (memtable.size() < flushThresholdBytes) {
memtable.upsert(entry);
return true;
}
} finally {
memtable.upsertLock().unlock();
}
if (flusher.flush()) {
return false;
} else {
if (currentMemtables.size() == MAX_MEMTABLES) {
throw new TooManyUpsertsException("out of memory.");
}
}
}
return false;
// Try again. We get SSTable that will be just replaced.
}

@Override
public void flush() {
flusher.flush();
}

/**
* Flushing memtable on disk.
*/
@Override
public void close() {
flusher.close();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package ru.vk.itmo.test.smirnovdmitrii.dao.inmemory;

import ru.vk.itmo.dao.Entry;

import java.lang.foreign.MemorySegment;
import java.util.Iterator;
import java.util.concurrent.locks.Lock;

/**
* Representation of memtable.
*/
public interface Memtable extends Iterable<Entry<MemorySegment>> {

long size();

void upsert(Entry<MemorySegment> entry);

Entry<MemorySegment> get(MemorySegment key);

Iterator<Entry<MemorySegment>> get(MemorySegment from, MemorySegment to);

void clear();

Lock upsertLock();

Lock flushLock();
}

Loading

0 comments on commit 7b048ca

Please sign in to comment.