Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge branch '3.4.x'
Browse files Browse the repository at this point in the history
  • Loading branch information
rhauch committed Aug 30, 2017
2 parents f5044f1 + 780487e commit 076bf36
Show file tree
Hide file tree
Showing 5 changed files with 343 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,26 @@
import org.apache.kafka.common.config.ConfigDef.Width;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.config.types.Password;
import org.apache.kafka.common.utils.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Arrays;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

public class JdbcSourceConnectorConfig extends AbstractConfig {

private static final Logger LOG = LoggerFactory.getLogger(JdbcSourceConnectorConfig.class);

public static final String CONNECTION_URL_CONFIG = "connection.url";
private static final String CONNECTION_URL_DOC = "JDBC connection URL.";
private static final String CONNECTION_URL_DISPLAY = "JDBC URL";
Expand Down Expand Up @@ -180,8 +189,11 @@ public class JdbcSourceConnectorConfig extends AbstractConfig {
public static final String MODE_GROUP = "Mode";
public static final String CONNECTOR_GROUP = "Connector";


private static final Recommender TABLE_RECOMMENDER = new TableRecommender();
// We want the table recommender to only cache values for a short period of time so that the blacklist and whitelist
// config properties can use a single query.
private static final Recommender TABLE_RECOMMENDER = new CachingRecommender(new TableRecommender(),
Time.SYSTEM,
TimeUnit.SECONDS.toMillis(5));
private static final Recommender MODE_DEPENDENTS_RECOMMENDER = new ModeDependentsRecommender();


Expand Down Expand Up @@ -248,11 +260,12 @@ public List<Object> validValues(String name, Map<String, Object> config) {
String dbUser = (String) config.get(CONNECTION_USER_CONFIG);
Password dbPassword = (Password) config.get(CONNECTION_PASSWORD_CONFIG);
String schemaPattern = (String) config.get(JdbcSourceTaskConfig.SCHEMA_PATTERN_CONFIG);
Set<String> tableTypes = new HashSet<>((List<String>) config.get(JdbcSourceTaskConfig.TABLE_TYPE_CONFIG));
if (dbUrl == null) {
throw new ConfigException(CONNECTION_URL_CONFIG + " cannot be null.");
}
try (Connection db = DriverManager.getConnection(dbUrl, dbUser, dbPassword == null ? null : dbPassword.value())) {
return new LinkedList<Object>(JdbcUtils.getTables(db, schemaPattern));
return new LinkedList<Object>(JdbcUtils.getTables(db, schemaPattern, tableTypes));
} catch (SQLException e) {
throw new ConfigException("Couldn't open connection to " + dbUrl, e);
}
Expand All @@ -264,6 +277,63 @@ public boolean visible(String name, Map<String, Object> config) {
}
}

/**
* A recommender that caches values returned by a delegate, where the cache remains valid for a specified duration
* and as long as the configuration remains unchanged.
*/
static class CachingRecommender implements Recommender {

private final Time time;
private final long cacheDurationInMillis;
private final AtomicReference<CachedRecommenderValues> cachedValues = new AtomicReference<>(new CachedRecommenderValues());
private final Recommender delegate;

public CachingRecommender(Recommender delegate, Time time, long cacheDurationInMillis) {
this.delegate = delegate;
this.time = time;
this.cacheDurationInMillis = cacheDurationInMillis;
}

@Override
public List<Object> validValues(String name, Map<String, Object> config) {
List<Object> results = cachedValues.get().cachedValue(config, time.milliseconds());
if (results != null) {
LOG.debug("Returning cached table names: {}", results);
return results;
}
LOG.trace("Fetching table names");
results = delegate.validValues(name, config);
LOG.debug("Caching table names: {}", results);
cachedValues.set(new CachedRecommenderValues(config, results, time.milliseconds() + cacheDurationInMillis));
return results;
}

@Override
public boolean visible(String name, Map<String, Object> config) {
return true;
}
}

static class CachedRecommenderValues {
private final Map<String, Object> lastConfig;
private final List<Object> results;
private final long expiryTimeInMillis;
public CachedRecommenderValues() {
this(null, null, 0L);
}
public CachedRecommenderValues(Map<String, Object> lastConfig, List<Object> results, long expiryTimeInMillis) {
this.lastConfig = lastConfig;
this.results = results;
this.expiryTimeInMillis = expiryTimeInMillis;
}
public List<Object> cachedValue(Map<String, Object> config, long currentTimeInMillis) {
if (currentTimeInMillis < expiryTimeInMillis && lastConfig != null && lastConfig.equals(config)) {
return results;
}
return null;
}
}

private static class ModeDependentsRecommender implements Recommender {

@Override
Expand Down
51 changes: 40 additions & 11 deletions src/main/java/io/confluent/connect/jdbc/util/JdbcUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ public class JdbcUtils {
* Get a list of tables in the database. This uses the default filters, which only include
* user-defined tables.
* @param conn database connection
* @return a list of tables
* @return a list of tables; never null
* @throws SQLException
*/
public static List<String> getTables(Connection conn, String schemaPattern) throws SQLException {
Expand All @@ -73,28 +73,57 @@ public static List<String> getTables(Connection conn, String schemaPattern) thro
/**
* Get a list of table names in the database.
* @param conn database connection
* @param types a set of table types that should be included in the results
* @param types a set of table types that should be included in the results; may not be null but may be empty if
* the tables should be returned regardless of their type
* @return a list of tables; never null
* @throws SQLException
*/
public static List<String> getTables(Connection conn, String schemaPattern, Set<String> types) throws SQLException {
DatabaseMetaData metadata = conn.getMetaData();
try (ResultSet rs = metadata.getTables(null, schemaPattern, "%", null)) {
String[] tableTypes = types.isEmpty() ? null : getActualTableTypes(metadata, types);

try (ResultSet rs = metadata.getTables(null, schemaPattern, "%", tableTypes)) {
List<String> tableNames = new ArrayList<>();
while (rs.next()) {
if (types.contains(rs.getString(GET_TABLES_TYPE_COLUMN))) {
String colName = rs.getString(GET_TABLES_NAME_COLUMN);
// SQLite JDBC driver does not correctly mark these as system tables
if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) {
continue;
}

tableNames.add(colName);
String colName = rs.getString(GET_TABLES_NAME_COLUMN);
// SQLite JDBC driver does not correctly mark these as system tables
if (metadata.getDatabaseProductName().equals("SQLite") && colName.startsWith("sqlite_")) {
continue;
}

tableNames.add(colName);
}
return tableNames;
}
}

/**
* Find the available table types that are returned by the JDBC driver that case insensitively match the specified types.
*
* @param metadata the database metadata; may not be null but may be empty if no table types
* @param types the case-independent table types
* @return the array of table types take directly from the list of available types returned by the JDBC driver; never null
* @throws SQLException
*/
protected static String[] getActualTableTypes(DatabaseMetaData metadata, Set<String> types) throws SQLException {
// Compute the uppercase form of the desired types ...
Set<String> uppercaseTypes = new HashSet<>();
for (String type : types) {
if (type != null) uppercaseTypes.add(type.toUpperCase());
}
// Now find out the available table types ...
Set<String> matchingTableTypes = new HashSet<>();
try (ResultSet rs = metadata.getTableTypes()) {
while (rs.next()) {
String tableType = rs.getString(1);
if (tableType != null && uppercaseTypes.contains(tableType.toUpperCase())) {
matchingTableTypes.add(tableType);
}
}
}
return matchingTableTypes.toArray(new String[matchingTableTypes.size()]);
}

/**
* Look up the autoincrement column for the specified table.
* @param conn database connection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ public void testSchemaPatternUsedForConfigValidation() throws Exception {
connProps.put(JdbcSourceConnectorConfig.SCHEMA_PATTERN_CONFIG, "SOME_SCHEMA");

PowerMock.mockStatic(JdbcUtils.class);
EasyMock.expect(JdbcUtils.getTables(EasyMock.anyObject(Connection.class), EasyMock.eq("SOME_SCHEMA")))
EasyMock.expect(JdbcUtils.getTables(EasyMock.anyObject(Connection.class), EasyMock.eq("SOME_SCHEMA"),
EasyMock.eq(JdbcUtils.DEFAULT_TABLE_TYPES)))
.andReturn(new ArrayList<String>())
.atLeastOnce();

Expand Down
Loading

0 comments on commit 076bf36

Please sign in to comment.