diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTask.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTask.java index 060553a3a1..43c97f4ded 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTask.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTask.java @@ -16,21 +16,37 @@ package com.google.cloud.bigtable.hbase.replication; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_ROWS; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_MAX_CELLS_PER_MUTATION; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY; + import com.google.bigtable.repackaged.com.google.api.client.util.Preconditions; import com.google.bigtable.repackaged.com.google.api.core.InternalApi; import com.google.bigtable.repackaged.com.google.common.annotations.VisibleForTesting; import com.google.bigtable.repackaged.com.google.common.base.Objects; +import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter; import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.concurrent.Callable; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellUtil; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Connection; import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Mutation; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.RowMutations; import org.apache.hadoop.hbase.client.Table; @@ -140,6 +156,7 @@ static MutationBuilder getMutationBuilder(Cell cell) { private final Connection connection; private final String tableName; private final Map> cellsToReplicateByRow; + private MetricsExporter metricsExporter; public CloudBigtableReplicationTask( String tableName, Connection connection, Map> entriesToReplicate) @@ -149,6 +166,16 @@ public CloudBigtableReplicationTask( this.tableName = tableName; } + public CloudBigtableReplicationTask( + String tableName, + Connection connection, + Map> entriesToReplicate, + MetricsExporter metricsExporter) + throws IOException { + this(tableName, connection, entriesToReplicate); + this.metricsExporter = metricsExporter; + } + /** * Replicates the list of WAL entries into CBT. * @@ -160,6 +187,7 @@ public Boolean call() { try { Table table = connection.getTable(TableName.valueOf(tableName)); + Configuration conf = this.connection.getConfiguration(); // Collect all the cells to replicate in this call. // All mutations in a WALEdit are atomic, this atomicity must be preserved. The order of WAL @@ -183,18 +211,28 @@ public Boolean call() { // Create a rowMutations and add it to the list to be flushed to CBT. RowMutations rowMutations = buildRowMutations(cellsByRow.getKey().deepCopyToNewArray(), cellsByRow.getValue()); - rowMutationsList.add(rowMutations); - } - Object[] results = new Object[rowMutationsList.size()]; - table.batch(rowMutationsList, results); + // verify if row mutations within size and count thresholds + boolean logAndSkipIncompatibleRowMutations = + verifyRowMutationThresholds(rowMutations, conf, this.metricsExporter); + + if (!logAndSkipIncompatibleRowMutations) { + rowMutationsList.add(rowMutations); + } + } - // Make sure that there were no errors returned via results. - for (Object result : results) { - if (result != null && result instanceof Throwable) { - LOG.error("Encountered error while replicating wal entry.", (Throwable) result); - succeeded = false; - break; + // commit batch + if (!rowMutationsList.isEmpty()) { + Object[] results = new Object[rowMutationsList.size()]; + table.batch(rowMutationsList, results); + + // Make sure that there were no errors returned via results. + for (Object result : results) { + if (result != null && result instanceof Throwable) { + LOG.error("Encountered error while replicating wal entry.", (Throwable) result); + succeeded = false; + break; + } } } } catch (Throwable t) { @@ -222,6 +260,106 @@ static RowMutations buildRowMutations(byte[] rowKey, List cellList) throws return rowMutationBuffer; } + // verify shape of row mutation within defined thresholds. row mutations may contain many + // mutations and each mutation may contain many cells. the conditions that may be configured to + // be evaluated include: 1) max cells in a single mutation, 2) total mutations in row mutations, + // and 3) max size of all mutations in row mutations + @VisibleForTesting + static boolean verifyRowMutationThresholds( + RowMutations rowMutations, Configuration conf, MetricsExporter metricsExporter) { + boolean logAndSkipIncompatibleRowMutations = false; + + // verify if threshold check is enabled for large rows or max cells + if (conf.getBoolean(FILTER_LARGE_ROWS_KEY, DEFAULT_FILTER_LARGE_ROWS) + || conf.getBoolean( + FILTER_MAX_CELLS_PER_MUTATION_KEY, DEFAULT_FILTER_MAX_CELLS_PER_MUTATION)) { + + // iterate row mutations + long totalByteSize = 0L; + int maxCellCountOfMutations = 0; + for (Mutation m : rowMutations.getMutations()) { + totalByteSize += m.heapSize(); + if (maxCellCountOfMutations < m.size()) maxCellCountOfMutations = m.size(); + } + + // check large rows + int maxSize = + conf.getInt( + FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY, + DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES); + if (conf.getBoolean(FILTER_LARGE_ROWS_KEY, DEFAULT_FILTER_LARGE_ROWS) + && totalByteSize > maxSize) { + + // exceeding limit, log and skip + logAndSkipIncompatibleRowMutations = true; + incrementDroppedIncompatibleMutationsRowSizeExceeded(metricsExporter); + LOG.warn( + "Dropping mutation, row mutations length, " + + totalByteSize + + ", exceeds filter length threshold (" + + FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY + + "), " + + maxSize + + ", mutation row key: " + + Bytes.toStringBinary(rowMutations.getRow())); + } + + // check max cells or max mutations + int maxCellsOrMutations = + conf.getInt( + FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY, + DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD); + if (conf.getBoolean(FILTER_MAX_CELLS_PER_MUTATION_KEY, DEFAULT_FILTER_MAX_CELLS_PER_MUTATION) + && (rowMutations.getMutations().size() > maxCellsOrMutations + || maxCellCountOfMutations > maxCellsOrMutations)) { + + // exceeding limit, log and skip + logAndSkipIncompatibleRowMutations = true; + incrementDroppedIncompatibleMutationsMaxCellsExceeded(metricsExporter); + LOG.warn( + "Dropping mutation, row mutation size with total mutations, " + + rowMutations.getMutations().size() + + ", or max cells per mutation, " + + maxCellCountOfMutations + + ", exceeds filter size (" + + FILTER_MAX_CELLS_PER_MUTATION_KEY + + "), " + + maxCellsOrMutations + + ", mutation row key: " + + Bytes.toStringBinary(rowMutations.getRow())); + } + } + return logAndSkipIncompatibleRowMutations; + } + + private static void incrementMetric( + MetricsExporter metricsExporter, String metricName, int delta) { + if (metricsExporter == null) return; + metricsExporter.incCounters(metricName, delta); + } + + private static void incrementDroppedIncompatibleMutationsRowSizeExceeded( + MetricsExporter metricsExporter) { + incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 1); + incrementIncompatibleMutations(metricsExporter); + incrementDroppedIncompatibleMutations(metricsExporter); + } + + private static void incrementDroppedIncompatibleMutationsMaxCellsExceeded( + MetricsExporter metricsExporter) { + incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 1); + incrementIncompatibleMutations(metricsExporter); + incrementDroppedIncompatibleMutations(metricsExporter); + } + + private static void incrementIncompatibleMutations(MetricsExporter metricsExporter) { + incrementMetric(metricsExporter, INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + } + + private static void incrementDroppedIncompatibleMutations(MetricsExporter metricsExporter) { + incrementMetric(metricsExporter, DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + } + @Override public boolean equals(Object o) { if (this == o) { diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator.java index 89f59e5dd8..7fbe4f7cfd 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicator.java @@ -414,7 +414,8 @@ private Future replicateBatch( String tableName, Map> batchToReplicate) { try { CloudBigtableReplicationTask replicationTask = - new CloudBigtableReplicationTask(tableName, sharedResources.connection, batchToReplicate); + new CloudBigtableReplicationTask( + tableName, sharedResources.connection, batchToReplicate, metricsExporter); return sharedResources.executorService.submit(replicationTask); } catch (Exception ex) { if (ex instanceof InterruptedException) { diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java index 1f0559ba42..daec3f50ed 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapter.java @@ -16,7 +16,14 @@ package com.google.cloud.bigtable.hbase.replication.adapters; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_CELLS; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY; @@ -76,6 +83,12 @@ private void incrementPutsInFutureMutations() { metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1); } + private void incrementDroppedIncompatibleMutationsCellSizeExceeded() { + incrementIncompatibleMutations(); + incrementDroppedIncompatibleMutations(); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 1); + } + /** * Creates an IncompatibleMutationAdapter with HBase configuration, MetricSource, and CBT * connection. @@ -100,6 +113,9 @@ public IncompatibleMutationAdapter( metricsExporter.incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0); metricsExporter.incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0); metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 0); } private boolean isValidDelete(Cell delete) { @@ -142,6 +158,29 @@ public final List adaptIncompatibleMutations(BigtableWALEntry walEntry) { // All puts are valid. if (cell.getTypeByte() == KeyValue.Type.Put.getCode()) { + // check max cell size + if (conf.getBoolean(FILTER_LARGE_CELLS_KEY, DEFAULT_FILTER_LARGE_CELLS) + && cell.getValueLength() + > conf.getInt( + FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY, + DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES)) { + // Drop the cell as it exceeds the size to be filtered + incrementDroppedIncompatibleMutationsCellSizeExceeded(); + + LOG.warn( + "Dropping mutation, cell value length, " + + cell.getValueLength() + + ", exceeds filter length (" + + FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY + + "), " + + conf.getInt( + FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY, + DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES) + + ", cell: " + + cell); + continue; + } + // flag if put is issued for future timestamp // do not log as we might fill up disk space due condition being true from clock skew if (cell.getTimestamp() > walEntry.getWalWriteTime()) { diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java index d44a4cef31..f4b9a12d59 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/configuration/HBaseToCloudBigtableReplicationConfiguration.java @@ -54,6 +54,48 @@ private HBaseToCloudBigtableReplicationConfiguration() {} // TODO maybe it should depend on the number of processors on the VM. public static final int DEFAULT_THREAD_COUNT = 10; + /** + * Determines if row mutations that exceed value of FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY + * should be logged and dropped. + */ + public static final String FILTER_LARGE_ROWS_KEY = + "google.bigtable.replication.filter_large_rows"; + + public static final Boolean DEFAULT_FILTER_LARGE_ROWS = false; + /** + * Determines the size in bytes of the row mutations that should be logged and dropped when + * replicating to Bigtable. Default: Approximate row size accepted for batch mutation request. + */ + public static final String FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY = + "google.bigtable.replication.large_rows_threshold_bytes"; + + public static final Integer DEFAULT_FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES = 190 * 1024 * 1024; + + /** + * Determines the size in bytes of the row mutations that should be logged and dropped when + * replicating to Bigtable based on value of FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY. Default: + * Approximate max cells for batch mutation request. + */ + public static final String FILTER_MAX_CELLS_PER_MUTATION_KEY = + "google.bigtable.replication.filter_max_cells_per_mutation"; + + public static final Boolean DEFAULT_FILTER_MAX_CELLS_PER_MUTATION = false; + public static final String FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY = + "google.bigtable.replication.max_cells_per_mutation"; + public static final Integer DEFAULT_FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD = 100_000 - 1; + + /** + * Determines if cells that exceed value of FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY should be + * logged and dropped. + */ + public static final String FILTER_LARGE_CELLS_KEY = + "google.bigtable.replication.filter_large_cells"; + + public static final Boolean DEFAULT_FILTER_LARGE_CELLS = false; + public static final String FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY = + "google.bigtable.replication.large_cells_threshold_bytes"; + public static final Integer DEFAULT_FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES = 100 * 1024 * 1024; + /** * Determines the size of request to CBT. This parameter controls the number of concurrent RPCs to * Cloud Bigtable. diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java index 1b04d8a211..cca0409e54 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/main/java/com/google/cloud/bigtable/hbase/replication/metrics/HBaseToCloudBigtableReplicationMetrics.java @@ -34,6 +34,14 @@ private HBaseToCloudBigtableReplicationMetrics() {} "bigtableIncompatibleDeleteMutations"; public static final String INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY = "bigtableIncompatibleTimestampOverflowMutation"; + + public static final String DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY = + "bigtableIncompatibleMutationsRowSizeExceeded"; + public static final String DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY = + "bigtableIncompatibleMutationsMaxCellsPerMutationExceeded"; + public static final String DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY = + "bigtableIncompatibleMutationsCellSizeExceeded"; + public static final String PUTS_IN_FUTURE_METRIC_KEY = "bigtablePutsInFutureMutations"; /** diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTaskTest.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTaskTest.java index 17958434a5..8e27400b89 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTaskTest.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicationTaskTest.java @@ -16,6 +16,18 @@ package com.google.cloud.bigtable.hbase.replication; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.PUTS_IN_FUTURE_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.CF1; import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.CF2; import static com.google.cloud.bigtable.hbase.replication.utils.TestUtils.COL_QUALIFIER; @@ -37,11 +49,14 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import com.google.cloud.bigtable.hbase.replication.metrics.MetricsExporter; import java.io.IOException; import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.Connection; @@ -74,16 +89,26 @@ public class CloudBigtableReplicationTaskTest { @Mock private Table mockTable; + @Mock MetricsExporter metricsExporter; + @Captor private ArgumentCaptor> captor; @Before public void setUp() throws Exception { when(mockConnection.getTable(TABLE_NAME)).thenReturn(mockTable); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + metricsExporter.incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + metricsExporter.incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0); + metricsExporter.incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0); + metricsExporter.incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 0); + metricsExporter.incCounters(DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 0); } @After public void tearDown() throws Exception { - reset(mockConnection, mockTable); + reset(mockConnection, mockTable, metricsExporter); } @Test @@ -96,7 +121,7 @@ public void batchCallFailsPartially() throws IOException, InterruptedException { cellsToReplicate.put(new SimpleByteRange(ROW_KEY), Arrays.asList(cell)); CloudBigtableReplicationTask replicationTaskUnderTest = new CloudBigtableReplicationTask( - TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate); + TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate, metricsExporter); assertFalse(replicationTaskUnderTest.call()); @@ -122,7 +147,7 @@ public void batchCallFails() throws IOException, InterruptedException { cellsToReplicate.put(new SimpleByteRange(getRowKey(1)), Arrays.asList(cell2)); CloudBigtableReplicationTask replicationTaskUnderTest = new CloudBigtableReplicationTask( - TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate); + TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate, metricsExporter); assertFalse(replicationTaskUnderTest.call()); @@ -143,7 +168,7 @@ public void batchCallSucceeds() throws IOException, InterruptedException { CloudBigtableReplicationTask replicationTaskUnderTest = new CloudBigtableReplicationTask( - TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate); + TABLE_NAME.getNameAsString(), mockConnection, cellsToReplicate, metricsExporter); assertTrue(replicationTaskUnderTest.call()); @@ -232,4 +257,83 @@ public void testCreateRowMutationsPutAndDeleteAlternate() throws IOException { expectedRowMutations, CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, cellsToReplicate)); } + + @Test + public void testFilterLargeRowMutationExceedsThreshold() throws IOException { + Configuration conf = new Configuration(); + conf.set(FILTER_LARGE_ROWS_KEY, "true"); + conf.setInt(FILTER_LARGE_ROWS_THRESHOLD_IN_BYTES_KEY, 10_000); + + Cell put1 = getPutCellCustomSize(10, ROW_KEY, CF1, COL_QUALIFIER, TIMESTAMP); + Cell put2 = getPutCellCustomSize(10, ROW_KEY, CF1, COL_QUALIFIER_2, TIMESTAMP); + Cell put3 = getPutCellCustomSize(10, ROW_KEY, CF2, COL_QUALIFIER, TIMESTAMP); + RowMutations rowMutationsSmallerSize = + CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, Arrays.asList(put1, put2, put3)); + + put1 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER, TIMESTAMP); + put2 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER_2, TIMESTAMP); + put3 = getPutCellCustomSize(10000, ROW_KEY, CF2, COL_QUALIFIER, TIMESTAMP); + RowMutations rowMutationsLargerSize = + CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, Arrays.asList(put1, put2, put3)); + + boolean logAndSkipIncompatibleRowMutationsDoesNotExceed = + CloudBigtableReplicationTask.verifyRowMutationThresholds( + rowMutationsSmallerSize, conf, metricsExporter); + boolean logAndSkipIncompatibleRowMutationsExceeds = + CloudBigtableReplicationTask.verifyRowMutationThresholds( + rowMutationsLargerSize, conf, metricsExporter); + Assert.assertEquals(false, logAndSkipIncompatibleRowMutationsDoesNotExceed); + Assert.assertEquals(true, logAndSkipIncompatibleRowMutationsExceeds); + + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 1); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 0); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + } + + @Test + public void testFilterMaxCellsPerMutationExceedsThreshold() throws IOException { + Configuration conf = new Configuration(); + conf.set(FILTER_MAX_CELLS_PER_MUTATION_KEY, "true"); + conf.setInt(FILTER_MAX_CELLS_PER_MUTATION_THRESHOLD_KEY, 2); + + Cell put1 = getPutCellCustomSize(10, ROW_KEY, CF1, COL_QUALIFIER, TIMESTAMP); + RowMutations rowMutationsCellsLessThanMax = + CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, Arrays.asList(put1)); + + put1 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER, TIMESTAMP); + Cell put2 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER_2, TIMESTAMP); + RowMutations rowMutationsCellsEqualToMax = + CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, Arrays.asList(put1, put2)); + + put1 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER, TIMESTAMP); + put2 = getPutCellCustomSize(10000, ROW_KEY, CF1, COL_QUALIFIER_2, TIMESTAMP); + Cell put3 = getPutCellCustomSize(10, ROW_KEY, CF2, COL_QUALIFIER, TIMESTAMP); + RowMutations rowMutationsCellsGreaterThanMax = + CloudBigtableReplicationTask.buildRowMutations(ROW_KEY, Arrays.asList(put1, put2, put3)); + + boolean logAndSkipIncompatibleRowMutationsDoesNotExceed = + CloudBigtableReplicationTask.verifyRowMutationThresholds( + rowMutationsCellsLessThanMax, conf, metricsExporter); + boolean logAndSkipIncompatibleRowMutationsDoesNotExceedEqual = + CloudBigtableReplicationTask.verifyRowMutationThresholds( + rowMutationsCellsEqualToMax, conf, metricsExporter); + boolean logAndSkipIncompatibleRowMutationsExceeds = + CloudBigtableReplicationTask.verifyRowMutationThresholds( + rowMutationsCellsGreaterThanMax, conf, metricsExporter); + Assert.assertEquals(false, logAndSkipIncompatibleRowMutationsDoesNotExceed); + Assert.assertEquals(false, logAndSkipIncompatibleRowMutationsDoesNotExceedEqual); + Assert.assertEquals(true, logAndSkipIncompatibleRowMutationsExceeds); + + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_ROW_SIZE_METRIC_KEY, 0); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_MAX_CELLS_METRIC_KEY, 1); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + } + + private Cell getPutCellCustomSize(int numBytes, byte[] rowkey, byte[] cf, byte[] qf, long ts) { + byte[] b = new byte[numBytes]; + new Random().nextBytes(b); + return new KeyValue(rowkey, cf, qf, ts, KeyValue.Type.Put, b); + } } diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicatorTest.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicatorTest.java index 7c3a4ed821..14ab4d7281 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicatorTest.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/CloudBigtableReplicatorTest.java @@ -179,7 +179,7 @@ public void testReplicateDoesNotSplitInBatches() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -235,11 +235,11 @@ public void testReplicateSplitsBatchesOnRowBoundary() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1, mockMetricExporter)); verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -296,11 +296,11 @@ public void testReplicateSplitsBatchesOnTableBoundary() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1, mockMetricExporter)); verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING_2, mockConnection, expectedBatchOfWal2)); + TABLE_NAME_STRING_2, mockConnection, expectedBatchOfWal2, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -353,11 +353,11 @@ public void testReplicateFailsOnAnyFailure() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1, mockMetricExporter)); verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -410,11 +410,11 @@ public void testReplicateFailsOnAnyFutureFailure() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1, mockMetricExporter)); verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -461,11 +461,11 @@ public void testReplicateFailsToSubmitTask() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal1, mockMetricExporter)); verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal2, mockMetricExporter)); Mockito.verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); Mockito.verifyNoInteractions(mockConnection); } @@ -529,7 +529,7 @@ public void testBidirectionalReplicationAddsSpecialMutation() throws IOException verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal, mockMetricExporter)); verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); verifyNoInteractions(mockConnection); } @@ -657,7 +657,7 @@ public void testBidirectionalReplicationDropsOneReplicatesOther() throws IOExcep verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal, mockMetricExporter)); verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); verifyNoInteractions(mockConnection); } @@ -746,7 +746,7 @@ public void testBidirectionalReplicationCustomSpecialColumnQualifier() throws IO verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal, mockMetricExporter)); verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); verifyNoInteractions(mockConnection); } @@ -811,7 +811,7 @@ public void testBidirectionalReplicationMultipleMutations() throws IOException { verify(mockExecutorService) .submit( new CloudBigtableReplicationTask( - TABLE_NAME_STRING, mockConnection, expectedBatchOfWal)); + TABLE_NAME_STRING, mockConnection, expectedBatchOfWal, mockMetricExporter)); verifyNoMoreInteractions(mockMetricExporter, mockExecutorService); verifyNoInteractions(mockConnection); } diff --git a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapterTest.java b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapterTest.java index cd48b94e9a..52bc9f54e4 100644 --- a/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapterTest.java +++ b/hbase-migration-tools/bigtable-hbase-replication/bigtable-hbase-replication-core/src/test/java/com/google/cloud/bigtable/hbase/replication/adapters/IncompatibleMutationAdapterTest.java @@ -16,6 +16,10 @@ package com.google.cloud.bigtable.hbase.replication.adapters; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.DEFAULT_FILTER_LARGE_CELLS; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_KEY; +import static com.google.cloud.bigtable.hbase.replication.configuration.HBaseToCloudBigtableReplicationConfiguration.FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY; +import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY; import static com.google.cloud.bigtable.hbase.replication.metrics.HBaseToCloudBigtableReplicationMetrics.INCOMPATIBLE_MUTATION_METRIC_KEY; @@ -35,6 +39,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Random; import java.util.Set; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.Cell; @@ -70,6 +75,9 @@ static class TestIncompatibleMutationAdapter extends IncompatibleMutationAdapter */ Set incompatibleMutations = new HashSet<>(); + // Configuration + private Configuration conf; + /** * Creates an IncompatibleMutationAdapter with HBase configuration, MetricSource, and CBT Table. * @@ -83,6 +91,7 @@ static class TestIncompatibleMutationAdapter extends IncompatibleMutationAdapter public TestIncompatibleMutationAdapter( Configuration conf, MetricsExporter metricsExporter, Connection connection) { super(conf, metricsExporter, connection); + this.conf = conf; } @Override @@ -311,4 +320,127 @@ public void testFuturePutAreFlagged() { verify(metricsExporter, times(1)).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 1); verifyNoMoreInteractions(metricsExporter); } + + @Test + public void testFilterLargeCellsFlag() { + int numBytes = 150 * 1024 * 1024; + byte[] largeCellValBytes = new byte[numBytes]; + new Random().nextBytes(largeCellValBytes); + + ArrayList walEntryCells = new ArrayList<>(); + Cell incompatibleLargePut = + new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, largeCellValBytes); + Cell put = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, val); + walEntryCells.add(put); + walEntryCells.add(incompatibleLargePut); + BigtableWALEntry walEntry = + new BigtableWALEntry(System.currentTimeMillis(), walEntryCells, tableName); + + Assert.assertEquals( + conf.getBoolean(FILTER_LARGE_CELLS_KEY, DEFAULT_FILTER_LARGE_CELLS), + DEFAULT_FILTER_LARGE_CELLS); + Assert.assertEquals( + Arrays.asList(put, incompatibleLargePut), + incompatibleMutationAdapter.adaptIncompatibleMutations(walEntry)); + } + + @Test + public void testFilterLargeCellsDefaultSize() { + conf.set(FILTER_LARGE_CELLS_KEY, "true"); + + ArrayList walEntryCells = new ArrayList<>(); + + int numBytes = 100 * 1024 * 1024 - 1; + byte[] b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putSmallerSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + numBytes = 100 * 1024 * 1024; + b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putEqualSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + numBytes = 100 * 1024 * 1024 + 1; + b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putLargerSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + Cell put = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, val); + + walEntryCells.add(put); + walEntryCells.add(putSmallerSize); + walEntryCells.add(putEqualSize); + walEntryCells.add(putLargerSize); + + BigtableWALEntry walEntry = + new BigtableWALEntry(System.currentTimeMillis(), walEntryCells, tableName); + + Assert.assertEquals(conf.getBoolean(FILTER_LARGE_CELLS_KEY, DEFAULT_FILTER_LARGE_CELLS), true); + Assert.assertEquals( + Arrays.asList(put, putSmallerSize, putEqualSize), + incompatibleMutationAdapter.adaptIncompatibleMutations(walEntry)); + + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 0); + verify(metricsExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0); + + verify(metricsExporter, times(1)).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter, times(1)).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter, times(1)) + .incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 1); + verifyNoMoreInteractions(metricsExporter); + } + + @Test + public void testFilterLargeCellsCustomSize() { + conf.set(FILTER_LARGE_CELLS_KEY, "true"); + conf.setInt(FILTER_LARGE_CELLS_THRESHOLD_IN_BYTES_KEY, 10 * 1024 * 1024); + + ArrayList walEntryCells = new ArrayList<>(); + + int numBytes = 10 * 1024 * 1024 - 1; + byte[] b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putSmallerSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + numBytes = 10 * 1024 * 1024; + b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putEqualSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + numBytes = 10 * 1024 * 1024 + 1; + b = new byte[numBytes]; + new Random().nextBytes(b); + Cell putLargerSize = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, b); + + Cell put = new KeyValue(rowKey, cf, qual, 0, KeyValue.Type.Put, val); + + walEntryCells.add(put); + walEntryCells.add(putSmallerSize); + walEntryCells.add(putEqualSize); + walEntryCells.add(putLargerSize); + + BigtableWALEntry walEntry = + new BigtableWALEntry(System.currentTimeMillis(), walEntryCells, tableName); + + Assert.assertEquals(conf.getBoolean(FILTER_LARGE_CELLS_KEY, DEFAULT_FILTER_LARGE_CELLS), true); + Assert.assertEquals( + Arrays.asList(put, putSmallerSize, putEqualSize), + incompatibleMutationAdapter.adaptIncompatibleMutations(walEntry)); + + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 0); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_DELETES_METRICS_KEY, 0); + verify(metricsExporter).incCounters(INCOMPATIBLE_MUTATION_TIMESTAMP_OVERFLOW_METRIC_KEY, 0); + verify(metricsExporter).incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 0); + verify(metricsExporter).incCounters(PUTS_IN_FUTURE_METRIC_KEY, 0); + verify(metricsExporter, times(1)).incCounters(INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter, times(1)).incCounters(DROPPED_INCOMPATIBLE_MUTATION_METRIC_KEY, 1); + verify(metricsExporter, times(1)) + .incCounters(DROPPED_INCOMPATIBLE_MUTATION_CELL_SIZE_METRIC_KEY, 1); + verifyNoMoreInteractions(metricsExporter); + } }