Skip to content

Commit

Permalink
[dingo-mpu] fix on synced clear after destroy
Browse files Browse the repository at this point in the history
  • Loading branch information
JYcz committed Feb 10, 2023
1 parent 07bfe84 commit 1422669
Show file tree
Hide file tree
Showing 2 changed files with 75 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import io.dingodb.common.util.FileUtils;
import io.dingodb.common.util.Optional;
import io.dingodb.common.util.Parameters;
import io.dingodb.common.util.Utils;
import io.dingodb.mpu.api.StorageApi;
import io.dingodb.mpu.core.CoreMeta;
import io.dingodb.mpu.instruction.Context;
Expand All @@ -46,7 +47,6 @@
import org.rocksdb.DBOptions;
import org.rocksdb.FileOperationInfo;
import org.rocksdb.FlushJobInfo;
import org.rocksdb.FlushOptions;
import org.rocksdb.LRUCache;
import org.rocksdb.MemTableInfo;
import org.rocksdb.Options;
Expand Down Expand Up @@ -78,6 +78,8 @@
import java.util.Comparator;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

import static io.dingodb.common.codec.PrimitiveCodec.encodeLong;
import static io.dingodb.mpu.Constant.API;
Expand Down Expand Up @@ -120,6 +122,7 @@ public class RocksStorage implements Storage {

public final WriteOptions writeOptions;
public final LinkedRunner runner;
private final AtomicInteger runningCount;

public Checkpoint checkPoint;
public RocksDB instruction;
Expand Down Expand Up @@ -166,6 +169,7 @@ public RocksStorage(String label, Path path, final String dbRocksOptionsFile,
final String logRocksOptionsFile, final int ttl, boolean withTtl) throws Exception {
this.label = label;
this.runner = new LinkedRunner(label);
this.runningCount = new AtomicInteger(0);
this.path = path.toAbsolutePath();
this.ttl = ttl;
this.withTtl = withTtl;
Expand Down Expand Up @@ -327,6 +331,7 @@ public synchronized void destroy() throws NullPointerException {
return;
}
destroy = true;
Utils.loop(() -> !runningCount.compareAndSet(0, Integer.MIN_VALUE), TimeUnit.MILLISECONDS.toNanos(10));
this.writeOptions.close();
closeDB();
if (this.icfHandler != null) {
Expand Down Expand Up @@ -395,35 +400,15 @@ public CompletableFuture<Void> transferTo(CoreMeta meta) {
});
}

private void flushMeta() {
try (FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {

if (db != null && mcfHandler != null) {
db.flush(flushOptions, mcfHandler);
}
} catch (RocksDBException e) {
log.error("Flush instruction error.", e);
}
}

private void flushInstruction() {
try (FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) {
if (instruction != null) {
instruction.flush(flushOptions);
}
} catch (RocksDBException e) {
log.error("Flush instruction error.", e);
}
}

/*
* Create new RocksDB checkpoint in backup dir
*
* @throws RuntimeException
*/
public void createNewCheckpoint() {
if (destroy) {
return;
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
if (checkPoint != null) {
Expand All @@ -437,6 +422,8 @@ public void createNewCheckpoint() {
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down Expand Up @@ -513,8 +500,9 @@ public int compare(File p1,File p2) {
*/
@Override
public void applyBackup() {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
String remoteCheckpointDir = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint");
Expand Down Expand Up @@ -550,19 +538,24 @@ public void applyBackup() {

} catch (Exception e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

public void backup() {
if (destroy) {
return;
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}

try {
createNewCheckpoint();
purgeOldCheckpoint(3);
} catch (Exception e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand All @@ -579,20 +572,24 @@ public String receiveBackup() {

@Override
public long approximateCount() {
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
if (destroy) {
throw new RuntimeException();
}
return db.getLongProperty(dcfHandler, "rocksdb.estimate-num-keys");
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

@Override
public long approximateSize() {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try (
Snapshot snapshot = db.getSnapshot();
Expand All @@ -617,15 +614,17 @@ public long approximateSize() {
} finally {
readOptions.setSnapshot(null);
db.releaseSnapshot(snapshot);
runningCount.decrementAndGet();
}
}
return 0;
}

@Override
public void clearClock(long clock) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
instruction.delete(icfHandler, encodeLong(clock));
Expand All @@ -634,13 +633,16 @@ public void clearClock(long clock) {
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

@Override
public long clocked() {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
if (bypassWriteDb) {
return bypassClock;
Expand All @@ -649,6 +651,8 @@ public long clocked() {
return Optional.mapOrGet(db.get(mcfHandler, CLOCK_K), PrimitiveCodec::decodeLong, () -> 0L);
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down Expand Up @@ -678,8 +682,9 @@ public void tick(long clock) {

@Override
public void saveInstruction(long clock, byte[] instruction) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
if (bypassSaveInstruction) {
return;
Expand All @@ -688,13 +693,16 @@ public void saveInstruction(long clock, byte[] instruction) {
this.instruction.put(icfHandler, PrimitiveCodec.encodeLong(clock), instruction);
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

@Override
public byte[] reappearInstruction(long clock) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
if (bypassSaveInstruction) {
Instruction emptyInstruction = new Instruction(clock, EmptyInstructions.id, EmptyInstructions.EMPTY);
Expand All @@ -704,6 +712,8 @@ public byte[] reappearInstruction(long clock) {
return instruction.get(icfHandler, PrimitiveCodec.encodeLong(clock));
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down Expand Up @@ -757,15 +767,7 @@ public void flush(io.dingodb.mpu.storage.Writer writer) {
try {
Instruction instruction = writer.instruction();
WriteBatch batch = ((Writer) writer).writeBatch();
byte[] clockValue = PrimitiveCodec.encodeLong(instruction.clock);
//if (withTtl) {
// clockValue = new byte[Long.BYTES + Integer.BYTES];
// encodeLong(instruction.clock, clockValue, 0);
// encodeInt(Utils.currentSecond(), clockValue);
//} else {
// clockValue = PrimitiveCodec.encodeLong(instruction.clock);
//}
batch.put(mcfHandler, CLOCK_K, clockValue);
batch.put(mcfHandler, CLOCK_K, PrimitiveCodec.encodeLong(instruction.clock));
this.db.write(writeOptions, batch);
} catch (Exception e) {
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicInteger;

import static io.dingodb.common.codec.PrimitiveCodec.encodeLong;
import static io.dingodb.mpu.Constant.CF_DEFAULT;
Expand All @@ -72,6 +73,7 @@ public class DingoColumnStorage implements Storage {
public Checkpoint checkPoint;
public RocksDB instruction;
public RocksDB db;
private final AtomicInteger runningCount;

private ColumnFamilyHandle mcfHandler;
private ColumnFamilyHandle icfHandler;
Expand All @@ -87,6 +89,7 @@ public class DingoColumnStorage implements Storage {
public DingoColumnStorage(String label, Path path, final String table, TableDefinition definition) throws Exception {
this.label = label;
this.runner = new LinkedRunner(label);
this.runningCount = new AtomicInteger(0);
this.path = path.toAbsolutePath();

RocksConfiguration rocksConfiguration = RocksConfiguration.refreshRocksConfiguration();
Expand Down Expand Up @@ -197,13 +200,16 @@ public void closeDB() {

@Override
public long clocked() {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
return Optional.mapOrGet(db.get(mcfHandler, CLOCK_K), PrimitiveCodec::decodeLong, () -> 0L);
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down Expand Up @@ -233,25 +239,31 @@ public void tick(long clock) {

@Override
public void saveInstruction(long clock, byte[] instruction) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
this.instruction.put(icfHandler, PrimitiveCodec.encodeLong(clock), instruction);
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

@Override
public byte[] reappearInstruction(long clock) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
return instruction.get(icfHandler, PrimitiveCodec.encodeLong(clock));
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down Expand Up @@ -328,8 +340,9 @@ public void applyBackup() {

@Override
public void clearClock(long clock) {
if (destroy) {
throw new RuntimeException();
if (runningCount.incrementAndGet() < 0) {
runningCount.decrementAndGet();
throw new RuntimeException("Storage already destroy.");
}
try {
instruction.delete(icfHandler, encodeLong(clock));
Expand All @@ -338,6 +351,8 @@ public void clearClock(long clock) {
}
} catch (RocksDBException e) {
throw new RuntimeException(e);
} finally {
runningCount.decrementAndGet();
}
}

Expand Down

0 comments on commit 1422669

Please sign in to comment.