From 56a7d21e3d8cc13c8246e4c8584daaa16ef87669 Mon Sep 17 00:00:00 2001 From: Sanskar Modi Date: Tue, 4 Mar 2025 22:26:38 +0800 Subject: [PATCH] [CELEBORN-1885] Fix nullptr exceptions in FetchChunk after worker restart MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ### What changes were proposed in this pull request? Handling nullptr exception in FetchHandler after worker restarts. ### Why are the changes needed? Current code throw nullptr during handleChunkFetchRequest – ``` 25/02/27 09:23:25 WARN [data-client-5-1] TransportResponseHandler: Ignoring response for RPC 425133 from phx61-u46.prod.uber.internal/10.154.141.80:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "state" is null at org.apache.celeborn.service.deploy.worker.storage.ChunkStreamManager.getShuffleKeyAndFileName(ChunkStreamManager.java:199) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleChunkFetchRequest(FetchHandler.scala:493) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:181) at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101) at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) ) since it is not outstanding ``` and during handleEndStreamFromClient –  ``` 2025-02-27T17:31:03+05:30 WARN [data-client-5-3] TransportResponseHandler: Ignoring response for RPC 461011 from phx61-hkr.prod.uber.internal/10.154.133.115:19103 (java.lang.NullPointerException: Cannot read field "shuffleKey" because "streamState" is null at org.apache.celeborn.service.deploy.worker.FetchHandler.handleEndStreamFromClient(FetchHandler.scala:458) at org.apache.celeborn.service.deploy.worker.FetchHandler.handleRpcRequest(FetchHandler.scala:174) at org.apache.celeborn.service.deploy.worker.FetchHandler.receive(FetchHandler.scala:101) at org.apache.celeborn.common.network.server.TransportRequestHandler.processRpcRequest(TransportRequestHandler.java:100) at org.apache.celeborn.common.network.server.TransportRequestHandler.handle(TransportRequestHandler.java:84) at org.apache.celeborn.common.network.server.TransportChannelHandler.channelRead(TransportChannelHandler.java:156) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.handler.timeout.IdleStateHandler.channelRead(IdleStateHandler.java:289) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at org.apache.celeborn.common.network.util.TransportFrameDecoder.channelRead(TransportFrameDecoder.java:74) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166) at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788) at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724) at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650) at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562) at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) at java.base/java.lang.Thread.run(Thread.java:840) ) since it is not outstanding ``` Due to which this error does not get handled properly in the `TransportResponseHandler` as ChunkFetchFailure. Instead it considered this failure as `RpcFailure`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Tested in our staging cluster. Closes #3128 from s0nskar/fix_nullptr_restart. Authored-by: Sanskar Modi Signed-off-by: mingji --- .../worker/storage/ChunkStreamManager.java | 7 ------ .../service/deploy/worker/FetchHandler.scala | 22 ++++++++++++++----- 2 files changed, 16 insertions(+), 13 deletions(-) diff --git a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java index 7ea390168f8..9586f519f3e 100644 --- a/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java +++ b/worker/src/main/java/org/apache/celeborn/service/deploy/worker/storage/ChunkStreamManager.java @@ -22,8 +22,6 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; -import scala.Tuple2; - import com.google.common.annotations.VisibleForTesting; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -194,11 +192,6 @@ public StreamState getStreamState(long streamId) { return streams.get(streamId); } - public Tuple2 getShuffleKeyAndFileName(long streamId) { - StreamState state = streams.get(streamId); - return new Tuple2<>(state.shuffleKey, state.fileName); - } - public int getStreamsCount() { return streams.size(); } diff --git a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala index 42009835e9b..ae693e93297 100644 --- a/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala +++ b/worker/src/main/scala/org/apache/celeborn/service/deploy/worker/FetchHandler.scala @@ -459,9 +459,11 @@ class FetchHandler( streamType match { case StreamType.ChunkStream => val streamState = chunkStreamManager.getStreamState(streamId) - val (shuffleKey, fileName) = (streamState.shuffleKey, streamState.fileName) - workerSource.recordAppActiveConnection(client, shuffleKey) - getRawFileInfo(shuffleKey, fileName).closeStream(streamId) + if (streamState != null) { + val (shuffleKey, fileName) = (streamState.shuffleKey, streamState.fileName) + workerSource.recordAppActiveConnection(client, shuffleKey) + getRawFileInfo(shuffleKey, fileName).closeStream(streamId) + } case StreamType.CreditStream => val shuffleKey = creditStreamManager.getStreamShuffleKey(streamId) if (shuffleKey != null) { @@ -501,9 +503,15 @@ class FetchHandler( logDebug(s"Received req from ${NettyUtils.getRemoteAddress(client.getChannel)}" + s" to fetch block $streamChunkSlice") - workerSource.recordAppActiveConnection( - client, - chunkStreamManager.getShuffleKeyAndFileName(streamChunkSlice.streamId)._1) + val streamState = chunkStreamManager.getStreamState(streamChunkSlice.streamId) + if (streamState == null) { + val message = s"Stream ${streamChunkSlice.streamId} is not registered with worker. " + + "This can happen if the worker was restart recently." + logError(message) + workerSource.incCounter(WorkerSource.FETCH_CHUNK_FAIL_COUNT) + client.getChannel.writeAndFlush(new ChunkFetchFailure(streamChunkSlice, message)) + return + } maxChunkBeingTransferred.foreach { threshold => val chunksBeingTransferred = chunkStreamManager.chunksBeingTransferred // take high cpu usage @@ -518,6 +526,8 @@ class FetchHandler( } } + workerSource.recordAppActiveConnection(client, streamState.shuffleKey) + val reqStr = req.toString workerSource.startTimer(WorkerSource.FETCH_CHUNK_TIME, reqStr) val fetchTimeMetric = chunkStreamManager.getFetchTimeMetric(streamChunkSlice.streamId)