Skip to content

Commit

Permalink
[pick-1.2][fix](mysql) fix mysql channel infinite blocking (#32741)
Browse files Browse the repository at this point in the history
* [fix](mysql) fix mysql channel infinite blocking (#28808)

Call the Channels blocking method with timeout instead.

Using session variables net_write_timeout and net_read_timeout as the timeout parameter.

* [conf](mysql) opt mysql network timeout to 600s #32545

---------

Co-authored-by: fornaix <[email protected]>
  • Loading branch information
cambyzju and fornaix authored Mar 29, 2024
1 parent 755f667 commit 74123e4
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 7 deletions.
14 changes: 10 additions & 4 deletions fe/fe-core/src/main/java/org/apache/doris/mysql/MysqlChannel.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.util.concurrent.TimeUnit;
import javax.net.ssl.SSLEngine;
import javax.net.ssl.SSLEngineResult;
import javax.net.ssl.SSLException;
Expand Down Expand Up @@ -75,11 +76,13 @@ public class MysqlChannel {

protected volatile MysqlSerializer serializer;

private ConnectContext context;

protected MysqlChannel() {
// For DummyMysqlChannel
}

public MysqlChannel(StreamConnection connection) {
public MysqlChannel(StreamConnection connection, ConnectContext context) {
Preconditions.checkNotNull(connection);
this.sequenceId = 0;
this.isSend = false;
Expand All @@ -100,6 +103,7 @@ public MysqlChannel(StreamConnection connection) {
this.defaultBuffer = ByteBuffer.allocate(16 * 1024);
this.headerByteBuffer = ByteBuffer.allocate(PACKET_HEADER_LEN);
this.sendBuffer = ByteBuffer.allocate(2 * 1024 * 1024);
this.context = context;
}

public void initSslBuffer() {
Expand Down Expand Up @@ -182,7 +186,8 @@ protected int readAll(ByteBuffer dstBuf, boolean isHeader) throws IOException {
}
try {
while (dstBuf.remaining() != 0) {
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf);
int ret = Channels.readBlocking(conn.getSourceChannel(), dstBuf, context.getNetReadTimeout(),
TimeUnit.SECONDS);
// return -1 when remote peer close the channel
if (ret == -1) {
decryptData(dstBuf, isHeader);
Expand Down Expand Up @@ -352,12 +357,13 @@ private ByteBuffer expandPacket(ByteBuffer result, int packetLen) {
protected void realNetSend(ByteBuffer buffer) throws IOException {
buffer = encryptData(buffer);
long bufLen = buffer.remaining();
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer);
long writeLen = Channels.writeBlocking(conn.getSinkChannel(), buffer, context.getNetWriteTimeout(),
TimeUnit.SECONDS);
if (bufLen != writeLen) {
throw new IOException("Write mysql packet failed.[write=" + writeLen
+ ", needToWrite=" + bufLen + "]");
}
Channels.flushBlocking(conn.getSinkChannel());
Channels.flushBlocking(conn.getSinkChannel(), context.getNetWriteTimeout(), TimeUnit.SECONDS);
isSend = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ public ConnectContext(StreamConnection connection) {
serverCapability = MysqlCapability.DEFAULT_CAPABILITY;
isKilled = false;
if (connection != null) {
mysqlChannel = new MysqlChannel(connection);
mysqlChannel = new MysqlChannel(connection, this);
} else {
mysqlChannel = new DummyMysqlChannel();
}
Expand Down Expand Up @@ -672,5 +672,12 @@ public String getQueryIdentifier() {
return "stmt[" + stmtId + ", " + DebugUtil.printId(queryId) + "]";
}

public int getNetReadTimeout() {
return this.sessionVariable.getNetReadTimeout();
}

public int getNetWriteTimeout() {
return this.sessionVariable.getNetWriteTimeout();
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -384,11 +384,11 @@ public class SessionVariable implements Serializable, Writable {

// The number of seconds to wait for a block to be written to a connection before aborting the write
@VariableMgr.VarAttr(name = NET_WRITE_TIMEOUT)
public int netWriteTimeout = 60;
public int netWriteTimeout = 600;

// The number of seconds to wait for a block to be written to a connection before aborting the write
@VariableMgr.VarAttr(name = NET_READ_TIMEOUT)
public int netReadTimeout = 60;
public int netReadTimeout = 600;

// The current time zone
@VariableMgr.VarAttr(name = TIME_ZONE, needForward = true)
Expand Down

0 comments on commit 74123e4

Please sign in to comment.