Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Support of maxNumRowsPerTask in RealtimeToOfflineSegmentsTasksGe… #14578

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -41,19 +41,39 @@
public class RealtimeToOfflineSegmentsTaskMetadata extends BaseTaskMetadata {

private static final String WATERMARK_KEY = "watermarkMs";
private static final String SUBTASKS_KEY = "numSubtasks";

private final String _tableNameWithType;
private final long _watermarkMs;
private long _watermarkMs;
private int _numSubtasksPending;

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
}

public RealtimeToOfflineSegmentsTaskMetadata(String tableNameWithType, long watermarkMs, int numSubtasksPending) {
_tableNameWithType = tableNameWithType;
_watermarkMs = watermarkMs;
_numSubtasksPending = numSubtasksPending;
}

public String getTableNameWithType() {
return _tableNameWithType;
}

public int getNumSubtasksPending() {
return _numSubtasksPending;
}

public void setNumSubtasksPending(int numSubtasksPending) {
_numSubtasksPending = numSubtasksPending;
}

public void setWatermarkMs(long watermarkMs) {
_watermarkMs = watermarkMs;
}

/**
* Get the watermark in millis
*/
Expand All @@ -63,12 +83,14 @@ public long getWatermarkMs() {

public static RealtimeToOfflineSegmentsTaskMetadata fromZNRecord(ZNRecord znRecord) {
long watermark = znRecord.getLongField(WATERMARK_KEY, 0);
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark);
int subtasksLeft = znRecord.getIntField(SUBTASKS_KEY, 0);
return new RealtimeToOfflineSegmentsTaskMetadata(znRecord.getId(), watermark, subtasksLeft);
}

public ZNRecord toZNRecord() {
ZNRecord znRecord = new ZNRecord(_tableNameWithType);
znRecord.setLongField(WATERMARK_KEY, _watermarkMs);
znRecord.setIntField(SUBTASKS_KEY, _numSubtasksPending);
return znRecord;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.List;
import java.util.Map;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier;
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata;
import org.apache.pinot.core.common.MinionConstants;
Expand Down Expand Up @@ -71,7 +72,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class);

private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager;
private int _expectedVersion = Integer.MIN_VALUE;

public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager,
MinionConf minionConf) {
Expand All @@ -82,7 +82,6 @@ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionT
/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
* Checks that the <code>watermarkMs</code> from the ZNode matches the windowStartMs in the task configs.
* If yes, caches the ZNode version to check during update.
*/
@Override
public void preProcess(PinotTaskConfig pinotTaskConfig) {
Expand All @@ -103,8 +102,6 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) {
"watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s shouldn't be larger than windowStartMs: %d in task"
+ " configs for table: %s. ZNode may have been modified by another task",
realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), windowStartMs, realtimeTableName);

_expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();
}

@Override
Expand Down Expand Up @@ -190,19 +187,45 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig,

/**
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table.
* Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update
* watermark in the ZNode
* Update the number of subtasks pending atomically. If number of subtasks left are zero, proceeds to update
* watermark in the ZNode.
* TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it
*/
@Override
public void postProcess(PinotTaskConfig pinotTaskConfig) {
Map<String, String> configs = pinotTaskConfig.getConfigs();
String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY);
long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata =
new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs);
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE,
_expectedVersion);

while (true) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since realtimeToOffline is a period task to move segments from realtime to offline table. Its on a schedule (it should not be allowed to run adhoc). Why don't we have the task generator advance the watermark instead of doing it from the minion. The task generator anyway needs to handle the minion execution failure scenarios.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If task generator advances watermark, It would need to have some state to refer which segments has been moved to offline. That is what is proposed in solution 2 in description.

ZNRecord realtimeToOfflineSegmentsTaskZNRecord =
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName,
RealtimeToOfflineSegmentsTask.TASK_TYPE);
int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion();

RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata =
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord);

int numSubtasksLeft = realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() - 1;
Preconditions.checkState(numSubtasksLeft >= 0,
"Num of minion subtasks pending for table: %s should be greater than equal to zero.",
realtimeTableName);
realtimeToOfflineSegmentsTaskMetadata.setNumSubtasksPending(numSubtasksLeft);

try {
if (numSubtasksLeft == 0) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens when a minion task fails before it decrementing this counter ? This state would never go down to zero right?

Copy link
Contributor Author

@noob-se7en noob-se7en Dec 9, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In that case, minion job will keep on retrying and will fail. Same subtasks will be picked in next iteration then.
This is the case in current scenario as well, If minion fails after segment is moved to offline but before watermark is updated, same segment gets picked again next time. But since segment name will be the same, already existing offline segments gets overwritten (Not a good approach though).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

current approach (Before this PR) is also not good, we might have to maintain a attribute in segment metadata marking that segment has been moved to offline.

long newWaterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY));
realtimeToOfflineSegmentsTaskMetadata.setWatermarkMs(newWaterMarkMs);
}
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(realtimeToOfflineSegmentsTaskMetadata,
RealtimeToOfflineSegmentsTask.TASK_TYPE,
expectedVersion);
break;
} catch (ZkException e) {
LOGGER.info(
"Version changed while updating num of subtasks left in RTO task metadata for table: {}, Retrying.",
realtimeTableName);
}
}
}

@Override
Expand Down
Loading
Loading