diff --git a/build.gradle b/build.gradle index 5ae51364f..f9efc4ae4 100644 --- a/build.gradle +++ b/build.gradle @@ -36,7 +36,7 @@ allprojects { compile 'com.sun.jersey:jersey-json:1.19.4' compile 'com.sun.jersey:jersey-bundle:1.19.4' compile 'com.sun.jersey.contribs:jersey-guice:1.19.4' - compile 'com.google.guava:guava:21.0' + //compile 'com.google.guava:guava:21.0' compile 'com.google.code.findbugs:jsr305:3.0.2' // AWS Services diff --git a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java index ba7f94865..187df1ff6 100644 --- a/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/AbstractFileSystem.java @@ -42,7 +42,6 @@ import java.util.List; import java.util.Set; import java.util.concurrent.*; -import javax.inject.Inject; import javax.inject.Provider; import org.apache.commons.collections4.iterators.FilterIterator; import org.apache.commons.collections4.iterators.TransformIterator; @@ -63,6 +62,7 @@ public abstract class AbstractFileSystem implements IBackupFileSystem { private final IConfiguration configuration; protected final BackupMetrics backupMetrics; private final Set tasksQueued; + private final ListeningExecutorService fileUploadExecutor; private final ThreadPoolExecutor fileDownloadExecutor; private final BackupNotificationMgr backupNotificationMgr; @@ -71,7 +71,6 @@ public abstract class AbstractFileSystem implements IBackupFileSystem { // file system. This is to ensure that we don't make too many API calls to remote file system. private final Cache objectCache; - @Inject public AbstractFileSystem( IConfiguration configuration, BackupMetrics backupMetrics, @@ -89,28 +88,44 @@ public AbstractFileSystem( files for "sync" feature which might compete with backups for scheduling. Also, we may want to have different TIMEOUT for each kind of operation (upload/download) based on our file system choices. */ + + logger.info("Initializing AbstractFileSystem ..."); + // Get the current thread's stack trace + StackTraceElement[] stackTraceElements = Thread.currentThread().getStackTrace(); + + // Loop through each stack trace element + for (StackTraceElement element : stackTraceElements) { + logger.info(element.toString()); + } + BlockingQueue uploadQueue = new ArrayBlockingQueue<>(configuration.getBackupQueueSize()); PolledMeter.using(backupMetrics.getRegistry()) .withName(backupMetrics.uploadQueueSize) .monitorSize(uploadQueue); - this.fileUploadExecutor = - MoreExecutors.listeningDecorator( - new BlockingSubmitThreadPoolExecutor( - configuration.getBackupThreads(), - uploadQueue, - configuration.getUploadTimeout())); + + BlockingSubmitThreadPoolExecutor uploadExecutor = new BlockingSubmitThreadPoolExecutor( + configuration.getBackupThreads(), + uploadQueue, + configuration.getUploadTimeout()); + logger.info("uploadExecutor: {}", uploadExecutor); + + this.fileUploadExecutor = MoreExecutors.listeningDecorator(uploadExecutor); + + logger.info("fileUploadExecutor: {}", this.fileUploadExecutor); BlockingQueue downloadQueue = new ArrayBlockingQueue<>(configuration.getDownloadQueueSize()); PolledMeter.using(backupMetrics.getRegistry()) .withName(backupMetrics.downloadQueueSize) .monitorSize(downloadQueue); - this.fileDownloadExecutor = - new BlockingSubmitThreadPoolExecutor( - configuration.getRestoreThreads(), - downloadQueue, - configuration.getDownloadTimeout()); + BlockingSubmitThreadPoolExecutor downloadExecutor = new BlockingSubmitThreadPoolExecutor( + configuration.getRestoreThreads(), + downloadQueue, + configuration.getDownloadTimeout()); + logger.info("downloadExecutor: {}", downloadExecutor); + + this.fileDownloadExecutor = downloadExecutor; } @Override @@ -158,20 +173,35 @@ protected abstract void downloadFileImpl(final AbstractBackupPath path, String s public ListenableFuture uploadAndDelete( final AbstractBackupPath path, Instant target, boolean async) throws RejectedExecutionException, BackupRestoreException { + logger.info(String.format("uploadAndDelete path: %s, async: %s", Paths.get(path.getBackupFile().getAbsolutePath()), async)); if (async) { + return fileUploadExecutor.submit( - () -> uploadAndDeleteInternal(path, target, 10 /* retries */)); + () -> { + // logger.info("uploadAndDeleteCallableV2 path: {}", Paths.get(path.getBackupFile().getAbsolutePath())); + try { + return uploadAndDeleteInternal(path, target, 10); + } catch (BackupRestoreException e) { + logger.info("uploadAndDeleteCallable exception path: {}", Paths.get(path.getBackupFile().getAbsolutePath()), e); + throw e; + } + }); + } else { - return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10 /* retries */)); + return Futures.immediateFuture(uploadAndDeleteInternal(path, target, 10)); } } + @VisibleForTesting public AbstractBackupPath uploadAndDeleteInternal( final AbstractBackupPath path, Instant target, int retry) throws RejectedExecutionException, BackupRestoreException { Path localPath = Paths.get(path.getBackupFile().getAbsolutePath()); File localFile = localPath.toFile(); + + // logger.info(String.format("uploadAndDeleteInternal: %s", localPath)); + Preconditions.checkArgument( localFile.exists(), String.format("Can't upload nonexistent %s", localPath)); Preconditions.checkArgument( diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java index a39ffc0cf..62ed733cf 100755 --- a/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupFileSystemContext.java @@ -30,6 +30,11 @@ public BackupFileSystemContext( this.encryptedFs = encryptedFs; } + public BackupFileSystemContext( + @Named("backup") IBackupFileSystem fs) { + this.fs = fs; + } + public IBackupFileSystem getFileStrategy(IConfiguration config) { if (!config.isEncryptBackupEnabled()) { diff --git a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java index 9f13891f7..b8ecc795a 100644 --- a/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java +++ b/priam/src/main/java/com/netflix/priam/backup/BackupHelperImpl.java @@ -8,10 +8,14 @@ import com.netflix.priam.compress.CompressionType; import com.netflix.priam.config.BackupsToCompress; import com.netflix.priam.config.IConfiguration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.File; import java.io.IOException; import java.nio.file.Files; import java.nio.file.Path; +import java.nio.file.Paths; import java.time.Instant; import java.util.Set; import java.util.stream.Stream; @@ -19,7 +23,7 @@ import javax.inject.Provider; public class BackupHelperImpl implements BackupHelper { - + private static final Logger logger = LoggerFactory.getLogger(BackupHelperImpl.class); private static final String COMPRESSION_SUFFIX = "-CompressionInfo.db"; private static final String DATA_SUFFIX = "-Data.db"; private final Provider pathFactory; @@ -55,8 +59,11 @@ public ImmutableList> uploadAndDeleteAllFil throws Exception { final ImmutableList.Builder> futures = ImmutableList.builder(); + for (AbstractBackupPath bp : getBackupPaths(parent, type)) { futures.add(fs.uploadAndDelete(bp, target, async)); + logger.info(String.format("After AbstractBackupPath: %s, localPath: %s", bp, Paths.get(bp.getBackupFile().getAbsolutePath()))); + } return futures.build(); } diff --git a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java index e33ff3bfc..4e89394d2 100644 --- a/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java +++ b/priam/src/main/java/com/netflix/priam/backup/IBackupFileSystem.java @@ -17,6 +17,7 @@ package com.netflix.priam.backup; import com.google.common.util.concurrent.ListenableFuture; + import java.io.FileNotFoundException; import java.nio.file.Path; import java.time.Instant; @@ -71,18 +72,18 @@ default void uploadAndDelete(AbstractBackupPath path, boolean async) * guaranteed as we try to avoid lock on read/write of the files-in-progress. Once uploaded, * files are deleted. Uploads are retried 10 times. * - * @param path AbstractBackupPath to be used to send backup notifications only. + * @param path AbstractBackupPath to be used to send backup notifications only. * @param target The target time of completion of all files in the upload. - * @param async boolean to determine whether the call should block or return immediately and - * upload asynchronously + * @param async boolean to determine whether the call should block or return immediately and + * upload asynchronously * @return The future of the async job to monitor the progress of the job. This will be null if - * file was de-duped for upload. - * @throws BackupRestoreException in case of failure to upload for any reason including file not - * readable or remote file system errors. - * @throws FileNotFoundException If a file as denoted by localPath is not available or is a - * directory. + * file was de-duped for upload. + * @throws BackupRestoreException in case of failure to upload for any reason including file not + * readable or remote file system errors. + * @throws FileNotFoundException If a file as denoted by localPath is not available or is a + * directory. * @throws RejectedExecutionException if the queue is full and TIMEOUT is reached while trying - * to add the work to the queue. + * to add the work to the queue. */ ListenableFuture uploadAndDelete( final AbstractBackupPath path, Instant target, boolean async) diff --git a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java index a886b5a1a..06bfbbe87 100644 --- a/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java +++ b/priam/src/main/java/com/netflix/priam/backupv2/SnapshotMetaTask.java @@ -60,7 +60,8 @@ * This service will run on CRON as specified by {@link * IBackupRestoreConfig#getSnapshotMetaServiceCronExpression()} The intent of this service is to run * a full snapshot on Cassandra, get the list of the SSTables on disk and then create a - * manifest.json file which will encapsulate the list of the files i.e. capture filesystem at a + * manifest.json file which will e + * ncapsulate the list of the files i.e. capture filesystem at a * moment in time. This manifest.json file will ensure the true filesystem status is exposed (for * external entities) and will be used in future for Priam Backup Version 2 where a file is not * uploaded to backup file system unless SSTable has been modified. This will lead to huge reduction @@ -287,15 +288,16 @@ private void uploadAllFiles(final File backupDir) throws Exception { // We do not want to wait for completion and we just want to add them to queue. This // is to ensure that next run happens on time. AbstractBackupPath.BackupFileType type = AbstractBackupPath.BackupFileType.SST_V2; + backupHelper - .uploadAndDeleteAllFiles(snapshotDirectory, type, target, true) + .uploadAndDeleteAllFiles(snapshotDirectory, type, target, config.enableAsyncSnapshot()) .forEach(future -> addCallback(future, snapshotDirectory)); // Next, upload secondary indexes type = AbstractBackupPath.BackupFileType.SECONDARY_INDEX_V2; ImmutableList> futures; for (File subDir : getSecondaryIndexDirectories(snapshotDirectory)) { - futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, true); + futures = backupHelper.uploadAndDeleteAllFiles(subDir, type, target, config.enableAsyncSnapshot()); if (futures.isEmpty()) { deleteIfEmpty(subDir); } @@ -436,7 +438,9 @@ private static void addCallback(ListenableFuture future, Fil FutureCallback callback = new FutureCallback() { @Override - public void onSuccess(AbstractBackupPath result) {} + public void onSuccess(AbstractBackupPath result) { + logger.info("Successfully uploaded contents of snapshotDir {}", snapshotDir); + } @Override public void onFailure(Throwable t) { diff --git a/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java b/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java index 0f68962ac..003b9157b 100644 --- a/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java +++ b/priam/src/main/java/com/netflix/priam/defaultimpl/PriamGuiceModule.java @@ -39,13 +39,19 @@ import com.netflix.spectator.api.Registry; import org.quartz.SchedulerFactory; import org.quartz.impl.StdSchedulerFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class PriamGuiceModule extends AbstractModule { + protected static final Logger logger = LoggerFactory.getLogger(PriamGuiceModule.class); + @Override protected void configure() { bind(SchedulerFactory.class).to(StdSchedulerFactory.class).asEagerSingleton(); bind(IBackupFileSystem.class).annotatedWith(Names.named("backup")).to(S3FileSystem.class); + logger.info("[chengw]: {}", Thread.currentThread().getStackTrace()); + bind(IBackupFileSystem.class) .annotatedWith(Names.named("encryptedbackup")) .to(S3EncryptedFileSystem.class);