Skip to content

Commit

Permalink
STORM-3408 - update rocks version (apache#3024)
Browse files Browse the repository at this point in the history
* STORM-3408 - update rocks version to 5.18.3
  • Loading branch information
thecoop authored and srdo committed Jun 25, 2019
1 parent 7fd37cc commit fb76dd1
Show file tree
Hide file tree
Showing 3 changed files with 135 additions and 106 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@
<jool.version>0.9.12</jool.version>
<caffeine.version>2.3.5</caffeine.version>
<jaxb-version>2.3.0</jaxb-version>
<rocksdb-version>5.8.6</rocksdb-version>
<rocksdb-version>5.18.3</rocksdb-version>

<!-- see intellij profile below... This fixes an annoyance with intellij -->
<provided.scope>provided</provided.scope>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,12 @@ private int storeMetadataString(KeyType type, String s, long metricTimestamp) th
}

// attempt to find the string in the database
stringMetadata = store.rocksDbGetStringMetadata(type, s);
try {
stringMetadata = store.rocksDbGetStringMetadata(type, s);
}
catch (RocksDBException e) {
throw new MetricException("Error reading metrics data", e);
}
if (stringMetadata != null) {
// update to the latest timestamp and add to the string cache
stringMetadata.update(metricTimestamp, type);
Expand Down Expand Up @@ -234,10 +239,15 @@ private void generateUniqueStringIds() throws MetricException {
// now scan all metadata and remove any matching string Ids from this list
RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
unusedIds.remove(key.getMetadataStringId());
return true; // process all metadata
});
try {
store.scanRange(firstPrefix, lastPrefix, (key, value) -> {
unusedIds.remove(key.getMetadataStringId());
return true; // process all metadata
});
}
catch (RocksDBException e) {
throw new MetricException("Error reading metrics data", e);
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,13 @@ private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lo
}

// attempt to find the string in the database
stringMetadata = rocksDbGetStringMetadata(type, s);
try {
stringMetadata = rocksDbGetStringMetadata(type, s);
}
catch (RocksDBException e) {
throw new MetricException("Error reading metric data", e);
}

if (stringMetadata != null) {
id = stringMetadata.getStringId();

Expand All @@ -290,7 +296,7 @@ private int lookupMetadataString(KeyType type, String s, Map<String, Integer> lo
}

// scans the database to look for a metadata string and returns the metadata info
StringMetadata rocksDbGetStringMetadata(KeyType type, String s) {
StringMetadata rocksDbGetStringMetadata(KeyType type, String s) throws RocksDBException {
RocksDbKey firstKey = RocksDbKey.getInitialKey(type);
RocksDbKey lastKey = RocksDbKey.getLastKey(type);
final AtomicReference<StringMetadata> reference = new AtomicReference<>();
Expand All @@ -306,20 +312,21 @@ StringMetadata rocksDbGetStringMetadata(KeyType type, String s) {
}

// scans from key start to the key before end, calling back until callback indicates not to process further
void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) {
void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) throws RocksDBException {
try (ReadOptions ro = new ReadOptions()) {
ro.setTotalOrderSeek(true);
RocksIterator iterator = db.newIterator(ro);
for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) {
RocksDbKey key = new RocksDbKey(iterator.key());
if (key.compareTo(end) >= 0) { // past limit, quit
return;
}
try (RocksIterator iterator = db.newIterator(ro)) {
for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) {
RocksDbKey key = new RocksDbKey(iterator.key());
if (key.compareTo(end) >= 0) { // past limit, quit
return;
}

RocksDbValue val = new RocksDbValue(iterator.value());
if (!fn.cb(key, val)) {
// if cb returns false, we are done with this section of rows
return;
RocksDbValue val = new RocksDbValue(iterator.value());
if (!fn.cb(key, val)) {
// if cb returns false, we are done with this section of rows
return;
}
}
}
}
Expand Down Expand Up @@ -448,88 +455,95 @@ private void scanInternal(FilterOptions filter, ScanCallback scanCallback, Rocks
endStreamId = streamId;
}

ReadOptions ro = new ReadOptions();
ro.setTotalOrderSeek(true);

for (AggLevel aggLevel : filter.getAggLevels()) {

RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
startComponentId, startExecutorId, startHostId, startPort, startStreamId);
RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
endComponentId, endExecutorId, endHostId, endPort, endStreamId);

RocksIterator iterator = db.newIterator(ro);
for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
RocksDbKey key = new RocksDbKey(iterator.key());

if (key.compareTo(endKey) > 0) { // past limit, quit
break;
}

if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
continue;
}

long timestamp = key.getTimestamp();
if (timestamp < startTime || timestamp > endTime) {
continue;
}

if (startMetricId != 0 && key.getMetricId() != startMetricId) {
continue;
}

if (startComponentId != 0 && key.getComponentId() != startComponentId) {
continue;
}

if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
continue;
}

