diff --git a/bigtable-dataflow-parent/bigtable-beam-import/pom.xml b/bigtable-dataflow-parent/bigtable-beam-import/pom.xml
index e103b62f9a..da6cee11bb 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/pom.xml
+++ b/bigtable-dataflow-parent/bigtable-beam-import/pom.xml
@@ -189,6 +189,12 @@ limitations under the License.
${mockito.version}
test
+
+ org.mockito
+ mockito-inline
+ ${mockito.version}
+ test
+
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
index 064736a04b..e659e61a50 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshot.java
@@ -15,21 +15,38 @@
*/
package com.google.cloud.bigtable.beam.hbasesnapshots;
-import com.google.bigtable.repackaged.com.google.api.core.InternalExtensionOnly;
+import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.bigtable.beam.CloudBigtableIO;
+import com.google.cloud.bigtable.beam.CloudBigtableTableConfiguration;
import com.google.cloud.bigtable.beam.TemplateUtils;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupHBaseSnapshotRestoreFiles;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupRestoredSnapshots;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.RestoreSnapshot;
+import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ListRegions;
+import com.google.cloud.bigtable.beam.hbasesnapshots.transforms.ReadRegions;
import com.google.cloud.bigtable.beam.sequencefiles.HBaseResultToMutationFn;
import com.google.cloud.bigtable.beam.sequencefiles.ImportJob;
import com.google.cloud.bigtable.beam.sequencefiles.Utils;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.gson.Gson;
+import com.google.gson.GsonBuilder;
+import java.io.IOException;
import java.util.Arrays;
import java.util.List;
+import java.util.Map;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.PipelineResult;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.io.FileSystems;
import org.apache.beam.sdk.io.hadoop.format.HadoopFormatIO;
import org.apache.beam.sdk.options.Default;
import org.apache.beam.sdk.options.Description;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.ValueProvider;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.transforms.Wait;
@@ -37,6 +54,7 @@
import org.apache.beam.sdk.values.PCollection;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.hbase.client.Mutation;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
@@ -68,6 +86,16 @@
public class ImportJobFromHbaseSnapshot {
private static final Log LOG = LogFactory.getLog(ImportJobFromHbaseSnapshot.class);
+ @VisibleForTesting
+ static final String MISSING_SNAPSHOT_SOURCEPATH =
+ "Source Path containing hbase snapshots must be specified.";
+
+ @VisibleForTesting
+ static final String MISSING_SNAPSHOT_NAMES =
+ "Snapshots must be specified. Allowed values are '*' (indicating all snapshots under source path) or "
+ + "'prefix*' (snapshots matching certain prefix) or 'snapshotname1:tablename1,snapshotname2:tablename2' "
+ + "(comma seperated list of snapshots)";
+
public interface ImportOptions extends ImportJob.ImportOptions {
@Description("The HBase root dir where HBase snapshot files resides.")
String getHbaseSnapshotSourceDir();
@@ -87,24 +115,163 @@ public interface ImportOptions extends ImportJob.ImportOptions {
@SuppressWarnings("unused")
void setEnableSnappy(Boolean enableSnappy);
+
+ @Description("Path to config file containing snapshot source path/snapshot names.")
+ String getImportConfigFilePath();
+
+ void setImportConfigFilePath(String value);
+
+ @Description(
+ "Snapshots to be imported. Can be '*', 'prefix*' or 'snap1,snap2' or 'snap1:table1,snap2:table2'.")
+ String getSnapshots();
+
+ void setSnapshots(String value);
+
+ @Description("Specifies whether to use dynamic splitting while reading hbase region.")
+ @Default.Boolean(true)
+ boolean getUseDynamicSplitting();
+
+ void setUseDynamicSplitting(boolean value);
}
public static void main(String[] args) throws Exception {
PipelineOptionsFactory.register(ImportOptions.class);
- ImportOptions opts =
+ ImportOptions options =
PipelineOptionsFactory.fromArgs(args).withValidation().as(ImportOptions.class);
+ // To determine the Google Cloud Storage file scheme (gs://)
+ FileSystems.setDefaultPipelineOptions(PipelineOptionsFactory.create().as(GcsOptions.class));
+
LOG.info("Building Pipeline");
- Pipeline pipeline = buildPipeline(opts);
+ Pipeline pipeline = null;
+ // Maintain Backward compatibility until deprecation
+ if (options.getSnapshotName() != null && !options.getSnapshotName().isEmpty()) {
+ pipeline = buildPipeline(options);
+ } else {
+ ImportConfig importConfig =
+ options.getImportConfigFilePath() != null
+ ? buildImportConfigFromConfigFile(options.getImportConfigFilePath())
+ : buildImportConfigFromPipelineOptions(options, options.as(GcsOptions.class));
+
+ LOG.info(
+ String.format(
+ "SourcePath:%s, RestorePath:%s",
+ importConfig.getSourcepath(), importConfig.getRestorepath()));
+ pipeline = buildPipelineWithMultipleSnapshots(options, importConfig);
+ }
+
LOG.info("Running Pipeline");
PipelineResult result = pipeline.run();
- if (opts.getWait()) {
+ if (options.getWait()) {
Utils.waitForPipelineToFinish(result);
}
}
+ @VisibleForTesting
+ static ImportConfig buildImportConfigFromConfigFile(String configFilePath) throws Exception {
+ Gson gson = new GsonBuilder().create();
+ ImportConfig importConfig =
+ gson.fromJson(SnapshotUtils.readFileContents(configFilePath), ImportConfig.class);
+ Preconditions.checkNotNull(importConfig.getSourcepath(), MISSING_SNAPSHOT_SOURCEPATH);
+ Preconditions.checkNotNull(importConfig.getSnapshots(), MISSING_SNAPSHOT_NAMES);
+ SnapshotUtils.setRestorePath(importConfig);
+ return importConfig;
+ }
+
+ @VisibleForTesting
+ static ImportConfig buildImportConfigFromPipelineOptions(
+ ImportOptions options, GcsOptions gcsOptions) throws IOException {
+ Preconditions.checkArgument(
+ options.getHbaseSnapshotSourceDir() != null, MISSING_SNAPSHOT_SOURCEPATH);
+ Preconditions.checkArgument(options.getSnapshots() != null, MISSING_SNAPSHOT_NAMES);
+
+ Map snapshots =
+ SnapshotUtils.isRegex(options.getSnapshots())
+ ? SnapshotUtils.getSnapshotsFromSnapshotPath(
+ options.getHbaseSnapshotSourceDir(),
+ gcsOptions.getGcsUtil(),
+ options.getSnapshots())
+ : SnapshotUtils.getSnapshotsFromString(options.getSnapshots());
+
+ ImportConfig importConfig = new ImportConfig();
+ importConfig.setSourcepath(options.getHbaseSnapshotSourceDir());
+ importConfig.setSnapshotsFromMap(snapshots);
+ SnapshotUtils.setRestorePath(importConfig);
+ return importConfig;
+ }
+
+ /**
+ * Builds the pipeline that supports loading multiple snapshots to BigTable.
+ *
+ * @param options - Pipeline options
+ * @param importConfig - Configuration representing snapshot source path, list of snapshots etc
+ * @return
+ * @throws Exception
+ */
+ static Pipeline buildPipelineWithMultipleSnapshots(
+ ImportOptions options, ImportConfig importConfig) throws Exception {
+ Map configurations =
+ SnapshotUtils.getConfiguration(
+ options.getRunner().getSimpleName(),
+ options.getProject(),
+ importConfig.getSourcepath(),
+ importConfig.getHbaseConfiguration());
+
+ List snapshotConfigs =
+ SnapshotUtils.buildSnapshotConfigs(
+ importConfig.getSnapshots(),
+ configurations,
+ options.getProject(),
+ importConfig.getSourcepath(),
+ importConfig.getRestorepath());
+
+ Pipeline pipeline = Pipeline.create(options);
+
+ PCollection restoredSnapshots =
+ pipeline
+ .apply("Read Snapshot Configs", Create.of(snapshotConfigs))
+ .apply("Restore Snapshots", ParDo.of(new RestoreSnapshot()));
+
+ // Read records from hbase region files and write to Bigtable
+ // PCollection hbaseRecords = restoredSnapshots
+ // .apply("List Regions", new ListRegions());
+ PCollection>> hbaseRecords =
+ restoredSnapshots
+ .apply("List Regions", new ListRegions())
+ .apply("Read Regions", new ReadRegions(options.getUseDynamicSplitting()));
+
+ options.setBigtableTableId(ValueProvider.StaticValueProvider.of("NA"));
+ CloudBigtableTableConfiguration bigtableConfiguration =
+ TemplateUtils.buildImportConfig(options, "HBaseSnapshotImportJob");
+ if (importConfig.getBigtableConfiguration() != null) {
+ CloudBigtableTableConfiguration.Builder builder = bigtableConfiguration.toBuilder();
+ for (Map.Entry entry : importConfig.getBigtableConfiguration().entrySet())
+ builder = builder.withConfiguration(entry.getKey(), entry.getValue());
+ bigtableConfiguration = builder.build();
+ }
+
+ hbaseRecords.apply(
+ "Write to BigTable", CloudBigtableIO.writeToMultipleTables(bigtableConfiguration));
+
+ // Clean up all the temporary restored snapshot HLinks after reading all the data
+ restoredSnapshots
+ .apply(Wait.on(hbaseRecords))
+ .apply("Clean restored files", ParDo.of(new CleanupRestoredSnapshots()));
+
+ return pipeline;
+ }
+
+ /**
+ * Builds the pipeline that supports loading single snapshot to BigTable. Maintained for backward
+ * compatiablity and will be deprecated merging the functionality to
+ * buildPipelineWithMultipleSnapshots method.
+ *
+ * @param opts - Pipeline options
+ * @return
+ * @throws Exception
+ */
@VisibleForTesting
static Pipeline buildPipeline(ImportOptions opts) throws Exception {
Pipeline pipeline = Pipeline.create(Utils.tweakOptions(opts));
@@ -133,7 +300,7 @@ static Pipeline buildPipeline(ImportOptions opts) throws Exception {
pipeline
.apply(Create.of(sourceAndRestoreFolders))
.apply(Wait.on(readResult))
- .apply(ParDo.of(new CleanupHBaseSnapshotRestoreFilesFn()));
+ .apply(ParDo.of(new CleanupHBaseSnapshotRestoreFiles()));
return pipeline;
}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java
new file mode 100644
index 0000000000..6d6643f980
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtils.java
@@ -0,0 +1,308 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots;
+
+import com.google.api.core.InternalApi;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.base.Joiner;
+import com.google.common.io.CharStreams;
+import java.io.File;
+import java.io.IOException;
+import java.io.Reader;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.nio.channels.Channels;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.util.BackOff;
+import org.apache.beam.sdk.util.FluentBackoff;
+import org.apache.beam.sdk.util.Sleeper;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.hadoop.hbase.HConstants;
+import org.joda.time.Duration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Contains various helper methods to handle different tasks associated with importing of hbase
+ * snapshots
+ */
+@InternalApi("For internal usage only")
+public class SnapshotUtils {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotUtils.class);
+ private static final String DIRECTRUNNER = "DirectRunner";
+ private static final String SNAPSHOT_MANIFEST_DIRECTORY = ".hbase-snapshot";
+ private static final String GCS_SCHEME = "gs";
+ private static final Sleeper sleeper = Sleeper.DEFAULT;
+ private static final Object lock = new Object();
+ private static Configuration hbaseConfiguration;
+
+ private SnapshotUtils() {}
+
+ private static String getParentDirectory(String hbaseSnapshotSourceDirectory) {
+ URI hbaseSnapshotSourceUri;
+ try {
+ hbaseSnapshotSourceUri = new URI(hbaseSnapshotSourceDirectory);
+ } catch (URISyntaxException e) {
+ throw new RuntimeException(
+ String.format(
+ "Invalid file path format for snapshot source directory: %s. Valid paths should have file scheme (gs://, file://)",
+ hbaseSnapshotSourceDirectory));
+ }
+
+ if (hbaseSnapshotSourceUri.getScheme() != null
+ && hbaseSnapshotSourceUri.getScheme().equals("gs")) // i.e Cloud Storage file system
+ {
+ return GcsPath.fromUri(hbaseSnapshotSourceUri).getParent().toString();
+ }
+
+ return new File(hbaseSnapshotSourceDirectory).getParent();
+ }
+
+ static String removeSuffixSlashIfExists(String directory) {
+ return directory.endsWith("/") ? directory.substring(0, directory.length() - 1) : directory;
+ }
+
+ static String appendCurrentTimestamp(String directory) {
+ DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyyMMddHHmm").withZone(ZoneId.of("UTC"));
+ return String.join("/", removeSuffixSlashIfExists(directory), formatter.format(Instant.now()));
+ }
+
+ static String getNamedDirectory(String sourceDirectory, String subFoldername) {
+ String parentDirectory = removeSuffixSlashIfExists(getParentDirectory(sourceDirectory));
+ return appendCurrentTimestamp(String.join("/", parentDirectory, subFoldername));
+ }
+
+ /** Builds the configuration combining default and user provided values. */
+ static Map getConfiguration(
+ String runner,
+ String project,
+ String sourcedir,
+ @Nullable Map hbaseConfiguration) {
+ Map configurations = new HashMap<>();
+
+ configurations.put(HConstants.HBASE_DIR, sourcedir);
+ configurations.put(
+ "fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS");
+ configurations.put("fs.gs.project.id", project);
+ configurations.put("google.cloud.auth.service.account.enable", "true");
+
+ if (runner.equals(DIRECTRUNNER)) {
+ // https://github.com/GoogleCloudDataproc/hadoop-connectors/blob/master/gcs/CONFIGURATION.md#authentication
+ configurations.put("fs.gs.auth.type", "APPLICATION_DEFAULT");
+ }
+
+ // Update the default configurations with user supplied configuration values
+ if (hbaseConfiguration != null) {
+ configurations.putAll(hbaseConfiguration);
+ }
+ return configurations;
+ }
+
+ public static Configuration getHBaseConfiguration(Map configurations) {
+ if (hbaseConfiguration == null) {
+ synchronized (lock) {
+ if (hbaseConfiguration == null)
+ hbaseConfiguration = createHbaseConfiguration(configurations);
+ }
+ }
+ return hbaseConfiguration;
+ }
+
+ private static Configuration createHbaseConfiguration(Map configurations) {
+ LOG.info("Create HBase Configuration instance");
+ Configuration hbaseConfiguration = HBaseConfiguration.create();
+ for (Map.Entry entry : configurations.entrySet())
+ hbaseConfiguration.set(entry.getKey(), entry.getValue());
+ return hbaseConfiguration;
+ }
+
+ /**
+ * Build Snapshot Configurations.
+ *
+ * @param snapshotdetails - Snapshot details representing hbase snapshot name and corresponding
+ * bigtable table name.
+ * @param configurations - BigTable Configurations
+ * @param projectId - Google Cloud Project Id
+ * @param sourcePath - Source path containing snapshot files
+ * @param restorePath - Path snapshot files gets stored during job runs.
+ * @return
+ */
+ static List buildSnapshotConfigs(
+ List snapshotdetails,
+ Map configurations,
+ String projectId,
+ String sourcePath,
+ String restorePath) {
+
+ return snapshotdetails.stream()
+ .map(
+ snapshotInfo ->
+ SnapshotConfig.builder()
+ .setProjectId(projectId)
+ .setSourceLocation(sourcePath)
+ .setRestoreLocation(restorePath)
+ .setSnapshotName(snapshotInfo.getSnapshotName())
+ .setTableName(snapshotInfo.getbigtableTableName())
+ .setConfigurationDetails(configurations)
+ .build())
+ .collect(Collectors.toList());
+ }
+
+ public static BackOff createBackOff(
+ long backoffInitialIntervalInMillis, long backoffMaxIntervalInMillis, int maxRetries) {
+ return FluentBackoff.DEFAULT
+ .withInitialBackoff(Duration.millis(backoffInitialIntervalInMillis))
+ .withMaxRetries(maxRetries)
+ .withMaxBackoff(Duration.millis(backoffMaxIntervalInMillis))
+ .backoff();
+ }
+
+ /**
+ * Creates restore path based on the input configuration
+ *
+ * @param importConfig - Job Configuration
+ */
+ public static void setRestorePath(ImportConfig importConfig) {
+ importConfig.setRestorepath(
+ formatRestorePath(importConfig.getRestorepath(), importConfig.getSourcepath()));
+ }
+
+ /**
+ * Parses the provided input to generate snapshot names and corresponding bigtable names. For
+ * single snapshot names the following are valid formats: If both Snapshotname and bigtablename
+ * are same then only snapshotname can be provided If bigtablename is different then should be
+ * provided in the format snapshotname:bigtablename Multiple snapshots can be provided in
+ * snapshot1:table1,snapshot2:table2 format or snapshot1,snapshot2 format
+ *
+ * @param snapshotNames - Snapshot names and corresponding bigtable table names.
+ */
+ public static Map getSnapshotsFromString(String snapshotNames) {
+ Map snapshots = new HashMap<>();
+ for (String snapshotInfo : snapshotNames.split(",")) {
+ String[] snapshotWithTableName = snapshotInfo.split(":");
+ if (snapshotWithTableName.length == 2)
+ snapshots.put(snapshotWithTableName[0], snapshotWithTableName[1]);
+ else if (snapshotWithTableName.length == 1)
+ snapshots.put(snapshotWithTableName[0], snapshotWithTableName[0]);
+ else
+ throw new IllegalArgumentException(
+ "Invalid specification format for snapshots. Expected format is snapshot1:table1,snapshot2:table2");
+ }
+ return snapshots;
+ }
+
+ public static String formatRestorePath(String providedPath, String hbaseSnapshotsPath) {
+ return providedPath == null
+ ? SnapshotUtils.getNamedDirectory(hbaseSnapshotsPath, "restore")
+ : SnapshotUtils.appendCurrentTimestamp(providedPath);
+ }
+
+ /**
+ * Read list of Snapshot names from Snapshot Source Path
+ *
+ * @param importSnapshotpath - Path representing the snapshot source directory
+ * @param gcsUtil - GCS Instance
+ * @param prefix - Specific prefix to be matched or '*' for all files.
+ * @return
+ * @throws IOException
+ */
+ public static Map getSnapshotsFromSnapshotPath(
+ String importSnapshotpath, GcsUtil gcsUtil, String prefix) throws IOException {
+
+ importSnapshotpath =
+ Joiner.on("/")
+ .join(removeSuffixSlashIfExists(importSnapshotpath), SNAPSHOT_MANIFEST_DIRECTORY);
+ // Build GCS path from given string e.g:
+ // gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot
+ GcsPath gcsPath = GcsPath.fromUri(importSnapshotpath);
+ // LOG.info("GCS Path:" + gcsPath + ";Object:" + gcsPath.getObject());
+ Map snapshots = new HashMap<>();
+
+ List objects =
+ gcsUtil.listObjects(gcsPath.getBucket(), gcsPath.getObject(), null).getItems();
+ if (objects == null)
+ throw new IllegalStateException(
+ String.format("Snapshot path %s does not contain any snapshots", importSnapshotpath));
+
+ // Build a pattern for object portion e.g if path is
+ // gs://sym-bucket/snapshots/20220309230526/.hbase-snapshot
+ // the object portion would be snapshots/60G/20220309230526/.hbase-snapshot
+ Pattern pathPattern = Pattern.compile(String.format("%s/(.+?/)", gcsPath.getObject()));
+ Pattern prefixPattern = prefix.equals("*") ? null : Pattern.compile(prefix);
+ Matcher pathMatcher = null;
+ String snapshotName = null;
+ for (StorageObject object : objects) {
+ pathMatcher = pathPattern.matcher(object.getId());
+ if (pathMatcher.find()) {
+ // Group 1 represents the snapshot directory name along with suffix slash (e.g: snapshot1/)
+ snapshotName = pathMatcher.group(1).replace("/", "");
+ if (prefix.equals("*") || prefixPattern.matcher(snapshotName).find())
+ snapshots.put(snapshotName, snapshotName);
+ }
+ }
+
+ return snapshots;
+ }
+
+ /**
+ * Reads the contents of file
+ *
+ * @param filePath - Path of the file.
+ * @return
+ * @throws IOException
+ */
+ public static String readFileContents(String filePath) throws IOException {
+ try (Reader reader =
+ Channels.newReader(
+ FileSystems.open(FileSystems.matchSingleFileSpec(filePath).resourceId()),
+ StandardCharsets.UTF_8.name())) {
+ return CharStreams.toString(reader);
+ }
+ }
+
+ /**
+ * Check if the given value contains any character in given meta characters list
+ *
+ * @param data - text value
+ * @return
+ */
+ public static boolean isRegex(String data) {
+ String[] metaChars = {"*", "+", "?"};
+ return Arrays.stream(metaChars).anyMatch(data::contains);
+ }
+
+ public static Sleeper getSleeper() {
+ return sleeper;
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java
new file mode 100644
index 0000000000..c274e97691
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/RegionConfigCoder.java
@@ -0,0 +1,101 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.coders;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.io.*;
+import java.util.Collections;
+import java.util.List;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.CoderException;
+import org.apache.beam.sdk.coders.VarLongCoder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.TableSchema;
+
+/** Implementation of {@link Coder} for encoding and decoding of {@link RegionConfig} */
+@InternalApi("For internal usage only")
+public class RegionConfigCoder extends Coder {
+ private static final VarLongCoder longCoder = VarLongCoder.of();
+
+ @Override
+ public void encode(RegionConfig value, OutputStream outStream) throws IOException {
+ ObjectOutputStream objectOutputStream = new ObjectOutputStream(outStream);
+ objectOutputStream.writeObject(value.getSnapshotConfig());
+
+ HBaseProtos.RegionInfo regionInfo = ProtobufUtil.toRegionInfo(value.getRegionInfo());
+ ByteArrayOutputStream boas1 = new ByteArrayOutputStream();
+ regionInfo.writeTo(boas1);
+ objectOutputStream.writeObject(boas1.toByteArray());
+
+ HBaseProtos.TableSchema tableSchema = ProtobufUtil.toTableSchema(value.getTableDescriptor());
+ ByteArrayOutputStream boas2 = new ByteArrayOutputStream();
+ tableSchema.writeTo(boas2);
+ objectOutputStream.writeObject(boas2.toByteArray());
+
+ longCoder.encode(value.getRegionSize(), outStream);
+ }
+
+ @Override
+ public RegionConfig decode(InputStream inStream) throws IOException {
+ ObjectInputStream objectInputStream = new ObjectInputStream(inStream);
+ SnapshotConfig snapshotConfig;
+ try {
+ snapshotConfig = (SnapshotConfig) objectInputStream.readObject();
+ } catch (ClassNotFoundException e) {
+ throw new CoderException("Failed to deserialize RestoredSnapshotConfig", e);
+ }
+
+ RegionInfo regionInfoProto = null;
+ try {
+ regionInfoProto =
+ ProtobufUtil.toRegionInfo(
+ HBaseProtos.RegionInfo.parseFrom((byte[]) objectInputStream.readObject()));
+ } catch (ClassNotFoundException e) {
+ throw new CoderException("Failed to parse regionInfo", e);
+ }
+
+ TableDescriptor tableSchema = null;
+ try {
+ tableSchema =
+ ProtobufUtil.toTableDescriptor(
+ TableSchema.parseFrom((byte[]) objectInputStream.readObject()));
+ } catch (ClassNotFoundException e) {
+ throw new CoderException("Failed to parse tableSchema", e);
+ }
+
+ Long regionsize = longCoder.decode(inStream);
+
+ return RegionConfig.builder()
+ .setSnapshotConfig(snapshotConfig)
+ .setRegionInfo(regionInfoProto)
+ .setTableDescriptor(tableSchema)
+ .setRegionSize(regionsize)
+ .build();
+ }
+
+ @Override
+ public List extends Coder>> getCoderArguments() {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public void verifyDeterministic() throws Coder.NonDeterministicException {}
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java
new file mode 100644
index 0000000000..213b6745e4
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/coders/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains coders to handle serialization and deserialization for different classes used in the
+ * pipeline.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.coders;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
similarity index 98%
rename from bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java
rename to bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
index 62b7a81ad5..58ccd34425 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilder.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/HBaseSnapshotInputConfigBuilder.java
@@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.beam.hbasesnapshots;
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
import com.google.common.base.Preconditions;
import org.apache.beam.sdk.extensions.gcp.util.gcsfs.GcsPath;
@@ -37,7 +37,7 @@
* hosted in Google Cloud Storage(GCS) bucket via GCS connector. It uses {@link
* TableSnapshotInputFormat} for reading HBase snapshots.
*/
-class HBaseSnapshotInputConfigBuilder {
+public class HBaseSnapshotInputConfigBuilder {
private static final Log LOG = LogFactory.getLog(HBaseSnapshotInputConfigBuilder.class);
// Batch size used for HBase snapshot scans
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java
new file mode 100644
index 0000000000..3af7ba983d
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/ImportConfig.java
@@ -0,0 +1,175 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.gson.TypeAdapter;
+import com.google.gson.annotations.JsonAdapter;
+import com.google.gson.stream.JsonReader;
+import com.google.gson.stream.JsonWriter;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Class representing the job configuration loading the different input values and combinations of
+ * snapshot names (such as all snapshots or matching prefix or explicit names) provided.
+ */
+public final class ImportConfig implements Serializable {
+ private final long DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS = 5000; // 5 seconds
+ private final long DEFAULT_BACKOFF_MAX_INTERVAL_MILLIS = 3 * 60 * 1000; // 180 seconds
+ private final int DEFAULT_BACKOFF_MAX_RETRIES = 3;
+
+ @JsonAdapter(SnapshotInfoJsonAdapter.class)
+ public List snapshots;
+
+ private String sourcepath;
+ private String restorepath;
+ private String runstatuspath;
+ private long backoffInitialIntervalInMillis =
+ DEFAULT_BACKOFF_INITIAL_INTERVAL_MILLIS; // Defaults to 5 seconds
+ private long backoffMaxIntervalInMillis = DEFAULT_BACKOFF_MAX_INTERVAL_MILLIS; // 60 seconds
+ private int backoffMaxretries = DEFAULT_BACKOFF_MAX_RETRIES;
+ private Map hbaseConfiguration;
+ private Map bigtableConfiguration;
+
+ public void setSnapshotsFromMap(Map snapshots) {
+ this.snapshots = new ArrayList<>();
+ snapshots.forEach(
+ (snapshotName, bigtableName) ->
+ this.snapshots.add(new SnapshotInfo(snapshotName, bigtableName)));
+ }
+
+ public String getSourcepath() {
+ return this.sourcepath;
+ }
+
+ public void setSourcepath(String sourcepath) {
+ this.sourcepath = sourcepath;
+ }
+
+ public String getRestorepath() {
+ return restorepath;
+ }
+
+ public void setRestorepath(String restorepath) {
+ this.restorepath = restorepath;
+ }
+
+ public String getRunstatuspath() {
+ return runstatuspath;
+ }
+
+ public void setRunstatuspath(String runstatuspath) {
+ this.runstatuspath = runstatuspath;
+ }
+
+ public long getBackoffInitialIntervalInMillis() {
+ return backoffInitialIntervalInMillis;
+ }
+
+ public void setBackoffInitialIntervalInMillis(long backoffInitialIntervalInMillis) {
+ this.backoffInitialIntervalInMillis = backoffInitialIntervalInMillis;
+ }
+
+ public long getBackoffMaxIntervalInMillis() {
+ return this.backoffMaxIntervalInMillis;
+ }
+
+ public void setBackoffMaxIntervalInMillis(long backoffMaxIntervalInMillis) {
+ this.backoffMaxIntervalInMillis = backoffMaxIntervalInMillis;
+ }
+
+ public int getBackoffMaxretries() {
+ return this.backoffMaxretries;
+ }
+
+ public void setBackoffMaxretries(int backoffMaxretries) {
+ this.backoffMaxretries = backoffMaxretries;
+ }
+
+ public List getSnapshots() {
+ return this.snapshots;
+ }
+
+ public void setSnapshots(List snapshots) {
+ this.snapshots = snapshots;
+ }
+
+ public Map getHbaseConfiguration() {
+ return this.hbaseConfiguration;
+ }
+
+ public void setHbaseConfiguration(Map hbaseConfiguration) {
+ this.hbaseConfiguration = hbaseConfiguration;
+ }
+
+ public Map getBigtableConfiguration() {
+ return bigtableConfiguration;
+ }
+
+ public void setBigtableConfiguration(Map bigtableConfiguration) {
+ this.bigtableConfiguration = bigtableConfiguration;
+ }
+
+ public static class SnapshotInfo implements Serializable {
+ private final String snapshotName;
+ private final String bigtableTableName;
+
+ public SnapshotInfo(String snapshotName, String tableName) {
+ this.snapshotName = snapshotName;
+ this.bigtableTableName = tableName;
+ }
+
+ public String getSnapshotName() {
+ return snapshotName;
+ }
+
+ public String getbigtableTableName() {
+ return bigtableTableName;
+ }
+ }
+
+ static class SnapshotInfoJsonAdapter extends TypeAdapter> {
+
+ @Override
+ public void write(JsonWriter jsonWriter, List snapshotInfos) throws IOException {
+ jsonWriter.beginObject();
+ snapshotInfos.forEach(
+ snapshotInfo -> {
+ try {
+ jsonWriter.name(snapshotInfo.getSnapshotName());
+ jsonWriter.value(snapshotInfo.getbigtableTableName());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ });
+ jsonWriter.endObject();
+ }
+
+ @Override
+ public List read(JsonReader jsonReader) throws IOException {
+ List snapshotInfoList = new ArrayList<>();
+ jsonReader.beginObject();
+ while (jsonReader.hasNext()) {
+ snapshotInfoList.add(new SnapshotInfo(jsonReader.nextName(), jsonReader.nextString()));
+ }
+ jsonReader.endObject();
+ return snapshotInfoList;
+ }
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java
new file mode 100644
index 0000000000..a946f375e7
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/RegionConfig.java
@@ -0,0 +1,62 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.auto.value.AutoValue;
+import com.google.cloud.bigtable.beam.hbasesnapshots.coders.RegionConfigCoder;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.coders.DefaultCoder;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+
+/**
+ * A {@link AutoValue} class representing the region configuration enclosing {@link SnapshotConfig},
+ * hbase region info and hbase table descriptor.
+ */
+@DefaultCoder(RegionConfigCoder.class)
+@AutoValue
+public abstract class RegionConfig {
+ public static Builder builder() {
+ return new AutoValue_RegionConfig.Builder();
+ }
+
+ @Nullable
+ public abstract String getName();
+
+ public abstract SnapshotConfig getSnapshotConfig();
+
+ public abstract RegionInfo getRegionInfo();
+
+ public abstract TableDescriptor getTableDescriptor();
+
+ public abstract Long getRegionSize();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setName(String value);
+
+ public abstract Builder setSnapshotConfig(SnapshotConfig value);
+
+ public abstract Builder setRegionInfo(RegionInfo value);
+
+ public abstract Builder setTableDescriptor(TableDescriptor value);
+
+ public abstract Builder setRegionSize(Long value);
+
+ public abstract RegionConfig build();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java
new file mode 100644
index 0000000000..81fb0ba7fb
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/SnapshotConfig.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
+
+import com.google.auto.value.AutoValue;
+import com.google.auto.value.extension.memoized.Memoized;
+import com.google.cloud.bigtable.beam.hbasesnapshots.SnapshotUtils;
+import java.io.Serializable;
+import java.util.Map;
+import org.apache.beam.sdk.schemas.AutoValueSchema;
+import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+
+/** A {@link AutoValue} class representing the configuration associated with each snapshot. */
+@DefaultSchema(AutoValueSchema.class)
+@AutoValue
+public abstract class SnapshotConfig implements Serializable {
+
+ public static Builder builder() {
+ return new AutoValue_SnapshotConfig.Builder();
+ }
+
+ public abstract String getProjectId();
+
+ public abstract String getSourceLocation();
+
+ // public abstract Path getSourcePath();
+ @Memoized
+ public Path getSourcePath() {
+ return new Path(getSourceLocation());
+ }
+
+ @Memoized
+ public Path getRestorePath() {
+ return new Path(getRestoreLocation());
+ }
+
+ public abstract String getSnapshotName();
+
+ public abstract String getTableName();
+
+ public abstract String getRestoreLocation();
+
+ @Override
+ public abstract int hashCode();
+
+ @Override
+ public abstract boolean equals(Object obj);
+
+ abstract Map getConfigurationDetails();
+
+ public Configuration getConfiguration() {
+ return SnapshotUtils.getHBaseConfiguration(getConfigurationDetails());
+ }
+
+ public abstract Builder toBuilder();
+
+ @AutoValue.Builder
+ public abstract static class Builder {
+
+ public abstract Builder setProjectId(String projectId);
+
+ public abstract Builder setSourceLocation(String value);
+
+ public abstract Builder setSnapshotName(String value);
+
+ public abstract Builder setTableName(String value);
+
+ public abstract Builder setRestoreLocation(String value);
+
+ public abstract Builder setConfigurationDetails(Map configuration);
+
+ public abstract SnapshotConfig build();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java
new file mode 100644
index 0000000000..69a683654a
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/conf/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Package contains configuration classes used in the pipeline. */
+package com.google.cloud.bigtable.beam.hbasesnapshots.conf;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFiles.java
similarity index 91%
rename from bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java
rename to bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFiles.java
index e0bdca69d5..727d887afa 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFn.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFiles.java
@@ -1,5 +1,5 @@
/*
- * Copyright 2021 Google LLC
+ * Copyright 2024 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -13,8 +13,9 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.beam.hbasesnapshots;
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+import com.google.api.core.InternalApi;
import com.google.api.services.storage.model.Objects;
import com.google.common.base.Preconditions;
import java.io.IOException;
@@ -32,8 +33,26 @@
* A {@link DoFn} that could be used for cleaning up temp files generated during HBase snapshot
* scans in Google Cloud Storage(GCS) bucket via GCS connector.
*/
-class CleanupHBaseSnapshotRestoreFilesFn extends DoFn, Boolean> {
- private static final Log LOG = LogFactory.getLog(CleanupHBaseSnapshotRestoreFilesFn.class);
+@InternalApi("For internal usage only")
+public class CleanupHBaseSnapshotRestoreFiles extends DoFn, Boolean> {
+ private static final Log LOG = LogFactory.getLog(CleanupHBaseSnapshotRestoreFiles.class);
+
+ public static String getWorkingBucketName(String hbaseSnapshotDir) {
+ Preconditions.checkArgument(
+ hbaseSnapshotDir.startsWith(GcsPath.SCHEME),
+ "snapshot folder must be hosted in a GCS bucket ");
+
+ return GcsPath.fromUri(hbaseSnapshotDir).getBucket();
+ }
+
+ // getListPrefix convert absolute restorePath in a Hadoop filesystem
+ // to a match prefix in a GCS bucket
+ public static String getListPrefix(String restorePath) {
+ Preconditions.checkArgument(
+ restorePath.startsWith("/"),
+ "restore folder must be an absolute path in current filesystem");
+ return restorePath.substring(1);
+ }
@ProcessElement
public void processElement(ProcessContext context) throws IOException {
@@ -65,20 +84,4 @@ public void processElement(ProcessContext context) throws IOException {
gcsUtil.remove(results);
context.output(true);
}
-
- public static String getWorkingBucketName(String hbaseSnapshotDir) {
- Preconditions.checkArgument(
- hbaseSnapshotDir.startsWith(GcsPath.SCHEME),
- "snapshot folder must be hosted in a GCS bucket ");
-
- return GcsPath.fromUri(hbaseSnapshotDir).getBucket();
- }
- // getListPrefix convert absolute restorePath in a Hadoop filesystem
- // to a match prefix in a GCS bucket
- public static String getListPrefix(String restorePath) {
- Preconditions.checkArgument(
- restorePath.startsWith("/"),
- "restore folder must be an absolute path in current filesystem");
- return restorePath.substring(1);
- }
}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshots.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshots.java
new file mode 100644
index 0000000000..afc65dfbec
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupRestoredSnapshots.java
@@ -0,0 +1,61 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.Arrays;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** A {@link DoFn} for cleaning up files from restore path generated during job run. */
+@InternalApi("For internal usage only")
+public class CleanupRestoredSnapshots extends DoFn {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CleanupRestoredSnapshots.class);
+
+ @ProcessElement
+ public void processElement(
+ @Element SnapshotConfig snapshotConfig, OutputReceiver outputReceiver)
+ throws IOException {
+ try {
+ cleanupSnapshot(snapshotConfig);
+ } catch (Exception ex) {
+ LOG.error(
+ "Exception: {}\n StackTrace:{}", ex.getMessage(), Arrays.toString(ex.getStackTrace()));
+ }
+ }
+
+ /**
+ * Removes the snapshot files from restore path.
+ *
+ * @param snapshotConfig - Snapshot Configuration
+ * @throws IOException
+ */
+ @VisibleForTesting
+ void cleanupSnapshot(SnapshotConfig snapshotConfig) throws IOException {
+ Path restorePath = snapshotConfig.getRestorePath();
+ Configuration configuration = snapshotConfig.getConfiguration();
+ FileSystem fileSystem = restorePath.getFileSystem(configuration);
+ fileSystem.delete(restorePath, true);
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshot.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshot.java
new file mode 100644
index 0000000000..df83c5ebf9
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshot.java
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.io.IOException;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link DoFn} for preprocessing the Snapshot files to restore HLinks & References prior to
+ * reading the snapshot.
+ */
+@InternalApi("For internal usage only")
+public class RestoreSnapshot extends DoFn {
+ private static final Logger LOG = LoggerFactory.getLogger(RestoreSnapshot.class);
+
+ @ProcessElement
+ public void processElement(
+ @Element SnapshotConfig snapshotConfig, OutputReceiver outputReceiver)
+ throws IOException {
+ restoreSnapshot(snapshotConfig);
+ outputReceiver.output(snapshotConfig);
+ }
+
+ /**
+ * Creates a copy of Snasphsot from the source path into restore path.
+ *
+ * @param snapshotConfig - Snapshot Configuration
+ * @throws IOException
+ */
+ void restoreSnapshot(SnapshotConfig snapshotConfig) throws IOException {
+ Path sourcePath = snapshotConfig.getSourcePath();
+ Path restorePath = snapshotConfig.getRestorePath();
+ Configuration configuration = snapshotConfig.getConfiguration();
+ LOG.info("RestoreSnapshot - sourcePath:{} restorePath: {}", sourcePath, restorePath);
+ FileSystem fileSystem = sourcePath.getFileSystem(configuration);
+ RestoreSnapshotHelper.copySnapshotForScanner(
+ configuration, fileSystem, sourcePath, restorePath, snapshotConfig.getSnapshotName());
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/package-info.java
new file mode 100644
index 0000000000..26c28d58d7
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/package-info.java
@@ -0,0 +1,20 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/**
+ * Contains all the {@link org.apache.beam.sdk.transforms.DoFn} implementations used in the
+ * pipeline.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/HbaseRegionSplitTracker.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/HbaseRegionSplitTracker.java
new file mode 100644
index 0000000000..d5b9b84ae0
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/HbaseRegionSplitTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.transforms;
+
+import com.google.api.core.InternalApi;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.transforms.splittabledofn.ByteKeyRangeTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.transforms.splittabledofn.SplitResult;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link RestrictionTracker} wrapping the {@link ByteKeyRangeTracker} for controlled execution of
+ * dynamic splitting.
+ */
+@InternalApi("For internal usage only")
+public class HbaseRegionSplitTracker extends RestrictionTracker
+ implements RestrictionTracker.HasProgress {
+
+ private static final Logger LOG = LoggerFactory.getLogger(HbaseRegionSplitTracker.class);
+
+ private final String snapshotName;
+
+ private final String regionName;
+ private final ByteKeyRangeTracker byteKeyRangeTracker;
+
+ private final boolean enableDynamicSplitting;
+
+ public HbaseRegionSplitTracker(
+ String snapshotName, String regionName, ByteKeyRange range, boolean enableDynamicSplitting) {
+ this.snapshotName = snapshotName;
+ this.regionName = regionName;
+ this.byteKeyRangeTracker = ByteKeyRangeTracker.of(range);
+ this.enableDynamicSplitting = enableDynamicSplitting;
+ }
+
+ public ByteKeyRange currentRestriction() {
+ return this.byteKeyRangeTracker.currentRestriction();
+ }
+
+ public SplitResult trySplit(double fractionOfRemainder) {
+ LOG.info(
+ "Splitting restriction for region:{} in snapshot:{}", this.regionName, this.snapshotName);
+
+ return enableDynamicSplitting ? this.byteKeyRangeTracker.trySplit(fractionOfRemainder) : null;
+ }
+
+ public boolean tryClaim(ByteKey key) {
+ return this.byteKeyRangeTracker.tryClaim(key);
+ }
+
+ public void checkDone() throws IllegalStateException {
+ this.byteKeyRangeTracker.checkDone();
+ }
+
+ public RestrictionTracker.IsBounded isBounded() {
+ return this.byteKeyRangeTracker.isBounded();
+ }
+
+ public String toString() {
+ return this.byteKeyRangeTracker.toString();
+ }
+
+ @Override
+ public Progress getProgress() {
+ return this.byteKeyRangeTracker.getProgress();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java
new file mode 100644
index 0000000000..3e45c7b812
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ListRegions.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.transforms;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.coders.RegionConfigCoder;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.Reshuffle;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormatImpl;
+import org.apache.hadoop.hbase.shaded.protobuf.generated.SnapshotProtos;
+import org.apache.hadoop.hbase.snapshot.SnapshotManifest;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} for listing the regions from snapshot manifest and builds the {@link
+ * RegionConfig} instances
+ */
+@InternalApi("For internal usage only")
+public class ListRegions
+ extends PTransform, PCollection> {
+
+ static class ListRegionsFn extends DoFn {
+ private static final Logger LOG = LoggerFactory.getLogger(ListRegionsFn.class);
+
+ private static long GIGA_BYTE = 1024 * 1024 * 1024;
+
+ private Map computeRegionSize(SnapshotManifest snapshotManifest) {
+ Map regionsSize = new HashMap<>();
+ long totalSize = 0;
+ for (SnapshotProtos.SnapshotRegionManifest regionManifest :
+ snapshotManifest.getRegionManifests()) {
+ totalSize = 0;
+ for (SnapshotProtos.SnapshotRegionManifest.FamilyFiles familyFiles :
+ regionManifest.getFamilyFilesList()) {
+ for (SnapshotProtos.SnapshotRegionManifest.StoreFile StoreFile :
+ familyFiles.getStoreFilesList()) totalSize += StoreFile.getFileSize();
+ }
+ regionsSize.put(regionManifest.getRegionInfo().getRegionId(), totalSize);
+ }
+
+ return regionsSize; // (int)Math.ceil((totalSize * 1.0)/GIGA_BYTE);
+ }
+
+ /**
+ * Reads snapshot file manifest and lists all the regions including the size.
+ *
+ * @param snapshotConfig - Snapshot Configuration containing source path.
+ * @param outputReceiver
+ * @throws Exception
+ */
+ @ProcessElement
+ public void processElement(
+ @Element SnapshotConfig snapshotConfig, OutputReceiver outputReceiver)
+ throws Exception {
+
+ Configuration configuration = snapshotConfig.getConfiguration();
+ Path sourcePath = snapshotConfig.getSourcePath();
+ FileSystem fileSystem = sourcePath.getFileSystem(configuration);
+ SnapshotManifest snapshotManifest =
+ TableSnapshotInputFormatImpl.getSnapshotManifest(
+ configuration, snapshotConfig.getSnapshotName(), sourcePath, fileSystem);
+
+ Map regionsSize = computeRegionSize(snapshotManifest);
+ TableDescriptor tableDescriptor = snapshotManifest.getTableDescriptor();
+
+ // Read Region info
+ List extends RegionInfo> regionInfos =
+ TableSnapshotInputFormatImpl.getRegionInfosFromManifest(snapshotManifest);
+
+ // List the regions
+ regionInfos.stream()
+ .map(
+ regionInfo ->
+ RegionConfig.builder()
+ .setSnapshotConfig(snapshotConfig)
+ .setTableDescriptor(tableDescriptor)
+ .setRegionInfo(regionInfo)
+ .setRegionSize(regionsSize.get(regionInfo.getRegionId()))
+ .build())
+ .forEach(outputReceiver::output);
+ }
+ }
+
+ @Override
+ public PCollection expand(PCollection snapshotconfigs) {
+ return snapshotconfigs
+ .apply("List Regions", ParDo.of(new ListRegionsFn()))
+ .setCoder(new RegionConfigCoder())
+ .apply(Reshuffle.viaRandomKey());
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java
new file mode 100644
index 0000000000..f6665dec43
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/ReadRegions.java
@@ -0,0 +1,266 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.transforms;
+
+// import com.google.cloud.bigtable.beam.hbasesnapshots.ImportSnapshots;
+
+import com.google.api.core.InternalApi;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.RegionConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.coders.CannotProvideCoderException;
+import org.apache.beam.sdk.coders.Coder;
+import org.apache.beam.sdk.coders.KvCoder;
+import org.apache.beam.sdk.io.range.ByteKey;
+import org.apache.beam.sdk.io.range.ByteKeyRange;
+import org.apache.beam.sdk.schemas.NoSuchSchemaException;
+import org.apache.beam.sdk.schemas.SchemaCoder;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.display.DisplayData;
+import org.apache.beam.sdk.transforms.splittabledofn.RestrictionTracker;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.client.*;
+import org.apache.hadoop.hbase.util.RegionSplitter;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A {@link PTransform} for reading the records from each region and creates Hbase {@link Mutation}
+ * instances. Each region will be split into configured size (512 MB) and pipeline option {@link
+ * com.google.cloud.bigtable.beam.hbasesnapshots.ImportJobFromHbaseSnapshot.ImportOptions#getUseDynamicSplitting()
+ * useDynamicSplitting} can be used to control whether each split needs to be subdivided further or
+ * not.
+ */
+@InternalApi("For internal usage only")
+public class ReadRegions
+ extends PTransform, PCollection>>> {
+
+ private static final long BYTES_PER_SPLIT = 512 * 1024 * 1024; // 512 MB
+
+ private static final long BYTES_PER_GB = 1024 * 1024 * 1024;
+
+ private final boolean useDynamicSplitting;
+
+ public ReadRegions(boolean useDynamicSplitting) {
+ this.useDynamicSplitting = useDynamicSplitting;
+ }
+
+ @Override
+ public PCollection>> expand(
+ PCollection regionConfig) {
+ Pipeline pipeline = regionConfig.getPipeline();
+ SchemaCoder snapshotConfigSchemaCoder;
+ Coder hbaseResultCoder;
+ try {
+ snapshotConfigSchemaCoder = pipeline.getSchemaRegistry().getSchemaCoder(SnapshotConfig.class);
+ hbaseResultCoder = pipeline.getCoderRegistry().getCoder(TypeDescriptor.of(Result.class));
+ } catch (CannotProvideCoderException | NoSuchSchemaException e) {
+ throw new RuntimeException(e);
+ }
+
+ return regionConfig
+ .apply("Read snapshot region", ParDo.of(new ReadRegionFn(this.useDynamicSplitting)))
+ .setCoder(KvCoder.of(snapshotConfigSchemaCoder, hbaseResultCoder))
+ .apply("Create Mutation", ParDo.of(new CreateMutationsFn()));
+ }
+
+ static class ReadRegionFn extends DoFn> {
+ private static final Logger LOG = LoggerFactory.getLogger(ReadRegionFn.class);
+
+ private final boolean useDynamicSplitting;
+
+ public ReadRegionFn(boolean useDynamicSplitting) {
+ this.useDynamicSplitting = useDynamicSplitting;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element RegionConfig regionConfig,
+ OutputReceiver> outputReceiver,
+ RestrictionTracker tracker)
+ throws Exception {
+ boolean hasSplit = false;
+ try (ResultScanner scanner = newScanner(regionConfig, tracker.currentRestriction())) {
+ for (Result result : scanner) {
+ if (tracker.tryClaim(ByteKey.copyFrom(result.getRow()))) {
+ outputReceiver.output(KV.of(regionConfig.getSnapshotConfig(), result));
+ } else {
+ hasSplit = true;
+ break;
+ }
+ }
+ }
+ // if (!hasSplit)
+ tracker.tryClaim(ByteKey.EMPTY);
+ }
+
+ /**
+ * Scans each region for given key range and constructs a ClientSideRegionScanner instance
+ *
+ * @param regionConfig - HBase Region Configuration
+ * @param byteKeyRange - Key range covering start and end row key
+ * @return
+ * @throws Exception
+ */
+ private ResultScanner newScanner(RegionConfig regionConfig, ByteKeyRange byteKeyRange)
+ throws Exception {
+ Scan scan =
+ new Scan()
+ // Limit scan to split range
+ .withStartRow(byteKeyRange.getStartKey().getBytes())
+ .withStopRow(byteKeyRange.getEndKey().getBytes())
+ .setIsolationLevel(IsolationLevel.READ_UNCOMMITTED)
+ .setCacheBlocks(false);
+
+ SnapshotConfig snapshotConfig = regionConfig.getSnapshotConfig();
+
+ Path sourcePath = snapshotConfig.getSourcePath();
+ Path restorePath = snapshotConfig.getRestorePath();
+ Configuration configuration = snapshotConfig.getConfiguration();
+ FileSystem fileSystem = sourcePath.getFileSystem(configuration);
+
+ return new ClientSideRegionScanner(
+ configuration,
+ fileSystem,
+ restorePath,
+ regionConfig.getTableDescriptor(),
+ regionConfig.getRegionInfo(),
+ scan,
+ null);
+ }
+
+ @GetInitialRestriction
+ public ByteKeyRange getInitialRange(@Element RegionConfig regionConfig) {
+ return ByteKeyRange.of(
+ ByteKey.copyFrom(regionConfig.getRegionInfo().getStartKey()),
+ ByteKey.copyFrom(regionConfig.getRegionInfo().getEndKey()));
+ }
+
+ @GetSize
+ public double getSize(@Element RegionConfig regionConfig) {
+ return BYTES_PER_SPLIT;
+ }
+
+ @NewTracker
+ public HbaseRegionSplitTracker newTracker(
+ @Element RegionConfig regionConfig, @Restriction ByteKeyRange range) {
+ return new HbaseRegionSplitTracker(
+ regionConfig.getSnapshotConfig().getSnapshotName(),
+ regionConfig.getRegionInfo().getEncodedName(),
+ range,
+ useDynamicSplitting);
+ }
+
+ @SplitRestriction
+ public void splitRestriction(
+ @Element RegionConfig regionConfig,
+ @Restriction ByteKeyRange range,
+ OutputReceiver outputReceiver) {
+ try {
+ int numSplits = getSplits(regionConfig.getRegionSize());
+ LOG.info(
+ "Splitting Initial Restriction for SnapshotName: {} - regionname:{} - regionsize(GB):{} - Splits: {}",
+ regionConfig.getSnapshotConfig().getSnapshotName(),
+ regionConfig.getRegionInfo().getEncodedName(),
+ (double) regionConfig.getRegionSize() / BYTES_PER_GB,
+ numSplits);
+ if (numSplits > 1) {
+ RegionSplitter.UniformSplit uniformSplit = new RegionSplitter.UniformSplit();
+ byte[][] splits =
+ uniformSplit.split(
+ range.getStartKey().getBytes(),
+ range.getEndKey().getBytes(),
+ getSplits(regionConfig.getRegionSize()),
+ true);
+ for (int i = 0; i < splits.length - 1; i++)
+ outputReceiver.output(
+ ByteKeyRange.of(ByteKey.copyFrom(splits[i]), ByteKey.copyFrom(splits[i + 1])));
+ } else {
+ outputReceiver.output(range);
+ }
+ } catch (Exception ex) {
+ LOG.warn(
+ "Unable to split range for region:{} in Snapshot:{}",
+ regionConfig.getRegionInfo().getEncodedName(),
+ regionConfig.getSnapshotConfig().getSnapshotName());
+ outputReceiver.output(range);
+ }
+ }
+
+ private int getSplits(long sizeInBytes) {
+ return (int) Math.ceil((double) sizeInBytes / BYTES_PER_SPLIT);
+ }
+
+ @Override
+ public void populateDisplayData(DisplayData.Builder builder) {
+ builder.add(
+ DisplayData.item("DynamicSplitting", useDynamicSplitting ? "Enabled" : "Disabled")
+ .withLabel("Dynamic Splitting"));
+ }
+ }
+
+ /**
+ * A {@link DoFn} class for converting Hbase {@link org.apache.hadoop.hbase.client.Result} to list
+ * of Hbase {@link org.apache.hadoop.hbase.client.Mutation}s
+ */
+ static class CreateMutationsFn
+ extends DoFn, KV>> {
+ private static final Logger LOG = LoggerFactory.getLogger(CreateMutationsFn.class);
+ private static final int MAX_MUTATIONS_PER_REQUEST = 999_999;
+
+ @ProcessElement
+ public void processElement(
+ @Element KV element,
+ OutputReceiver>> outputReceiver)
+ throws IOException {
+
+ if (element.getValue().listCells().isEmpty()) return;
+
+ String targetTable = element.getKey().getTableName();
+
+ // Limit the number of mutations per Put (server will reject >= 100k mutations per Put)
+ byte[] rowKey = element.getValue().getRow();
+ List mutations = new ArrayList<>();
+
+ int cellCount = 0;
+
+ Put put = null;
+ for (Cell cell : element.getValue().listCells()) {
+ // Split the row into multiple mutations if mutations exceeds threshold limit
+ if (cellCount % MAX_MUTATIONS_PER_REQUEST == 0) {
+ cellCount = 0;
+ put = new Put(rowKey);
+ mutations.add(put);
+ }
+ put.add(cell);
+ cellCount++;
+ }
+
+ outputReceiver.output(KV.of(targetTable, mutations));
+ }
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/package-info.java b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/package-info.java
new file mode 100644
index 0000000000..384f5420c2
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/hbasesnapshots/transforms/package-info.java
@@ -0,0 +1,17 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+/** Package contains all the {@link org.apache.beam.sdk.transforms.PTransform} implementations. */
+package com.google.cloud.bigtable.beam.hbasesnapshots.transforms;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java
index 4edc5f1a7c..7f5a0019c5 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/EndToEndIT.java
@@ -19,6 +19,8 @@
import com.google.api.services.storage.model.StorageObject;
import com.google.bigtable.repackaged.com.google.gson.Gson;
import com.google.cloud.bigtable.beam.hbasesnapshots.ImportJobFromHbaseSnapshot.ImportOptions;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
+import com.google.cloud.bigtable.beam.hbasesnapshots.dofn.CleanupHBaseSnapshotRestoreFiles;
import com.google.cloud.bigtable.beam.sequencefiles.HBaseResultToMutationFn;
import com.google.cloud.bigtable.beam.test_env.EnvSetup;
import com.google.cloud.bigtable.beam.test_env.TestProperties;
@@ -310,8 +312,7 @@ public void testHBaseSnapshotImport() throws Exception {
// The restore directory is stored relative to the snapshot directory and contains the job name
String bucket = GcsPath.fromUri(hbaseSnapshotDir).getBucket();
String restorePathPrefix =
- CleanupHBaseSnapshotRestoreFilesFn.getListPrefix(
- HBaseSnapshotInputConfigBuilder.RESTORE_DIR);
+ CleanupHBaseSnapshotRestoreFiles.getListPrefix(HBaseSnapshotInputConfigBuilder.RESTORE_DIR);
List allObjects = new ArrayList<>();
String nextToken;
do {
@@ -423,8 +424,7 @@ public void testSnappyCompressedHBaseSnapshotImport() throws Exception {
// The restore directory is stored relative to the snapshot directory and contains the job name
String bucket = GcsPath.fromUri(hbaseSnapshotDir).getBucket();
String restorePathPrefix =
- CleanupHBaseSnapshotRestoreFilesFn.getListPrefix(
- HBaseSnapshotInputConfigBuilder.RESTORE_DIR);
+ CleanupHBaseSnapshotRestoreFiles.getListPrefix(HBaseSnapshotInputConfigBuilder.RESTORE_DIR);
List allObjects = new ArrayList<>();
String nextToken;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilderTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilderTest.java
index fb5346f72a..039bd5b3e2 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilderTest.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/HBaseSnapshotInputConfigBuilderTest.java
@@ -18,6 +18,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.mapreduce.TableSnapshotInputFormat;
import org.apache.hadoop.mapreduce.InputFormat;
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshotTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshotTest.java
new file mode 100644
index 0000000000..ea6a246adf
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/ImportJobFromHbaseSnapshotTest.java
@@ -0,0 +1,176 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.notNullValue;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import java.io.File;
+import java.util.Arrays;
+import java.util.List;
+import org.apache.beam.sdk.extensions.gcp.options.GcsOptions;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test cases for the {@link ImportJobFromHbaseSnapshot} class. */
+@RunWith(JUnit4.class)
+public class ImportJobFromHbaseSnapshotTest {
+ private static final Logger LOG = LoggerFactory.getLogger(ImportJobFromHbaseSnapshotTest.class);
+
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+ @Rule public final ExpectedException expectedException = ExpectedException.none();
+
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+ @Mock GcsOptions gcsOptions;
+ @Mock GcsUtil gcsUtilMock;
+ @Mock Objects gcsObjects;
+
+ @Test
+ public void testBuildImportConfigWithMissingSourcePathThrowsException() throws Exception {
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {
+ "--snapshots='bookmark-2099:bookmark,malwarescanstate-9087:malwarescan'"
+ });
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(ImportJobFromHbaseSnapshot.MISSING_SNAPSHOT_SOURCEPATH);
+ ImportJobFromHbaseSnapshot.buildImportConfigFromPipelineOptions(options, gcsOptions);
+ }
+
+ @Test
+ public void testBuildImportConfigWithMissingSnapshotsThrowsException() throws Exception {
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {"--hbaseSnapshotSourceDir=gs://bucket/data/"});
+
+ expectedException.expect(IllegalArgumentException.class);
+ expectedException.expectMessage(ImportJobFromHbaseSnapshot.MISSING_SNAPSHOT_NAMES);
+ ImportJobFromHbaseSnapshot.buildImportConfigFromPipelineOptions(options, gcsOptions);
+ }
+
+ @Test
+ public void testBuildImportConfigFromSnapshotsString() throws Exception {
+ String sourcePath = "gs://bucket/data/";
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {
+ "--hbaseSnapshotSourceDir=" + sourcePath,
+ "--snapshots='bookmark-2099:bookmark,malwarescanstate-9087:malwarescan'"
+ });
+
+ ImportConfig importConfig =
+ ImportJobFromHbaseSnapshot.buildImportConfigFromPipelineOptions(options, gcsOptions);
+ assertThat(importConfig.getSourcepath(), is(sourcePath));
+ assertThat(importConfig.getRestorepath(), notNullValue());
+ assertThat(importConfig.getSnapshots().size(), is(2));
+ }
+
+ private void setUpGcsObjectMocks(List fakeStorageObjects) throws Exception {
+ Mockito.when(gcsObjects.getItems()).thenReturn(fakeStorageObjects);
+ Mockito.when(gcsUtilMock.listObjects(Mockito.anyString(), Mockito.anyString(), Mockito.any()))
+ .thenReturn(gcsObjects);
+ }
+
+ @Test
+ public void testBuildImportConfigForAllSnapshots() throws Exception {
+ String baseObjectPath = "snapshots/20220309230526";
+ String importSnapshotpath = String.format("gs://sym-bucket/%s", baseObjectPath);
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {"--hbaseSnapshotSourceDir=" + importSnapshotpath, "--snapshots=*"});
+ Mockito.when(gcsOptions.getGcsUtil()).thenReturn(gcsUtilMock);
+
+ List snapshotList = Arrays.asList("audit-events", "dlpInfo", "ce-metrics-manifest");
+ List fakeStorageObjects =
+ SnapshotTestHelper.createFakeStorageObjects(baseObjectPath, snapshotList);
+ setUpGcsObjectMocks(fakeStorageObjects);
+
+ ImportConfig importConfig =
+ ImportJobFromHbaseSnapshot.buildImportConfigFromPipelineOptions(options, gcsOptions);
+ assertThat(importConfig.getSourcepath(), is(importSnapshotpath));
+ assertThat(importConfig.getRestorepath(), notNullValue());
+ assertThat(importConfig.getSnapshots().size(), is(snapshotList.size()));
+ }
+
+ @Test
+ public void testBuildImportConfigFromJsonFileWithMissingPathThrowsException() throws Exception {
+ String config =
+ "{\n"
+ + " \"snapshots\": {\n"
+ + " \"snap_demo1\": \"snap_demo1\",\n"
+ + " \"snap_demo2\": \"snap_demo2\"\n"
+ + " }\n"
+ + "}";
+ File file = tempFolder.newFile();
+ SnapshotTestHelper.writeToFile(file.getAbsolutePath(), config);
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {"--importConfigFilePath=" + file.getAbsolutePath()});
+
+ expectedException.expect(NullPointerException.class);
+ expectedException.expectMessage(ImportJobFromHbaseSnapshot.MISSING_SNAPSHOT_SOURCEPATH);
+
+ ImportConfig importConfig =
+ ImportJobFromHbaseSnapshot.buildImportConfigFromConfigFile(
+ options.getImportConfigFilePath());
+ }
+
+ @Test
+ public void testBuildImportConfigFromJsonFile() throws Exception {
+ String importSnapshotpath = "gs://sym-datastore/snapshots/data/snap_demo";
+ String restoreSnapshotpath = "gs://sym-datastore/snapshots/data/restore";
+ String config =
+ String.format(
+ "{\n"
+ + " \"sourcepath\": \"%s\",\n"
+ + " \"restorepath\": \"%s\",\n"
+ + " \"snapshots\": {\n"
+ + " \"snap_demo1\": \"demo1\",\n"
+ + " \"snap_demo2\": \"demo2\"\n"
+ + " }\n"
+ + "}",
+ importSnapshotpath, restoreSnapshotpath);
+
+ File file = tempFolder.newFile();
+ SnapshotTestHelper.writeToFile(file.getAbsolutePath(), config);
+ ImportJobFromHbaseSnapshot.ImportOptions options =
+ SnapshotTestHelper.getPipelineOptions(
+ new String[] {"--importConfigFilePath=" + file.getAbsolutePath()});
+ ImportConfig importConfig =
+ ImportJobFromHbaseSnapshot.buildImportConfigFromConfigFile(
+ options.getImportConfigFilePath());
+ assertThat(importConfig.getSourcepath(), is(importSnapshotpath));
+ assertThat(importConfig.getRestorepath().startsWith(restoreSnapshotpath), is(true));
+ assertThat(importConfig.getSnapshots().get(0).getbigtableTableName(), is("demo1"));
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotTestHelper.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotTestHelper.java
new file mode 100644
index 0000000000..b0359d946b
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotTestHelper.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots;
+
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.base.Joiner;
+import com.google.common.io.ByteStreams;
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.WritableByteChannel;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.ResourceId;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.util.MimeTypes;
+
+/** Contains various helper methods to handle different tasks while executing tests. */
+public class SnapshotTestHelper {
+ private SnapshotTestHelper() {}
+
+ /**
+ * Helper to generate files for testing.
+ *
+ * @param filePath The path to the file to write.
+ * @param fileContents The content to write.
+ * @throws IOException If an error occurs while creating or writing the file.
+ */
+ static void writeToFile(String filePath, String fileContents) throws IOException {
+
+ ResourceId resourceId = FileSystems.matchNewResource(filePath, false);
+
+ // Write the file contents to the channel and close.
+ try (ReadableByteChannel readChannel =
+ Channels.newChannel(new ByteArrayInputStream(fileContents.getBytes()))) {
+ try (WritableByteChannel writeChannel = FileSystems.create(resourceId, MimeTypes.TEXT)) {
+ ByteStreams.copy(readChannel, writeChannel);
+ }
+ }
+ }
+
+ /**
+ * @param restorePath - Path to which snapshots will be restored temporarily
+ * @return SnapshotConfig - Returns the snapshot config
+ */
+ public static SnapshotConfig newSnapshotConfig(String restorePath) {
+ return newSnapshotConfig("testsourcepath", restorePath);
+ }
+
+ public static SnapshotConfig newSnapshotConfig(String sourcePath, String restorePath) {
+ return SnapshotConfig.builder()
+ .setProjectId("testproject")
+ .setSourceLocation(sourcePath)
+ .setRestoreLocation(restorePath)
+ .setSnapshotName("testsnapshot")
+ .setTableName("testtable")
+ .setConfigurationDetails(new HashMap())
+ .build();
+ }
+
+ /**
+ * Helper method providing pipeline options.
+ *
+ * @param args list of pipeline arguments.
+ */
+ static ImportJobFromHbaseSnapshot.ImportOptions getPipelineOptions(String[] args) {
+ return PipelineOptionsFactory.fromArgs(args).as(ImportJobFromHbaseSnapshot.ImportOptions.class);
+ }
+
+ /**
+ * Creates Fake Storage Objects
+ *
+ * @param basePath File System base path
+ * @param objectNames List of object names
+ * @return List of matching Storage objects
+ */
+ static List createFakeStorageObjects(String basePath, List objectNames) {
+ if (objectNames == null) return null;
+
+ List storageObjects = new ArrayList<>();
+ objectNames.forEach(
+ name -> {
+ StorageObject object = new StorageObject();
+ object.setId(Joiner.on("/").join(basePath, ".hbase-snapshot", name, ".snapshotinfo"));
+ storageObjects.add(object);
+ });
+
+ return storageObjects;
+ }
+
+ static Map buildMapFromList(String[] values) {
+ if (values.length % 2 != 0)
+ throw new IllegalArgumentException(
+ "Input should contain even number of values to represent both"
+ + " key and value for the map.");
+ Map data = new HashMap<>();
+ for (int i = 0; i < values.length; i += 2) data.put(values[i], values[i + 1]);
+ return data;
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java
new file mode 100644
index 0000000000..ce7ea4d2de
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/SnapshotUtilsTest.java
@@ -0,0 +1,231 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.*;
+
+import com.google.api.services.storage.model.Objects;
+import com.google.api.services.storage.model.StorageObject;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.ImportConfig;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import com.google.common.truth.Truth;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.extensions.gcp.util.GcsUtil;
+import org.apache.hadoop.conf.Configuration;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Test cases for the {@link SnapshotUtils} class. */
+@RunWith(JUnit4.class)
+public class SnapshotUtilsTest {
+ private static final Logger LOG = LoggerFactory.getLogger(SnapshotUtilsTest.class);
+
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+ // Preferred way to instantiate mocks in JUnit4 is via the JUnit rule MockitoJUnit
+ @Rule public final MockitoRule mockito = MockitoJUnit.rule();
+ @Mock GcsUtil gcsUtilMock;
+ @Mock Objects gcsObjects;
+
+ @Test
+ public void testRemoveSuffixSlashIfExists() {
+ String path = "gs://bucket/prefix";
+
+ assertThat(SnapshotUtils.removeSuffixSlashIfExists(path), is(path));
+ assertThat(SnapshotUtils.removeSuffixSlashIfExists(path + "/"), is(path));
+ }
+
+ @Test
+ public void testAppendCurrentTimestamp() {
+ String path = "gs://bucket/prefix";
+ DateTimeFormatter formatter =
+ DateTimeFormatter.ofPattern("yyyyMMddHHmm").withZone(ZoneId.of("UTC"));
+ long currentTime = Long.parseLong(formatter.format(Instant.now()));
+ long returnTime =
+ Long.parseLong(SnapshotUtils.appendCurrentTimestamp(path).replace(path + "/", ""));
+ assertThat((returnTime - currentTime), lessThan(2L));
+ }
+
+ @Test
+ public void testgetNamedDirectory() {
+ String path = "gs://bucket/subdir1";
+ String subFolder = "subdir2";
+ String expectedPath = "gs://bucket/subdir2";
+ String retValue = SnapshotUtils.getNamedDirectory(path, subFolder);
+ assertThat(retValue.startsWith(expectedPath), is(true));
+ }
+
+ @Test
+ public void testGetConfigurationWithDataflowRunner() {
+ String projectId = "testproject";
+ Map configurations =
+ SnapshotUtils.getConfiguration("DataflowRunner", projectId, "/path/to/sourcedir", null);
+ assertThat(configurations.get("fs.gs.project.id"), is(projectId));
+ assertThat(configurations.get("s.gs.auth.type"), nullValue());
+ }
+
+ @Test
+ public void testGetConfigurationWithDirectRunner() {
+ Map hbaseConfiguration =
+ SnapshotTestHelper.buildMapFromList(
+ new String[] {"fs.AbstractFileSystem.gs.impl", "org.apache.hadoop.fs.hdfs"});
+ Map configurations =
+ SnapshotUtils.getConfiguration(
+ "DirectRunner", "testproject", "/path/to/sourcedir", hbaseConfiguration);
+ assertThat(
+ configurations.get("fs.AbstractFileSystem.gs.impl"),
+ is(hbaseConfiguration.get("fs.AbstractFileSystem.gs.impl")));
+ assertThat(configurations.get("fs.gs.auth.type"), is("APPLICATION_DEFAULT"));
+ }
+
+ @Test
+ public void testGetHbaseConfiguration() {
+ Map configurations =
+ SnapshotTestHelper.buildMapFromList(
+ new String[] {"throttling.enable", "true", "throttling.threshold.ms", "200"});
+ Configuration hbaseConfiguration = SnapshotUtils.getHBaseConfiguration(configurations);
+ Truth.assertThat(hbaseConfiguration.get("throttling.enable")).isEqualTo("true");
+ Truth.assertThat(hbaseConfiguration.get("throttling.threshold.ms")).isEqualTo("200");
+ }
+
+ @Test
+ public void testBuildSnapshotConfigs() {
+ String projectId = "testproject";
+ String sourcePath = "/path/to/sourcedir";
+ String restorePath = "/path/to/restoredir";
+ List snapshotInfoList =
+ Arrays.asList(
+ new ImportConfig.SnapshotInfo("snapdemo", "btdemo"),
+ new ImportConfig.SnapshotInfo("bookcontent-9087", "bookcontent"));
+
+ Map conbfiguration =
+ SnapshotTestHelper.buildMapFromList(
+ new String[] {"bigtable.row.size", "100", "bigtable.auth.type", "private"});
+
+ List snapshotConfigs =
+ SnapshotUtils.buildSnapshotConfigs(
+ snapshotInfoList, new HashMap<>(), projectId, sourcePath, restorePath);
+
+ assertThat(snapshotConfigs.size(), is(2));
+ assertThat(snapshotConfigs.get(0).getProjectId(), is(projectId));
+ assertThat(snapshotConfigs.get(0).getSnapshotName(), is("snapdemo"));
+ assertThat(snapshotConfigs.get(1).getSourceLocation(), is(sourcePath));
+ assertThat(snapshotConfigs.get(1).getTableName(), is("bookcontent"));
+ }
+
+ @Test
+ public void testGetSnapshotsFromStringReturnsSameTableName() {
+ String snapshotsWithBigtableTableName = "bookmark-2099";
+ Map snapshots =
+ SnapshotUtils.getSnapshotsFromString(snapshotsWithBigtableTableName);
+ assertThat(snapshots.size(), is(equalTo(1)));
+ assertThat(snapshots.get("bookmark-2099"), is("bookmark-2099"));
+ }
+
+ @Test
+ public void testGetSnapshotsFromStringReturnsMultipleTables() {
+ String snapshotsWithBigtableTableName = "snapshot1,snapshot2,snapshot3:mytable3,snapshot4";
+ Map snapshots =
+ SnapshotUtils.getSnapshotsFromString(snapshotsWithBigtableTableName);
+ assertThat(snapshots.size(), is(equalTo(4)));
+ assertThat(snapshots.get("snapshot1"), is("snapshot1"));
+ assertThat(snapshots.get("snapshot2"), is("snapshot2"));
+ assertThat(snapshots.get("snapshot3"), is("mytable3"));
+ assertThat(snapshots.get("snapshot4"), is("snapshot4"));
+ }
+
+ @Test
+ public void testGetSnapshotsFromStringReturnsParsedValues() {
+ String snapshotsWithBigtableTableName =
+ "bookmark-2099:bookmark,malwarescanstate-9087:malwarescan";
+ Map snapshots =
+ SnapshotUtils.getSnapshotsFromString(snapshotsWithBigtableTableName);
+ assertThat(snapshots.size(), is(equalTo(2)));
+ assertThat(snapshots.get("malwarescanstate-9087"), is("malwarescan"));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testGetSnapshotsFromStringThrowsException() {
+ String snapshotsWithBigtableTableName =
+ "bookmark-2099:bookmark,malwarescanstate-9087:malwarescan:snapdemo1";
+ Map snapshots =
+ SnapshotUtils.getSnapshotsFromString(snapshotsWithBigtableTableName);
+ }
+
+ private void setUpGcsObjectMocks(List fakeStorageObjects) throws IOException {
+ Mockito.when(gcsObjects.getItems()).thenReturn(fakeStorageObjects);
+ Mockito.when(gcsUtilMock.listObjects(Mockito.anyString(), Mockito.anyString(), Mockito.any()))
+ .thenReturn(gcsObjects);
+ }
+
+ private Map getMatchingSnapshotsFromSnapshotPath(
+ List snapshotList, String prefix) throws IOException {
+ String baseObjectPath = "snapshots/20220309230526";
+ String importSnapshotpath = String.format("gs://sym-bucket/%s", baseObjectPath);
+ List fakeStorageObjects =
+ SnapshotTestHelper.createFakeStorageObjects(baseObjectPath, snapshotList);
+ setUpGcsObjectMocks(fakeStorageObjects);
+ return SnapshotUtils.getSnapshotsFromSnapshotPath(importSnapshotpath, gcsUtilMock, prefix);
+ }
+
+ @Test
+ public void testgetAllSnapshotsFromSnapshotPath() throws IOException {
+ List snapshotList = Arrays.asList("audit-events", "dlpInfo", "ce-metrics-manifest");
+ Map snapshots = getMatchingSnapshotsFromSnapshotPath(snapshotList, "*");
+ assertThat(snapshots.size(), is(equalTo(3)));
+ assertThat(snapshots.keySet(), containsInAnyOrder(snapshotList.toArray(new String[0])));
+ }
+
+ @Test
+ public void testgetSubSetSnapshotsFromSnapshotPath() throws IOException {
+ List snapshotList =
+ Arrays.asList(
+ "audit-events",
+ "symphony-attachments",
+ "ce-metrics-manifest",
+ "symphony-attachments-streams");
+ Map snapshots =
+ getMatchingSnapshotsFromSnapshotPath(snapshotList, ".*attachments.*");
+ List expectedResult =
+ snapshotList.stream().filter(e -> e.contains("attachments")).collect(Collectors.toList());
+ // LOG.info("Matched:{} and expected:{}", snapshots.size(), expectedResult.size());
+ assertThat(snapshots.size(), is(equalTo(expectedResult.size())));
+ assertThat(snapshots.keySet(), containsInAnyOrder(expectedResult.toArray(new String[0])));
+ }
+
+ @Test(expected = IllegalStateException.class)
+ public void testgetSubSetSnapshotsFromSnapshotPathThrowsException() throws IOException {
+ Map snapshots = getMatchingSnapshotsFromSnapshotPath(null, "*");
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanUpRestoredSnapshotsTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanUpRestoredSnapshotsTest.java
new file mode 100644
index 0000000000..9cda975481
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanUpRestoredSnapshotsTest.java
@@ -0,0 +1,70 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.cloud.bigtable.beam.hbasesnapshots.SnapshotTestHelper;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import java.io.File;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.junit.Assert;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/** Tests the {@link CleanupRestoredSnapshots} functionality. */
+@RunWith(JUnit4.class)
+public class CleanUpRestoredSnapshotsTest {
+ private static final Logger LOG = LoggerFactory.getLogger(CleanUpRestoredSnapshotsTest.class);
+
+ @Rule public final transient TestPipeline pipeline = TestPipeline.create();
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testDeleteRestoredSnapshot() throws Exception {
+ File restoreDir = tempFolder.newFolder();
+ if (restoreDir.exists()) {
+ LOG.info("Created temp folder: {}", restoreDir.getAbsolutePath());
+ SnapshotConfig snapshotConfig =
+ SnapshotTestHelper.newSnapshotConfig(restoreDir.getAbsolutePath());
+ new CleanupRestoredSnapshots().cleanupSnapshot(snapshotConfig);
+ Assert.assertFalse(restoreDir.exists());
+ } else {
+ LOG.warn(
+ "Skipping CleanUpRestoredSnapshotsTest since temporary file was unable to be created in restore path: {}",
+ restoreDir.getAbsolutePath());
+ }
+ }
+
+ /**
+ * Tests CleanupRestoredSnapshots with invalid path to verify exception is handled internally
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testDeleteRestoredSnapshotWithInvalidPath() throws Exception {
+ pipeline
+ .apply("CreateInput", Create.of(SnapshotTestHelper.newSnapshotConfig("invalid_path")))
+ .apply("DeleteSnapshot", ParDo.of(new CleanupRestoredSnapshots()));
+ pipeline.run();
+ }
+}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFnTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesTest.java
similarity index 72%
rename from bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFnTest.java
rename to bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesTest.java
index 0183f856f1..bfe46f5191 100644
--- a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/CleanupHBaseSnapshotRestoreFilesFnTest.java
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/CleanupHBaseSnapshotRestoreFilesTest.java
@@ -13,15 +13,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package com.google.cloud.bigtable.beam.hbasesnapshots;
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThrows;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.HBaseSnapshotInputConfigBuilder;
import java.util.UUID;
import org.junit.Test;
-public class CleanupHBaseSnapshotRestoreFilesFnTest {
+public class CleanupHBaseSnapshotRestoreFilesTest {
private static final String TEST_BUCKET_NAME = "test-bucket";
private static final String TEST_SNAPSHOT_PATH = "gs://" + TEST_BUCKET_NAME + "/hbase-export";
private static final String TEST_RESTORE_PATH =
@@ -32,24 +33,24 @@ public class CleanupHBaseSnapshotRestoreFilesFnTest {
public void testGetWorkingBucketName() {
assertEquals(
TEST_BUCKET_NAME,
- CleanupHBaseSnapshotRestoreFilesFn.getWorkingBucketName(TEST_SNAPSHOT_PATH));
+ CleanupHBaseSnapshotRestoreFiles.getWorkingBucketName(TEST_SNAPSHOT_PATH));
assertThrows(
IllegalArgumentException.class,
() -> {
- CleanupHBaseSnapshotRestoreFilesFn.getWorkingBucketName(TEST_BUCKET_NAME);
+ CleanupHBaseSnapshotRestoreFiles.getWorkingBucketName(TEST_BUCKET_NAME);
});
}
@Test
public void testGetListPrefix() {
assertEquals(
- TEST_RESTORE_PREFIX, CleanupHBaseSnapshotRestoreFilesFn.getListPrefix(TEST_RESTORE_PATH));
+ TEST_RESTORE_PREFIX, CleanupHBaseSnapshotRestoreFiles.getListPrefix(TEST_RESTORE_PATH));
assertThrows(
IllegalArgumentException.class,
() -> {
- CleanupHBaseSnapshotRestoreFilesFn.getWorkingBucketName(TEST_RESTORE_PREFIX);
+ CleanupHBaseSnapshotRestoreFiles.getWorkingBucketName(TEST_RESTORE_PREFIX);
});
}
}
diff --git a/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshotTest.java b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshotTest.java
new file mode 100644
index 0000000000..34e2c77552
--- /dev/null
+++ b/bigtable-dataflow-parent/bigtable-beam-import/src/test/java/com/google/cloud/bigtable/beam/hbasesnapshots/dofn/RestoreSnapshotTest.java
@@ -0,0 +1,58 @@
+/*
+ * Copyright 2024 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.bigtable.beam.hbasesnapshots.dofn;
+
+import com.google.cloud.bigtable.beam.hbasesnapshots.SnapshotTestHelper;
+import com.google.cloud.bigtable.beam.hbasesnapshots.conf.SnapshotConfig;
+import org.apache.hadoop.hbase.snapshot.RestoreSnapshotHelper;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.MockedStatic;
+import org.mockito.Mockito;
+
+/** Tests the {@link RestoreSnapshot} functionality. */
+@RunWith(JUnit4.class)
+public class RestoreSnapshotTest {
+
+ @ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();
+
+ @Test
+ public void testRestoreSnapshot() throws Exception {
+
+ SnapshotConfig snapshotConfig =
+ SnapshotTestHelper.newSnapshotConfig(
+ tempFolder.newFolder().getAbsolutePath(), tempFolder.newFolder().getAbsolutePath());
+
+ try (MockedStatic restoreSnapshotHelper =
+ Mockito.mockStatic(RestoreSnapshotHelper.class)) {
+ restoreSnapshotHelper
+ .when(
+ () ->
+ RestoreSnapshotHelper.copySnapshotForScanner(
+ snapshotConfig.getConfiguration(),
+ null,
+ snapshotConfig.getSourcePath(),
+ snapshotConfig.getRestorePath(),
+ snapshotConfig.getSnapshotName()))
+ .thenReturn(null);
+
+ new RestoreSnapshot().restoreSnapshot(snapshotConfig);
+ }
+ }
+}