You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I tried flume-ng-kafka-sink and it worked fine. But I noticed that the cpu utilization stay at 100% and never dropped down all the time even at the time the channel is empty.
I looked into the source code and found that "process" function in KafkaSink always return Status.READY even if no events available in channel. That causes the SinkRunner keep running achieving event from channel and get nothing.
Do we need to change to return Status.BACKOFF in "process" function in KafkaSink when it notices that there is no events processed in current round? So that the SinkRunner has a chance to take a rest when there is no event in channel. If this proposal feasible, function "testEmptyChannel" in TestKafkaSink also need to be changed.
Thanks
Yue
The text was updated successfully, but these errors were encountered:
hi @luyue0821 ,We found that one of our flume's cpu utilization also stay at 100%, We also use kafka sink of flume 1.6.But I can't reproduce the issue. And the kafkasink only take event from channel about every 3 seconds. Can you paste your flume config file or tell how to reproduce the issue? thanks very much.
Hi Hari,
I tried flume-ng-kafka-sink and it worked fine. But I noticed that the cpu utilization stay at 100% and never dropped down all the time even at the time the channel is empty.
I looked into the source code and found that "process" function in KafkaSink always return Status.READY even if no events available in channel. That causes the SinkRunner keep running achieving event from channel and get nothing.
Do we need to change to return Status.BACKOFF in "process" function in KafkaSink when it notices that there is no events processed in current round? So that the SinkRunner has a chance to take a rest when there is no event in channel. If this proposal feasible, function "testEmptyChannel" in TestKafkaSink also need to be changed.
Thanks
Yue
The text was updated successfully, but these errors were encountered: