-
Notifications
You must be signed in to change notification settings - Fork 1.3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adds Support of maxNumRowsPerTask in RealtimeToOfflineSegmentsTasksGe… #14578
Changes from all commits
4ab1610
c71bcac
d0ca568
8db838b
3233c33
65e6aef
fd496bf
fec0b65
11c84be
31b3960
aaa72e3
2992595
58eb51c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -25,6 +25,7 @@ | |
import java.util.List; | ||
import java.util.Map; | ||
import org.apache.helix.zookeeper.datamodel.ZNRecord; | ||
import org.apache.helix.zookeeper.zkclient.exception.ZkException; | ||
import org.apache.pinot.common.metadata.segment.SegmentZKMetadataCustomMapModifier; | ||
import org.apache.pinot.common.minion.RealtimeToOfflineSegmentsTaskMetadata; | ||
import org.apache.pinot.core.common.MinionConstants; | ||
|
@@ -71,7 +72,6 @@ public class RealtimeToOfflineSegmentsTaskExecutor extends BaseMultipleSegmentsC | |
private static final Logger LOGGER = LoggerFactory.getLogger(RealtimeToOfflineSegmentsTaskExecutor.class); | ||
|
||
private final MinionTaskZkMetadataManager _minionTaskZkMetadataManager; | ||
private int _expectedVersion = Integer.MIN_VALUE; | ||
|
||
public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionTaskZkMetadataManager, | ||
MinionConf minionConf) { | ||
|
@@ -82,7 +82,6 @@ public RealtimeToOfflineSegmentsTaskExecutor(MinionTaskZkMetadataManager minionT | |
/** | ||
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. | ||
* Checks that the <code>watermarkMs</code> from the ZNode matches the windowStartMs in the task configs. | ||
* If yes, caches the ZNode version to check during update. | ||
*/ | ||
@Override | ||
public void preProcess(PinotTaskConfig pinotTaskConfig) { | ||
|
@@ -103,8 +102,6 @@ public void preProcess(PinotTaskConfig pinotTaskConfig) { | |
"watermarkMs in RealtimeToOfflineSegmentsTask metadata: %s shouldn't be larger than windowStartMs: %d in task" | ||
+ " configs for table: %s. ZNode may have been modified by another task", | ||
realtimeToOfflineSegmentsTaskMetadata.getWatermarkMs(), windowStartMs, realtimeTableName); | ||
|
||
_expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion(); | ||
} | ||
|
||
@Override | ||
|
@@ -190,19 +187,45 @@ protected List<SegmentConversionResult> convert(PinotTaskConfig pinotTaskConfig, | |
|
||
/** | ||
* Fetches the RealtimeToOfflineSegmentsTask metadata ZNode for the realtime table. | ||
* Checks that the version of the ZNode matches with the version cached earlier. If yes, proceeds to update | ||
* watermark in the ZNode | ||
* Update the number of subtasks pending atomically. If number of subtasks left are zero, proceeds to update | ||
* watermark in the ZNode. | ||
* TODO: Making the minion task update the ZK metadata is an anti-pattern, however cannot see another way to do it | ||
*/ | ||
@Override | ||
public void postProcess(PinotTaskConfig pinotTaskConfig) { | ||
Map<String, String> configs = pinotTaskConfig.getConfigs(); | ||
String realtimeTableName = configs.get(MinionConstants.TABLE_NAME_KEY); | ||
long waterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); | ||
RealtimeToOfflineSegmentsTaskMetadata newMinionMetadata = | ||
new RealtimeToOfflineSegmentsTaskMetadata(realtimeTableName, waterMarkMs); | ||
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(newMinionMetadata, RealtimeToOfflineSegmentsTask.TASK_TYPE, | ||
_expectedVersion); | ||
|
||
while (true) { | ||
ZNRecord realtimeToOfflineSegmentsTaskZNRecord = | ||
_minionTaskZkMetadataManager.getTaskMetadataZNRecord(realtimeTableName, | ||
RealtimeToOfflineSegmentsTask.TASK_TYPE); | ||
int expectedVersion = realtimeToOfflineSegmentsTaskZNRecord.getVersion(); | ||
|
||
RealtimeToOfflineSegmentsTaskMetadata realtimeToOfflineSegmentsTaskMetadata = | ||
RealtimeToOfflineSegmentsTaskMetadata.fromZNRecord(realtimeToOfflineSegmentsTaskZNRecord); | ||
|
||
int numSubtasksLeft = realtimeToOfflineSegmentsTaskMetadata.getNumSubtasksPending() - 1; | ||
Preconditions.checkState(numSubtasksLeft >= 0, | ||
"Num of minion subtasks pending for table: %s should be greater than equal to zero.", | ||
realtimeTableName); | ||
realtimeToOfflineSegmentsTaskMetadata.setNumSubtasksPending(numSubtasksLeft); | ||
|
||
try { | ||
if (numSubtasksLeft == 0) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. What happens when a minion task fails before it decrementing this counter ? This state would never go down to zero right? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In that case, minion job will keep on retrying and will fail. Same subtasks will be picked in next iteration then. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. current approach (Before this PR) is also not good, we might have to maintain a attribute in segment metadata marking that segment has been moved to offline. |
||
long newWaterMarkMs = Long.parseLong(configs.get(RealtimeToOfflineSegmentsTask.WINDOW_END_MS_KEY)); | ||
realtimeToOfflineSegmentsTaskMetadata.setWatermarkMs(newWaterMarkMs); | ||
} | ||
_minionTaskZkMetadataManager.setTaskMetadataZNRecord(realtimeToOfflineSegmentsTaskMetadata, | ||
RealtimeToOfflineSegmentsTask.TASK_TYPE, | ||
expectedVersion); | ||
break; | ||
} catch (ZkException e) { | ||
LOGGER.info( | ||
"Version changed while updating num of subtasks left in RTO task metadata for table: {}, Retrying.", | ||
realtimeTableName); | ||
} | ||
} | ||
} | ||
|
||
@Override | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since realtimeToOffline is a period task to move segments from realtime to offline table. Its on a schedule (it should not be allowed to run adhoc). Why don't we have the task generator advance the watermark instead of doing it from the minion. The task generator anyway needs to handle the minion execution failure scenarios.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If task generator advances watermark, It would need to have some state to refer which segments has been moved to offline. That is what is proposed in solution 2 in description.