Skip to content

Commit

Permalink
feat: shedlock over jdbc (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
KirillKurdyukov authored Nov 18, 2024
1 parent 809ec33 commit 3690721
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 159 deletions.
18 changes: 4 additions & 14 deletions shedlock-ydb/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

<groupId>tech.ydb.dialects</groupId>
<artifactId>shedlock-ydb</artifactId>
<version>0.2.0</version>
<version>0.3.0</version>

<packaging>jar</packaging>

Expand Down Expand Up @@ -38,8 +38,8 @@

<junit5.version>5.9.3</junit5.version>
<log4j2.version>2.17.2</log4j2.version>
<ydb.sdk.version>2.2.6</ydb.sdk.version>
<ydb.jdbc.version>2.2.2</ydb.jdbc.version>
<ydb.sdk.version>2.3.6</ydb.sdk.version>
<ydb.jdbc.version>2.3.5</ydb.jdbc.version>
<spring.boot.version>3.2.3</spring.boot.version>
<shedlock-spring.version>5.15.0</shedlock-spring.version>
</properties>
Expand All @@ -53,13 +53,6 @@

<dependencyManagement>
<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-bom</artifactId>
<version>${ydb.sdk.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
Expand All @@ -71,10 +64,6 @@
</dependencyManagement>

<dependencies>
<dependency>
<groupId>tech.ydb</groupId>
<artifactId>ydb-sdk-coordination</artifactId>
</dependency>
<dependency>
<groupId>net.javacrumbs.shedlock</groupId>
<artifactId>shedlock-spring</artifactId>
Expand All @@ -101,6 +90,7 @@
<dependency>
<groupId>tech.ydb.test</groupId>
<artifactId>ydb-junit5-support</artifactId>
<version>${ydb.sdk.version}</version>
<scope>test</scope>
</dependency>
<dependency>
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import java.util.Optional;
import javax.sql.DataSource;
import net.javacrumbs.shedlock.core.LockConfiguration;
import net.javacrumbs.shedlock.core.LockProvider;
import net.javacrumbs.shedlock.core.SimpleLock;
import net.javacrumbs.shedlock.support.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* @author Kirill Kurdyukov
*/
public class YdbJDBCLockProvider implements LockProvider {
private static final Logger LOGGER = LoggerFactory.getLogger(YdbJDBCLockProvider.class);
private static final String LOCKED_BY = "Hostname=" + Utils.getHostname() + ", " +
"Current PID=" + ProcessHandle.current().pid();

private final DataSource dataSource;

public YdbJDBCLockProvider(DataSource dataSource) {
this.dataSource = dataSource;
}

@Override
public Optional<SimpleLock> lock(LockConfiguration lockConfiguration) {
try (var connection = dataSource.getConnection()) {
var autoCommit = connection.getAutoCommit();
try {
connection.setAutoCommit(false);

var selectPS = connection.prepareStatement("SELECT locked_by, lock_until FROM shedlock " +
"WHERE name = ? AND lock_until > CurrentUtcTimestamp()");

selectPS.setString(1, lockConfiguration.getName());

try (var rs = selectPS.executeQuery()) {
if (rs.next()) {
LOGGER.debug("Instance[{}] acquire lock is failed. Leader is {}, lock_until = {}",
LOCKED_BY, rs.getString(1), rs.getString(2));
return Optional.empty();
}
}

var upsertPS = connection.prepareStatement("" +
"UPSERT INTO shedlock(name, lock_until, locked_at, locked_by) " +
"VALUES (?, Unwrap(CurrentUtcTimestamp() + ?), CurrentUtcTimestamp(), ?)"
);

upsertPS.setObject(1, lockConfiguration.getName());
upsertPS.setObject(2, lockConfiguration.getLockAtMostFor());
upsertPS.setObject(3, LOCKED_BY);
upsertPS.execute();

connection.commit();

LOGGER.debug("Instance[{}] is leader", LOCKED_BY);

return Optional.of(new YdbJDBCLock(lockConfiguration.getName(), dataSource));
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
LOGGER.debug("Instance[{}] acquire lock is failed", LOCKED_BY);

return Optional.empty();
}
}

private record YdbJDBCLock(String name, DataSource dataSource) implements SimpleLock {
private static final int ATTEMPT_RELEASE_LOCK = 10;

@Override
public void unlock() {
for (int i = 0; i < ATTEMPT_RELEASE_LOCK; i++) {
try {
doUnlock();

return;
} catch (SQLException e) {
if (i == ATTEMPT_RELEASE_LOCK - 1) {
throw new RuntimeException(e);
}
}
}
}

private void doUnlock() throws SQLException {
try (var connection = dataSource.getConnection()) {
var autoCommit = connection.getAutoCommit();

try {
connection.setAutoCommit(true);
var ps = connection.prepareStatement(
"UPDATE shedlock SET lock_until = CurrentUtcTimestamp() WHERE name = ? and locked_by = ?");
ps.setString(1, name);
ps.setString(2, LOCKED_BY);
ps.execute();
} finally {
connection.setAutoCommit(autoCommit);
}
} catch (SQLException e) {
LOGGER.debug(String.format("Instance[{%s}] release lock is failed", LOCKED_BY), e);

throw e;
}
}
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,9 @@
package tech.ydb.lock.provider;

import java.sql.SQLException;
import javax.sql.DataSource;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import tech.ydb.jdbc.YdbConnection;

/**
* @author Kirill Kurdyukov
Expand All @@ -13,11 +12,8 @@
@Configuration
public class YdbLockProviderConfiguration {
@Bean
public YdbCoordinationServiceLockProvider ydbLockProvider(DataSource dataSource) throws SQLException {
var provider = new YdbCoordinationServiceLockProvider(dataSource.getConnection().unwrap(YdbConnection.class));

provider.init();

return provider;
@ConditionalOnBean(DataSource.class)
public YdbJDBCLockProvider ydbLockProvider(DataSource dataSource) {
return new YdbJDBCLockProvider(dataSource);
}
}
Loading

0 comments on commit 3690721

Please sign in to comment.