Skip to content

Commit

Permalink
fix some bug for connection heartbeat (#2593)
Browse files Browse the repository at this point in the history
  • Loading branch information
PanternBao authored Apr 16, 2021
1 parent 5e714bf commit 9855806
Showing 1 changed file with 48 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@
import com.actiontech.dble.net.mysql.FieldPacket;
import com.actiontech.dble.net.mysql.RowDataPacket;
import io.netty.util.Timeout;
import io.netty.util.TimerTask;

import java.util.List;
import java.util.concurrent.TimeUnit;
Expand All @@ -27,6 +26,7 @@ public class ConnectionHeartBeatHandler implements ResponseHandler {
private volatile Timeout heartbeatTimeout;
private final BackendConnection conn;
private final MySQLConnectionListener listener;
private boolean returned = false;
private boolean finished = false;

public ConnectionHeartBeatHandler(BackendConnection conn, boolean isBlock, MySQLConnectionListener listener) {
Expand All @@ -41,25 +41,29 @@ public ConnectionHeartBeatHandler(BackendConnection conn, boolean isBlock, MySQL
}

public boolean ping(long timeout) {
conn.ping();
if (heartbeatLock != null) {
final long deadline = System.currentTimeMillis() + timeout;
synchronized (heartbeatLock) {
conn.ping();
try {
heartbeatLock.wait(timeout);
while (!returned) {
timeout = deadline - System.currentTimeMillis();
if (timeout <= 0L) {
returned = true;
} else {
heartbeatLock.wait(timeout);
}
}
} catch (InterruptedException e) {
finished = false;
returned = true;
}
}
return finished;
} else {
heartbeatTimeout = TimerHolder.getTimer().newTimeout(new TimerTask() {
@Override
public void run(Timeout timeout) throws Exception {
conn.closeWithoutRsp("conn heart timeout");
}
}, timeout, TimeUnit.MILLISECONDS);
return true;
heartbeatTimeout = TimerHolder.getTimer().newTimeout(timeout1 -> conn.closeWithoutRsp("conn heart timeout"), timeout, TimeUnit.MILLISECONDS);
conn.ping();
}

return finished;
}

/**
Expand All @@ -73,8 +77,11 @@ public void run(Timeout timeout) throws Exception {
public void okResponse(byte[] ok, BackendConnection con) {
if (heartbeatLock != null) {
synchronized (heartbeatLock) {
finished = true;
heartbeatLock.notifyAll();
if (!returned) {
returned = true;
finished = true;
heartbeatLock.notifyAll();
}
}
return;
}
Expand All @@ -83,27 +90,29 @@ public void okResponse(byte[] ok, BackendConnection con) {
listener.onHeartbeatSuccess(con);
}

/**
* if heart beat returns error than clase the connection and
* start the next one
*
* @param data
* @param con
*/
@Override
public void errorResponse(byte[] data, BackendConnection con) {
public void connectionClose(BackendConnection con, String reason) {
if (heartbeatLock != null) {
synchronized (heartbeatLock) {
if (!returned) {
returned = true;
finished = false;
heartbeatLock.notifyAll();
}
}
}
}

/**
* if when the query going on the conneciton be closed
* than just do nothing and go on for next one
*
* @param con
* @param reason
*/
@Override
public void connectionClose(BackendConnection con, String reason) {
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof,
boolean isLeft, BackendConnection con) {
// not called
}

@Override
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, BackendConnection con) {
// not called
return false;
}

/**
Expand All @@ -116,26 +125,24 @@ public void rowEofResponse(byte[] eof, boolean isLeft, BackendConnection con) {
// not called
}


@Override
public void fieldEofResponse(byte[] header, List<byte[]> fields, List<FieldPacket> fieldPackets, byte[] eof,
boolean isLeft, BackendConnection con) {
// not called
}

@Override
public boolean rowResponse(byte[] rowNull, RowDataPacket rowPacket, boolean isLeft, BackendConnection con) {
public void connectionError(Throwable e, Object attachment) {
// not called
return false;
}

@Override
public void connectionAcquired(BackendConnection con) {
// not called
}

/**
* if heart beat returns error than clase the connection and
* start the next one
*
* @param data
* @param con
*/
@Override
public void connectionError(Throwable e, Object attachment) {
// not called
public void errorResponse(byte[] data, BackendConnection con) {
}
}

0 comments on commit 9855806

Please sign in to comment.