Skip to content

Commit

Permalink
refactor: separate sql stores from datasources (#4381)
Browse files Browse the repository at this point in the history
  • Loading branch information
ndr-brt authored Aug 7, 2024
1 parent 0d9ee7e commit b99f4dc
Show file tree
Hide file tree
Showing 27 changed files with 331 additions and 197 deletions.
1 change: 1 addition & 0 deletions docs/developer/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
## Guides
- [Customize Policy Engine](policy-engine.md)
- [Performance Tuning](performance-tuning.md)
- [Sql persistence](sql-persistence.md)

## Build and testing
- [OpenApi Spec Generation](openapi.md)
Expand Down
41 changes: 41 additions & 0 deletions docs/developer/sql-persistence.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# SQL persistence

Every store in the EDC, intended as persistence for state, comes out of the box with two implementations:
- in-memory
- sql (postgresql dialect)

By default, the `in-memory` stores are provided by the dependency injection, the `sql` implementations can be used by
simply registering the relative extensions (e.g. `asset-index-sql`, `contract-negotiation-store-sql`, ...).

## Configuration

### DataSources

For using `sql` extensions, a `DataSource` is needed, and it should be registered on the `DataSourceRegistry` service.

The `sql-pool-apache-commons` extension takes care to create and register pooled data sources starting from configuration.
It expects at least one data source called `default` that can be configured with `Vault` keys:
```
edc.datasource.default.url=...
edc.datasource.default.name=...
edc.datasource.default.password=...
```
(note: if no vault entries are found for such keys, they will be obtained from the configuration).

Other datasources can be defined using the same settings structure:
```
edc.datasource.<datasource-name>.url=...
edc.datasource.<datasource-name>.name=...
edc.datasource.<datasource-name>.password=...
```

`<datasource-name>` can be a string that then can be used by the stores configuration to use specific data sources.

### Using custom datasource in stores

Using a custom datasource in a store can be done by configuring the setting:
```
edc.sql.store.<store-context>.datasource=<datasource-name>
```

This way the `<store-context>` (that could be `asset`, `contractnegotiation`...) will use the `<datasource-name>` datasource.
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright (c) 2024 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.edc.sql.configuration;

import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.system.configuration.Config;

import static org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry.DEFAULT_DATASOURCE;

/**
* This interface is only functional to the migration from the old format to the new one.
* Ref. <a href="https://github.com/eclipse-edc/Connector/issues/3811">https://github.com/eclipse-edc/Connector/issues/3811</a>
*/
public interface DataSourceName {

/**
* Extracts datasource name from configuration considering deprecated key as fallback.
*
* @param key the setting key.
* @param deprecatedKey the deprecate setting key.
* @param config the config.
* @param monitor the monitor.
* @return the datasource name
* @deprecated will be removed together with the deprecated settings.
*/
@Deprecated(since = "0.8.1")
static String getDataSourceName(String key, String deprecatedKey, Config config, Monitor monitor) {
var datasourceName = config.getString(key, null);

if (datasourceName != null) {
return datasourceName;
}

var name = config.getString(deprecatedKey, null);

if (name != null) {
monitor.warning("Datasource name setting key %s has been deprecated, please switch to %s".formatted(deprecatedKey, key));
return name;
}

return DEFAULT_DATASOURCE;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,13 @@
import org.eclipse.edc.sql.datasource.ConnectionFactoryDataSource;
import org.eclipse.edc.sql.datasource.ConnectionPoolDataSource;
import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import javax.sql.DataSource;

import static java.util.Optional.ofNullable;
Expand All @@ -48,24 +44,32 @@ public class CommonsConnectionPoolServiceExtension implements ServiceExtension {
public static final String NAME = "Commons Connection Pool";

public static final String EDC_DATASOURCE_PREFIX = "edc.datasource";
private static final String EDC_DATASOURCE_CONFIG_CONTEXT = EDC_DATASOURCE_PREFIX + ".<name>";

@Setting(value = "JDBC url", required = true, context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String URL = "url";
@Setting(value = "Username to be used for the JDBC connection. Can be omitted if not required, or if the user is encoded in the JDBC url.", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String USER = "user";
@Setting(value = "Username to be used for the JDBC connection. Can be omitted if not required, or if the password is encoded in the JDBC url.", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String PASSWORD = "password";

@Setting(value = "Pool max idle connections", type = "int", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTIONS_MAX_IDLE = "pool.connections.max-idle";
@Setting(value = "Pool max total connections", type = "int", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTIONS_MAX_TOTAL = "pool.connections.max-total";
@Setting(value = "Pool min idle connections", type = "int", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTIONS_MIN_IDLE = "pool.connections.min-idle";
@Setting(value = "Pool test on borrow", type = "boolean", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTION_TEST_ON_BORROW = "pool.connection.test.on-borrow";
@Setting(value = "Pool test on create", type = "boolean", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTION_TEST_ON_CREATE = "pool.connection.test.on-create";
@Setting(value = "Pool test on return", type = "boolean", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTION_TEST_ON_RETURN = "pool.connection.test.on-return";
@Setting(value = "Pool test while idle", type = "boolean", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTION_TEST_WHILE_IDLE = "pool.connection.test.while-idle";
@Setting(value = "Pool test query", context = EDC_DATASOURCE_CONFIG_CONTEXT)
public static final String POOL_CONNECTION_TEST_QUERY = "pool.connection.test.query";

@Setting(required = true)
public static final String URL = "url";

@Setting(value = "Username to be used for the JDBC connection. Can be omitted if not required, or if the user is encoded in the JDBC url.")
public static final String USER = "user";
@Setting(value = "Username to be used for the JDBC connection. Can be omitted if not required, or if the password is encoded in the JDBC url.")
public static final String PASSWORD = "password";

private final List<CommonsConnectionPool> commonsConnectionPools = new LinkedList<>();
@Inject
private DataSourceRegistry dataSourceRegistry;

Expand All @@ -78,113 +82,75 @@ public class CommonsConnectionPoolServiceExtension implements ServiceExtension {
@Inject
private Vault vault;

private final List<CommonsConnectionPool> commonsConnectionPools = new LinkedList<>();

@Override
public String name() {
return NAME;
}

@Override
public void initialize(ServiceExtensionContext context) {
var config = context.getConfig(EDC_DATASOURCE_PREFIX);

var namedConnectionPools = createConnectionPools(config);

for (var entry : namedConnectionPools.entrySet()) {
var dataSourceName = entry.getKey();
var commonsConnectionPool = entry.getValue();
commonsConnectionPools.add(commonsConnectionPool);
var connectionPoolDataSource = new ConnectionPoolDataSource(commonsConnectionPool);
context.getConfig(EDC_DATASOURCE_PREFIX).partition().forEach(config -> {
var dataSourceName = config.currentNode();
var dataSource = createDataSource(config);
var connectionPool = createConnectionPool(dataSource, config);
commonsConnectionPools.add(connectionPool);
var connectionPoolDataSource = new ConnectionPoolDataSource(connectionPool);
dataSourceRegistry.register(dataSourceName, connectionPoolDataSource);
}
});
}

@Override
public void shutdown() {
commonsConnectionPools.forEach(CommonsConnectionPool::close);
}

public List<CommonsConnectionPool> getCommonsConnectionPools() {
return commonsConnectionPools;
}

private @NotNull Supplier<@Nullable String> readFromConfig(Config config, String value) {
return () -> {
var entry = EDC_DATASOURCE_PREFIX + "." + config.currentNode() + "." + value;
monitor.warning("Database configuration value '%s' not found in vault, will fall back to Config. Please consider putting database configuration into the vault.".formatted(entry));
return config.getString(value, null);
};
}

private void setIfProvidedString(String key, Consumer<String> setter, Config config) {
setIfProvided(key, setter, config::getString);
}

private void setIfProvidedBoolean(String key, Consumer<Boolean> setter, Config config) {
setIfProvided(key, setter, config::getBoolean);
}

private void setIfProvidedInt(String key, Consumer<Integer> setter, Config config) {
setIfProvided(key, setter, config::getInteger);
}

private <T> void setIfProvided(String key, Consumer<T> setter, BiFunction<String, T, T> getter) {
var value = getter.apply(key, null);
if (value != null) {
setter.accept(value);
}
}

private Map<String, CommonsConnectionPool> createConnectionPools(Config parent) {
Map<String, CommonsConnectionPool> commonsConnectionPools = new HashMap<>();
for (var config : parent.partition().toList()) {
var dataSourceName = config.currentNode();

var dataSource = createDataSource(config);

var commonsConnectionPool = createConnectionPool(dataSource, config);
commonsConnectionPools.put(dataSourceName, commonsConnectionPool);
}
return commonsConnectionPools;
}

private DataSource createDataSource(Config config) {
var rootPath = EDC_DATASOURCE_PREFIX + "." + config.currentNode();

// read values from the vault first, fall back to config
var urlProperty = rootPath + "." + URL;
var jdbcUrl = ofNullable(vault.resolveSecret(urlProperty)).orElseGet(readFromConfig(config, URL));

if (jdbcUrl == null) {
throw new EdcException("Mandatory config '%s' not found. Please provide a value for the '%s' property, either as a secret in the vault or an application property.".formatted(urlProperty, urlProperty));
}

var jdbcUser = ofNullable(vault.resolveSecret(rootPath + "." + USER))
.orElseGet(readFromConfig(config, USER));
var jdbcPassword = ofNullable(vault.resolveSecret(rootPath + "." + PASSWORD))
.orElseGet(readFromConfig(config, PASSWORD));
var jdbcUrl = getSecretOrSetting(rootPath, URL, config)
.orElseThrow(() -> new EdcException("Mandatory url for datasource '%s' not found. Please provide a value for it, either as a secret in the vault or an application property.".formatted(config.currentNode())));
var jdbcUser = getSecretOrSetting(rootPath, USER, config);
var jdbcPassword = getSecretOrSetting(rootPath, PASSWORD, config);

var properties = new Properties();
properties.putAll(config.getRelativeEntries());

// only set if not-null, otherwise Properties#add throws a NPE
ofNullable(jdbcUser).ifPresent(u -> properties.put(USER, u));
ofNullable(jdbcPassword).ifPresent(p -> properties.put(PASSWORD, p));
jdbcUser.ifPresent(u -> properties.put(USER, u));
jdbcPassword.ifPresent(p -> properties.put(PASSWORD, p));

return new ConnectionFactoryDataSource(connectionFactory, jdbcUrl, properties);
}

private Optional<String> getSecretOrSetting(String rootPath, String key, Config config) {
var fullKey = rootPath + "." + key;
return ofNullable(vault.resolveSecret(fullKey))
.or(() -> {
monitor.warning("Datasource configuration value '%s' not found in vault, will fall back to Config. Please consider putting datasource configuration into the vault.".formatted(fullKey));
return Optional.ofNullable(config.getString(key, null));
});
}

private CommonsConnectionPool createConnectionPool(DataSource unPooledDataSource, Config config) {
var builder = CommonsConnectionPoolConfig.Builder.newInstance();

setIfProvidedInt(POOL_CONNECTIONS_MAX_IDLE, builder::maxIdleConnections, config);
setIfProvidedInt(POOL_CONNECTIONS_MAX_TOTAL, builder::maxTotalConnections, config);
setIfProvidedInt(POOL_CONNECTIONS_MIN_IDLE, builder::minIdleConnections, config);
setIfProvidedBoolean(POOL_CONNECTION_TEST_ON_BORROW, builder::testConnectionOnBorrow, config);
setIfProvidedBoolean(POOL_CONNECTION_TEST_ON_CREATE, builder::testConnectionOnCreate, config);
setIfProvidedBoolean(POOL_CONNECTION_TEST_ON_RETURN, builder::testConnectionOnReturn, config);
setIfProvidedBoolean(POOL_CONNECTION_TEST_WHILE_IDLE, builder::testConnectionWhileIdle, config);
setIfProvidedString(POOL_CONNECTION_TEST_QUERY, builder::testQuery, config);
setIfProvided(POOL_CONNECTIONS_MAX_IDLE, config::getInteger, builder::maxIdleConnections);
setIfProvided(POOL_CONNECTIONS_MAX_TOTAL, config::getInteger, builder::maxTotalConnections);
setIfProvided(POOL_CONNECTIONS_MIN_IDLE, config::getInteger, builder::minIdleConnections);
setIfProvided(POOL_CONNECTION_TEST_ON_BORROW, config::getBoolean, builder::testConnectionOnBorrow);
setIfProvided(POOL_CONNECTION_TEST_ON_CREATE, config::getBoolean, builder::testConnectionOnCreate);
setIfProvided(POOL_CONNECTION_TEST_ON_RETURN, config::getBoolean, builder::testConnectionOnReturn);
setIfProvided(POOL_CONNECTION_TEST_WHILE_IDLE, config::getBoolean, builder::testConnectionWhileIdle);
setIfProvided(POOL_CONNECTION_TEST_QUERY, config::getString, builder::testQuery);

return new CommonsConnectionPool(unPooledDataSource, builder.build(), monitor);
}

private <T> void setIfProvided(String key, BiFunction<String, T, T> getter, Consumer<T> setter) {
var value = getter.apply(key, null);
if (value != null) {
setter.accept(value);
}
}
}
Loading

0 comments on commit b99f4dc

Please sign in to comment.