Skip to content
This repository was archived by the owner on Nov 27, 2023. It is now read-only.

Commit

Permalink
Merge remote-tracking branch 'jdbc-sink/master' into jdbc-sink-merge
Browse files Browse the repository at this point in the history
One hitch: had to disable `reuseForks` for tests in pom.xml because of static mocking of DriverManager in the source connector tests
  • Loading branch information
shikhar committed Jul 26, 2016
2 parents d0d4792 + 423ff97 commit 2c6cb8c
Show file tree
Hide file tree
Showing 37 changed files with 4,290 additions and 3 deletions.
2 changes: 1 addition & 1 deletion checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@

<!-- header -->
<module name="RegexpHeader">
<property name="header" value="/\*\*\nCopyright .* Confluent Inc."/>
<property name="header" value="/\*\nCopyright .* Confluent Inc."/>
</module>

<module name="TreeWalker">
Expand Down
120 changes: 118 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@
<powermock.version>1.6.2</powermock.version>
<derby.version>10.11.1.1</derby.version>
<commons-io.version>2.4</commons-io.version>
<sqlite.version>3.8.11.2</sqlite.version>
<mockito.version>1.10.19</mockito.version>
<licenses.version>3.1.0-SNAPSHOT</licenses.version>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<confluent.maven.repo>http://packages.confluent.io/maven/</confluent.maven.repo>
</properties>
Expand All @@ -76,11 +79,11 @@
<version>${kafka.version}</version>
</dependency>

