Skip to content

Commit

Permalink
[CELEBORN-1883] Replace HashSet with ConcurrentHashMap.newKeySet for …
Browse files Browse the repository at this point in the history
…ShuffleFileGroups

### What changes were proposed in this pull request?
Replacing HashSet of PartitionLocations with concurrent version of it.

### Why are the changes needed?
We are seeing some race conditions between `handleGetReducerFileGroup`& `tryFinalCommit`, where reducers complete without processing partition, even though there's data.

### Problematic logs
On the driver side:
```
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 commit files complete. File count 23200 using 240180 ms
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Shuffle 23 partition 11931-0: primary lost, use replica PartitionLocation[
  id-epoch:11931-0
  host-rpcPort-pushPort-fetchPort-replicatePort:10.68.138.242-39557-35555-37139-39685
  mode:REPLICA
  peer:(empty)
  storage hint:StorageInfo{type=SSD, mountPoint='', finalResult=true, filePath=}
  mapIdBitMap:null].
...
25/01/31 14:23:02 {} INFO org.apache.celeborn.client.commit.ReducePartitionCommitHandler: Succeed to handle stageEnd for 23.
```

On the executor side:
```
25/01/31 14:23:02 {executorId=92, jobId=28, partitionId=420, stageId=74, taskAttemptId=82047} INFO org.apache.celeborn.client.ShuffleClientImpl: Shuffle 23 request reducer file group success using 59315 ms, result partition size 12000
...
25/01/31 14:40:54 {executorId=92, partitionId=11931, taskAttemptId=93846} INFO org.apache.spark.executor.Executor: Running task 11931.0 in stage 74.0 (TID 93846)
25/01/31 14:40:54 {jobId=28, executorId=92, taskAttemptId=93846, partitionId=11931, stageId=74} INFO org.apache.spark.shuffle.celeborn.SparkShuffleManager: Shuffle 24 write mode is changed to SORT because partition count 12000 is greater than threshold 2000
25/01/31 14:40:54 {executorId=92, jobId=28, partitionId=11931, stageId=74, taskAttemptId=93846} INFO org.apache.spark.shuffle.celeborn.CelebornShuffleReader: BatchOpenStream for 0 cost 0ms
25/01/31 14:40:54 {} WARN org.apache.celeborn.client.ShuffleClientImpl: Shuffle data is empty for shuffle 23 partition 11931.
```

### How was this patch tested?
No additional tests for this: I've tried to reproduce it, but we've only seen this happen with high number of nodes and during long execution time range.

### More explanation on why/how this happens

```
// write path
 override def setStageEnd(shuffleId: Int): Unit = {
    getReducerFileGroupRequest synchronized {
      stageEndShuffleSet.add(shuffleId)
    }
....

// read path
 override def handleGetReducerFileGroup(context: RpcCallContext, shuffleId: Int): Unit = {
    // Quick return for ended stage, avoid occupy sync lock.
    if (isStageEnd(shuffleId)) {
      replyGetReducerFileGroup(context, shuffleId)
    } else {
      getReducerFileGroupRequest.synchronized {
...

override def isStageEnd(shuffleId: Int): Boolean = {
    stageEndShuffleSet.contains(shuffleId)
  }
```

Since concurrency guarantees between read/write path are based on ConcurrentHashMap's volatile values there's no guarantee that content of a HashSet would be seen fully by the reader thread.

Closes apache#3100 from aidar-stripe/main-commit-concurrency-fix.

Authored-by: Aidar Bariev <[email protected]>
Signed-off-by: Shuang <[email protected]>
  • Loading branch information
aidar-stripe authored and SteNicholas committed Mar 10, 2025
1 parent c343adc commit 567e4b0
Showing 1 changed file with 1 addition and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,7 @@ abstract class CommitHandler(
committedPartitions.values().asScala.foreach { partition =>
val partitionLocations = reducerFileGroupsMap.get(shuffleId).computeIfAbsent(
partition.getId,
(k: Integer) => new util.HashSet[PartitionLocation]())
(k: Integer) => ConcurrentHashMap.newKeySet[PartitionLocation]())
partitionLocations.add(partition)
}
}
Expand Down

0 comments on commit 567e4b0

Please sign in to comment.