Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NODE-2622 Delete old data #3912

Merged
merged 68 commits into from
Jan 16, 2024
Merged
Show file tree
Hide file tree
Changes from 53 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
4b4c841
AddressId instead of Address in Data keys
vsuharnikov Nov 6, 2023
4612d11
Removed not used keys
vsuharnikov Nov 6, 2023
a58c78b
Removed not used keys (2)
vsuharnikov Nov 6, 2023
0141ded
Removed not used keys (3): removed BlockReward
vsuharnikov Nov 6, 2023
b80702c
Updated curve25519-java dependency
vsuharnikov Nov 6, 2023
d220d07
Basic implementation of DB cleanup
vsuharnikov Nov 6, 2023
483bbc6
Added a test
vsuharnikov Nov 7, 2023
08601d7
Added new test, fixed an issue with excess deletion
vsuharnikov Nov 7, 2023
41e5765
Implementation with multiGet
vsuharnikov Nov 8, 2023
eaf3fb4
Delete old entries with low priority writes
vsuharnikov Nov 8, 2023
da1137c
Easier review improvements
vsuharnikov Nov 8, 2023
cc1541a
Merge version-1.5.x
vsuharnikov Nov 14, 2023
0361b07
Running node-it on Apple ARM
vsuharnikov Nov 14, 2023
49802a9
setMaxSubcompactions(2)
vsuharnikov Nov 17, 2023
4206494
setMaxBytesForLevelBase(536870912) + fixed compilation
vsuharnikov Nov 18, 2023
68be82a
setMaxBytesForLevelBase(128 MiB)
vsuharnikov Nov 18, 2023
a73a067
- Default setMaxBytesForLevelBase;
vsuharnikov Nov 18, 2023
2697ae1
Additional CF for some history sequences
vsuharnikov Nov 19, 2023
52cd611
Revert "Additional CF for some history sequences"
vsuharnikov Nov 21, 2023
9a7ece8
Write without adjusting a priority
vsuharnikov Nov 21, 2023
cee92fb
Increase write buffer number
vsuharnikov Nov 22, 2023
e29b418
Increase write buffer size for default CF
vsuharnikov Nov 23, 2023
cc08b1a
Background cleanup with LowPriorityWrites
vsuharnikov Nov 24, 2023
45589ac
Delete old data in a batch
vsuharnikov Nov 27, 2023
8622aa1
Delete old data in a batch: bug fix
vsuharnikov Nov 28, 2023
1c790ec
Delete old data in a batch: bug fix (2)
vsuharnikov Nov 28, 2023
95eac27
RocksDB: 8.5.4 -> 8.8.1
vsuharnikov Nov 29, 2023
5865004
Delete old data in a batch: bug fix (3)
vsuharnikov Nov 29, 2023
f3d36ee
Delete old data in a batch: bug fix (4), issue was in Explorer, added…
vsuharnikov Dec 6, 2023
9beba63
Delete in batch: threads for cleanup
vsuharnikov Dec 7, 2023
3034584
- Explorer: shows stats for unknown prefixes instead of falling;
vsuharnikov Dec 11, 2023
a728cb5
RocksDBWriter.deleteOldEntriesThreadPoolExecutor: 1 thread test
vsuharnikov Dec 11, 2023
ae0154a
RocksDBWriter.deleteOldEntriesThreadPoolExecutor: 1 thread test (2)
vsuharnikov Dec 11, 2023
db9ac06
Closing RocksDBWriter
vsuharnikov Dec 13, 2023
15a1da1
Saving last cleanup height to continue after restart
vsuharnikov Dec 13, 2023
e573db5
Some iterator fixes, added useful comments
vsuharnikov Dec 15, 2023
a81ad33
merge version-1.5.x
vsuharnikov Dec 15, 2023
54ac104
Delete old waves balances in batch
vsuharnikov Dec 18, 2023
bde11b3
Point and range deletes
vsuharnikov Dec 19, 2023
18946a4
Revert "Point and range deletes"
vsuharnikov Dec 20, 2023
95578c9
Better deletion of old data at start
vsuharnikov Dec 20, 2023
fea53f9
RocksDBWriter.hasData: we can't rely on KeyTags.ChangedDataKeys, beca…
vsuharnikov Dec 20, 2023
661fd01
What is affected by cleanup-interval
vsuharnikov Dec 20, 2023
d637d1b
Optional cleanup
vsuharnikov Dec 21, 2023
bdb483d
Fixed recursion in Exporter
vsuharnikov Dec 21, 2023
9b0a8c9
Cleanup without loops (eventually it will be cleaned)
vsuharnikov Dec 21, 2023
063b95a
Cleanup before review
vsuharnikov Dec 21, 2023
1a22965
Merge version-1.5.x
vsuharnikov Dec 21, 2023
5a4e4de
Non-overlapping delete ranges
vsuharnikov Dec 24, 2023
9b86363
deleteRange only if it is not the last record
vsuharnikov Dec 24, 2023
a86f2c6
only one job in TPE, fixed ArithmeticException in PoSCalculator
vsuharnikov Dec 24, 2023
e3689ab
- low priority writes;
vsuharnikov Dec 24, 2023
30cfdc7
linter fixes, better default settings
vsuharnikov Dec 26, 2023
2ebe1f9
Merge branch 'version-1.5.x' into NODE-2622-delete-old-data
phearnot Jan 10, 2024
964d0d2
- maxBackgroundJobs: 4 -> 6;
vsuharnikov Dec 28, 2023
9e591a0
- maxBackgroundJobs: 6 -> 8;
vsuharnikov Dec 29, 2023
eeb60c3
- RocksDB: 8.8.1 -> 8.9.1;
vsuharnikov Dec 30, 2023
6bed2c8
- setIncreaseParallelism: 8 -> 7;
vsuharnikov Dec 30, 2023
f0c47be
optimizeLevelStyleCompaction and enable subcompactions
vsuharnikov Jan 2, 2024
830496a
setMaxWriteBufferNumber: 4 -> 6
vsuharnikov Jan 3, 2024
2c55d98
Less write buffers, L0 files and threads for background jobs (this sh…
vsuharnikov Jan 4, 2024
cb3437c
- L0 compaction trigger: 2 -> 3;
vsuharnikov Jan 4, 2024
4cabd16
Review fixes
vsuharnikov Jan 15, 2024
e4d2649
Cleanup disabled by default
vsuharnikov Jan 15, 2024
1451081
RebroadcastTransactionSuite fix: waiting for container start
vsuharnikov Jan 15, 2024
622eddc
Review fix (2)
vsuharnikov Jan 15, 2024
ed5992d
Review fix (3)
vsuharnikov Jan 15, 2024
29e771c
Merge branch 'version-1.5.x' into NODE-2622-delete-old-data
phearnot Jan 15, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions benchmark/src/main/scala/com/wavesplatform/state/DBState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,12 @@ abstract class DBState extends ScorexLogging {

lazy val rdb: RDB = RDB.open(settings.dbSettings)

lazy val rocksDBWriter: RocksDBWriter =
new RocksDBWriter(
rdb,
settings.blockchainSettings,
settings.dbSettings.copy(maxCacheSize = 1),
settings.enableLightMode
)
lazy val rocksDBWriter: RocksDBWriter = RocksDBWriter(
rdb,
settings.blockchainSettings,
settings.dbSettings.copy(maxCacheSize = 1),
settings.enableLightMode
)

AddressScheme.current = new AddressScheme { override val chainId: Byte = 'W' }

Expand All @@ -44,6 +43,7 @@ abstract class DBState extends ScorexLogging {

@TearDown
def close(): Unit = {
rocksDBWriter.close()
rdb.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ object RollbackBenchmark extends ScorexLogging {
val settings = Application.loadApplicationConfig(Some(new File(args(0))))
val rdb = RDB.open(settings.dbSettings)
val time = new NTP(settings.ntpServer)
val rocksDBWriter = new RocksDBWriter(rdb, settings.blockchainSettings, settings.dbSettings, settings.enableLightMode)
val rocksDBWriter = RocksDBWriter(rdb, settings.blockchainSettings, settings.dbSettings, settings.enableLightMode)

val issuer = KeyPair(new Array[Byte](32))

Expand Down Expand Up @@ -111,6 +111,7 @@ object RollbackBenchmark extends ScorexLogging {
rocksDBWriter.rollbackTo(1)
val end = System.nanoTime()
log.info(f"Rollback took ${(end - start) * 1e-6}%.3f ms")
rocksDBWriter.close()
rdb.close()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ trait BaseState {

@TearDown
def close(): Unit = {
state.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform.state

import java.nio.file.Files
import java.util.concurrent.TimeUnit

import com.google.common.primitives.Ints
import com.typesafe.config.ConfigFactory
import com.wavesplatform.database.RDB
Expand All @@ -12,6 +9,10 @@ import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import org.rocksdb.{ReadOptions, WriteBatch, WriteOptions}

import java.nio.file.Files
import java.util.concurrent.TimeUnit
import scala.util.Using

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@Threads(1)
Expand Down Expand Up @@ -60,7 +61,7 @@ object RocksDBIteratorBenchmark {
RDB.open(wavesSettings.dbSettings.copy(directory = dir))
}

val keysPrefix = "keysPrefix"
val keysPrefix = "keysPrefix" // Must have 10 or more bytes, see RDB.newColumnFamilyOptions
val firstKey: Array[Byte] = keysPrefix.getBytes ++ Ints.toByteArray(1)
val lastKey: Array[Byte] = keysPrefix.getBytes ++ Ints.toByteArray(10000)

Expand All @@ -70,14 +71,18 @@ object RocksDBIteratorBenchmark {

val readOptions: ReadOptions = new ReadOptions().setTotalOrderSeek(false).setPrefixSameAsStart(true)

private val wb: WriteBatch = new WriteBatch()
kvs.foreach { case (key, value) =>
wb.put(key, value)
Using.Manager { use =>
val wb = use(new WriteBatch())
val wo = use(new WriteOptions())
kvs.foreach { case (key, value) =>
wb.put(key, value)
}
rdb.db.write(wo, wb)
}
rdb.db.write(new WriteOptions(), wb)

@TearDown
def close(): Unit = {
readOptions.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,7 @@
package com.wavesplatform.state

import java.nio.file.Files
import java.util.concurrent.TimeUnit

import com.google.common.primitives.{Bytes, Shorts}
import com.typesafe.config.ConfigFactory
import com.wavesplatform.account.Address
import com.wavesplatform.database.{
AddressId,
CurrentData,
Expand All @@ -24,6 +20,10 @@ import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole
import org.rocksdb.{ReadOptions, WriteBatch, WriteOptions}

import java.nio.file.Files
import java.util.concurrent.TimeUnit
import scala.util.Using

@OutputTimeUnit(TimeUnit.NANOSECONDS)
@BenchmarkMode(Array(Mode.AverageTime))
@Threads(1)
Expand Down Expand Up @@ -63,27 +63,29 @@ object RocksDBSeekForPrevBenchmark {
RDB.open(wavesSettings.dbSettings.copy(directory = dir))
}

val address: Address = Address(Array.fill(20)(1.toByte))
val addressId: AddressId = AddressId(1L)

val keyString = "key"
val currentDataKey: Array[Byte] = Keys.data(address, keyString).keyBytes
val currentDataKey: Array[Byte] = Keys.data(addressId, keyString).keyBytes
val dataNodeKey: Height => Array[Byte] = Keys.dataAt(addressId, "key")(_).keyBytes
val dataNodeKeyPrefix: Array[Byte] = Bytes.concat(Shorts.toByteArray(KeyTags.DataHistory.id.toShort), addressId.toByteArray, keyString.getBytes)

private val dataEntry: StringDataEntry = StringDataEntry(keyString, "value")

val readOptions: ReadOptions = new ReadOptions()

private val wb: WriteBatch = new WriteBatch()
wb.put(currentDataKey, writeCurrentData(CurrentData(dataEntry, Height(10000), Height(9999))))
(1 to 1000).foreach { h =>
wb.put(dataNodeKey(Height(h)), writeDataNode(DataNode(dataEntry, Height(h - 1))))
Using.Manager { use =>
val wb = use(new WriteBatch())
wb.put(currentDataKey, writeCurrentData(CurrentData(dataEntry, Height(10000), Height(9999))))
(1 to 1000).foreach { h =>
wb.put(dataNodeKey(Height(h)), writeDataNode(DataNode(dataEntry, Height(h - 1))))
}
rdb.db.write(use(new WriteOptions()), wb)
}
rdb.db.write(new WriteOptions(), wb)

@TearDown
def close(): Unit = {
readOptions.close()
rdb.close()
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,5 @@
package com.wavesplatform.state

import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}

import com.typesafe.config.ConfigFactory
import com.wavesplatform.account.*
import com.wavesplatform.api.BlockMeta
Expand All @@ -17,7 +14,10 @@ import com.wavesplatform.transaction.Transaction
import org.openjdk.jmh.annotations.*
import org.openjdk.jmh.infra.Blackhole

import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.io.Codec
import scala.util.Using

/** Tests over real database. How to test:
* 1. Download a database 2. Import it:
Expand Down Expand Up @@ -87,7 +87,7 @@ object RocksDBWriterBenchmark {
RDB.open(wavesSettings.dbSettings)
}

val db = new RocksDBWriter(rawDB, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
val db = RocksDBWriter(rawDB, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)

def loadBlockInfoAt(height: Int): Option[(BlockMeta, Seq[(TxMeta, Transaction)])] =
loadBlockMetaAt(height).map { meta =>
Expand All @@ -102,15 +102,12 @@ object RocksDBWriterBenchmark {

@TearDown
def close(): Unit = {
db.close()
rawDB.close()
}

protected def load[T](label: String, absolutePath: String)(f: String => T): Vector[T] = {
scala.io.Source
.fromFile(absolutePath)(Codec.UTF8)
.getLines()
.map(f)
.toVector
Using.resource(scala.io.Source.fromFile(absolutePath)(Codec.UTF8))(_.getLines().map(f).toVector)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import scodec.bits.BitVector
import java.io.File
import java.util.concurrent.{ThreadLocalRandom, TimeUnit}
import scala.io.Codec
import scala.util.Using

/** Tests over real database. How to test:
* 1. Download a database 2. Import it:
Expand Down Expand Up @@ -134,8 +135,8 @@ object WavesEnvironmentBenchmark {
RDB.open(wavesSettings.dbSettings)
}

val state = RocksDBWriter(rdb, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
val environment: Environment[Id] = {
val state = new RocksDBWriter(rdb, wavesSettings.blockchainSettings, wavesSettings.dbSettings, wavesSettings.enableLightMode)
WavesEnvironment(
AddressScheme.current.chainId,
Coeval.raiseError(new NotImplementedError("`tx` is not implemented")),
Expand All @@ -149,15 +150,12 @@ object WavesEnvironmentBenchmark {

@TearDown
def close(): Unit = {
state.close()
rdb.close()
}

protected def load[T](label: String, absolutePath: String)(f: String => T): Vector[T] = {
scala.io.Source
.fromFile(absolutePath)(Codec.UTF8)
.getLines()
.map(f)
.toVector
Using.resource(scala.io.Source.fromFile(absolutePath)(Codec.UTF8))(_.getLines().map(f).toVector)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,11 @@ object BaseTargetChecker {
.withFallback(defaultReference())
.resolve()

val settings = WavesSettings.fromRootConfig(sharedConfig)
val db = RDB.open(settings.dbSettings.copy(directory = "/tmp/tmp-db"))
val ntpTime = new NTP("ntp.pool.org")
val (blockchainUpdater, _) = StorageFactory(settings, db, ntpTime, BlockchainUpdateTriggers.noop)
val poSSelector = PoSSelector(blockchainUpdater, settings.synchronizationSettings.maxBaseTarget)
val settings = WavesSettings.fromRootConfig(sharedConfig)
val db = RDB.open(settings.dbSettings.copy(directory = "/tmp/tmp-db"))
val ntpTime = new NTP("ntp.pool.org")
val (blockchainUpdater, rdbWriter) = StorageFactory(settings, db, ntpTime, BlockchainUpdateTriggers.noop)
val poSSelector = PoSSelector(blockchainUpdater, settings.synchronizationSettings.maxBaseTarget)

try {
val genesisBlock =
Expand All @@ -51,6 +51,10 @@ object BaseTargetChecker {

f"$address: ${timeDelay * 1e-3}%10.3f s"
}
} finally ntpTime.close()
} finally {
ntpTime.close()
rdbWriter.close()
db.close()
}
}
}
Loading