<!-- JDBC drivers, only included in runtime so they get packaged -->
<!-- JDBC drivers, only included in runtime so they get packaged -->
<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>3.8.11.2</version>
<version>${sqlite.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
Expand All @@ -90,6 +93,12 @@
<scope>runtime</scope>
</dependency>

<dependency>
<groupId>org.xerial</groupId>
<artifactId>sqlite-jdbc</artifactId>
<version>${sqlite.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
Expand Down Expand Up @@ -124,6 +133,11 @@
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>${commons-io.version}</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>${mockito.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
Expand Down Expand Up @@ -164,6 +178,8 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>2.18.1</version>
<configuration>
<!-- FIXME: because of impossible to reset PowerMock.mockStatic(DriverManager.class) -->
<reuseForks>false</reuseForks>
<argLine>-Djava.awt.headless=true</argLine>
</configuration>
</plugin>
Expand Down Expand Up @@ -215,5 +231,105 @@
</plugins>
</build>
</profile>
<profile>
<id>licenses-package</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<id>create-licenses</id>
<configuration>
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i ${project.build.directory}/${project.build.finalName}-package/share/java/kafka-connect-jdbc</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses.html</argument>
<argument>-l ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/licenses</argument>
<argument>-n ${project.build.directory}/${project.build.finalName}-package/share/doc/kafka-connect-jdbc/notices</argument>
<argument>-t ${project.name}</argument>
<argument>-x licenses-${licenses.version}.jar</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>true</includePluginDependencies>
<executableDependency>
<groupId>io.confluent</groupId>
<artifactId>licenses</artifactId>
</executableDependency>
</configuration>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>licenses</artifactId>
<version>${licenses.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>licenses-source</id>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<id>create-licenses</id>
<configuration>
<mainClass>io.confluent.licenses.LicenseFinder</mainClass>
<arguments>
<!-- Note use of development instead of package so we pick up all dependencies. -->
<argument>-i ${project.build.directory}/${project.build.finalName}-development/share/java/kafka-connect-jdbc</argument>
<argument>-o ${project.basedir}/licenses</argument>
<argument>-f</argument>
<argument>-h ${project.basedir}/licenses.html</argument>
<argument>-l ${project.basedir}/licenses</argument>
<argument>-n ${project.basedir}/notices</argument>
<argument>-t ${project.name}</argument>
<argument>-x licenses-${licenses.version}.jar</argument>
</arguments>
</configuration>
<phase>package</phase>
<goals>
<goal>java</goal>
</goals>
</execution>
</executions>
<configuration>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>true</includePluginDependencies>
<executableDependency>
<groupId>io.confluent</groupId>
<artifactId>licenses</artifactId>
</executableDependency>
</configuration>
<dependencies>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>licenses</artifactId>
<version>${licenses.version}</version>
</dependency>
</dependencies>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
77 changes: 77 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/JdbcSinkConnector.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.connect.jdbc;

import org.apache.kafka.common.config.Config;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.sink.SinkConnector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import io.confluent.connect.jdbc.sink.JdbcSinkConfig;
import io.confluent.connect.jdbc.sink.JdbcSinkTask;

public final class JdbcSinkConnector extends SinkConnector {
private static final Logger log = LoggerFactory.getLogger(JdbcSinkConnector.class);

private Map<String, String> configProps;

public Class<? extends Task> taskClass() {
return JdbcSinkTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
log.info("Setting task configurations for {} workers.", maxTasks);
final List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; ++i) {
configs.add(configProps);
}
return configs;
}

@Override
public void start(Map<String, String> props) {
configProps = props;
}

@Override
public void stop() {
}

@Override
public ConfigDef config() {
return JdbcSinkConfig.CONFIG_DEF;
}

@Override
public Config validate(Map<String, String> connectorConfigs) {
// TODO cross-fields validation here: pkFields against the pkMode
return super.validate(connectorConfigs);
}

@Override
public String version() {
// TODO switch away to whatever connector-jdbc is doing
return getClass().getPackage().getImplementationVersion();
}
}
131 changes: 131 additions & 0 deletions src/main/java/io/confluent/connect/jdbc/sink/BufferedRecords.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
/*
* Copyright 2016 Confluent Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package io.confluent.connect.jdbc.sink;

import org.apache.kafka.connect.errors.ConnectException;
import org.apache.kafka.connect.sink.SinkRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import io.confluent.connect.jdbc.sink.dialect.DbDialect;
import io.confluent.connect.jdbc.sink.metadata.FieldsMetadata;
import io.confluent.connect.jdbc.sink.metadata.SchemaPair;

public class BufferedRecords {
private static final Logger log = LoggerFactory.getLogger(BufferedRecords.class);

private final String tableName;
private final JdbcSinkConfig config;
private final DbDialect dbDialect;
private final DbStructure dbStructure;
private final Connection connection;

private List<SinkRecord> records = new ArrayList<>();
private SchemaPair currentSchemaPair;
private FieldsMetadata fieldsMetadata;
private PreparedStatement preparedStatement;
private PreparedStatementBinder preparedStatementBinder;

public BufferedRecords(JdbcSinkConfig config, String tableName, DbDialect dbDialect, DbStructure dbStructure, Connection connection) {
this.tableName = tableName;
this.config = config;
this.dbDialect = dbDialect;
this.dbStructure = dbStructure;
this.connection = connection;
}

public List<SinkRecord> add(SinkRecord record) throws SQLException {
final SchemaPair schemaPair = new SchemaPair(record.keySchema(), record.valueSchema());

if (currentSchemaPair == null) {
currentSchemaPair = schemaPair;
// re-initialize everything that depends on the record schema
fieldsMetadata = FieldsMetadata.extract(tableName, config.pkMode, config.pkFields, currentSchemaPair);
dbStructure.createOrAmendIfNecessary(config, connection, tableName, fieldsMetadata);
final String insertSql = getInsertSql();
log.debug("insertion sql:{}", config.insertMode, insertSql);
preparedStatement = connection.prepareStatement(insertSql);
preparedStatementBinder = new PreparedStatementBinder(preparedStatement, config.pkMode, schemaPair, fieldsMetadata);
}

final List<SinkRecord> flushed;
if (currentSchemaPair.equals(schemaPair)) {
// Continue with current batch state
records.add(record);
if (records.size() >= config.batchSize) {
flushed = flush();
} else {
flushed = Collections.emptyList();
}
} else {
// Each batch needs to have the same SchemaPair, so get the buffered records out, reset state and re-attempt the add
flushed = flush();
currentSchemaPair = null;
flushed.addAll(add(record));
}
return flushed;
}

public List<SinkRecord> flush() throws SQLException {
if (records.isEmpty()) {
return new ArrayList<>();
}
for (SinkRecord record : records) {
preparedStatementBinder.bindRecord(record);
}
int totalUpdateCount = 0;
for (int updateCount : preparedStatement.executeBatch()) {
totalUpdateCount += updateCount;
}
if (totalUpdateCount != records.size()) {
switch (config.insertMode) {
case INSERT:
throw new ConnectException(String.format("Update count (%d) did not sum up to total number of records inserted (%d)",
totalUpdateCount, records.size()));
case UPSERT:
log.trace("Upserted records:{} resulting in in totalUpdateCount:{}", records.size(), totalUpdateCount);
}
}

final List<SinkRecord> flushedRecords = records;
records = new ArrayList<>();
return flushedRecords;
}

private String getInsertSql() {
switch (config.insertMode) {
case INSERT:
return dbDialect.getInsert(tableName, fieldsMetadata.keyFieldNames, fieldsMetadata.nonKeyFieldNames);
case UPSERT:
if (fieldsMetadata.keyFieldNames.isEmpty()) {
throw new ConnectException(String.format(
"Write to table '%s' in UPSERT mode requires key field names to be known, check the primary key configuration", tableName
));
}
return dbDialect.getUpsertQuery(tableName, fieldsMetadata.keyFieldNames, fieldsMetadata.nonKeyFieldNames);
default:
throw new ConnectException("Invalid insert mode");
}
}
}
Loading

0 comments on commit 2c6cb8c

Please sign in to comment.