From 0a21bd511ae25f6ed02df237775e027578b43382 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 14 Feb 2020 16:40:36 -0800 Subject: [PATCH 01/13] Move autoCreateTable function to BigQueryWriter. --- .../connect/bigquery/BigQuerySinkTask.java | 23 +++-------- .../write/row/AdaptiveBigQueryWriter.java | 38 +++++++++++++++---- 2 files changed, 36 insertions(+), 25 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 9aa3d62c7..9478d587d 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -142,8 +142,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) { TableId baseTableId = topicsToBaseTableIds.get(record.topic()); - maybeCreateTable(record, baseTableId); - PartitionedTableId.Builder builder = new PartitionedTableId.Builder(baseTableId); if(usePartitionDecorator) { @@ -161,20 +159,6 @@ private PartitionedTableId getRecordTable(SinkRecord record) { return builder.build(); } - /** - * Create the table which doesn't exist in BigQuery for a (record's) topic when autoCreateTables config is set to true. - * @param record Kafka Sink Record to be streamed into BigQuery. - * @param baseTableId BaseTableId in BigQuery. - */ - private void maybeCreateTable(SinkRecord record, TableId baseTableId) { - BigQuery bigQuery = getBigQuery(); - boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); - if (autoCreateTables && bigQuery.getTable(baseTableId) == null) { - getSchemaManager(bigQuery).createTable(baseTableId, record.topic()); - logger.info("Table {} does not exist, auto-created table for topic {}", baseTableId, record.topic()); - } - } - private RowToInsert getRecordRow(SinkRecord record) { Map convertedRecord = recordConverter.convertRecord(record, KafkaSchemaRecordType.VALUE); Optional kafkaKeyFieldName = config.getKafkaKeyFieldName(); @@ -284,14 +268,17 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) { private BigQueryWriter getBigQueryWriter() { boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG); + boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG); long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG); BigQuery bigQuery = getBigQuery(); - if (updateSchemas) { + if (updateSchemas || autoCreateTables) { return new AdaptiveBigQueryWriter(bigQuery, getSchemaManager(bigQuery), retry, - retryWait); + retryWait, + updateSchemas, + autoCreateTables); } else { return new SimpleBigQueryWriter(bigQuery, retry, retryWait); } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index bbfffb8e8..5df209e66 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -37,16 +37,18 @@ import java.util.Map; /** - * A {@link BigQueryWriter} capable of updating BigQuery table schemas. + * A {@link BigQueryWriter} capable of updating BigQuery table schemas and creating non-existed tables automatically. */ public class AdaptiveBigQueryWriter extends BigQueryWriter { private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class); // The maximum number of retries we will attempt to write rows after updating a BQ table schema. - private static final int AFTER_UPDATE_RETY_LIMIT = 5; + private static final int AFTER_UPDATE_RETRY_LIMIT = 5; private final BigQuery bigQuery; private final SchemaManager schemaManager; + private final boolean updateSchemas; + private final boolean autoCreateTables; /** * @param bigQuery Used to send write requests to BigQuery. @@ -57,10 +59,14 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { public AdaptiveBigQueryWriter(BigQuery bigQuery, SchemaManager schemaManager, int retry, - long retryWait) { + long retryWait, + boolean updateSchemas, + boolean autoCreateTables) { super(retry, retryWait); this.bigQuery = bigQuery; this.schemaManager = schemaManager; + this.updateSchemas = updateSchemas; + this.autoCreateTables = autoCreateTables; } private boolean isTableMissingSchema(BigQueryException exception) { @@ -69,6 +75,12 @@ private boolean isTableMissingSchema(BigQueryException exception) { return exception.getReason() != null && exception.getReason().equalsIgnoreCase("invalid"); } + private boolean isTableNotExisted(BigQueryException exception) { + // If a table does not exist, it will raise a BigQueryException that the input is notFound + // Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en + return exception.getReason() != null && exception.getReason().equalsIgnoreCase("notFound"); + } + /** * Sends the request to BigQuery, then checks the response to see if any errors have occurred. If * any have, and all errors can be blamed upon invalid columns in the rows sent, attempts to @@ -89,11 +101,13 @@ public Map> performWriteRequest( // Should only perform one schema update attempt; may have to continue insert attempts due to // BigQuery schema updates taking up to two minutes to take effect if (writeResponse.hasErrors() - && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { + && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSchemas) { attemptSchemaUpdate(tableId, topic); } } catch (BigQueryException exception) { - if (isTableMissingSchema(exception)) { + if (isTableNotExisted(exception) && autoCreateTables) { + attemptTableCreate(tableId, topic); + } else if (isTableMissingSchema(exception) && updateSchemas) { attemptSchemaUpdate(tableId, topic); } else { throw exception; @@ -117,10 +131,10 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors())) { return writeResponse.getInsertErrors(); } attemptCount++; - if (attemptCount >= AFTER_UPDATE_RETY_LIMIT) { + if (attemptCount >= AFTER_UPDATE_RETRY_LIMIT) { throw new BigQueryConnectException( "Failed to write rows after BQ schema update within " - + AFTER_UPDATE_RETY_LIMIT + " attempts for: " + tableId.getBaseTableId()); + + AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId()); } } logger.debug("table insertion completed successfully"); @@ -136,6 +150,16 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) { } } + private void attemptTableCreate(PartitionedTableId tableId, String topic) { + try { + schemaManager.createTable(tableId.getBaseTableId(), topic); + logger.info("Table {} does not exist, auto-created table for topic {}", tableId.getBaseTableName(), topic); + } catch (BigQueryException exception) { + throw new BigQueryConnectException( + "Failed to create table " + tableId.getBaseTableName(), exception); + } + } + /* * Currently, the only way to determine the cause of an insert all failure is by examining the map * object returned by the insertErrors() method of an insert all response. The only way to From d855548b4fc9fc971321e7527cf6958a11d3ef48 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 14 Feb 2020 18:07:19 -0800 Subject: [PATCH 02/13] Add unit tests. --- build.gradle | 2 + .../write/row/BigQueryWriterTest.java | 71 +++++++++++++++++++ 2 files changed, 73 insertions(+) diff --git a/build.gradle b/build.gradle index c1f81ad99..9ede2daaf 100644 --- a/build.gradle +++ b/build.gradle @@ -217,6 +217,8 @@ project(':kcbq-connector') { "junit:junit:$junitVersion", "org.mockito:mockito-core:$mockitoVersion" ) + + testImplementation 'org.mockito:mockito-inline:2.13.0' } artifacts { diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java index d33287640..3e45e6d16 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java @@ -27,11 +27,13 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.storage.Storage; import com.wepay.kafka.connect.bigquery.BigQuerySinkTask; +import com.wepay.kafka.connect.bigquery.SchemaManager; import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; @@ -95,6 +97,75 @@ public void testBigQueryNoFailure() { verify(bigQuery, times(1)).insertAll(anyObject()); } + @Test + public void testAutoCreateTables() { + final String topic = "test_topic"; + final String dataset = "scratch"; + final Map properties = makeProperties("3", "2000", topic, dataset); + properties.put(BigQuerySinkTaskConfig.TABLE_CREATE_CONFIG, "true"); + + BigQuery bigQuery = mock(BigQuery.class); + Map> emptyMap = mock(Map.class); + when(emptyMap.isEmpty()).thenReturn(true); + + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + when(insertAllResponse.hasErrors()).thenReturn(false); + when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap); + + BigQueryException missTableException = mock(BigQueryException.class); + when(missTableException.getReason()).thenReturn("notFound"); + + when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + + Storage storage = mock(Storage.class); + SchemaManager schemaManager = mock(SchemaManager.class); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + testTask.put( + Collections.singletonList(spoofSinkRecord(topic, 0, 0, "some_field", "some_value"))); + testTask.flush(Collections.emptyMap()); + + verify(schemaManager, times(1)).createTable(anyObject(), anyObject()); + verify(bigQuery, times(2)).insertAll(anyObject()); + } + + @Test + public void testNonAutoCreateTables() { + final String topic = "test_topic"; + final String dataset = "scratch"; + final Map properties = makeProperties("3", "2000", topic, dataset); + + BigQuery bigQuery = mock(BigQuery.class); + + Map> emptyMap = mock(Map.class); + when(emptyMap.isEmpty()).thenReturn(true); + InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); + when(insertAllResponse.hasErrors()).thenReturn(false); + when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap); + + BigQueryException missTableException = mock(BigQueryException.class); + when(missTableException.getReason()).thenReturn("notFound"); + + when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse); + + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + + Storage storage = mock(Storage.class); + SchemaManager schemaManager = mock(SchemaManager.class); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager); + testTask.initialize(sinkTaskContext); + testTask.start(properties); + testTask.put( + Collections.singletonList(spoofSinkRecord(topic, 0, 0, "some_field", "some_value"))); + testTask.flush(Collections.emptyMap()); + + verify(schemaManager, times(0)).createTable(anyObject(), anyObject()); + verify(bigQuery, times(2)).insertAll(anyObject()); + } + @Test public void testBigQueryPartialFailure() { final String topic = "test_topic"; From b7c851dfbfc3bf3ddb3ac71e2586cfc84537f957 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 14 Feb 2020 18:12:22 -0800 Subject: [PATCH 03/13] Remove deprecated unit tests. --- .../bigquery/BigQuerySinkTaskTest.java | 78 ------------------- 1 file changed, 78 deletions(-) diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 760794c68..939310443 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -125,84 +125,6 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() { any(String.class), any(Schema.class)); } - @Test - public void testAutoCreateTables() { - final String dataset = "scratch"; - final String existingTableTopic = "topic-with-existing-table"; - final String nonExistingTableTopic = "topic-without-existing-table"; - final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); - final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); - - Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConfig.TABLE_CREATE_CONFIG, "true"); - properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, BigQuerySinkConnectorTest.MockSchemaRetriever.class.getName()); - properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); - properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); - properties.put(BigQuerySinkConfig.TOPICS_CONFIG, existingTableTopic); - - BigQuery bigQuery = mock(BigQuery.class); - Table fakeTable = mock(Table.class); - when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); - when(bigQuery.getTable(nonExistingTable)).thenReturn(null); - InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); - when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); - when(insertAllResponse.hasErrors()).thenReturn(false); - - Storage storage = mock(Storage.class); - SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); - SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - SchemaManager schemaManager = mock(SchemaManager.class); - - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); - testTask.initialize(sinkTaskContext); - testTask.start(properties); - - testTask.put(Collections.singletonList(spoofSinkRecord(nonExistingTableTopic))); - testTask.flush(Collections.emptyMap()); - - verify(schemaManager, never()).createTable(existingTable, existingTableTopic); - verify(schemaManager).createTable(nonExistingTable, nonExistingTableTopic); - } - - @Test - public void testNonAutoCreateTables() { - final String dataset = "scratch"; - final String existingTableTopic = "topic-with-existing-table"; - final String nonExistingTableTopic = "topic-without-existing-table"; - final TableId existingTable = TableId.of(dataset, "topic_with_existing_table"); - final TableId nonExistingTable = TableId.of(dataset, "topic_without_existing_table"); - - Map properties = propertiesFactory.getProperties(); - properties.put(BigQuerySinkConfig.SCHEMA_RETRIEVER_CONFIG, BigQuerySinkConnectorTest.MockSchemaRetriever.class.getName()); - properties.put(BigQuerySinkConfig.SANITIZE_TOPICS_CONFIG, "true"); - properties.put(BigQuerySinkConfig.DATASETS_CONFIG, String.format(".*=%s", dataset)); - properties.put(BigQuerySinkConfig.TOPICS_CONFIG, existingTableTopic); - - BigQuery bigQuery = mock(BigQuery.class); - Table fakeTable = mock(Table.class); - when(bigQuery.getTable(existingTable)).thenReturn(fakeTable); - when(bigQuery.getTable(nonExistingTable)).thenReturn(null); - InsertAllResponse insertAllResponse = mock(InsertAllResponse.class); - when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); - when(insertAllResponse.hasErrors()).thenReturn(false); - - Storage storage = mock(Storage.class); - SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); - SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - SchemaManager schemaManager = mock(SchemaManager.class); - - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); - testTask.initialize(sinkTaskContext); - testTask.start(properties); - - testTask.put(Collections.singletonList(spoofSinkRecord(nonExistingTableTopic))); - testTask.flush(Collections.emptyMap()); - - verify(schemaManager, never()).createTable(existingTable, existingTableTopic); - verify(schemaManager, never()).createTable(nonExistingTable, existingTableTopic); - } - - @Test public void testEmptyPut() { Map properties = propertiesFactory.getProperties(); From ed3c9b22363771d0772e8159213b51e58fbe10cc Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Fri, 14 Feb 2020 18:22:59 -0800 Subject: [PATCH 04/13] Remove unused import. --- .../com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 939310443..57c39d659 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -27,7 +27,6 @@ import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; -import static org.mockito.Mockito.never; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryError; @@ -35,7 +34,6 @@ import com.google.cloud.bigquery.InsertAllRequest; import com.google.cloud.bigquery.InsertAllResponse; import com.google.cloud.bigquery.TableId; -import com.google.cloud.bigquery.Table; import com.google.cloud.storage.Storage; import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; From c45aca08ac1753f2647e04ae8263f56ffb5887a1 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 18 Feb 2020 10:35:47 -0800 Subject: [PATCH 05/13] Try to fix unit tests in CI builds. --- build.gradle | 2 ++ 1 file changed, 2 insertions(+) diff --git a/build.gradle b/build.gradle index 9ede2daaf..70210e9c4 100644 --- a/build.gradle +++ b/build.gradle @@ -338,6 +338,8 @@ project('kcbq-confluent') { "junit:junit:$junitVersion", "org.mockito:mockito-core:$mockitoVersion" ) + + testImplementation 'org.mockito:mockito-inline:2.13.0' } artifacts { From a2538ed6cd6a28100d26d7129afbb1a1fef79d83 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 18 Feb 2020 11:07:53 -0800 Subject: [PATCH 06/13] Try to fix CI build failure. --- build.gradle | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/build.gradle b/build.gradle index 70210e9c4..1a824d4e9 100644 --- a/build.gradle +++ b/build.gradle @@ -22,7 +22,7 @@ project.ext { ioConfluentVersion = '5.3.1' junitVersion = '4.12' kafkaVersion = '2.3.0' - mockitoVersion = '1.10.19' + mockitoVersion = '3.2.4' slf4jVersion = '1.6.1' } @@ -215,10 +215,9 @@ project(':kcbq-connector') { testCompile ( "junit:junit:$junitVersion", - "org.mockito:mockito-core:$mockitoVersion" + "org.mockito:mockito-core:$mockitoVersion", + "org.mockito:mockito-inline:$mockitoVersion" ) - - testImplementation 'org.mockito:mockito-inline:2.13.0' } artifacts { @@ -336,10 +335,9 @@ project('kcbq-confluent') { testCompile ( "junit:junit:$junitVersion", - "org.mockito:mockito-core:$mockitoVersion" + "org.mockito:mockito-core:$mockitoVersion", + "org.mockito:mockito-inline:$mockitoVersion" ) - - testImplementation 'org.mockito:mockito-inline:2.13.0' } artifacts { From 8709d0411756908da3d1c890d15d530a18f656c4 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Thu, 20 Feb 2020 16:56:01 -0800 Subject: [PATCH 07/13] Add sleep block in performWriteRequest to wait for table creation to take effect. --- .../bigquery/write/row/AdaptiveBigQueryWriter.java | 10 +++++++++- kcbq-connector/test/docker/connect/connect-docker.sh | 2 +- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 5df209e66..a16f0bb89 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -44,6 +44,8 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { // The maximum number of retries we will attempt to write rows after updating a BQ table schema. private static final int AFTER_UPDATE_RETRY_LIMIT = 5; + // Wait for about 30s between each retry since both creating table and updating schema take up to 2 minutes to take effect. + private static final int RETRY_WAIT_TIME = 30000; private final BigQuery bigQuery; private final SchemaManager schemaManager; @@ -105,7 +107,8 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch attemptSchemaUpdate(tableId, topic); } } catch (BigQueryException exception) { - if (isTableNotExisted(exception) && autoCreateTables) { + // Should only perform one table creation attempt. + if (isTableNotExisted(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { attemptTableCreate(tableId, topic); } else if (isTableMissingSchema(exception) && updateSchemas) { attemptSchemaUpdate(tableId, topic); @@ -136,6 +139,11 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch "Failed to write rows after BQ schema update within " + AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId()); } + try { + Thread.sleep(RETRY_WAIT_TIME); + } catch (InterruptedException e) { + // no-op, we want to keep retrying the insert + } } logger.debug("table insertion completed successfully"); return new HashMap<>(); diff --git a/kcbq-connector/test/docker/connect/connect-docker.sh b/kcbq-connector/test/docker/connect/connect-docker.sh index 862baee65..377e26736 100755 --- a/kcbq-connector/test/docker/connect/connect-docker.sh +++ b/kcbq-connector/test/docker/connect/connect-docker.sh @@ -20,5 +20,5 @@ connect-standalone \ /etc/kafka-connect-bigquery/standalone.properties \ /etc/kafka-connect-bigquery/connector.properties & -sleep 60 +sleep 180 kill $! From 0b2ffece84de7ffd12337ed466b914057040380a Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 25 Feb 2020 16:35:03 -0800 Subject: [PATCH 08/13] Auto-create tables for GCS Load mode. --- .../connect/bigquery/BigQuerySinkTask.java | 9 +- .../write/batch/GCSBatchTableWriter.java | 12 ++- .../write/row/AdaptiveBigQueryWriter.java | 11 +-- .../bigquery/write/row/GCSToBQWriter.java | 31 +++++-- .../bigquery/BigQuerySinkTaskTest.java | 82 +++++++++++++++---- .../write/row/BigQueryWriterTest.java | 22 +++-- .../bigquery/write/row/GCSToBQWriterTest.java | 17 +++- 7 files changed, 143 insertions(+), 41 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 9478d587d..926f9f443 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -201,7 +201,8 @@ public void put(Collection records) { if (!tableWriterBuilders.containsKey(table)) { TableWriterBuilder tableWriterBuilder; if (config.getList(config.ENABLE_BATCH_CONFIG).contains(record.topic())) { - String gcsBlobName = record.topic() + "_" + uuid + "_" + Instant.now().toEpochMilli(); + String topic = record.topic(); + String gcsBlobName = topic + "_" + uuid + "_" + Instant.now().toEpochMilli(); String gcsFolderName = config.getString(config.GCS_FOLDER_NAME_CONFIG); if (gcsFolderName != null && !"".equals(gcsFolderName)) { gcsBlobName = gcsFolderName + "/" + gcsBlobName; @@ -211,6 +212,7 @@ public void put(Collection records) { table.getBaseTableId(), config.getString(config.GCS_BUCKET_NAME_CONFIG), gcsBlobName, + topic, recordConverter); } else { tableWriterBuilder = @@ -299,10 +301,13 @@ private GCSToBQWriter getGcsWriter() { BigQuery bigQuery = getBigQuery(); int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG); long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG); + boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); return new GCSToBQWriter(getGcs(), bigQuery, + getSchemaManager(bigQuery), retry, - retryWait); + retryWait, + autoCreateTables); } @Override diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java index 90d92a21b..6cf6536a5 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java @@ -45,6 +45,7 @@ public class GCSBatchTableWriter implements Runnable { private final String bucketName; private final String blobName; + private final String topic; private final List rows; private final GCSToBQWriter writer; @@ -61,10 +62,12 @@ private GCSBatchTableWriter(List rows, GCSToBQWriter writer, TableId tableId, String bucketName, - String baseBlobName) { + String baseBlobName, + String topic) { this.tableId = tableId; this.bucketName = bucketName; this.blobName = baseBlobName; + this.topic = topic; this.rows = rows; this.writer = writer; @@ -73,7 +76,7 @@ private GCSBatchTableWriter(List rows, @Override public void run() { try { - writer.writeRows(rows, tableId, bucketName, blobName); + writer.writeRows(rows, tableId, bucketName, blobName, topic); } catch (ConnectException ex) { throw new ConnectException("Failed to write rows to GCS", ex); } catch (InterruptedException ex) { @@ -87,6 +90,7 @@ public void run() { public static class Builder implements TableWriterBuilder { private final String bucketName; private String blobName; + private String topic; private final TableId tableId; @@ -107,10 +111,12 @@ public Builder(GCSToBQWriter writer, TableId tableId, String gcsBucketName, String gcsBlobName, + String topic, RecordConverter> recordConverter) { this.bucketName = gcsBucketName; this.blobName = gcsBlobName; + this.topic = topic; this.tableId = tableId; @@ -133,7 +139,7 @@ public void addRow(RowToInsert rowToInsert) { } public GCSBatchTableWriter build() { - return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName); + return new GCSBatchTableWriter(rows, writer, tableId, bucketName, blobName, topic); } } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index a16f0bb89..1deeaa168 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.BigQueryError; import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.InsertAllRequest; @@ -109,7 +110,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch } catch (BigQueryException exception) { // Should only perform one table creation attempt. if (isTableNotExisted(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { - attemptTableCreate(tableId, topic); + attemptTableCreate(tableId.getBaseTableId(), topic); } else if (isTableMissingSchema(exception) && updateSchemas) { attemptSchemaUpdate(tableId, topic); } else { @@ -158,13 +159,13 @@ private void attemptSchemaUpdate(PartitionedTableId tableId, String topic) { } } - private void attemptTableCreate(PartitionedTableId tableId, String topic) { + private void attemptTableCreate(TableId tableId, String topic) { try { - schemaManager.createTable(tableId.getBaseTableId(), topic); - logger.info("Table {} does not exist, auto-created table for topic {}", tableId.getBaseTableName(), topic); + schemaManager.createTable(tableId, topic); + logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic); } catch (BigQueryException exception) { throw new BigQueryConnectException( - "Failed to create table " + tableId.getBaseTableName(), exception); + "Failed to create table " + tableId, exception); } } diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriter.java index 1829460fd..d7313919d 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriter.java @@ -19,6 +19,7 @@ import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; import com.google.cloud.bigquery.TableId; import com.google.cloud.storage.Blob; @@ -28,6 +29,8 @@ import com.google.cloud.storage.StorageException; import com.google.gson.Gson; +import com.wepay.kafka.connect.bigquery.SchemaManager; +import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; import com.wepay.kafka.connect.bigquery.exception.GCSConnectException; import org.apache.kafka.connect.errors.ConnectException; @@ -53,12 +56,16 @@ public class GCSToBQWriter { private final BigQuery bigQuery; + private final SchemaManager schemaManager; + private static final int WAIT_MAX_JITTER = 1000; private static final Random random = new Random(); private int retries; private long retryWaitMs; + private boolean autoCreateTables; + public static final String GCS_METADATA_TABLE_KEY = "sinkTable"; @@ -71,13 +78,17 @@ public class GCSToBQWriter { */ public GCSToBQWriter(Storage storage, BigQuery bigQuery, + SchemaManager schemaManager, int retries, - long retryWaitMs) { + long retryWaitMs, + boolean autoCreateTables) { this.storage = storage; this.bigQuery = bigQuery; + this.schemaManager = schemaManager; this.retries = retries; this.retryWaitMs = retryWaitMs; + this.autoCreateTables = autoCreateTables; } /** @@ -92,7 +103,8 @@ public GCSToBQWriter(Storage storage, public void writeRows(List rows, TableId tableId, String bucketName, - String blobName) throws InterruptedException { + String blobName, + String topic) throws InterruptedException { // Get Source URI BlobId blobId = BlobId.of(bucketName, blobName); @@ -103,9 +115,8 @@ public void writeRows(List rows, // Check if the table specified exists // This error shouldn't be thrown. All tables should be created by the connector at startup - if (bigQuery.getTable(tableId) == null) { - throw new ConnectException( - String.format("Table with TableId %s does not exist.", tableId.getTable())); + if (autoCreateTables && bigQuery.getTable(tableId) == null) { + attemptTableCreate(tableId, topic); } int attemptCount = 0; @@ -184,4 +195,14 @@ private String toJson(List rows) { private void waitRandomTime() throws InterruptedException { Thread.sleep(retryWaitMs + random.nextInt(WAIT_MAX_JITTER)); } + + private void attemptTableCreate(TableId tableId, String topic) { + try { + schemaManager.createTable(tableId, topic); + logger.info("Table {} does not exist, auto-created table for topic {}", tableId, topic); + } catch (BigQueryException exception) { + throw new BigQueryConnectException( + "Failed to create table " + tableId, exception); + } + } } diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java index 57c39d659..a7ce9fe1b 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTaskTest.java @@ -85,7 +85,10 @@ public void testSimplePut() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -111,8 +114,9 @@ public void testSimplePutWhenSchemaRetrieverIsNotNull() { when(insertAllResponse.hasErrors()).thenReturn(false); SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -129,7 +133,10 @@ public void testEmptyPut() { BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.start(properties); testTask.put(Collections.emptyList()); @@ -148,7 +155,10 @@ public void testEmptyRecordPut() { BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.start(properties); SinkRecord emptyRecord = spoofSinkRecord(topic, simpleSchema, null); @@ -175,7 +185,10 @@ public void testPutWhenPartitioningOnMessageTime() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -205,8 +218,11 @@ public void testPutWhenPartitioningIsSetToTrue() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -235,8 +251,11 @@ public void testPutWhenPartitioningIsSetToFalse() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -267,7 +286,10 @@ public void testPutWhenPartitioningOnMessageTimeWhenNoTimestampType() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -292,8 +314,11 @@ public void testBufferClearOnFlushError() { when(bigQuery.insertAll(any(InsertAllRequest.class))) .thenThrow(new RuntimeException("This is a test")); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -313,8 +338,11 @@ public void testEmptyFlush() { BigQuery bigQuery = mock(BigQuery.class); Storage storage = mock(Storage.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -345,7 +373,10 @@ public void testBigQuery5XXRetry() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -379,7 +410,10 @@ public void testBigQuery403Retry() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -410,7 +444,10 @@ public void testBigQueryRetryExceeded() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); @@ -434,8 +471,11 @@ public void testInterruptedException() { when(fakeResponse.getInsertErrors()).thenReturn(Collections.emptyMap()); when(bigQuery.insertAll(any(InsertAllRequest.class))).thenReturn(fakeResponse); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); @@ -453,8 +493,11 @@ public void testConfigException() { Map badProperties = propertiesFactory.getProperties(); badProperties.remove(BigQuerySinkConfig.TOPICS_CONFIG); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + BigQuerySinkTask testTask = - new BigQuerySinkTask(mock(BigQuery.class), null, mock(Storage.class), null); + new BigQuerySinkTask(mock(BigQuery.class), schemaRetriever, mock(Storage.class), schemaManager); testTask.start(badProperties); } @@ -481,8 +524,11 @@ public void testStop() { when(bigQuery.insertAll(anyObject())).thenReturn(insertAllResponse); when(insertAllResponse.hasErrors()).thenReturn(false); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(Collections.singletonList(spoofSinkRecord(topic))); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java index 3e45e6d16..0a1ab98cf 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java @@ -35,6 +35,7 @@ import com.wepay.kafka.connect.bigquery.BigQuerySinkTask; import com.wepay.kafka.connect.bigquery.SchemaManager; import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory; +import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.exception.BigQueryConnectException; @@ -86,8 +87,11 @@ public void testBigQueryNoFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -120,8 +124,9 @@ public void testAutoCreateTables() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); Storage storage = mock(Storage.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); SchemaManager schemaManager = mock(SchemaManager.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -154,8 +159,9 @@ public void testNonAutoCreateTables() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); Storage storage = mock(Storage.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); SchemaManager schemaManager = mock(SchemaManager.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, schemaManager); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -203,8 +209,11 @@ public void testBigQueryPartialFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(sinkRecordList); @@ -252,8 +261,11 @@ public void testBigQueryCompleteFailure() { SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + Storage storage = mock(Storage.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put(sinkRecordList); diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java index a381f860f..34bff0f8a 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/GCSToBQWriterTest.java @@ -25,7 +25,9 @@ import com.google.cloud.storage.Storage; import com.google.cloud.storage.StorageException; import com.wepay.kafka.connect.bigquery.BigQuerySinkTask; +import com.wepay.kafka.connect.bigquery.SchemaManager; import com.wepay.kafka.connect.bigquery.SinkTaskPropertiesFactory; +import com.wepay.kafka.connect.bigquery.api.SchemaRetriever; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import org.apache.kafka.connect.data.Schema; @@ -68,7 +70,10 @@ public void testGCSNoFailure(){ Storage storage = mock(Storage.class); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -90,11 +95,14 @@ public void testGCSSomeFailures(){ Storage storage = mock(Storage.class); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + when(storage.create((BlobInfo)anyObject(), (byte[])anyObject())) .thenThrow(new StorageException(500, "internal server error")) // throw first time .thenReturn(null); // return second time. (we don't care about the result.) - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( @@ -116,10 +124,13 @@ public void testGCSAllFailures(){ Storage storage = mock(Storage.class); SinkTaskContext sinkTaskContext = mock(SinkTaskContext.class); + SchemaRetriever schemaRetriever = mock(SchemaRetriever.class); + SchemaManager schemaManager = mock(SchemaManager.class); + when(storage.create((BlobInfo)anyObject(), (byte[])anyObject())) .thenThrow(new StorageException(500, "internal server error")); - BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, null, storage, null); + BigQuerySinkTask testTask = new BigQuerySinkTask(bigQuery, schemaRetriever, storage, schemaManager); testTask.initialize(sinkTaskContext); testTask.start(properties); testTask.put( From c612f6944bc042d6ecd3ad2682f20ccc2792d841 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 25 Feb 2020 16:53:07 -0800 Subject: [PATCH 09/13] Update function name and comments. --- .../connect/bigquery/write/row/AdaptiveBigQueryWriter.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 1deeaa168..bfb003669 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -78,7 +78,7 @@ private boolean isTableMissingSchema(BigQueryException exception) { return exception.getReason() != null && exception.getReason().equalsIgnoreCase("invalid"); } - private boolean isTableNotExisted(BigQueryException exception) { + private boolean isTableNotExistedException(BigQueryException exception) { // If a table does not exist, it will raise a BigQueryException that the input is notFound // Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en return exception.getReason() != null && exception.getReason().equalsIgnoreCase("notFound"); @@ -109,7 +109,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch } } catch (BigQueryException exception) { // Should only perform one table creation attempt. - if (isTableNotExisted(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { + if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { attemptTableCreate(tableId.getBaseTableId(), topic); } else if (isTableMissingSchema(exception) && updateSchemas) { attemptSchemaUpdate(tableId, topic); @@ -118,7 +118,7 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSch } } - // Schema update might be delayed, so multiple insertion attempts may be necessary + // Schema update or table creation might be delayed, so multiple insertion attempts may be necessary int attemptCount = 0; while (writeResponse == null || writeResponse.hasErrors()) { logger.trace("insertion failed"); From 5a8ce72e847be1aa48f02c1dafffb86b0aa685c6 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 25 Feb 2020 17:28:20 -0800 Subject: [PATCH 10/13] Change updateSchemas to be autoUpdateSchemas. --- .../wepay/kafka/connect/bigquery/BigQuerySinkTask.java | 6 +++--- .../bigquery/write/row/AdaptiveBigQueryWriter.java | 10 +++++----- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 926f9f443..0c12e4e59 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -269,17 +269,17 @@ private SchemaManager getSchemaManager(BigQuery bigQuery) { } private BigQueryWriter getBigQueryWriter() { - boolean updateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG); + boolean autoUpdateSchemas = config.getBoolean(config.SCHEMA_UPDATE_CONFIG); boolean autoCreateTables = config.getBoolean(config.TABLE_CREATE_CONFIG); int retry = config.getInt(config.BIGQUERY_RETRY_CONFIG); long retryWait = config.getLong(config.BIGQUERY_RETRY_WAIT_CONFIG); BigQuery bigQuery = getBigQuery(); - if (updateSchemas || autoCreateTables) { + if (autoUpdateSchemas || autoCreateTables) { return new AdaptiveBigQueryWriter(bigQuery, getSchemaManager(bigQuery), retry, retryWait, - updateSchemas, + autoUpdateSchemas, autoCreateTables); } else { return new SimpleBigQueryWriter(bigQuery, retry, retryWait); diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index bfb003669..0808ae231 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -50,7 +50,7 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { private final BigQuery bigQuery; private final SchemaManager schemaManager; - private final boolean updateSchemas; + private final boolean autoUpdateSchemas; private final boolean autoCreateTables; /** @@ -63,12 +63,12 @@ public AdaptiveBigQueryWriter(BigQuery bigQuery, SchemaManager schemaManager, int retry, long retryWait, - boolean updateSchemas, + boolean autoUpdateSchemas, boolean autoCreateTables) { super(retry, retryWait); this.bigQuery = bigQuery; this.schemaManager = schemaManager; - this.updateSchemas = updateSchemas; + this.autoUpdateSchemas = autoUpdateSchemas; this.autoCreateTables = autoCreateTables; } @@ -104,14 +104,14 @@ public Map> performWriteRequest( // Should only perform one schema update attempt; may have to continue insert attempts due to // BigQuery schema updates taking up to two minutes to take effect if (writeResponse.hasErrors() - && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && updateSchemas) { + && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) { attemptSchemaUpdate(tableId, topic); } } catch (BigQueryException exception) { // Should only perform one table creation attempt. if (isTableNotExistedException(exception) && autoCreateTables && bigQuery.getTable(tableId.getBaseTableId()) == null) { attemptTableCreate(tableId.getBaseTableId(), topic); - } else if (isTableMissingSchema(exception) && updateSchemas) { + } else if (isTableMissingSchema(exception) && autoUpdateSchemas) { attemptSchemaUpdate(tableId, topic); } else { throw exception; From 88f8c633165812f8ae70e8efcf0a71ac1a810535 Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Tue, 25 Feb 2020 18:11:52 -0800 Subject: [PATCH 11/13] Use getCode instead of getReason --- .../connect/bigquery/write/row/AdaptiveBigQueryWriter.java | 2 +- .../kafka/connect/bigquery/write/row/BigQueryWriterTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 0808ae231..206400574 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -81,7 +81,7 @@ private boolean isTableMissingSchema(BigQueryException exception) { private boolean isTableNotExistedException(BigQueryException exception) { // If a table does not exist, it will raise a BigQueryException that the input is notFound // Referring to Google Cloud Error Codes Doc: https://cloud.google.com/bigquery/docs/error-messages?hl=en - return exception.getReason() != null && exception.getReason().equalsIgnoreCase("notFound"); + return exception.getCode() == 404; } /** diff --git a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java index 0a1ab98cf..7c0e871e5 100644 --- a/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java +++ b/kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/write/row/BigQueryWriterTest.java @@ -117,7 +117,7 @@ public void testAutoCreateTables() { when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap); BigQueryException missTableException = mock(BigQueryException.class); - when(missTableException.getReason()).thenReturn("notFound"); + when(missTableException.getCode()).thenReturn(404); when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse); @@ -152,7 +152,7 @@ public void testNonAutoCreateTables() { when(insertAllResponse.getInsertErrors()).thenReturn(emptyMap); BigQueryException missTableException = mock(BigQueryException.class); - when(missTableException.getReason()).thenReturn("notFound"); + when(missTableException.getCode()).thenReturn(404); when(bigQuery.insertAll(anyObject())).thenThrow(missTableException).thenReturn(insertAllResponse); From 97983333dc0164e4e6889147b485713b2598110c Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Wed, 26 Feb 2020 14:30:40 -0800 Subject: [PATCH 12/13] Clean up comments and add missed doc. --- .../write/batch/GCSBatchTableWriter.java | 3 ++- .../write/row/AdaptiveBigQueryWriter.java | 16 ++++++++-------- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java index 6cf6536a5..48a9512a6 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/batch/GCSBatchTableWriter.java @@ -32,7 +32,6 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; -import java.util.concurrent.atomic.AtomicInteger; /** * Batch Table Writer that uploads records to GCS as a blob @@ -57,6 +56,7 @@ public class GCSBatchTableWriter implements Runnable { * @param bucketName the name of the GCS bucket where the blob should be uploaded * @param baseBlobName the base name of the blob in which the serialized rows should be uploaded. * The full name is [baseBlobName]_[writerId]_ + * @param topic Kafka record topic */ private GCSBatchTableWriter(List rows, GCSToBQWriter writer, @@ -105,6 +105,7 @@ public static class Builder implements TableWriterBuilder { * @param tableId The bigquery table to be written to. * @param gcsBucketName The GCS bucket to write to. * @param gcsBlobName The name of the GCS blob to write. + * @param topic Kafka record topic * @param recordConverter the {@link RecordConverter} to use. */ public Builder(GCSToBQWriter writer, diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java index 206400574..8c8b6b696 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/write/row/AdaptiveBigQueryWriter.java @@ -43,9 +43,9 @@ public class AdaptiveBigQueryWriter extends BigQueryWriter { private static final Logger logger = LoggerFactory.getLogger(AdaptiveBigQueryWriter.class); - // The maximum number of retries we will attempt to write rows after updating a BQ table schema. - private static final int AFTER_UPDATE_RETRY_LIMIT = 5; - // Wait for about 30s between each retry since both creating table and updating schema take up to 2 minutes to take effect. + // The maximum number of retries we will attempt to write rows after creating a table or updating a BQ table schema. + private static final int RETRY_LIMIT = 5; + // Wait for about 30s between each retry since both creating table and updating schema take up to 2~3 minutes to take effect. private static final int RETRY_WAIT_TIME = 30000; private final BigQuery bigQuery; @@ -101,8 +101,7 @@ public Map> performWriteRequest( try { request = createInsertAllRequest(tableId, rows); writeResponse = bigQuery.insertAll(request); - // Should only perform one schema update attempt; may have to continue insert attempts due to - // BigQuery schema updates taking up to two minutes to take effect + // Should only perform one schema update attempt. if (writeResponse.hasErrors() && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdateSchemas) { attemptSchemaUpdate(tableId, topic); @@ -118,7 +117,8 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat } } - // Schema update or table creation might be delayed, so multiple insertion attempts may be necessary + // Creating tables or updating table schemas in BigQuery takes up to 2~3 minutes to take affect, + // so multiple insertion attempts may be necessary. int attemptCount = 0; while (writeResponse == null || writeResponse.hasErrors()) { logger.trace("insertion failed"); @@ -135,10 +135,10 @@ && onlyContainsInvalidSchemaErrors(writeResponse.getInsertErrors()) && autoUpdat return writeResponse.getInsertErrors(); } attemptCount++; - if (attemptCount >= AFTER_UPDATE_RETRY_LIMIT) { + if (attemptCount >= RETRY_LIMIT) { throw new BigQueryConnectException( "Failed to write rows after BQ schema update within " - + AFTER_UPDATE_RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId()); + + RETRY_LIMIT + " attempts for: " + tableId.getBaseTableId()); } try { Thread.sleep(RETRY_WAIT_TIME); From ca32148235dff53c05c6eb6f27c8d2037ee203fa Mon Sep 17 00:00:00 2001 From: Bingqin Zhou Date: Wed, 26 Feb 2020 15:37:47 -0800 Subject: [PATCH 13/13] Add comments about sleeping time in integration test docker container. --- kcbq-connector/test/docker/connect/connect-docker.sh | 2 ++ 1 file changed, 2 insertions(+) diff --git a/kcbq-connector/test/docker/connect/connect-docker.sh b/kcbq-connector/test/docker/connect/connect-docker.sh index 377e26736..65c2bd606 100755 --- a/kcbq-connector/test/docker/connect/connect-docker.sh +++ b/kcbq-connector/test/docker/connect/connect-docker.sh @@ -20,5 +20,7 @@ connect-standalone \ /etc/kafka-connect-bigquery/standalone.properties \ /etc/kafka-connect-bigquery/connector.properties & +# Time (seconds) to wait for the process for inserting rows into BigQuery to be done. +# This time can be adjusted if necessary. sleep 180 kill $!