if (startHostId != 0 && key.getHostnameId() != startHostId) {
continue;
}

if (startPort != 0 && key.getPort() != startPort) {
continue;
}

if (startStreamId != 0 && key.getStreamId() != startStreamId) {
continue;
}

RocksDbValue val = new RocksDbValue(iterator.value());

if (scanCallback != null) {
try {
// populate a metric
String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);

Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
streamId, key.getPort(), aggLevel);

val.populateMetric(metric);
try (ReadOptions ro = new ReadOptions()) {
ro.setTotalOrderSeek(true);

// callback to caller
scanCallback.cb(metric);
} catch (MetricException e) {
LOG.warn("Failed to report found metric: {}", e.getMessage());
}
} else {
if (!rawCallback.cb(key, val)) {
return;
for (AggLevel aggLevel : filter.getAggLevels()) {

RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId,
startComponentId, startExecutorId, startHostId, startPort, startStreamId);
RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId,
endComponentId, endExecutorId, endHostId, endPort, endStreamId);

try (RocksIterator iterator = db.newIterator(ro)) {
for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) {
RocksDbKey key = new RocksDbKey(iterator.key());

if (key.compareTo(endKey) > 0) { // past limit, quit
break;
}

if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) {
continue;
}

long timestamp = key.getTimestamp();
if (timestamp < startTime || timestamp > endTime) {
continue;
}

if (startMetricId != 0 && key.getMetricId() != startMetricId) {
continue;
}

if (startComponentId != 0 && key.getComponentId() != startComponentId) {
continue;
}

if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) {
continue;
}

if (startHostId != 0 && key.getHostnameId() != startHostId) {
continue;
}

if (startPort != 0 && key.getPort() != startPort) {
continue;
}

if (startStreamId != 0 && key.getStreamId() != startStreamId) {
continue;
}

RocksDbValue val = new RocksDbValue(iterator.value());

if (scanCallback != null) {
try {
// populate a metric
String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache);
String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache);
String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache);
String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache);
String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache);
String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache);

Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname,
streamId, key.getPort(), aggLevel);

val.populateMetric(metric);

// callback to caller
scanCallback.cb(metric);
}
catch (MetricException e) {
LOG.warn("Failed to report found metric: {}", e.getMessage());
}
}
else {
try {
if (!rawCallback.cb(key, val)) {
return;
}
}
catch (RocksDBException e) {
throw new MetricException("Error reading metrics data", e);
}
}
}
}
}
iterator.close();
}
ro.close();
}

// Finds the metadata string that matches the string Id and type provided. The string should exist, as it is
Expand Down Expand Up @@ -568,7 +582,7 @@ void deleteMetrics(FilterOptions filter) throws MetricException {
WriteOptions writeOps = new WriteOptions()) {

scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> {
writeBatch.remove(key.getRaw());
writeBatch.delete(key.getRaw());
return true;
});

Expand Down Expand Up @@ -603,15 +617,20 @@ void deleteMetadataBefore(long firstValidTimestamp) throws MetricException {
// search all metadata strings
RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START);
RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END);
scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
// we'll assume the metadata was recently used if still in the cache.
if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
if (value.getLastTimestamp() < firstValidTimestamp) {
writeBatch.remove(key.getRaw());
try {
scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> {
// we'll assume the metadata was recently used if still in the cache.
if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) {
if (value.getLastTimestamp() < firstValidTimestamp) {
writeBatch.delete(key.getRaw());
}
}
}
return true;
});
return true;
});
}
catch (RocksDBException e) {
throw new MetricException("Error reading metric data", e);
}

if (writeBatch.count() > 0) {
LOG.info("Deleting {} metadata strings", writeBatch.count());
Expand All @@ -630,7 +649,7 @@ void deleteMetadataBefore(long firstValidTimestamp) throws MetricException {
}

interface RocksDbScanCallback {
boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan
boolean cb(RocksDbKey key, RocksDbValue val) throws RocksDBException; // return false to stop scan
}
}

0 comments on commit fb76dd1

Please sign in to comment.