Skip to content

Commit

Permalink
[CELEBORN-1885] Fix nullptr exceptions in FetchChunk after worker res…
Browse files Browse the repository at this point in the history
…tart

### What changes were proposed in this pull request?

Handling nullptr exception in FetchHandler after worker restarts.

### Why are the changes needed?

Current code throw nullptr during handleChunkFetchRequest –

```
25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "state" is null
	at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
and during handleEndStreamFromClient – 

```
2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler: Ignoring response for RPC 461011 from phx61-hkr.prod.uber.internal/10.154.133.115:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "streamState" is null
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174)
	at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100)
	at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84)
	at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
	at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
	at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
	at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
	at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.base/java.lang.Thread.run(Thread.java:840)
) since it is not outstanding
```
Due to which this error does not get handled properly in the `TransportResponseHandler` as ChunkFetchFailure. Instead it considered this failure as `RpcFailure`.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Tested in our staging cluster.

Closes apache#3128 from s0nskar/fix_nullptr_restart.

Authored-by: Sanskar Modi <[email protected]>
Signed-off-by: mingji <[email protected]>
  • Loading branch information
s0nskar authored and SteNicholas committed Mar 10, 2025
1 parent 567e4b0 commit 56a7d21
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;

import scala.Tuple2;

import com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -194,11 +192,6 @@ public StreamState getStreamState(long streamId) {
return streams.get(streamId);
}

public Tuple2<String, String> getShuffleKeyAndFileName(long streamId) {
StreamState state = streams.get(streamId);
return new Tuple2<>(state.shuffleKey, state.fileName);
}

public int getStreamsCount() {
return streams.size();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,11 @@ class FetchHandler(
streamType match {
case StreamType.ChunkStream =>
val streamState = chunkStreamManager.getStreamState(streamId)
val (shuffleKey, fileName) = (streamState.shuffleKey, streamState.fileName)
workerSource.recordAppActiveConnection(client, shuffleKey)
getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
if (streamState != null) {
val (shuffleKey, fileName) = (streamState.shuffleKey, streamState.fileName)
workerSource.recordAppActiveConnection(client, shuffleKey)
getRawFileInfo(shuffleKey, fileName).closeStream(streamId)
}
case StreamType.CreditStream =>
val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId)
if (shuffleKey != null) {
Expand Down Expand Up @@ -501,9 +503,15 @@ class FetchHandler(
logDebug(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" +
s" to fetch block $streamChunkSlice")

workerSource.recordAppActiveConnection(
client,
chunkStreamManager.getShuffleKeyAndFileName(streamChunkSlice.streamId)._1)
val streamState = chunkStreamManager.getStreamState(streamChunkSlice.streamId)
if (streamState == null) {
val message = s"Stream ${streamChunkSlice.streamId} is not registered with worker. " +
"This can happen if the worker was restart recently."
logError(message)
workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT)
client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, message))
return
}

maxChunkBeingTransferred.foreach { threshold =>
val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred // take high cpu usage
Expand All @@ -518,6 +526,8 @@ class FetchHandler(
}
}

workerSource.recordAppActiveConnection(client, streamState.shuffleKey)

val reqStr = req.toString
workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr)
val fetchTimeMetric = chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)
Expand Down

0 comments on commit 56a7d21

Please sign in to comment.