Skip to content

Commit

Permalink
dump file can be killed and modify log (#1503)
Browse files Browse the repository at this point in the history
* dump file can be killed and modify log

* true info log level to debug in case of performance test
  • Loading branch information
PanternBao authored and yanhuqing666 committed Nov 18, 2019
1 parent 9a66a4e commit 6aee2ed
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 41 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,17 @@
*/
public class ManagerConnection extends FrontendConnection {
private static final long AUTH_TIMEOUT = 15 * 1000L;
private volatile boolean skipIdleCheck = false;

public ManagerConnection(NetworkChannel channel) throws IOException {
super(channel);
}

@Override
public boolean isIdleTimeout() {
if (isAuthenticated) {
if (skipIdleCheck) {
return false;
} else if (isAuthenticated) {
return super.isIdleTimeout();
} else {
return TimeUtil.currentTimeMillis() > Math.max(lastWriteTime,
Expand All @@ -50,4 +53,9 @@ public void handle(final byte[] data) {
public void killAndClose(String reason) {
this.close(reason);
}

public void skipIdleCheck(boolean skip) {
this.skipIdleCheck = skip;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@ public void query(String sql) {
DataHostHandler.handle(sql, c);
break;
case ManagerParse.SPLIT:
new SplitDumpHandler().handle(sql, c, rs >>> SHIFT);
c.skipIdleCheck(true);
SplitDumpHandler.handle(sql, c, rs >>> SHIFT);
break;
default:
c.writeErrMessage(ErrorCode.ER_YES, "Unsupported statement");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.actiontech.dble.manager.dump.handler.StatementHandlerManager;
import com.actiontech.dble.route.factory.RouteStrategyFactory;
import com.actiontech.dble.server.parser.ServerParse;
import com.actiontech.dble.util.TimeUtil;
import com.alibaba.druid.sql.ast.SQLStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -17,33 +18,44 @@
*/
public final class DumpFileExecutor implements Runnable {

public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");

// receive statement in dump file
private static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");
private BlockingQueue<String> queue;
private DumpFileContext context;
private Thread self;

public DumpFileExecutor(BlockingQueue<String> queue, DumpFileWriter writer, DumpFileConfig config) {
this.queue = queue;
this.context = new DumpFileContext(writer, config);
}

public void start() {
new Thread(this, "dump-file-executor").start();
this.self = new Thread(this, "dump-file-executor");
this.self.start();
}

public void stop() {
this.self.interrupt();
}

@Override
public void run() {
String stmt;
DumpFileWriter writer = context.getWriter();
long startTime = TimeUtil.currentTimeMillis();
LOGGER.info("begin to parse statement in dump file.");
while (true) {
while (!Thread.currentThread().isInterrupted()) {
try {
if (queue.isEmpty()) {
LOGGER.info("dump file reader is too slow, please increase read queue size.");
stmt = queue.take();
if (LOGGER.isDebugEnabled()) {
long endTime = TimeUtil.currentTimeMillis();
if (endTime - startTime > 1000) {
startTime = endTime;
if (queue.isEmpty()) {
LOGGER.debug("dump file reader is slow, you can try increasing read queue size.");
}
}
}

stmt = queue.take();
context.setStmt(stmt);
int type = ServerParse.parse(stmt);
// pre handle
Expand Down Expand Up @@ -80,6 +92,8 @@ public void run() {
context.skipCurrentContext();
LOGGER.warn("current stmt[" + currentStmt + "] error,because:" + e.getMessage());
context.addError("current stmt[" + currentStmt + "] error,because:" + e.getMessage());
} catch (InterruptedException ie) {
LOGGER.warn("dump file executor is interrupted.");
} catch (Exception e) {
LOGGER.warn("dump file executor exit, due to :" + e.getMessage());
try {
Expand All @@ -105,6 +119,7 @@ private boolean preHandle(DumpFileWriter writer, int type, String stmt) throws I
}
// skip view
if ((ServerParse.MYSQL_CMD_COMMENT == type || ServerParse.MYSQL_COMMENT == type) && skipView(stmt)) {
context.skipCurrentContext();
return true;
}
// footer
Expand All @@ -116,10 +131,9 @@ private boolean preHandle(DumpFileWriter writer, int type, String stmt) throws I
}

private boolean skipView(String stmt) {
Matcher matcher = DumpFileReader.HINT.matcher(stmt);
Matcher matcher = DumpFileReader.CREATE_VIEW.matcher(stmt);
if (matcher.find()) {
int type = ServerParse.parse(matcher.group(1));
return type >= ServerParse.CREATE_VIEW && type <= ServerParse.ALTER_VIEW;
context.addError("skip view " + matcher.group(1));
}
return false;
}
Expand Down
30 changes: 19 additions & 11 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileReader.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.actiontech.dble.manager.dump;

import com.actiontech.dble.backend.mysql.store.fs.FileUtils;
import com.actiontech.dble.manager.ManagerConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -18,25 +19,39 @@ public final class DumpFileReader {

public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");
public static final String EOF = "dump file eof";
public static final Pattern HINT = Pattern.compile("/\\*!\\d+\\s+(.*)\\*/", Pattern.CASE_INSENSITIVE);
public static final Pattern CREATE_VIEW = Pattern.compile("CREATE\\s+VIEW\\s+`?([a-zA-Z_0-9\\-_]+)`?\\s+", Pattern.CASE_INSENSITIVE);
private StringBuilder tempStr = new StringBuilder(200);
private BlockingQueue<String> readQueue;
private FileChannel fileChannel;
private long fileLength;
private long readLength;
private int readPercent;

public DumpFileReader(BlockingQueue<String> queue) {
this.readQueue = queue;
}

public void open(String fileName) throws IOException {
this.fileChannel = FileUtils.open(fileName, "r");
this.fileLength = this.fileChannel.size();
}

public void start() throws IOException {
public void start(ManagerConnection c) throws IOException, InterruptedException {
LOGGER.info("begin to read dump file.");
try {
ByteBuffer buffer = ByteBuffer.allocate(0x20000);
int byteRead = fileChannel.read(buffer);
while (byteRead != -1) {
if (c.isClosed()) {
LOGGER.info("finish to read dump file, tha task is interrupted.");
throw new InterruptedException();
}
readLength += byteRead;
float percent = ((float) readLength / (float) fileLength) * 100;
if (((int) percent) - readPercent > 5 || (int) percent == 100) {
readPercent = (int) percent;
LOGGER.info("dump file has bean read " + readPercent + "%");
}
readSQLByEOF(buffer.array(), byteRead);
buffer.clear();
byteRead = fileChannel.read(buffer);
Expand All @@ -45,19 +60,13 @@ public void start() throws IOException {
this.readQueue.put(tempStr.toString());
this.tempStr = null;
}
} catch (IOException e) {
throw e;
} catch (InterruptedException e) {
// ignore
LOGGER.warn("thread for read dump file is interrupted.");
this.readQueue.put(EOF);
} finally {
LOGGER.info("finish to read dump file.");
try {
this.readQueue.put(EOF);
if (fileChannel != null) {
fileChannel.close();
}
} catch (IOException | InterruptedException e) {
} catch (IOException e) {
// ignore
LOGGER.warn("close dump file error:" + e.getMessage());
}
Expand Down Expand Up @@ -98,7 +107,6 @@ private void readSQLByEOF(byte[] linesByte, int byteRead) throws InterruptedExce
this.readQueue.put(lines[len]);
}
}

}

}
45 changes: 33 additions & 12 deletions src/main/java/com/actiontech/dble/manager/dump/DumpFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import com.actiontech.dble.DbleServer;
import com.actiontech.dble.backend.mysql.store.fs.FileUtils;
import com.actiontech.dble.util.TimeUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -34,8 +35,19 @@ public void open(String writePath, int writeQueueSize) throws IOException {
}

public void start() {
Thread writer;
for (Map.Entry<String, DataNodeWriter> entry : dataNodeWriters.entrySet()) {
new Thread(entry.getValue(), entry.getKey() + "-writer-" + finished.incrementAndGet()).start();
writer = new Thread(entry.getValue(), entry.getKey() + "-writer-" + finished.incrementAndGet());
writer.start();
entry.getValue().self = writer;
}
}

public void stop() {
for (Map.Entry<String, DataNodeWriter> entry : dataNodeWriters.entrySet()) {
if (entry.getValue().self != null) {
entry.getValue().self.interrupt();
}
}
}

Expand Down Expand Up @@ -68,6 +80,7 @@ class DataNodeWriter implements Runnable {
private BlockingQueue<String> queue;
private int queueSize;
private String dataNode;
private Thread self;

DataNodeWriter(String dataNode, int queueSize) {
this.dataNode = dataNode;
Expand All @@ -83,9 +96,6 @@ void open(String fileName) throws IOException {
}

void write(String stmt) throws InterruptedException {
if (this.queue.size() == queueSize) {
LOGGER.info("dump file write is too slow, please increase write queue size.");
}
this.queue.put(stmt);
}

Expand All @@ -98,25 +108,36 @@ void close() throws IOException {
public void run() {
try {
String stmt;
while (true) {
if (queue.isEmpty()) {
LOGGER.info("dump file executor is too slow, no good way.");
}
long startTime = TimeUtil.currentTimeMillis();
while (!Thread.currentThread().isInterrupted()) {
stmt = this.queue.take();
if (LOGGER.isDebugEnabled()) {
long endTime = TimeUtil.currentTimeMillis();
if (endTime - startTime > 1000) {
startTime = endTime;
if (queue.isEmpty()) {
LOGGER.debug("dump file executor parse statement slowly.");
} else if (this.queue.size() == queueSize) {
LOGGER.debug("dump file writer is slow, you can try increasing write queue size.");
}
}
}

if (stmt.equals(DumpFileReader.EOF)) {
LOGGER.info("finish to write dump file.");
close();
finished.decrementAndGet();
return;
}
if (this.fileChannel != null) {
this.fileChannel.write(ByteBuffer.wrap(stmt.getBytes()));
}
}
} catch (IOException | InterruptedException e) {
finished.decrementAndGet();
LOGGER.warn("write " + dataNode + " dump file error, because:" + e.getMessage());
} catch (IOException e) {
LOGGER.warn("dump file writer[" + dataNode + "] occur error:" + e.getMessage());
} catch (InterruptedException ie) {
LOGGER.warn("dump file writer[" + dataNode + "] is interrupted.");
} finally {
finished.decrementAndGet();
try {
close();
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import com.actiontech.dble.manager.response.DumpFileError;
import com.actiontech.dble.net.mysql.OkPacket;
import com.actiontech.dble.util.CollectionUtil;
import com.actiontech.dble.util.StringUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -23,7 +24,10 @@ public final class SplitDumpHandler {
private static final Pattern SPLIT_STMT = Pattern.compile("([^\\s]+)\\s+([^\\s]+)\\s*(-r(\\d+))?\\s*(-w(\\d+))?\\s*(-l(\\d+))?", Pattern.CASE_INSENSITIVE);
public static final Logger LOGGER = LoggerFactory.getLogger("dumpFileLog");

public void handle(String stmt, ManagerConnection c, int offset) {
private SplitDumpHandler() {
}

public static void handle(String stmt, ManagerConnection c, int offset) {
LOGGER.info("begin to split dump file.");
DumpFileConfig config = parseOption(stmt.substring(offset).trim());
if (config == null) {
Expand All @@ -44,33 +48,47 @@ public void handle(String stmt, ManagerConnection c, int offset) {

// thread for process statement
dumpFileExecutor.start();
// start read
// start write
writer.start();
reader.start();
// start read
reader.start(c);
} catch (IOException e) {
LOGGER.info("finish to split dump file.");
c.writeErrMessage(ErrorCode.ER_IO_EXCEPTION, e.getMessage());
return;
} catch (InterruptedException ie) {
LOGGER.info("finish to split dump file, because the task is interrupted.");
// manager connection is closed or waiting blocking queue
dumpFileExecutor.stop();
writer.stop();
return;
}

while (!writer.isFinished()) {
while (!c.isClosed() && !writer.isFinished()) {
LockSupport.parkNanos(1000);
}

if (c.isClosed()) {
dumpFileExecutor.stop();
writer.stop();
return;
}

List<ErrorMsg> errors = dumpFileExecutor.getContext().getErrors();
if (CollectionUtil.isEmpty(errors)) {
OkPacket packet = new OkPacket();
packet.setPacketId(1);
packet.setAffectedRows(0);
packet.setServerStatus(2);
packet.setMessage(StringUtil.encode("please see detail in dump.log.", c.getCharset().getResults()));
packet.write(c);
} else {
DumpFileError.execute(c, errors);
}
LOGGER.info("finish to split dump file.");
}

private DumpFileConfig parseOption(String options) {
private static DumpFileConfig parseOption(String options) {
Matcher m = SPLIT_STMT.matcher(options);
DumpFileConfig config = null;
if (m.matches()) {
Expand Down

0 comments on commit 6aee2ed

Please sign in to comment.