Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RTO Task Overhaul (BugFix and Support to run multiple subtasks) #14623

Open
wants to merge 75 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 17 commits
Commits
Show all changes
75 commits
Select commit Hold shift + click to select a range
4ab1610
Adds Support of maxNumRowsPerTask in RealtimeToOfflineSegmentsTasksGe…
noob-se7en Dec 2, 2024
c71bcac
refactoring
noob-se7en Dec 2, 2024
d0ca568
nit
noob-se7en Dec 2, 2024
8db838b
nit
noob-se7en Dec 2, 2024
3233c33
fixes bug
noob-se7en Dec 2, 2024
65e6aef
adds initial logic
noob-se7en Dec 6, 2024
fd496bf
changes logic
noob-se7en Dec 6, 2024
fec0b65
fixes bug
noob-se7en Dec 6, 2024
11c84be
clean up
noob-se7en Dec 9, 2024
31b3960
clean up
noob-se7en Dec 9, 2024
aaa72e3
nit
noob-se7en Dec 9, 2024
2992595
addresses PR comment
noob-se7en Dec 9, 2024
58eb51c
nit
noob-se7en Dec 9, 2024
f4ed406
Alternate solution
noob-se7en Dec 9, 2024
07f831c
fixes bugs and nits
noob-se7en Dec 9, 2024
318e89e
lint fix
noob-se7en Dec 10, 2024
b8e0daf
fix multiple consecuritve failure scenrio
noob-se7en Dec 10, 2024
8e80c09
Merge branch 'master' of github.com:apache/pinot into origin/parallel…
noob-se7en Dec 12, 2024
30a9459
refactoring and clean up
noob-se7en Dec 12, 2024
d29af8d
refactoring
noob-se7en Dec 12, 2024
8bcbe0f
refactoring
noob-se7en Dec 12, 2024
01381a2
fixes tests
noob-se7en Dec 13, 2024
75fb4ba
nit
noob-se7en Dec 13, 2024
546f27e
nit
noob-se7en Dec 13, 2024
fae4aaa
nit
noob-se7en Dec 13, 2024
97e8c49
Fixes time window bug
noob-se7en Dec 13, 2024
4ddfbb1
refactoring
noob-se7en Dec 14, 2024
7d3fa68
adds conditions for edge cases
noob-se7en Dec 14, 2024
19b83c6
nit
noob-se7en Dec 14, 2024
68cc920
nit
noob-se7en Dec 14, 2024
84f471a
update
noob-se7en Dec 14, 2024
e365e91
revert time overlap change
noob-se7en Dec 15, 2024
1baf68e
fixes some edge cases
noob-se7en Dec 15, 2024
1754dbc
refactoring
noob-se7en Dec 16, 2024
ff4017e
fixes test
noob-se7en Dec 16, 2024
9d329bc
Adds code docs and clean up
noob-se7en Dec 16, 2024
21185e6
nit
noob-se7en Dec 16, 2024
324a4cf
updates code docs of metadata
noob-se7en Dec 16, 2024
0f27067
Adds test
noob-se7en Dec 16, 2024
0eba7cc
clean up
noob-se7en Dec 17, 2024
02b94fe
handles edge case
noob-se7en Dec 17, 2024
6405c35
Adds tests and clean up
noob-se7en Dec 17, 2024
f77e817
clean up
noob-se7en Dec 17, 2024
5cd9bdf
Merge branch 'master' of github.com:apache/pinot into origin/parallel…
noob-se7en Dec 17, 2024
592f65c
Updates integration test
noob-se7en Dec 17, 2024
cf97b6e
adds code docs
noob-se7en Dec 17, 2024
f2dcef2
handle edge case
noob-se7en Dec 17, 2024
b3cebb4
handle edge case
noob-se7en Dec 17, 2024
e8c1b9b
fixes bug
noob-se7en Dec 17, 2024
9eacf76
test
noob-se7en Dec 18, 2024
9a2eb80
handles edge case
noob-se7en Dec 18, 2024
32628d0
nit
noob-se7en Dec 18, 2024
cc70645
clean up
noob-se7en Dec 18, 2024
903d519
addresses PR comment
noob-se7en Dec 18, 2024
066d925
nit
noob-se7en Dec 18, 2024
66ff5ad
Refactor var names
noob-se7en Dec 23, 2024
b307458
nit
noob-se7en Dec 23, 2024
2d1f086
nit
noob-se7en Dec 23, 2024
f183b85
nit
noob-se7en Dec 23, 2024
46bbd20
minor edge cases
noob-se7en Dec 30, 2024
00cc1bc
nit
noob-se7en Dec 30, 2024
dca5736
fixes lintg
noob-se7en Dec 30, 2024
6040793
nit
noob-se7en Dec 30, 2024
8e418d8
Merge branch 'master' of github.com:apache/pinot into origin/parallel…
noob-se7en Jan 22, 2025
7380368
minor refactoring
noob-se7en Jan 22, 2025
f4371c2
throws exception if failed to delete invalid segment
noob-se7en Jan 22, 2025
d00ea27
Adds logs
noob-se7en Jan 22, 2025
1dda7ca
simplifies code
noob-se7en Jan 25, 2025
97846ff
nit
noob-se7en Jan 25, 2025
97bf146
Adds unit tests
noob-se7en Jan 25, 2025
b12980b
Adds unit tests
noob-se7en Jan 26, 2025
2cdc946
Fixes unit test
noob-se7en Jan 26, 2025
279a339
fixes integration test
noob-se7en Jan 26, 2025
bfad067
fixes log format output
noob-se7en Jan 27, 2025
986e8bf
Attempts to fix test
noob-se7en Jan 29, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,12 @@
*/
package org.apache.pinot.common.minion;

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;


