From 1422669fbe4307621153244c1a78d5af187ada41 Mon Sep 17 00:00:00 2001 From: JYcz Date: Thu, 9 Feb 2023 18:45:15 +0800 Subject: [PATCH] [dingo-mpu] fix on synced clear after destroy --- .../mpu/storage/rocks/RocksStorage.java | 102 +++++++++--------- .../store/column/DingoColumnStorage.java | 31 ++++-- 2 files changed, 75 insertions(+), 58 deletions(-) diff --git a/dingo-mirror-processing-unit/src/main/java/io/dingodb/mpu/storage/rocks/RocksStorage.java b/dingo-mirror-processing-unit/src/main/java/io/dingodb/mpu/storage/rocks/RocksStorage.java index b75b41c002..96c28cc8bf 100644 --- a/dingo-mirror-processing-unit/src/main/java/io/dingodb/mpu/storage/rocks/RocksStorage.java +++ b/dingo-mirror-processing-unit/src/main/java/io/dingodb/mpu/storage/rocks/RocksStorage.java @@ -22,6 +22,7 @@ import io.dingodb.common.util.FileUtils; import io.dingodb.common.util.Optional; import io.dingodb.common.util.Parameters; +import io.dingodb.common.util.Utils; import io.dingodb.mpu.api.StorageApi; import io.dingodb.mpu.core.CoreMeta; import io.dingodb.mpu.instruction.Context; @@ -46,7 +47,6 @@ import org.rocksdb.DBOptions; import org.rocksdb.FileOperationInfo; import org.rocksdb.FlushJobInfo; -import org.rocksdb.FlushOptions; import org.rocksdb.LRUCache; import org.rocksdb.MemTableInfo; import org.rocksdb.Options; @@ -78,6 +78,8 @@ import java.util.Comparator; import java.util.List; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import static io.dingodb.common.codec.PrimitiveCodec.encodeLong; import static io.dingodb.mpu.Constant.API; @@ -120,6 +122,7 @@ public class RocksStorage implements Storage { public final WriteOptions writeOptions; public final LinkedRunner runner; + private final AtomicInteger runningCount; public Checkpoint checkPoint; public RocksDB instruction; @@ -166,6 +169,7 @@ public RocksStorage(String label, Path path, final String dbRocksOptionsFile, final String logRocksOptionsFile, final int ttl, boolean withTtl) throws Exception { this.label = label; this.runner = new LinkedRunner(label); + this.runningCount = new AtomicInteger(0); this.path = path.toAbsolutePath(); this.ttl = ttl; this.withTtl = withTtl; @@ -327,6 +331,7 @@ public synchronized void destroy() throws NullPointerException { return; } destroy = true; + Utils.loop(() -> !runningCount.compareAndSet(0, Integer.MIN_VALUE), TimeUnit.MILLISECONDS.toNanos(10)); this.writeOptions.close(); closeDB(); if (this.icfHandler != null) { @@ -395,35 +400,15 @@ public CompletableFuture transferTo(CoreMeta meta) { }); } - private void flushMeta() { - try (FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) { - - if (db != null && mcfHandler != null) { - db.flush(flushOptions, mcfHandler); - } - } catch (RocksDBException e) { - log.error("Flush instruction error.", e); - } - } - - private void flushInstruction() { - try (FlushOptions flushOptions = new FlushOptions().setWaitForFlush(true)) { - if (instruction != null) { - instruction.flush(flushOptions); - } - } catch (RocksDBException e) { - log.error("Flush instruction error.", e); - } - } - /* * Create new RocksDB checkpoint in backup dir * * @throws RuntimeException */ public void createNewCheckpoint() { - if (destroy) { - return; + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { if (checkPoint != null) { @@ -437,6 +422,8 @@ public void createNewCheckpoint() { } } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -513,8 +500,9 @@ public int compare(File p1,File p2) { */ @Override public void applyBackup() { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { String remoteCheckpointDir = String.format("%s%s", REMOTE_CHECKPOINT_PREFIX, "checkpoint"); @@ -550,12 +538,15 @@ public void applyBackup() { } catch (Exception e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } public void backup() { - if (destroy) { - return; + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { @@ -563,6 +554,8 @@ public void backup() { purgeOldCheckpoint(3); } catch (Exception e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -579,20 +572,24 @@ public String receiveBackup() { @Override public long approximateCount() { + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); + } try { - if (destroy) { - throw new RuntimeException(); - } return db.getLongProperty(dcfHandler, "rocksdb.estimate-num-keys"); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @Override public long approximateSize() { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try ( Snapshot snapshot = db.getSnapshot(); @@ -617,6 +614,7 @@ public long approximateSize() { } finally { readOptions.setSnapshot(null); db.releaseSnapshot(snapshot); + runningCount.decrementAndGet(); } } return 0; @@ -624,8 +622,9 @@ public long approximateSize() { @Override public void clearClock(long clock) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { instruction.delete(icfHandler, encodeLong(clock)); @@ -634,13 +633,16 @@ public void clearClock(long clock) { } } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @Override public long clocked() { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } if (bypassWriteDb) { return bypassClock; @@ -649,6 +651,8 @@ public long clocked() { return Optional.mapOrGet(db.get(mcfHandler, CLOCK_K), PrimitiveCodec::decodeLong, () -> 0L); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -678,8 +682,9 @@ public void tick(long clock) { @Override public void saveInstruction(long clock, byte[] instruction) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } if (bypassSaveInstruction) { return; @@ -688,13 +693,16 @@ public void saveInstruction(long clock, byte[] instruction) { this.instruction.put(icfHandler, PrimitiveCodec.encodeLong(clock), instruction); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @Override public byte[] reappearInstruction(long clock) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } if (bypassSaveInstruction) { Instruction emptyInstruction = new Instruction(clock, EmptyInstructions.id, EmptyInstructions.EMPTY); @@ -704,6 +712,8 @@ public byte[] reappearInstruction(long clock) { return instruction.get(icfHandler, PrimitiveCodec.encodeLong(clock)); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -757,15 +767,7 @@ public void flush(io.dingodb.mpu.storage.Writer writer) { try { Instruction instruction = writer.instruction(); WriteBatch batch = ((Writer) writer).writeBatch(); - byte[] clockValue = PrimitiveCodec.encodeLong(instruction.clock); - //if (withTtl) { - // clockValue = new byte[Long.BYTES + Integer.BYTES]; - // encodeLong(instruction.clock, clockValue, 0); - // encodeInt(Utils.currentSecond(), clockValue); - //} else { - // clockValue = PrimitiveCodec.encodeLong(instruction.clock); - //} - batch.put(mcfHandler, CLOCK_K, clockValue); + batch.put(mcfHandler, CLOCK_K, PrimitiveCodec.encodeLong(instruction.clock)); this.db.write(writeOptions, batch); } catch (Exception e) { throw new RuntimeException(e); diff --git a/dingo-server/executor/src/main/java/io/dingodb/server/executor/store/column/DingoColumnStorage.java b/dingo-server/executor/src/main/java/io/dingodb/server/executor/store/column/DingoColumnStorage.java index 8a9584664b..230127736b 100644 --- a/dingo-server/executor/src/main/java/io/dingodb/server/executor/store/column/DingoColumnStorage.java +++ b/dingo-server/executor/src/main/java/io/dingodb/server/executor/store/column/DingoColumnStorage.java @@ -50,6 +50,7 @@ import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; import static io.dingodb.common.codec.PrimitiveCodec.encodeLong; import static io.dingodb.mpu.Constant.CF_DEFAULT; @@ -72,6 +73,7 @@ public class DingoColumnStorage implements Storage { public Checkpoint checkPoint; public RocksDB instruction; public RocksDB db; + private final AtomicInteger runningCount; private ColumnFamilyHandle mcfHandler; private ColumnFamilyHandle icfHandler; @@ -87,6 +89,7 @@ public class DingoColumnStorage implements Storage { public DingoColumnStorage(String label, Path path, final String table, TableDefinition definition) throws Exception { this.label = label; this.runner = new LinkedRunner(label); + this.runningCount = new AtomicInteger(0); this.path = path.toAbsolutePath(); RocksConfiguration rocksConfiguration = RocksConfiguration.refreshRocksConfiguration(); @@ -197,13 +200,16 @@ public void closeDB() { @Override public long clocked() { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { return Optional.mapOrGet(db.get(mcfHandler, CLOCK_K), PrimitiveCodec::decodeLong, () -> 0L); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -233,25 +239,31 @@ public void tick(long clock) { @Override public void saveInstruction(long clock, byte[] instruction) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { this.instruction.put(icfHandler, PrimitiveCodec.encodeLong(clock), instruction); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @Override public byte[] reappearInstruction(long clock) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { return instruction.get(icfHandler, PrimitiveCodec.encodeLong(clock)); } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } } @@ -328,8 +340,9 @@ public void applyBackup() { @Override public void clearClock(long clock) { - if (destroy) { - throw new RuntimeException(); + if (runningCount.incrementAndGet() < 0) { + runningCount.decrementAndGet(); + throw new RuntimeException("Storage already destroy."); } try { instruction.delete(icfHandler, encodeLong(clock)); @@ -338,6 +351,8 @@ public void clearClock(long clock) { } } catch (RocksDBException e) { throw new RuntimeException(e); + } finally { + runningCount.decrementAndGet(); } }