Skip to content

Commit

Permalink
Add a method to rebuild a database.
Browse files Browse the repository at this point in the history
  • Loading branch information
broneill committed Nov 10, 2024
1 parent fc3a8ed commit c44c837
Show file tree
Hide file tree
Showing 10 changed files with 438 additions and 16 deletions.
22 changes: 22 additions & 0 deletions src/main/java/org/cojen/tupl/Database.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@

import javax.net.ssl.SSLContext;

import org.cojen.tupl.core.Rebuilder;

import org.cojen.tupl.diag.CompactionObserver;
import org.cojen.tupl.diag.DatabaseStats;
import org.cojen.tupl.diag.EventListener;
Expand Down Expand Up @@ -117,6 +119,26 @@ public static Database connect(SocketAddress addr, SSLContext context, long... t
return ClientDatabase.connect(addr, context, tokens);
}

/**
* Open an existing database and copy all the data into a new database, which can have a
* different configuration.
*
* @param numThreads pass 0 for default, or if negative, the actual number will be {@code
* (-numThreads * availableProcessors)}.
* @return the newly built database
* @throws IllegalStateException if the new database already exists
*/
public static Database rebuild(DatabaseConfig oldConfig, DatabaseConfig newConfig,
int numThreads)
throws IOException
{
if (numThreads <= 0) {
int procs = Runtime.getRuntime().availableProcessors();
numThreads = numThreads == 0 ? procs : procs * -numThreads;
}
return new Rebuilder(oldConfig.mLauncher, newConfig.mLauncher, numThreads).run();
}

/**
* Returns the given named index, creating it if necessary.
*
Expand Down
153 changes: 153 additions & 0 deletions src/main/java/org/cojen/tupl/core/BTreeCopier.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,153 @@
/*
* Copyright (C) 2024 Cojen.org
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as
* published by the Free Software Foundation, either version 3 of the
* License, or (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <https://www.gnu.org/licenses/>.
*/

package org.cojen.tupl.core;

import java.io.InterruptedIOException;
import java.io.IOException;

import java.util.concurrent.Executor;

import org.cojen.tupl.Index;

import org.cojen.tupl.util.Latch;