Expand All @@ -41,19 +47,37 @@
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {

private static final String WATERMARK_KEY = "watermarkMs";
private static final String SEGMENT_NAME_SEPARATOR = ",";

private final String _tableNameWithType;
private final long _watermarkMs;
private long _watermarkMs;
private final Map<String, List<String>> _realtimeSegmentVsCorrespondingOfflineSegmentMap;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This approach looks more promising.
I think we might have to track this input->output segment on a per task basis and then undo everything if there's a task failure (task's execution should be treated as all or nothing). You could maintain 2 maps where the key of the map is taskId and value is list of segments (inputSegments, outputSegments).
If there's a task failure, you need to undo all that the task has done i.e remove all outputSegments (if they exist in offline table) and redo all inputSegments that the task picked.
The reason for the above is a single input segment can map to multiple output segments and multiple input can map to single output segment. The cleanest approach is undo what the task has done (either in the minion itself & in generator as fallback) if there's failure and retry the input segments.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This code handles M -> N segment conversion. This edge case is handled currently as well, no?


public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_realtimeSegmentVsCorrespondingOfflineSegmentMap = new HashMap<>();
}

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs,
Map<String, List<String>> realtimeSegmentVsCorrespondingOfflineSegment) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_realtimeSegmentVsCorrespondingOfflineSegmentMap = realtimeSegmentVsCorrespondingOfflineSegment;
}

public String getTableNameWithType() {
return _tableNameWithType;
}

public Map<String, List<String>> getRealtimeSegmentVsCorrespondingOfflineSegmentMap() {
return _realtimeSegmentVsCorrespondingOfflineSegmentMap;
}

public void setWatermarkMs(long watermarkMs) {
_watermarkMs = watermarkMs;
}

