Skip to content

Commit

Permalink
Realtime segment size threshold metrics (apache#14485)
Browse files Browse the repository at this point in the history
* Realtime segment size threshold metrics

* Addessed PR comment
  • Loading branch information
mcvsubbu authored Nov 19, 2024
1 parent 8300838 commit b9965c8
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -165,6 +165,16 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {

TABLE_DISABLED("tableDisabled", false),

// A per-table metric that shows the number of rows we expect to consume for the next segment of
// any partition in the realtime table. This metric is emitted from the segment size based threshold
// computer.
NUM_ROWS_THRESHOLD("numRowsThreshold", false),

// The actual segment size for committing segments. These may be shorter than expected when the administrator
// issues a force-commit, or zero when new partitions are detected in the stream (since there is no completing
// segment when the partition is first detected).
COMMITTING_SEGMENT_SIZE("committingSegmentSize", false),

TABLE_REBALANCE_IN_PROGRESS("tableRebalanceInProgress", false);

private final String _gaugeName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public FlushThresholdUpdater getFlushThresholdUpdater(StreamConfig streamConfig)
long flushThresholdSegmentSizeBytes = streamConfig.getFlushThresholdSegmentSizeBytes();
if (flushThresholdRows == 0 || flushThresholdSegmentSizeBytes > 0) {
return _flushThresholdUpdaterMap.computeIfAbsent(realtimeTableName,
k -> new SegmentSizeBasedFlushThresholdUpdater());
k -> new SegmentSizeBasedFlushThresholdUpdater(realtimeTableName));
} else {
_flushThresholdUpdaterMap.remove(realtimeTableName);
return new DefaultFlushThresholdUpdater(StreamConfig.DEFAULT_FLUSH_THRESHOLD_ROWS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@

import javax.annotation.Nullable;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadata;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.spi.stream.StreamConfig;
import org.apache.pinot.spi.utils.builder.TableNameBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -36,9 +39,13 @@
public class SegmentSizeBasedFlushThresholdUpdater implements FlushThresholdUpdater {
public static final Logger LOGGER = LoggerFactory.getLogger(SegmentSizeBasedFlushThresholdUpdater.class);
private final SegmentFlushThresholdComputer _flushThresholdComputer;
private final String _rawTableName;

public SegmentSizeBasedFlushThresholdUpdater() {
private final ControllerMetrics _controllerMetrics = ControllerMetrics.get();

public SegmentSizeBasedFlushThresholdUpdater(String realtimeTableName) {
_flushThresholdComputer = new SegmentFlushThresholdComputer();
_rawTableName = TableNameBuilder.extractRawTableName(realtimeTableName);
}

// synchronized since this method could be called for multiple partitions of the same table in different threads
Expand All @@ -50,5 +57,9 @@ public synchronized void updateFlushThreshold(StreamConfig streamConfig, Segment
_flushThresholdComputer.computeThreshold(streamConfig, committingSegmentDescriptor, committingSegmentZKMetadata,
newSegmentZKMetadata.getSegmentName());
newSegmentZKMetadata.setSizeThresholdToFlushSegment(threshold);

_controllerMetrics.setOrUpdateTableGauge(_rawTableName, ControllerGauge.NUM_ROWS_THRESHOLD, threshold);
_controllerMetrics.setOrUpdateTableGauge(_rawTableName, ControllerGauge.COMMITTING_SEGMENT_SIZE,
committingSegmentDescriptor.getSegmentSizeBytes());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,8 @@ public void testSegmentSizeBasedFlushThreshold() {

for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(0);
Expand Down Expand Up @@ -176,7 +177,8 @@ public void testSegmentSizeBasedFlushThresholdMinPartition() {

for (long[] segmentSizesMB : Arrays.asList(EXPONENTIAL_GROWTH_SEGMENT_SIZES_MB, LOGARITHMIC_GROWTH_SEGMENT_SIZES_MB,
STEPS_SEGMENT_SIZES_MB)) {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);

// Start consumption
SegmentZKMetadata newSegmentZKMetadata = getNewSegmentZKMetadata(1);
Expand Down Expand Up @@ -236,7 +238,8 @@ private long getSegmentSizeBytes(int numRowsConsumed, long[] segmentSizesMB) {

@Test
public void testTimeThreshold() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();

// Start consumption
Expand Down Expand Up @@ -269,7 +272,8 @@ public void testTimeThreshold() {

@Test
public void testMinThreshold() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater =
new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);
StreamConfig streamConfig = mockDefaultAutotuneStreamConfig();

// Start consumption
Expand Down Expand Up @@ -301,7 +305,8 @@ public void testMinThreshold() {

@Test
public void testSegmentSizeBasedUpdaterWithModifications() {
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater = new SegmentSizeBasedFlushThresholdUpdater();
SegmentSizeBasedFlushThresholdUpdater flushThresholdUpdater
= new SegmentSizeBasedFlushThresholdUpdater(REALTIME_TABLE_NAME);

// Use customized stream config
long flushSegmentDesiredSizeBytes = StreamConfig.DEFAULT_FLUSH_THRESHOLD_SEGMENT_SIZE_BYTES / 2;
Expand Down

0 comments on commit b9965c8

Please sign in to comment.