/**
* Parallel tree copying utility. All entries from the source tree are copied into a new target
* temporary tree. No threads should be active in the source tree.
*
* @author Brian S. O'Neill
*/
/*P*/
final class BTreeCopier extends BTreeSeparator {
private final int mPageSize;

private final Latch mLatch;
private final Latch.Condition mCondition;

private byte[] mBuf;

private BTree mMerged;
private IOException mException;

/**
* @param dest is only used for calling newTemporaryIndex
* @param executor used for parallel separation; pass null to use only the starting thread
* @param workerCount maximum parallelism; must be at least 1
*/
BTreeCopier(LocalDatabase dest, BTree source, Executor executor, int workerCount) {
super(dest, new BTree[] {source}, executor, workerCount);
mPageSize = dest.stats().pageSize;
mLatch = new Latch();
mCondition = new Latch.Condition();
}

/**
* Returns a new temporary index with all the results, or null if empty.
*/
BTree result() throws IOException {
mLatch.acquireExclusive();
try {
while (true) {
if (mException != null) {
throw mException;
}
if (mMerged != null) {
return mMerged;
}
if (mCondition.await(mLatch) < 0) {
throw new InterruptedIOException();
}
}

} finally {
mLatch.releaseExclusive();
}
}

@Override
protected void finished(Chain<BTree> firstRange) {
BTree merged = firstRange.element();

if (merged != null) {
Chain<BTree> range = firstRange.next();

while (range != null) {
BTree tree = range.element();

if (tree != null) {
try {
merged = BTree.graftTempTree(merged, tree);
} catch (IOException e) {
mException = e;
merged = null;
break;
}
}

range = range.next();
}
}

mMerged = merged;

mLatch.acquireExclusive();
mCondition.signalAll(mLatch);
mLatch.releaseExclusive();
}

@Override
protected void transfer(BTreeCursor source, BTreeCursor target) throws IOException {
target.findNearby(source.key());

long length = source.valueLength();

if (length <= mPageSize) {
source.load();
target.store(source.value());
} else {
byte[] buf = mBuf;

if (buf == null) {
mBuf = buf = new byte[Math.max(source.mTree.mDatabase.stats().pageSize, mPageSize)];
}

target.valueLength(length);

long pos = 0;
while (true) {
int amt = source.valueRead(pos, buf, 0, buf.length);
target.valueWrite(pos, buf, 0, amt);
pos += amt;
if (amt < buf.length) {
break;
}
}

if (pos != length) {
throw new AssertionError("Value isn't fully copied");
}
}

source.next();
}

@Override
protected void skip(BTreeCursor source) throws IOException {
source.next();
}
}
1 change: 1 addition & 0 deletions src/main/java/org/cojen/tupl/core/BTreeMerger.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
/*P*/
abstract class BTreeMerger extends BTreeSeparator {
/**
* @param db is only used for calling newTemporaryIndex
* @param executor used for parallel separation; pass null to use only the starting thread
* @param workerCount maximum parallelism; must be at least 1
*/
Expand Down
28 changes: 24 additions & 4 deletions src/main/java/org/cojen/tupl/core/BTreeSeparator.java
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ abstract class BTreeSeparator extends LongAdder {
}

/**
* @param db is only used for calling newTemporaryIndex
* @param executor used for parallel separation; pass null to use only the starting thread
* @param workerCount maximum parallelism; must be at least 1
*/
Expand Down Expand Up @@ -135,12 +136,32 @@ protected void failed(Throwable cause) {

/**
* Called when separation has finished. When finished normally (not stopped), then all
* source trees are empty, but not deleted.
* source trees are empty, but not deleted, unless the transfer and skip methods are
* overridden.
*
* @param firstRange first separated range; the ranges are ordered lowest to highest.
*/
protected abstract void finished(Chain<BTree> firstRange);

/**
* Copies (or moves) the current entry from the source cursor to the unpositioned target
* cursor, and advance the source cursor to the next key. The source cursor value isn't
* autoloaded.
*
* Note: When this method is overridden, the skip method should be overridden too.
*/
protected void transfer(BTreeCursor source, BTreeCursor target) throws IOException {
target.appendTransfer(source);
}

/**
* Skips (and possibly deletes) the current entry.
*/
protected void skip(BTreeCursor source) throws IOException {
source.store(null);
source.next();
}

private void startWorker(Worker from, int spawnCount, byte[] lowKey, byte[] highKey) {
var worker = new Worker(spawnCount, lowKey, highKey, mSources.length);

Expand Down Expand Up @@ -341,8 +362,7 @@ private void doRun() throws Exception {
scursor.reset();
} else {
if (selector.mSkip) {
scursor.store(null);
scursor.next();
skip(scursor);
selector.mSkip = false;
} else {
if (tcursor == null) {
Expand All @@ -351,7 +371,7 @@ private void doRun() throws Exception {
tcursor.mKeyOnly = true;
tcursor.firstLeaf();
}
tcursor.appendTransfer(scursor);
transfer(scursor, tcursor);
if (++count == 0) {
// Inherited from LongAdder.
add(256);
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/org/cojen/tupl/core/CompressedPageArray.java
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ Snapshot beginSnapshot() throws IOException {
try (var snapDb = launcher.open(false, null)) {
snapArray = new CompressedPageArray
(pageSize(), snapDb, snapDb.registry(), mCompressorFactory);
var snapPageDb = StoredPageDb.open(null, snapArray, null, null, false);
var snapPageDb = StoredPageDb.open(null, snapArray, null, null, false, 0);
redoPos = snapPageDb.snapshotRedoPos();
}
}
Expand Down
16 changes: 16 additions & 0 deletions src/main/java/org/cojen/tupl/core/CoreDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ public void rootSwap(Index a, Index b) throws IOException {

public abstract boolean isInTrash(Transaction txn, long treeId) throws IOException;

abstract long databaseId();

abstract boolean isDirectPageAccess();

abstract boolean isCacheOnly();
Expand All @@ -110,6 +112,20 @@ public void rootSwap(Index a, Index b) throws IOException {

abstract Tree registry();

abstract Tree registryKeyMap();

abstract boolean scanAllIndexes(ScanVisitor visitor) throws IOException;

abstract Index anyIndexById(long id) throws IOException;

/**
* Copies all entries from a source index into a new temporary index, which can be null if
* empty. No threads should be active in the source index.
*
* @param workerCount maximum parallelism; must be at least 1
*/
abstract Tree parallelCopy(Index source, int workerCount) throws IOException;

/**
* @return null if none
*/
Expand Down
3 changes: 3 additions & 0 deletions src/main/java/org/cojen/tupl/core/Launcher.java
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public final class Launcher implements Cloneable {
Map<String, PrepareHandler> mPrepareHandlers;
TempFileManager mTempFileManager;

// When 0, the database id is assigned automatically.
long mDatabaseId;

// When true: one index is supported (the registry), no lock file is created, snapshots
// aren't supported, and the database has no redo log.
boolean mBasicMode;
Expand Down
30 changes: 26 additions & 4 deletions src/main/java/org/cojen/tupl/core/LocalDatabase.java
Original file line number Diff line number Diff line change
Expand Up @@ -472,14 +472,17 @@ private LocalDatabase(Launcher launcher, boolean destroy) throws IOException {
debugListener = mEventListener;
}

long databaseId = launcher.mDatabaseId;

if (dataFiles == null) {
PageArray dataPageArray = launcher.mDataPageArray;
if (dataPageArray == null) {
mPageDb = new NonPageDb(pageSize);
} else {
Crypto crypto = launcher.mDataCrypto;
mPageDb = StoredPageDb.open(debugListener, dataPageArray,
launcher.mChecksumFactory, crypto, destroy);
mPageDb = StoredPageDb.open
(debugListener, dataPageArray,
launcher.mChecksumFactory, crypto, destroy, databaseId);
/*P*/ // [|
/*P*/ // fullyMapped = crypto == null && dataPageArray.isFullyMapped();
/*P*/ // ]
Expand All @@ -491,7 +494,7 @@ private LocalDatabase(Launcher launcher, boolean destroy) throws IOException {
try {
pageDb = StoredPageDb.open
(debugListener, explicitPageSize, pageSize, dataFiles, options,
launcher.mChecksumFactory, launcher.mDataCrypto, destroy);
launcher.mChecksumFactory, launcher.mDataCrypto, destroy, databaseId);
} catch (FileNotFoundException e) {
if (!mReadOnly) {
throw e;
Expand Down Expand Up @@ -1335,6 +1338,7 @@ private Tree lookupIndexById(long id) {
/**
* Allows access to internal indexes which can use the redo log.
*/
@Override
Index anyIndexById(long id) throws IOException {
return anyIndexById(null, id);
}
Expand Down Expand Up @@ -2365,6 +2369,13 @@ public Sorter newSorter() {
return new ParallelSorter(this, Runner.current());
}

@Override
Tree parallelCopy(Index source, int workerCount) throws IOException {
var copier = new BTreeCopier(this, (BTree) source, Runner.current(), workerCount);
copier.start();
return copier.result();
}

@Override
public void capacityLimit(long bytes) {
mPageDb.pageLimit(bytes < 0 ? -1 : (bytes / mPageSize));
Expand Down Expand Up @@ -2983,7 +2994,8 @@ synchronized void waitFor() throws IOException {
/**
* @return false if stopped
*/
private boolean scanAllIndexes(ScanVisitor visitor) throws IOException {
@Override
boolean scanAllIndexes(ScanVisitor visitor) throws IOException {
if (!scan(visitor, mRegistry) || !scan(visitor, mRegistryKeyMap)
|| !scan(visitor, openFragmentedTrash(false))
|| !scan(visitor, openCursorRegistry(false))
Expand Down Expand Up @@ -6184,6 +6196,11 @@ void readNode(Node node, long id) throws IOException {
}
}

@Override
long databaseId() {
return mPageDb.databaseId();
}

@Override
boolean isDirectPageAccess() {
/*P*/ // [
Expand Down Expand Up @@ -6218,6 +6235,11 @@ Tree registry() {
return mRegistry;
}

@Override
Tree registryKeyMap() {
return mRegistryKeyMap;
}

@Override
public EventListener eventListener() {
return mEventListener;
Expand Down
Loading

0 comments on commit c44c837

Please sign in to comment.