diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java index 1986b709a..64852e7b1 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java @@ -91,7 +91,7 @@ private BigQuery getBigQuery() { return testBigQuery; } String projectName = config.getString(config.PROJECT_CONFIG); - String key = config.getString(config.KEYFILE_CONFIG); + String key = config.getKeyFile(); String keySource = config.getString(config.KEY_SOURCE_CONFIG); return new BigQueryHelper().setKeySource(keySource).connect(projectName, key); } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 6f93c9bb4..9aa3d62c7 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -265,7 +265,7 @@ private BigQuery getBigQuery() { return testBigQuery; } String projectName = config.getString(config.PROJECT_CONFIG); - String keyFile = config.getString(config.KEYFILE_CONFIG); + String keyFile = config.getKeyFile(); String keySource = config.getString(config.KEY_SOURCE_CONFIG); return new BigQueryHelper().setKeySource(keySource).connect(projectName, keyFile); } @@ -302,7 +302,7 @@ private Storage getGcs() { return testGcs; } String projectName = config.getString(config.PROJECT_CONFIG); - String key = config.getString(config.KEYFILE_CONFIG); + String key = config.getKeyFile(); String keySource = config.getString(config.KEY_SOURCE_CONFIG); return new GCSBuilder(projectName).setKey(key).setKeySource(keySource).build(); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java index d045f1b63..7d4cf0f22 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/config/BigQuerySinkConfig.java @@ -28,6 +28,7 @@ import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.types.Password; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.common.config.ConfigException; @@ -133,7 +134,7 @@ public class BigQuerySinkConfig extends AbstractConfig { "A class that can be used for automatically creating tables and/or updating schemas"; public static final String KEYFILE_CONFIG = "keyfile"; - private static final ConfigDef.Type KEYFILE_TYPE = ConfigDef.Type.STRING; + private static final ConfigDef.Type KEYFILE_TYPE = ConfigDef.Type.PASSWORD; public static final String KEYFILE_DEFAULT = null; private static final ConfigDef.Importance KEYFILE_IMPORTANCE = ConfigDef.Importance.MEDIUM; private static final String KEYFILE_DOC = @@ -406,6 +407,13 @@ protected static Map.Entry parseMapping(String mapping, String n } } + /** + * Returns the keyfile + */ + public String getKeyFile() { + return Optional.ofNullable(getPassword(KEYFILE_CONFIG)).map(Password::value).orElse(null); + } + /** * Parses a config map, which must be provided as a list of Strings of the form * '<key>=<value>' into a Map. Locates that list, splits its key and value pairs, and diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java index 3daa31315..61e14530b 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/SinkPropertiesFactory.java @@ -63,7 +63,7 @@ public void testProperties(BigQuerySinkConfig config) { config.getList(config.TOPICS_TO_TABLES_CONFIG); config.getList(config.DATASETS_CONFIG); - config.getString(config.KEYFILE_CONFIG); + config.getKeyFile(); config.getString(config.PROJECT_CONFIG); config.getBoolean(config.SANITIZE_TOPICS_CONFIG);