Skip to content

Commit

Permalink
Simplify timer start and stop logic
Browse files Browse the repository at this point in the history
  • Loading branch information
malessi committed Jan 22, 2025
1 parent 2d1f7bb commit 1c56f0d
Showing 1 changed file with 93 additions and 96 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -231,13 +231,7 @@ public PipelineJobOutcome call() throws Exception {
finalManifestLists.stream()
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(
dataset -> {
if (loadJobMetrics.activeTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopActiveTimerSampleForDataset(dataset);
if (loadJobMetrics.totalTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopTotalTimerSampleForDataset(dataset, false);
});
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));

listener.noDataAvailable();
statusReporter.reportNothingToDo();
Expand Down Expand Up @@ -361,21 +355,15 @@ public PipelineJobOutcome call() throws Exception {
* processing multiple data sets in parallel (which would lead to data
* consistency problems).
*/
if (!loadJobMetrics.activeTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.startActiveTimerSampleForDataset(
manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData());
if (!loadJobMetrics.totalTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.startTotalTimerSampleForDataset(manifestToProcess.getTimestampText());
final var activeManifestTimer =
loadJobMetrics.createActiveTimerForManifest(manifestToProcess).start();
final var totalManifestTimer = Timer.start();
loadJobMetrics.startTimersForDataset(
manifestToProcess.getTimestampText(), manifestToProcess.isSyntheticData());
loadJobMetrics.startTimersForManifest(manifestToProcess);
statusReporter.reportProcessingManifestData(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsStarted(manifestRecord);
listener.dataAvailable(rifFilesEvent);
statusReporter.reportCompletedManifest(manifestToProcess.getIncomingS3Key());
dataSetQueue.markAsProcessed(manifestRecord);
totalManifestTimer.stop(loadJobMetrics.createTotalTimerForManifest(manifestToProcess));
activeManifestTimer.stop();
loadJobMetrics.stopTimersForManifest(manifestToProcess);
if (!manifestToProcess.isSyntheticData()) {
// Non-synthetic datasets are typically one manifest to one RIF, so we need to look for
// the final manifest list that corresponds to the just-loaded manifest and ensure, via the
Expand All @@ -389,20 +377,11 @@ public PipelineJobOutcome call() throws Exception {
.filter(l -> l.getTimestampText().equals(manifestToProcess.getTimestampText()))
.filter(l -> !dataSetQueue.hasIncompleteManifests(l.getManifests()))
.map(FinalManifestList::getTimestampText)
.forEach(
dataset -> {
if (loadJobMetrics.activeTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopActiveTimerSampleForDataset(dataset);
if (loadJobMetrics.totalTimerSampleExistsForDataset(dataset))
loadJobMetrics.stopTotalTimerSampleForDataset(dataset, false);
});
.forEach(dataset -> loadJobMetrics.stopTimersForDataset(dataset, false));
} else {
// Synthetic datasets contain only a single manifest, so if the currently loading manifest
// is synthetic we can stop the dataset timers immediately after it has loaded
if (loadJobMetrics.activeTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.stopActiveTimerSampleForDataset(manifestToProcess.getTimestampText());
if (loadJobMetrics.totalTimerSampleExistsForDataset(manifestToProcess.getTimestampText()))
loadJobMetrics.stopTotalTimerSampleForDataset(manifestToProcess.getTimestampText(), true);
loadJobMetrics.stopTimersForDataset(manifestToProcess.getTimestampText(), true);
}

LOGGER.info(LOG_MESSAGE_DATA_SET_COMPLETE);
Expand Down Expand Up @@ -652,21 +631,68 @@ public static final class Metrics {
/** Micrometer {@link MeterRegistry} for the Pipeline application. */
private final MeterRegistry appMetrics;

/** Map of a {@link DataSetManifest} to its active {@link ManifestTimerSet} timer metrics. */
private final Map<DataSetManifest, ManifestTimerSet> activeManifestTimersMap = new HashMap<>();

/** Map of a dataset to its active {@link DatasetTimerSet} timer metrics. */
private final Map<String, DatasetTimerSet> activeDatasetTimersMap = new HashMap<>();

/**
* Starts the active and total processing time timers for a {@link DataSetManifest} that is
* beginning to be processed. Will not start new timers if the {@link DataSetManifest} is
* already being timed.
*
* @param manifest the {@link DataSetManifest} to measure processing time for
*/
void startTimersForManifest(DataSetManifest manifest) {
activeManifestTimersMap.putIfAbsent(
manifest,
new ManifestTimerSet(createActiveTimerForManifest(manifest).start(), Timer.start()));
}

/**
* Maps a dataset's timestamp text to an active, started {@link LongTaskTimer.Sample}.
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @implNote This is necessary as the {@link CcwRifLoadJob} does not have any concept of a
* "dataset" as a whole. We cannot sequentially create a timer, start it, and then close it
* in the same method call unlike the other timers; so, we need to store it.
* @param manifest the {@link DataSetManifest} for which its started timers will be stopped
*/
private final Map<String, LongTaskTimer.Sample> activeDatasetTimerSamplesMap = new HashMap<>();
void stopTimersForManifest(DataSetManifest manifest) {
if (!activeManifestTimersMap.containsKey(manifest)) return;

final var manifestTimers = activeManifestTimersMap.get(manifest);
manifestTimers.activeTimer.stop();
manifestTimers.totalTimer.stop(createTotalTimerForManifest(manifest));
}

/**
* Maps a dataset's timestamp text to a total, started {@link Timer.Sample}.
* Starts the active and total processing time timers for a dataset that is beginning to be
* processed. Will not start new timers if the dataset is already being timed.
*
* @implNote See the implNote for {@link #activeDatasetTimerSamplesMap} for justification.
* @param datasetTimestampText the dataset to measure processing time for
* @param isSynthetic whether the dataset is synthetic
*/
private final Map<String, Timer.Sample> totalDatasetTimerSamplesMap = new HashMap<>();
void startTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
activeDatasetTimersMap.putIfAbsent(
datasetTimestampText,
new DatasetTimerSet(
createActiveTimerForDataset(datasetTimestampText, isSynthetic).start(),
Timer.start()));
}

/**
* Stops the active and total processing time timers for a {@link DataSetManifest}, if they
* exist.
*
* @param datasetTimestampText the dataset for which its processing time timers will be stopped
* @param isSynthetic whether the dataset is synthetic
*/
void stopTimersForDataset(String datasetTimestampText, boolean isSynthetic) {
if (!activeDatasetTimersMap.containsKey(datasetTimestampText)) return;

final var datasetTimers = activeDatasetTimersMap.get(datasetTimestampText);
datasetTimers.activeTimer.stop();
datasetTimers.totalTimer.stop(createTotalTimerForDataset(datasetTimestampText, isSynthetic));
}

/**
* Creates a {@link LongTaskTimer} for a given {@link DataSetManifest} so that the time it takes
Expand All @@ -677,7 +703,7 @@ public static final class Metrics {
* @return the {@link LongTaskTimer} that will be used to actively measure and record the time
* taken to load the {@link DataSetManifest}
*/
LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
private LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
return LongTaskTimer.builder(MANIFEST_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
Expand All @@ -692,7 +718,7 @@ LongTaskTimer createActiveTimerForManifest(DataSetManifest manifest) {
* @return the {@link LongTaskTimer} that will be used to record the total time taken to load
* the {@link DataSetManifest}
*/
Timer createTotalTimerForManifest(DataSetManifest manifest) {
private Timer createTotalTimerForManifest(DataSetManifest manifest) {
return Timer.builder(MANIFEST_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForManifestMetrics(manifest))
.register(appMetrics);
Expand All @@ -705,45 +731,11 @@ Timer createTotalTimerForManifest(DataSetManifest manifest) {
* @param datasetTimestamp the timestamp text of the dataset to time
* @param isSynthetic whether the dataset is synthetic
*/
void startActiveTimerSampleForDataset(String datasetTimestamp, boolean isSynthetic) {
LongTaskTimer activeTimer =
LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
activeDatasetTimerSamplesMap.put(datasetTimestamp, activeTimer.start());
}

/**
* Stops an already existing, active {@link LongTaskTimer.Sample} for the given dataset provided
* its timestamp text.
*
* @param datasetTimestamp the timestamp text of a dataset for which an existing, active {@link
* LongTaskTimer.Sample} will be stopped
*/
void stopActiveTimerSampleForDataset(String datasetTimestamp) {
activeDatasetTimerSamplesMap.get(datasetTimestamp).stop();
activeDatasetTimerSamplesMap.remove(datasetTimestamp);
}

/**
* Returns whether a dataset has an existing, active {@link LongTaskTimer.Sample} actively
* recording the time it is taking to load the dataset.
*
* @param datasetTimestamp the timestamp text of the dataset
* @return {@code true} if a {@link LongTaskTimer.Sample} exists for the dataset, {@code false}
* otherwise
*/
boolean activeTimerSampleExistsForDataset(String datasetTimestamp) {
return activeDatasetTimerSamplesMap.containsKey(datasetTimestamp);
}

/**
* Starts a total {@link Timer.Sample} for the provided dataset given its timestamp text.
*
* @param datasetTimestamp the timestamp text of the dataset to time
*/
void startTotalTimerSampleForDataset(String datasetTimestamp) {
totalDatasetTimerSamplesMap.put(datasetTimestamp, Timer.start());
private LongTaskTimer createActiveTimerForDataset(
String datasetTimestamp, boolean isSynthetic) {
return LongTaskTimer.builder(DATASET_PROCESSING_ACTIVE_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
Expand All @@ -754,25 +746,10 @@ void startTotalTimerSampleForDataset(String datasetTimestamp) {
* LongTaskTimer.Sample} will be stopped
* @param isSynthetic whether the dataset is synthetic
*/
void stopTotalTimerSampleForDataset(String datasetTimestamp, boolean isSynthetic) {
Timer totalTimer =
Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
totalDatasetTimerSamplesMap.get(datasetTimestamp).stop(totalTimer);
totalDatasetTimerSamplesMap.remove(datasetTimestamp);
}

/**
* Returns whether a dataset has an existing, total {@link Timer.Sample} recording the total
* time it is taking to load the dataset.
*
* @param datasetTimestamp the timestamp text of the dataset
* @return {@code true} if a {@link Timer.Sample} exists for the dataset, {@code false}
* otherwise
*/
boolean totalTimerSampleExistsForDataset(String datasetTimestamp) {
return totalDatasetTimerSamplesMap.containsKey(datasetTimestamp);
private Timer createTotalTimerForDataset(String datasetTimestamp, boolean isSynthetic) {
return Timer.builder(DATASET_PROCESSING_TOTAL_TIMER_NAME)
.tags(getTagsForDatasetMetrics(datasetTimestamp, isSynthetic))
.register(appMetrics);
}

/**
Expand Down Expand Up @@ -810,5 +787,25 @@ private List<Tag> getTagsForDatasetMetrics(String datasetTimestamp, boolean isSy
Tag.of(TAG_DATA_SET_TIMESTAMP, datasetTimestamp),
Tag.of(TAG_IS_SYNTHETIC, Boolean.toString(isSynthetic)));
}

/**
* A set of started timer metrics for a {@link DataSetManifest}.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the manifest
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the manifest
*/
private record ManifestTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}

/**
* A set of started timer metrics for a dataset.
*
* @param activeTimer a {@link LongTaskTimer.Sample} that is actively timing the processing time
* of the dataset
* @param totalTimer a {@link Timer.Sample} that will time the total time it takes to process
* the dataset
*/
private record DatasetTimerSet(LongTaskTimer.Sample activeTimer, Timer.Sample totalTimer) {}
}
}

0 comments on commit 1c56f0d

Please sign in to comment.