/**
* Get the watermark in millis
*/
Expand All @@ -63,11 +87,30 @@ public long getWatermarkMs() {

public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark);
Map<String, List<String>> realtimeSegmentVsCorrespondingOfflineSegmentMap = new HashMap<>();
Map<String, String> fields = znRecord.getSimpleFields();
for (Map.Entry<String, String> entry : fields.entrySet()) {
if (entry.getKey().equals(WATERMARK_KEY)) {
continue;
}
String segmentFrom = entry.getKey();
String segmentsTo = entry.getValue();
List<String> segmentsToList =
Arrays.stream(StringUtils.split(segmentsTo, SEGMENT_NAME_SEPARATOR))
.map(String::trim).collect(Collectors.toList());
realtimeSegmentVsCorrespondingOfflineSegmentMap.put(segmentFrom, segmentsToList);
}
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark,
realtimeSegmentVsCorrespondingOfflineSegmentMap);
}

public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_tableNameWithType);
for (Map.Entry<String, List<String>> entry : _realtimeSegmentVsCorrespondingOfflineSegmentMap.entrySet()) {
String segmentFrom = entry.getKey();
List<String> segmentTo = entry.getValue();
znRecord.setSimpleField(segmentFrom, StringUtils.join(segmentTo, SEGMENT_NAME_SEPARATOR));
}
znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
return znRecord;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,14 @@
import com.google.common.base.Preconditions;
import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.lang3.StringUtils;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
Expand Down Expand Up @@ -71,7 +75,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);

private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;

public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
MinionConf minionConf) {
Expand All @@ -82,7 +85,6 @@ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionT
/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
* Checks that the <code>watermarkMs</code> from the ZNode matches the windowStartMs in the task configs.
* If yes, caches the ZNode version to check during update.
*/
@Override
public void preProcess(PinotTaskConfig pinotTaskConfig) {
Expand All @@ -103,8 +105,6 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
"watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s shouldn't be larger than windowStartMs: %d in task"
+ " configs for table: %s. ZNode may have been modified by another task",
realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), windowStartMs, realtimeTableName);

_expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
}

@Override
Expand Down Expand Up @@ -190,19 +190,51 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
* Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update
* watermark in the ZNode
* TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it
* Before uploading the segments, updates the metadata with the expected results
* of the successful execution of current subtask.
* The expected result updated in metadata is read by the next iteration of Task Generator.
*/
@Override
public void postProcess(PinotTaskConfig pinotTaskConfig) {
Map<String, String> configs = pinotTaskConfig.getConfigs();
String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE,
_expectedVersion);
protected void preUploadSegments(SegmentUploadContext context)
throws Exception {
super.preUploadSegments(context);
String realtimeTableName = context.getTableNameWithType();
while (true) {
ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
RealtimeToOfflineSegmentsTask.TASK_TYPE);
int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();

RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);

Map<String, List<String>> realtimeSegmentVsCorrespondingOfflineSegmentMap =
realtimeToOfflineSegmentsTaskMetadata.getRealtimeSegmentVsCorrespondingOfflineSegmentMap();

List<String> segmentsFrom =
Arrays.stream(StringUtils.split(context.getInputSegmentNames(), MinionConstants.SEGMENT_NAME_SEPARATOR))
.map(String::trim).collect(Collectors.toList());

List<String> segmentsTo =
context.getSegmentConversionResults().stream().map(SegmentConversionResult::getSegmentName)
.collect(Collectors.toList());

for (String segmentFrom : segmentsFrom) {
Preconditions.checkState(!realtimeSegmentVsCorrespondingOfflineSegmentMap.containsKey(segmentFrom));
realtimeSegmentVsCorrespondingOfflineSegmentMap.put(segmentFrom, segmentsTo);
swaminathanmanish marked this conversation as resolved.
Show resolved Hide resolved
}

try {
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(realtimeToOfflineSegmentsTaskMetadata,
RealtimeToOfflineSegmentsTask.TASK_TYPE,
expectedVersion);
break;
} catch (ZkBadVersionException e) {
LOGGER.info(
"Version changed while updating num of subtasks left in RTO task metadata for table: {}, Retrying.",
realtimeTableName);
}
}
}

@Override
Expand Down
Loading
Loading