Skip to content

Commit

Permalink
Enhance Transactional Methods for Improved Request Queue Handling (#180)
Browse files Browse the repository at this point in the history
Motivation:
The current Transaction Methods implementation lacks consideration for
the request queue, resulting in potential undefined behavior and issues.

Modifications:
Transaction Methods properly respect the request queue.

Result:
Bug resolved.
Enhanced stability and reliability.
  • Loading branch information
jchrys authored Dec 22, 2023
1 parent 14e9342 commit 550f8c3
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 19 deletions.
20 changes: 1 addition & 19 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -184,22 +184,12 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS

@Override
public Mono<Void> beginTransaction() {
return Mono.defer(() -> {
if (isInTransaction()) {
return Mono.empty();
}

return QueryFlow.executeVoid(client, "BEGIN");
});
return beginTransaction(MySqlTransactionDefinition.empty());
}

@Override
public Mono<Void> beginTransaction(TransactionDefinition definition) {
return Mono.defer(() -> {
if (isInTransaction()) {
return Mono.empty();
}

return QueryFlow.beginTransaction(client, this, batchSupported, definition);
});
}
Expand All @@ -219,10 +209,6 @@ public Mono<Void> close() {
@Override
public Mono<Void> commitTransaction() {
return Mono.defer(() -> {
if (!isInTransaction()) {
return Mono.empty();
}

return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported);
});
}
Expand Down Expand Up @@ -300,10 +286,6 @@ public Mono<Void> releaseSavepoint(String name) {
@Override
public Mono<Void> rollbackTransaction() {
return Mono.defer(() -> {
if (!isInTransaction()) {
return Mono.empty();
}

return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,69 @@ void errorPropagteRequestQueue() {
);
}

@Test
void commitTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.thenMany(Flux.merge(
connection.beginTransaction(),
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
.execute(),
connection.commitTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result ->
Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
)
.doOnNext(text -> assertThat(text).isEqualTo(1L))
);
}

@Test
void rollbackTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.thenMany(Flux.merge(
connection.beginTransaction(),
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
.execute(),
connection.rollbackTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
.doOnNext(count -> assertThat(count).isEqualTo(0L)))
);
}

@Test
void beginTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.then(Mono.from(connection.beginTransaction()))
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
.thenMany(Flux.merge(
connection.createStatement("INSERT INTO test VALUES (1, 'test1')").execute(),
connection.commitTransaction(),
connection.beginTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isTrue())
.then(Mono.from(connection.rollbackTransaction()))
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
.doOnNext(count -> assertThat(count).isEqualTo(1L)))
);

}

@Test
void batchCrud() {
// TODO: spilt it to multiple test cases and move it to BatchIntegrationTest
Expand Down

0 comments on commit 550f8c3

Please sign in to comment.