From fb76dd1c7dc39c4979f9cc921cf4a5930dfaa760 Mon Sep 17 00:00:00 2001 From: Simon Cooper Date: Tue, 25 Jun 2019 18:39:43 +0100 Subject: [PATCH] STORM-3408 - update rocks version (#3024) * STORM-3408 - update rocks version to 5.18.3 --- pom.xml | 2 +- .../rocksdb/RocksDbMetricsWriter.java | 20 +- .../metricstore/rocksdb/RocksDbStore.java | 219 ++++++++++-------- 3 files changed, 135 insertions(+), 106 deletions(-) diff --git a/pom.xml b/pom.xml index 989fedbb7ae..5603831ac48 100644 --- a/pom.xml +++ b/pom.xml @@ -329,7 +329,7 @@ 0.9.12 2.3.5 2.3.0 - 5.8.6 + 5.18.3 provided diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java index 5ae8a0e6f8a..fae3865677f 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbMetricsWriter.java @@ -182,7 +182,12 @@ private int storeMetadataString(KeyType type, String s, long metricTimestamp) th } // attempt to find the string in the database - stringMetadata = store.rocksDbGetStringMetadata(type, s); + try { + stringMetadata = store.rocksDbGetStringMetadata(type, s); + } + catch (RocksDBException e) { + throw new MetricException("Error reading metrics data", e); + } if (stringMetadata != null) { // update to the latest timestamp and add to the string cache stringMetadata.update(metricTimestamp, type); @@ -234,10 +239,15 @@ private void generateUniqueStringIds() throws MetricException { // now scan all metadata and remove any matching string Ids from this list RocksDbKey firstPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START); RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END); - store.scanRange(firstPrefix, lastPrefix, (key, value) -> { - unusedIds.remove(key.getMetadataStringId()); - return true; // process all metadata - }); + try { + store.scanRange(firstPrefix, lastPrefix, (key, value) -> { + unusedIds.remove(key.getMetadataStringId()); + return true; // process all metadata + }); + } + catch (RocksDBException e) { + throw new MetricException("Error reading metrics data", e); + } } } diff --git a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java index 6b7617eceb2..ba3f08b7ba2 100644 --- a/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java +++ b/storm-server/src/main/java/org/apache/storm/metricstore/rocksdb/RocksDbStore.java @@ -274,7 +274,13 @@ private int lookupMetadataString(KeyType type, String s, Map lo } // attempt to find the string in the database - stringMetadata = rocksDbGetStringMetadata(type, s); + try { + stringMetadata = rocksDbGetStringMetadata(type, s); + } + catch (RocksDBException e) { + throw new MetricException("Error reading metric data", e); + } + if (stringMetadata != null) { id = stringMetadata.getStringId(); @@ -290,7 +296,7 @@ private int lookupMetadataString(KeyType type, String s, Map lo } // scans the database to look for a metadata string and returns the metadata info - StringMetadata rocksDbGetStringMetadata(KeyType type, String s) { + StringMetadata rocksDbGetStringMetadata(KeyType type, String s) throws RocksDBException { RocksDbKey firstKey = RocksDbKey.getInitialKey(type); RocksDbKey lastKey = RocksDbKey.getLastKey(type); final AtomicReference reference = new AtomicReference<>(); @@ -306,20 +312,21 @@ StringMetadata rocksDbGetStringMetadata(KeyType type, String s) { } // scans from key start to the key before end, calling back until callback indicates not to process further - void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) { + void scanRange(RocksDbKey start, RocksDbKey end, RocksDbScanCallback fn) throws RocksDBException { try (ReadOptions ro = new ReadOptions()) { ro.setTotalOrderSeek(true); - RocksIterator iterator = db.newIterator(ro); - for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) { - RocksDbKey key = new RocksDbKey(iterator.key()); - if (key.compareTo(end) >= 0) { // past limit, quit - return; - } + try (RocksIterator iterator = db.newIterator(ro)) { + for (iterator.seek(start.getRaw()); iterator.isValid(); iterator.next()) { + RocksDbKey key = new RocksDbKey(iterator.key()); + if (key.compareTo(end) >= 0) { // past limit, quit + return; + } - RocksDbValue val = new RocksDbValue(iterator.value()); - if (!fn.cb(key, val)) { - // if cb returns false, we are done with this section of rows - return; + RocksDbValue val = new RocksDbValue(iterator.value()); + if (!fn.cb(key, val)) { + // if cb returns false, we are done with this section of rows + return; + } } } } @@ -448,88 +455,95 @@ private void scanInternal(FilterOptions filter, ScanCallback scanCallback, Rocks endStreamId = streamId; } - ReadOptions ro = new ReadOptions(); - ro.setTotalOrderSeek(true); - - for (AggLevel aggLevel : filter.getAggLevels()) { - - RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId, - startComponentId, startExecutorId, startHostId, startPort, startStreamId); - RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId, - endComponentId, endExecutorId, endHostId, endPort, endStreamId); - - RocksIterator iterator = db.newIterator(ro); - for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) { - RocksDbKey key = new RocksDbKey(iterator.key()); - - if (key.compareTo(endKey) > 0) { // past limit, quit - break; - } - - if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) { - continue; - } - - long timestamp = key.getTimestamp(); - if (timestamp < startTime || timestamp > endTime) { - continue; - } - - if (startMetricId != 0 && key.getMetricId() != startMetricId) { - continue; - } - - if (startComponentId != 0 && key.getComponentId() != startComponentId) { - continue; - } - - if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) { - continue; - } - - if (startHostId != 0 && key.getHostnameId() != startHostId) { - continue; - } - - if (startPort != 0 && key.getPort() != startPort) { - continue; - } - - if (startStreamId != 0 && key.getStreamId() != startStreamId) { - continue; - } - - RocksDbValue val = new RocksDbValue(iterator.value()); - - if (scanCallback != null) { - try { - // populate a metric - String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache); - String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache); - String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache); - String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache); - String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache); - String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache); - - Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname, - streamId, key.getPort(), aggLevel); - - val.populateMetric(metric); + try (ReadOptions ro = new ReadOptions()) { + ro.setTotalOrderSeek(true); - // callback to caller - scanCallback.cb(metric); - } catch (MetricException e) { - LOG.warn("Failed to report found metric: {}", e.getMessage()); - } - } else { - if (!rawCallback.cb(key, val)) { - return; + for (AggLevel aggLevel : filter.getAggLevels()) { + + RocksDbKey startKey = RocksDbKey.createMetricKey(aggLevel, startTopologyId, startTime, startMetricId, + startComponentId, startExecutorId, startHostId, startPort, startStreamId); + RocksDbKey endKey = RocksDbKey.createMetricKey(aggLevel, endTopologyId, endTime, endMetricId, + endComponentId, endExecutorId, endHostId, endPort, endStreamId); + + try (RocksIterator iterator = db.newIterator(ro)) { + for (iterator.seek(startKey.getRaw()); iterator.isValid(); iterator.next()) { + RocksDbKey key = new RocksDbKey(iterator.key()); + + if (key.compareTo(endKey) > 0) { // past limit, quit + break; + } + + if (startTopologyId != 0 && key.getTopologyId() != startTopologyId) { + continue; + } + + long timestamp = key.getTimestamp(); + if (timestamp < startTime || timestamp > endTime) { + continue; + } + + if (startMetricId != 0 && key.getMetricId() != startMetricId) { + continue; + } + + if (startComponentId != 0 && key.getComponentId() != startComponentId) { + continue; + } + + if (startExecutorId != 0 && key.getExecutorId() != startExecutorId) { + continue; + } + + if (startHostId != 0 && key.getHostnameId() != startHostId) { + continue; + } + + if (startPort != 0 && key.getPort() != startPort) { + continue; + } + + if (startStreamId != 0 && key.getStreamId() != startStreamId) { + continue; + } + + RocksDbValue val = new RocksDbValue(iterator.value()); + + if (scanCallback != null) { + try { + // populate a metric + String metricName = metadataIdToString(KeyType.METRIC_STRING, key.getMetricId(), idToStringCache); + String topologyId = metadataIdToString(KeyType.TOPOLOGY_STRING, key.getTopologyId(), idToStringCache); + String componentId = metadataIdToString(KeyType.COMPONENT_STRING, key.getComponentId(), idToStringCache); + String executorId = metadataIdToString(KeyType.EXEC_ID_STRING, key.getExecutorId(), idToStringCache); + String hostname = metadataIdToString(KeyType.HOST_STRING, key.getHostnameId(), idToStringCache); + String streamId = metadataIdToString(KeyType.STREAM_ID_STRING, key.getStreamId(), idToStringCache); + + Metric metric = new Metric(metricName, timestamp, topologyId, 0.0, componentId, executorId, hostname, + streamId, key.getPort(), aggLevel); + + val.populateMetric(metric); + + // callback to caller + scanCallback.cb(metric); + } + catch (MetricException e) { + LOG.warn("Failed to report found metric: {}", e.getMessage()); + } + } + else { + try { + if (!rawCallback.cb(key, val)) { + return; + } + } + catch (RocksDBException e) { + throw new MetricException("Error reading metrics data", e); + } + } } } } - iterator.close(); } - ro.close(); } // Finds the metadata string that matches the string Id and type provided. The string should exist, as it is @@ -568,7 +582,7 @@ void deleteMetrics(FilterOptions filter) throws MetricException { WriteOptions writeOps = new WriteOptions()) { scanRaw(filter, (RocksDbKey key, RocksDbValue value) -> { - writeBatch.remove(key.getRaw()); + writeBatch.delete(key.getRaw()); return true; }); @@ -603,15 +617,20 @@ void deleteMetadataBefore(long firstValidTimestamp) throws MetricException { // search all metadata strings RocksDbKey topologyMetadataPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_START); RocksDbKey lastPrefix = RocksDbKey.getPrefix(KeyType.METADATA_STRING_END); - scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> { - // we'll assume the metadata was recently used if still in the cache. - if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) { - if (value.getLastTimestamp() < firstValidTimestamp) { - writeBatch.remove(key.getRaw()); + try { + scanRange(topologyMetadataPrefix, lastPrefix, (key, value) -> { + // we'll assume the metadata was recently used if still in the cache. + if (!readOnlyStringMetadataCache.contains(key.getMetadataStringId())) { + if (value.getLastTimestamp() < firstValidTimestamp) { + writeBatch.delete(key.getRaw()); + } } - } - return true; - }); + return true; + }); + } + catch (RocksDBException e) { + throw new MetricException("Error reading metric data", e); + } if (writeBatch.count() > 0) { LOG.info("Deleting {} metadata strings", writeBatch.count()); @@ -630,7 +649,7 @@ void deleteMetadataBefore(long firstValidTimestamp) throws MetricException { } interface RocksDbScanCallback { - boolean cb(RocksDbKey key, RocksDbValue val); // return false to stop scan + boolean cb(RocksDbKey key, RocksDbValue val) throws RocksDBException; // return false to stop scan } }