Skip to content

Commit

Permalink
[fix](cloud) Fix cloud -230 retry not reset ctx state (apache#47326)
Browse files Browse the repository at this point in the history
  • Loading branch information
deardeng authored Jan 27, 2025
1 parent f45984b commit 780024a
Show file tree
Hide file tree
Showing 3 changed files with 35 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1274,15 +1274,15 @@ public void abortTransaction(Long dbId, Long transactionId, String reason,

AbortTxnResponse abortTxnResponse = null;
try {
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null, null);
abortTxnResponse = abortTransactionImpl(dbId, transactionId, reason, null);
} finally {
handleAfterAbort(abortTxnResponse, txnCommitAttachment, transactionId);
}
}

private AbortTxnResponse abortTransactionImpl(Long dbId, Long transactionId, String reason,
TxnCommitAttachment txnCommitAttachment, List<Table> tableList) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}", dbId, transactionId);
TxnCommitAttachment txnCommitAttachment) throws UserException {
LOG.info("try to abort transaction, dbId:{}, transactionId:{}, reason: {}", dbId, transactionId, reason);

AbortTxnRequest.Builder builder = AbortTxnRequest.newBuilder();
builder.setDbId(dbId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -598,6 +598,7 @@ public void queryRetry(TUniqueId queryId) throws Exception {
i, DebugUtil.printId(firstQueryId), DebugUtil.printId(lastQueryId),
DebugUtil.printId(queryId), randomMillis);
Thread.sleep(randomMillis);
context.getState().reset();
} catch (Exception e) {
throw e;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
import com.mysql.cj.jdbc.StatementImpl
import org.apache.doris.regression.suite.ClusterOptions
import org.apache.doris.regression.util.NodeType
import org.apache.doris.regression.suite.SuiteCluster
Expand All @@ -30,6 +31,16 @@ suite("test_retry_e-230", 'docker') {
options.feConfigs.add('sys_log_verbose_modules=org')
options.setBeNum(1)
options.cloudMode = true

def insert_sql = { sql, expected_row_count ->
def stmt = prepareStatement """ ${sql} """
def result = stmt.executeUpdate()
logger.info("insert result: " + result)
def serverInfo = (((StatementImpl) stmt).results).getServerInfo()
logger.info("result server info: " + serverInfo)
assertEquals(result, expected_row_count)
assertTrue(serverInfo.contains("'status':'VISIBLE'"))
}
// 1. connect to master
options.connectToFollower = false
for (def j = 0; j < 2; j++) {
Expand Down Expand Up @@ -57,7 +68,7 @@ suite("test_retry_e-230", 'docker') {
);
"""
for (def i = 1; i <= 5; i++) {
sql "INSERT INTO ${tbl} VALUES (${i}, ${10 * i})"
insert_sql """INSERT INTO ${tbl} VALUES (${i}, ${10 * i})""", 1
}

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])
Expand Down Expand Up @@ -112,36 +123,48 @@ suite("test_retry_e-230", 'docker') {
)
"""

sql """
insert into ${tbl1} values (9,10,11,12), (1,2,3,4)
"""
insert_sql """INSERT INTO ${tbl1} VALUES (9,10,11,12), (1,2,3,4)""", 2

// dp again
cluster.injectDebugPoints(NodeType.BE, ['CloudTablet.capture_rs_readers.return.e-230' : null])

cluster.clearFrontendDebugPoints()
try {
sql """insert into ${tbl2} select * from ${tbl1}"""
assertFalse(true)
} catch (Exception e) {
logger.info("Received expected exception when insert into select: {}", e.getMessage())
assert e.getMessage().contains("[E-230]injected error"), "Unexpected exception message when insert into select"
}

cluster.injectDebugPoints(NodeType.FE, ['StmtExecutor.retry.longtime' : null])

def futrue3 = thread {
Thread.sleep(4000)
cluster.clearBackendDebugPoints()
}

begin = System.currentTimeMillis();
def futrue4 = thread {
def result = try_sql """insert into ${tbl2} select * from ${tbl1}"""
insert_sql """insert into ${tbl2} select * from ${tbl1}""", 2
}

futrue4.get()
cost = System.currentTimeMillis() - begin;
log.info("time cost insert into select : {}", cost)
futrue3.get()
def tbl1Ret = sql_return_maparray """select * from ${tbl1}"""
log.info("tbl1 ret {}", tbl1Ret)
def tbl2Ret = sql_return_maparray """select * from ${tbl2}"""
log.info("tbl2 ret {}", tbl2Ret)
// Compare the results from both tables
assertEquals(tbl1Ret, tbl2Ret, "Data in ${tbl1} and ${tbl2} should be identical")
// fe StmtExecutor retry time, at most 25 * 1.5s + 25 * 2.5s
assertTrue(cost > 4000 && cost < 100000)

} finally {
cluster.clearFrontendDebugPoints()
cluster.clearBackendDebugPoints()
sql """ DROP TABLE IF EXISTS ${tbl} """
sql """ DROP TABLE IF EXISTS ${tbl1} """
sql """ DROP TABLE IF EXISTS ${tbl2} """
}
}
// 2. connect to follower
Expand Down

0 comments on commit 780024a

Please sign in to comment.