From bf6cfb1846dc75bd7b854f60e4fd7dd616ee4270 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Wed, 19 Jul 2023 14:20:12 -0700 Subject: [PATCH 1/9] add debug log --- .../com/netflix/priam/backup/AbstractFileSystem.java | 11 ++++++++++- .../netflix/priam/backup/BackupFileSystemContext.java | 6 ++++++ .../com/netflix/priam/backup/BackupHelperImpl.java | 8 +++++++- 3 files changed, 23 insertions(+), 2 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index ba7f94865..96c6d651c 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -71,7 +71,6 @@ public abstract class AbstractFileSystem implements IBackupFileSystem { // file system. This is to ensure that we don't make too many API calls to remote file system. private final Cache objectCache; - @Inject public AbstractFileSystem( IConfiguration configuration, BackupMetrics backupMetrics, @@ -89,6 +88,16 @@ public AbstractFileSystem( files for "sync" feature which might compete with backups for scheduling. Also, we may want to have different TIMEOUT for each kind of operation (upload/download) based on our file system choices. */ + + logger.info("Initializing AbstractFileSystem ..."); + // Get the current thread's stack trace + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + + // Loop through each stack trace element + for (StackTraceElement element : stackTraceElements) { + logger.info(element.toString()); + } + BlockingQueue uploadQueue = new ArrayBlockingQueue<>(configuration.getBackupQueueSize()); PolledMeter.using(backupMetrics.getRegistry()) diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java index a39ffc0cf..491a0ff94 100755 --- a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java @@ -30,6 +30,12 @@ public BackupFileSystemContext( this.encryptedFs = encryptedFs; } + @Inject + public BackupFileSystemContext( + @Named("backup") IBackupFileSystem fs) { + this.fs = fs; + } + public IBackupFileSystem getFileStrategy(IConfiguration config) { if (!config.isEncryptBackupEnabled()) { diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java index 9f13891f7..c0e2aa875 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java @@ -8,10 +8,14 @@ import com.netflix.priam.compress.CompressionType; import com.netflix.priam.config.BackupsToCompress; import com.netflix.priam.config.IConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.Set; import java.util.stream.Stream; @@ -19,7 +23,7 @@ import javax.inject.Provider; public class BackupHelperImpl implements BackupHelper { - + private static final Logger logger = LoggerFactory.getLogger(BackupHelperImpl.class); private static final String COMPRESSION_SUFFIX = "-CompressionInfo.db"; private static final String DATA_SUFFIX = "-Data.db"; private final Provider pathFactory; @@ -56,6 +60,8 @@ public ImmutableList> uploadAndDeleteAllFil final ImmutableList.Builder> futures = ImmutableList.builder(); for (AbstractBackupPath bp : getBackupPaths(parent, type)) { + logger.info(String.format("AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); + futures.add(fs.uploadAndDelete(bp, target, async)); } return futures.build(); From c9ebe29664f857338d117fe7969de37152434239 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Wed, 19 Jul 2023 14:30:31 -0700 Subject: [PATCH 2/9] more debug traces --- .../java/com/netflix/priam/backup/AbstractFileSystem.java | 1 + .../main/java/com/netflix/priam/backup/BackupHelperImpl.java | 5 ++++- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index 96c6d651c..0dab7d10f 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -167,6 +167,7 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s public ListenableFuture uploadAndDelete( final AbstractBackupPath path, Instant target, boolean async) throws RejectedExecutionException, BackupRestoreException { + logger.info(String.format("uploadAndDelete path: %s", path)); if (async) { return fileUploadExecutor.submit( () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java index c0e2aa875..68e93e4d5 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java @@ -60,9 +60,12 @@ public ImmutableList> uploadAndDeleteAllFil final ImmutableList.Builder> futures = ImmutableList.builder(); for (AbstractBackupPath bp : getBackupPaths(parent, type)) { - logger.info(String.format("AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); + logger.info(String.format("Before AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); futures.add(fs.uploadAndDelete(bp, target, async)); + + logger.info(String.format("After AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); + } return futures.build(); } From d10c726558b7f51c00b7b78c6989c2f79b680c35 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Thu, 20 Jul 2023 13:54:02 -0700 Subject: [PATCH 3/9] more changes --- .../com/netflix/priam/backup/AbstractFileSystem.java | 5 ++++- .../com/netflix/priam/backupv2/SnapshotMetaTask.java | 10 +++++++--- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index 0dab7d10f..a90c00c7c 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -167,7 +167,7 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s public ListenableFuture uploadAndDelete( final AbstractBackupPath path, Instant target, boolean async) throws RejectedExecutionException, BackupRestoreException { - logger.info(String.format("uploadAndDelete path: %s", path)); + logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { return fileUploadExecutor.submit( () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); @@ -182,6 +182,9 @@ public AbstractBackupPath uploadAndDeleteInternal( throws RejectedExecutionException, BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); File localFile = localPath.toFile(); + + logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); + Preconditions.checkArgument( localFile.exists(), String.format("Can't upload nonexistent %s", localPath)); Preconditions.checkArgument( diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index a886b5a1a..3a272fcde 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -60,7 +60,8 @@ * This service will run on CRON as specified by {@link * IBackupRestoreConfig#getSnapshotMetaServiceCronExpression()} The intent of this service is to run * a full snapshot on Cassandra, get the list of the SSTables on disk and then create a - * manifest.json file which will encapsulate the list of the files i.e. capture filesystem at a + * manifest.json file which will e + * ncapsulate the list of the files i.e. capture filesystem at a * moment in time. This manifest.json file will ensure the true filesystem status is exposed (for * external entities) and will be used in future for Priam Backup Version 2 where a file is not * uploaded to backup file system unless SSTable has been modified. This will lead to huge reduction @@ -287,15 +288,18 @@ private void uploadAllFiles(final File backupDir) throws Exception { // We do not want to wait for completion and we just want to add them to queue. This // is to ensure that next run happens on time. AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2; + + logger.info("enableAsyncSnapshot: {}", config.enableAsyncSnapshot()); + backupHelper - .uploadAndDeleteAllFiles(snapshotDirectory, type, target, true) + .uploadAndDeleteAllFiles(snapshotDirectory, type, target, config.enableAsyncSnapshot()) .forEach(future -> addCallback(future, snapshotDirectory)); // Next, upload secondary indexes type = AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2; ImmutableList> futures; for (File subDir : getSecondaryIndexDirectories(snapshotDirectory)) { - futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, true); + futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, config.enableAsyncSnapshot()); if (futures.isEmpty()) { deleteIfEmpty(subDir); } From db7c5254b9a2e738b8889a72ef3bafc5f6a142b6 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Mon, 24 Jul 2023 11:50:28 -0700 Subject: [PATCH 4/9] more changes --- .../main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index 3a272fcde..be5f49ca9 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -289,8 +289,6 @@ private void uploadAllFiles(final File backupDir) throws Exception { // is to ensure that next run happens on time. AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2; - logger.info("enableAsyncSnapshot: {}", config.enableAsyncSnapshot()); - backupHelper .uploadAndDeleteAllFiles(snapshotDirectory, type, target, config.enableAsyncSnapshot()) .forEach(future -> addCallback(future, snapshotDirectory)); From b01b9f503c620748116ef42156f7a1419f85bed4 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Tue, 25 Jul 2023 16:46:02 -0700 Subject: [PATCH 5/9] more changes --- .../priam/backup/AbstractFileSystem.java | 28 +++++++++++++++---- .../priam/backup/BackupFileSystemContext.java | 1 - .../priam/defaultimpl/PriamGuiceModule.java | 6 ++++ 3 files changed, 28 insertions(+), 7 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index a90c00c7c..e9c391518 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -103,12 +103,17 @@ Also, we may want to have different TIMEOUT for each kind of operation (upload/d PolledMeter.using(backupMetrics.getRegistry()) .withName(backupMetrics.uploadQueueSize) .monitorSize(uploadQueue); + + BlockingSubmitThreadPoolExecutor bsExecutor = new BlockingSubmitThreadPoolExecutor( + configuration.getBackupThreads(), + uploadQueue, + configuration.getUploadTimeout()); + logger.info("bsExecutor: {}", bsExecutor); + this.fileUploadExecutor = - MoreExecutors.listeningDecorator( - new BlockingSubmitThreadPoolExecutor( - configuration.getBackupThreads(), - uploadQueue, - configuration.getUploadTimeout())); + MoreExecutors.listeningDecorator(bsExecutor); + + logger.info("fileUploadExecutor: {}", fileUploadExecutor); BlockingQueue downloadQueue = new ArrayBlockingQueue<>(configuration.getDownloadQueueSize()); @@ -169,8 +174,10 @@ public ListenableFuture uploadAndDelete( throws RejectedExecutionException, BackupRestoreException { logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { - return fileUploadExecutor.submit( + ListenableFuture res = fileUploadExecutor.submit( () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); + logger.info("Return for {}: {}", Paths.get(path.getBackupFile().getAbsolutePath()), res); + return res; } else { return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */)); } @@ -184,6 +191,15 @@ public AbstractBackupPath uploadAndDeleteInternal( File localFile = localPath.toFile(); logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); + StringBuilder sb = new StringBuilder(); + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + for (StackTraceElement element : stackTraceElements) { + sb.append(element.toString()); + sb.append("\n"); + } + // remove the last line + sb.setLength(sb.length() - 1); + logger.info(sb.toString()); Preconditions.checkArgument( localFile.exists(), String.format("Can't upload nonexistent %s", localPath)); diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java index 491a0ff94..62ed733cf 100755 --- a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java @@ -30,7 +30,6 @@ public BackupFileSystemContext( this.encryptedFs = encryptedFs; } - @Inject public BackupFileSystemContext( @Named("backup") IBackupFileSystem fs) { this.fs = fs; diff --git a/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java b/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java index 0f68962ac..003b9157b 100644 --- a/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java +++ b/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java @@ -39,13 +39,19 @@ import com.netflix.spectator.api.Registry; import org.quartz.SchedulerFactory; import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PriamGuiceModule extends AbstractModule { + protected static final Logger logger = LoggerFactory.getLogger(PriamGuiceModule.class); + @Override protected void configure() { bind(SchedulerFactory.class).to(StdSchedulerFactory.class).asEagerSingleton(); bind(IBackupFileSystem.class).annotatedWith(Names.named("backup")).to(S3FileSystem.class); + logger.info("[chengw]: {}", Thread.currentThread().getStackTrace()); + bind(IBackupFileSystem.class) .annotatedWith(Names.named("encryptedbackup")) .to(S3EncryptedFileSystem.class); From 7c994ab5cf46a804fb434964f236bb71bdd2625d Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Tue, 25 Jul 2023 18:24:39 -0700 Subject: [PATCH 6/9] more changes --- build.gradle | 2 +- .../priam/backup/AbstractFileSystem.java | 37 +++++++------------ .../priam/backup/BackupHelperImpl.java | 8 ++++ .../priam/backup/IBackupFileSystem.java | 19 +++++----- .../priam/backupv2/SnapshotMetaTask.java | 4 +- 5 files changed, 36 insertions(+), 34 deletions(-) diff --git a/build.gradle b/build.gradle index 5ae51364f..f9efc4ae4 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { compile 'com.sun.jersey:jersey-json:1.19.4' compile 'com.sun.jersey:jersey-bundle:1.19.4' compile 'com.sun.jersey.contribs:jersey-guice:1.19.4' - compile 'com.google.guava:guava:21.0' + //compile 'com.google.guava:guava:21.0' compile 'com.google.code.findbugs:jsr305:3.0.2' // AWS Services diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index e9c391518..cbb2835f1 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.*; -import javax.inject.Inject; import javax.inject.Provider; import org.apache.commons.collections4.iterators.FilterIterator; import org.apache.commons.collections4.iterators.TransformIterator; @@ -63,6 +62,7 @@ public abstract class AbstractFileSystem implements IBackupFileSystem { private final IConfiguration configuration; protected final BackupMetrics backupMetrics; private final Set tasksQueued; + private final ListeningExecutorService fileUploadExecutor; private final ThreadPoolExecutor fileDownloadExecutor; private final BackupNotificationMgr backupNotificationMgr; @@ -104,27 +104,28 @@ Also, we may want to have different TIMEOUT for each kind of operation (upload/d .withName(backupMetrics.uploadQueueSize) .monitorSize(uploadQueue); - BlockingSubmitThreadPoolExecutor bsExecutor = new BlockingSubmitThreadPoolExecutor( + BlockingSubmitThreadPoolExecutor uploadExecutor = new BlockingSubmitThreadPoolExecutor( configuration.getBackupThreads(), uploadQueue, configuration.getUploadTimeout()); - logger.info("bsExecutor: {}", bsExecutor); + logger.info("uploadExecutor: {}", uploadExecutor); - this.fileUploadExecutor = - MoreExecutors.listeningDecorator(bsExecutor); + this.fileUploadExecutor = MoreExecutors.listeningDecorator(uploadExecutor); - logger.info("fileUploadExecutor: {}", fileUploadExecutor); + logger.info("fileUploadExecutor: {}", this.fileUploadExecutor); BlockingQueue downloadQueue = new ArrayBlockingQueue<>(configuration.getDownloadQueueSize()); PolledMeter.using(backupMetrics.getRegistry()) .withName(backupMetrics.downloadQueueSize) .monitorSize(downloadQueue); - this.fileDownloadExecutor = - new BlockingSubmitThreadPoolExecutor( - configuration.getRestoreThreads(), - downloadQueue, - configuration.getDownloadTimeout()); + BlockingSubmitThreadPoolExecutor downloadExecutor = new BlockingSubmitThreadPoolExecutor( + configuration.getRestoreThreads(), + downloadQueue, + configuration.getDownloadTimeout()); + logger.info("downloadExecutor: {}", downloadExecutor); + + this.fileDownloadExecutor = downloadExecutor; } @Override @@ -174,10 +175,9 @@ public ListenableFuture uploadAndDelete( throws RejectedExecutionException, BackupRestoreException { logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { - ListenableFuture res = fileUploadExecutor.submit( + return fileUploadExecutor.submit( () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); - logger.info("Return for {}: {}", Paths.get(path.getBackupFile().getAbsolutePath()), res); - return res; + } else { return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */)); } @@ -191,15 +191,6 @@ public AbstractBackupPath uploadAndDeleteInternal( File localFile = localPath.toFile(); logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); - StringBuilder sb = new StringBuilder(); - StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); - for (StackTraceElement element : stackTraceElements) { - sb.append(element.toString()); - sb.append("\n"); - } - // remove the last line - sb.setLength(sb.length() - 1); - logger.info(sb.toString()); Preconditions.checkArgument( localFile.exists(), String.format("Can't upload nonexistent %s", localPath)); diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java index 68e93e4d5..1246a339e 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java @@ -4,6 +4,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.netflix.priam.compress.CompressionType; import com.netflix.priam.config.BackupsToCompress; @@ -17,6 +18,7 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; +import java.util.List; import java.util.Set; import java.util.stream.Stream; import javax.inject.Inject; @@ -59,11 +61,17 @@ public ImmutableList> uploadAndDeleteAllFil throws Exception { final ImmutableList.Builder> futures = ImmutableList.builder(); + for (AbstractBackupPath bp : getBackupPaths(parent, type)) { logger.info(String.format("Before AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); futures.add(fs.uploadAndDelete(bp, target, async)); + ImmutableList> futuresList = futures.build(); + ListenableFuture> allFutures = Futures.allAsList(futuresList); + + List allResults = allFutures.get(); + logger.info(String.format("After AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); } diff --git a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java index e33ff3bfc..4e89394d2 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java @@ -17,6 +17,7 @@ package com.netflix.priam.backup; import com.google.common.util.concurrent.ListenableFuture; + import java.io.FileNotFoundException; import java.nio.file.Path; import java.time.Instant; @@ -71,18 +72,18 @@ default void uploadAndDelete(AbstractBackupPath path, boolean async) * guaranteed as we try to avoid lock on read/write of the files-in-progress. Once uploaded, * files are deleted. Uploads are retried 10 times. * - * @param path AbstractBackupPath to be used to send backup notifications only. + * @param path AbstractBackupPath to be used to send backup notifications only. * @param target The target time of completion of all files in the upload. - * @param async boolean to determine whether the call should block or return immediately and - * upload asynchronously + * @param async boolean to determine whether the call should block or return immediately and + * upload asynchronously * @return The future of the async job to monitor the progress of the job. This will be null if - * file was de-duped for upload. - * @throws BackupRestoreException in case of failure to upload for any reason including file not - * readable or remote file system errors. - * @throws FileNotFoundException If a file as denoted by localPath is not available or is a - * directory. + * file was de-duped for upload. + * @throws BackupRestoreException in case of failure to upload for any reason including file not + * readable or remote file system errors. + * @throws FileNotFoundException If a file as denoted by localPath is not available or is a + * directory. * @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying - * to add the work to the queue. + * to add the work to the queue. */ ListenableFuture uploadAndDelete( final AbstractBackupPath path, Instant target, boolean async) diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index be5f49ca9..06bfbbe87 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -438,7 +438,9 @@ private static void addCallback(ListenableFuture future, Fil FutureCallback callback = new FutureCallback() { @Override - public void onSuccess(AbstractBackupPath result) {} + public void onSuccess(AbstractBackupPath result) { + logger.info("Successfully uploaded contents of snapshotDir {}", snapshotDir); + } @Override public void onFailure(Throwable t) { From 88432abe579cf6de67e9ebd51c641e87b5eaae71 Mon Sep 17 00:00:00 2001 From: Ammar Khaku Date: Tue, 25 Jul 2023 19:56:37 -0700 Subject: [PATCH 7/9] Some more debug logging --- .../com/netflix/priam/backup/AbstractFileSystem.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index cbb2835f1..a3e6f5a57 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -176,7 +176,15 @@ public ListenableFuture uploadAndDelete( logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { return fileUploadExecutor.submit( - () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); + () -> { + logger.info("uploadAndDeleteCallable path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); + try { + return uploadAndDeleteInternal(path, target, 10 /* retries */); + } catch (BackupRestoreException e) { + logger.info("uploadAndDeleteCallable exception path: {}", Paths.get(path.getBackupFile().getAbsolutePath()), e); + throw e; + } + }); } else { return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */)); From 7949351f0ec19267d2493669c4716eff7c7e28de Mon Sep 17 00:00:00 2001 From: Ammar Khaku Date: Tue, 25 Jul 2023 23:25:08 -0700 Subject: [PATCH 8/9] Back out early resolution of futures --- .../com/netflix/priam/backup/AbstractFileSystem.java | 2 +- .../com/netflix/priam/backup/BackupHelperImpl.java | 10 ---------- 2 files changed, 1 insertion(+), 11 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index a3e6f5a57..bbfd7873f 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -177,7 +177,7 @@ public ListenableFuture uploadAndDelete( if (async) { return fileUploadExecutor.submit( () -> { - logger.info("uploadAndDeleteCallable path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); + logger.info("uploadAndDeleteCallableV2 path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); try { return uploadAndDeleteInternal(path, target, 10 /* retries */); } catch (BackupRestoreException e) { diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java index 1246a339e..b8ecc795a 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java @@ -4,7 +4,6 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableSet; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import com.netflix.priam.compress.CompressionType; import com.netflix.priam.config.BackupsToCompress; @@ -18,7 +17,6 @@ import java.nio.file.Path; import java.nio.file.Paths; import java.time.Instant; -import java.util.List; import java.util.Set; import java.util.stream.Stream; import javax.inject.Inject; @@ -63,15 +61,7 @@ public ImmutableList> uploadAndDeleteAllFil ImmutableList.builder(); for (AbstractBackupPath bp : getBackupPaths(parent, type)) { - logger.info(String.format("Before AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); - futures.add(fs.uploadAndDelete(bp, target, async)); - - ImmutableList> futuresList = futures.build(); - ListenableFuture> allFutures = Futures.allAsList(futuresList); - - List allResults = allFutures.get(); - logger.info(String.format("After AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); } From 6a45df587b9874afb919c34c93c40a52e32a8739 Mon Sep 17 00:00:00 2001 From: Cheng Wang Date: Wed, 2 Aug 2023 16:30:08 -0700 Subject: [PATCH 9/9] change --- .../com/netflix/priam/backup/AbstractFileSystem.java | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index bbfd7873f..187df1ff6 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -175,11 +175,12 @@ public ListenableFuture uploadAndDelete( throws RejectedExecutionException, BackupRestoreException { logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { + return fileUploadExecutor.submit( () -> { - logger.info("uploadAndDeleteCallableV2 path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); + // logger.info("uploadAndDeleteCallableV2 path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); try { - return uploadAndDeleteInternal(path, target, 10 /* retries */); + return uploadAndDeleteInternal(path, target, 10); } catch (BackupRestoreException e) { logger.info("uploadAndDeleteCallable exception path: {}", Paths.get(path.getBackupFile().getAbsolutePath()), e); throw e; @@ -187,10 +188,11 @@ public ListenableFuture uploadAndDelete( }); } else { - return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */)); + return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10)); } } + @VisibleForTesting public AbstractBackupPath uploadAndDeleteInternal( final AbstractBackupPath path, Instant target, int retry) @@ -198,7 +200,7 @@ public AbstractBackupPath uploadAndDeleteInternal( Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); File localFile = localPath.toFile(); - logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); + // logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); Preconditions.checkArgument( localFile.exists(), String.format("Can't upload nonexistent %s", localPath));