Skip to content

Commit

Permalink
Transaction Methods Respect Request Queue
Browse files Browse the repository at this point in the history
Motivation:
The current Transaction Methods implementation lacks consideration for the request queue, resulting in potential undefined behavior and issues.

Modifications:
Transaction Methods properly respect the request queue.

Result:
Bug resolved.
Enhanced stability and reliability.
  • Loading branch information
jchrys committed Dec 21, 2023
1 parent d4a9fe0 commit d1b9592
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 16 deletions.
16 changes: 0 additions & 16 deletions src/main/java/io/asyncer/r2dbc/mysql/MySqlConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -185,21 +185,13 @@ public final class MySqlConnection implements Connection, Lifecycle, ConnectionS
@Override
public Mono<Void> beginTransaction() {
return Mono.defer(() -> {
if (isInTransaction()) {
return Mono.empty();
}

return QueryFlow.executeVoid(client, "BEGIN");
});
}

@Override
public Mono<Void> beginTransaction(TransactionDefinition definition) {
return Mono.defer(() -> {
if (isInTransaction()) {
return Mono.empty();
}

return QueryFlow.beginTransaction(client, this, batchSupported, definition);
});
}
Expand All @@ -219,10 +211,6 @@ public Mono<Void> close() {
@Override
public Mono<Void> commitTransaction() {
return Mono.defer(() -> {
if (!isInTransaction()) {
return Mono.empty();
}

return QueryFlow.doneTransaction(client, this, true, lockWaitTimeout, batchSupported);
});
}
Expand Down Expand Up @@ -300,10 +288,6 @@ public Mono<Void> releaseSavepoint(String name) {
@Override
public Mono<Void> rollbackTransaction() {
return Mono.defer(() -> {
if (!isInTransaction()) {
return Mono.empty();
}

return QueryFlow.doneTransaction(client, this, false, lockWaitTimeout, batchSupported);
});
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -292,6 +292,69 @@ void errorPropagteRequestQueue() {
);
}

@Test
void commitTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.thenMany(Flux.merge(
connection.beginTransaction(),
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
.execute(),
connection.commitTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result ->
Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
)
.doOnNext(text -> assertThat(text).isEqualTo(1L))
);
}

@Test
void rollbackTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.thenMany(Flux.merge(
connection.beginTransaction(),
connection.createStatement("INSERT INTO test VALUES (1, 'test1')")
.execute(),
connection.rollbackTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
.doOnNext(count -> assertThat(count).isEqualTo(0L)))
);
}


@Test
void beginTransactionShouldRespectQueuedMessages() {
final String tdl = "CREATE TEMPORARY TABLE test (id INT NOT NULL PRIMARY KEY, name VARCHAR(50))";
complete(connection ->
Mono.from(connection.createStatement(tdl).execute())
.flatMap(IntegrationTestSupport::extractRowsUpdated)
.then(Mono.from(connection.beginTransaction()))
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isTrue())
.thenMany(Flux.merge(
connection.createStatement("INSERT INTO test VALUES (1, 'test1')").execute(),
connection.commitTransaction(),
connection.beginTransaction()
))
.doOnComplete(() -> assertThat(connection.isInTransaction()).isTrue())
.then(Mono.from(connection.rollbackTransaction()))
.doOnSuccess(ignored -> assertThat(connection.isInTransaction()).isFalse())
.thenMany(connection.createStatement("SELECT COUNT(*) FROM test").execute())
.flatMap(result -> Mono.from(result.map((row, metadata) -> row.get(0, Long.class)))
.doOnNext(count -> assertThat(count).isEqualTo(1L)))
);

}
@Test
void batchCrud() {
// TODO: spilt it to multiple test cases and move it to BatchIntegrationTest
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package io.asyncer.r2dbc.mysql;

import io.asyncer.r2dbc.mysql.constant.SslMode;
import io.r2dbc.spi.R2dbcBadGrammarException;
import io.r2dbc.spi.Result;
import org.jetbrains.annotations.Nullable;
Expand Down Expand Up @@ -87,6 +88,7 @@ static MySqlConnectionConfiguration configuration(
.user("root")
.password(password)
.database(database)
.sslMode(SslMode.DISABLED)
.createDatabaseIfNotExist(createDatabaseIfNotExist)
.autodetectExtensions(autodetectExtensions);

Expand Down

0 comments on commit d1b9592

Please sign in to comment.