diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java index 6564850933..65fe4cdbd2 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/it/CloudBigtableBeamIT.java @@ -17,6 +17,8 @@ import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_ADMIN_HOST_KEY; import static com.google.cloud.bigtable.hbase.BigtableOptionsFactory.BIGTABLE_HOST_KEY; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import com.google.cloud.bigtable.beam.CloudBigtableIO; import com.google.cloud.bigtable.beam.CloudBigtableScanConfiguration; @@ -26,15 +28,18 @@ import com.google.cloud.bigtable.hbase.BigtableConfiguration; import java.io.IOException; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Random; import java.util.UUID; import org.apache.beam.repackaged.core.org.apache.commons.lang3.RandomStringUtils; +import org.apache.beam.runners.dataflow.DataflowPipelineJob; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.sdk.Pipeline; import org.apache.beam.sdk.PipelineResult; import org.apache.beam.sdk.io.Read; import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Lineage; import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.options.PipelineOptionsFactory; import org.apache.beam.sdk.testing.PAssert; @@ -142,8 +147,11 @@ public void testWriteToBigtable() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); properties.applyTo(options); options.setAppName("testWriteToBigtable-" + System.currentTimeMillis()); + options.setExperiments(Collections.singletonList("disable_runner_v2")); LOG.info( String.format("Started writeToBigtable test with jobName as: %s", options.getAppName())); + System.out.println( + String.format("Started writeToBigtable test with jobName as: %s", options.getAppName())); CloudBigtableTableConfiguration.Builder configBuilder = new CloudBigtableTableConfiguration.Builder() @@ -165,16 +173,21 @@ public void testWriteToBigtable() throws IOException { keys.add(RandomStringUtils.randomAlphanumeric(10)); } - PipelineResult.State result = - Pipeline.create(options) + DataflowPipelineJob result = + (DataflowPipelineJob) Pipeline.create(options) .apply("Keys", Create.of(keys)) .apply("Create Puts", ParDo.of(WRITE_ONE_TENTH_PERCENT)) .apply("Write to BT", CloudBigtableIO.writeToTable(config)) .getPipeline() - .run() - .waitUntilFinish(); + .run(); + LOG.info( + String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId())); + System.out.println( + String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId())); + PipelineResult.State state = + result.waitUntilFinish(); - Assert.assertEquals(PipelineResult.State.DONE, result); + Assert.assertEquals(PipelineResult.State.DONE, state); try (ResultScanner scanner = connection.getTable(tableName).getScanner(new Scan().setFilter(new KeyOnlyFilter()))) { @@ -184,6 +197,9 @@ public void testWriteToBigtable() throws IOException { } Assert.assertEquals(TOTAL_ROW_COUNT, count); } + assertThat(Lineage.query(result.metrics(), Lineage.Type.SINK), + contains(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + config.getTableId()))); } @Test @@ -206,6 +222,7 @@ public void testReadFromBigtable() throws IOException { DataflowPipelineOptions options = PipelineOptionsFactory.as(DataflowPipelineOptions.class); properties.applyTo(options); options.setJobName("testReadFromBigtable-" + System.currentTimeMillis()); + options.setExperiments(Collections.singletonList("disable_runner_v2")); LOG.info( String.format("Started readFromBigtable test with jobName as: %s", options.getJobName())); @@ -236,8 +253,17 @@ public void testReadFromBigtable() throws IOException { PAssert.thatSingleton(count).isEqualTo(TOTAL_ROW_COUNT); - PipelineResult.State result = pipeLine.run().waitUntilFinish(); - Assert.assertEquals(PipelineResult.State.DONE, result); + DataflowPipelineJob result = (DataflowPipelineJob) pipeLine.run(); + LOG.info( + String.format("Ran writeToBigtable test with job ID as: %s", result.getJobId())); + System.out.println( + String.format("Ran writeToBigtable test with job ID: %s", result.getJobId())); + + PipelineResult.State state = result.waitUntilFinish(); + Assert.assertEquals(PipelineResult.State.DONE, state); + assertThat(Lineage.query(result.metrics(), Lineage.Type.SOURCE), + contains(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + config.getTableId()))); } private static byte[] createRandomValue() { diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/test_env/TestProperties.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/test_env/TestProperties.java index 22e6c87bbf..345949cb69 100644 --- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/test_env/TestProperties.java +++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/test_env/TestProperties.java @@ -48,7 +48,7 @@ public TestProperties( } public String getProjectId() { - return projectId; + return "google.com:cloud-bigtable-dev"; } public String getInstanceId() { diff --git a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java index dcc0441a8f..b827555b25 100755 --- a/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java +++ b/bigtable-dataflow-parent/bigtable-hbase-beam/src/main/java/com/google/cloud/bigtable/beam/CloudBigtableIO.java @@ -82,6 +82,7 @@ import org.apache.hadoop.hbase.util.Bytes; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.beam.sdk.metrics.Lineage; /** * Utilities to create {@link PTransform}s for reading and writing { private final AtomicInteger attempt = new AtomicInteger(3); + private transient boolean reportedLineage; + @VisibleForTesting Reader(CloudBigtableIO.AbstractSource source) { this.source = source; @@ -680,6 +683,7 @@ static class Reader extends BoundedReader { public boolean start() throws IOException { initializeScanner(); workStart = System.currentTimeMillis(); + reportLineageOnce(); return advance(); } @@ -906,6 +910,15 @@ public String toString() { Bytes.toStringBinary(rangeTracker.getStartPosition().getBytes()), Bytes.toStringBinary(rangeTracker.getStopPosition().getBytes())); } + + void reportLineageOnce() { + if (!reportedLineage) { + Lineage.getSources().add( + String.format("bigtable:%s.%s.%s", source.getConfiguration().getProjectId(), + source.getConfiguration().getInstanceId(), source.getConfiguration().getTableId())); + reportedLineage = true; + } + } } ///////////////////// Write Class ///////////////////////////////// @@ -969,6 +982,7 @@ public static class CloudBigtableSingleTableBufferedWriteFn extends BufferedMutatorDoFn { private static final long serialVersionUID = 2L; private transient BufferedMutator mutator; + private transient boolean reportedLineage; public CloudBigtableSingleTableBufferedWriteFn(CloudBigtableTableConfiguration config) { super(config); @@ -1006,6 +1020,16 @@ public synchronized void finishBundle(@SuppressWarnings("unused") FinishBundleCo logExceptions(null, exception); rethrowException(exception); } + reportLineageOnce(); + } + + void reportLineageOnce() { + if (!reportedLineage) { + Lineage.getSinks() + .add(String.format("bigtable:%s.%s.%s", config.getProjectId(), config.getInstanceId(), + ((CloudBigtableTableConfiguration) getConfig()).getTableId())); + reportedLineage = true; + } } } @@ -1029,6 +1053,7 @@ public static class CloudBigtableMultiTableWriteFn // Stats private transient Map mutators; + private transient boolean reportedLineage; public CloudBigtableMultiTableWriteFn(CloudBigtableConfiguration config) { super(config); @@ -1084,6 +1109,18 @@ public void finishBundle(FinishBundleContext c) throws Exception { } } finally { mutators.clear(); + reportLineageOnce(); + } + } + + void reportLineageOnce() { + if (!reportedLineage) { + for (String tableName : mutators.keySet()) { + Lineage.getSinks() + .add(String.format("bigtable:%s.%s.%s", config.getProjectId(), + config.getInstanceId(), tableName)); + reportedLineage = true; + } } } } diff --git a/pom.xml b/pom.xml index 735a596f43..3a1ee37f9e 100644 --- a/pom.xml +++ b/pom.xml @@ -80,7 +80,7 @@ limitations under the License. 1.1.5 1.3 4.11.0 - 2.58.0 + 2.60.0 31.1-jre 0.31.1