From fd50b211a69e08e56c2632ccd219d441d2b4c7d8 Mon Sep 17 00:00:00 2001 From: Ting Date: Sun, 7 Aug 2016 18:17:12 -0400 Subject: [PATCH] file container trace. (#477) --- src/Agent.Listener/_project.json | 2 +- src/Agent.Worker/AsyncCommandContext.cs | 50 +++- src/Agent.Worker/Build/FileContainerServer.cs | 249 +++++++++++++----- src/Agent.Worker/Build/GitCommandManager.cs | 2 +- src/Agent.Worker/_project.json | 2 +- .../_project.json | 2 +- src/Misc/layoutbin/en-US/strings.json | 11 +- src/Test/_project.json | 2 +- 8 files changed, 246 insertions(+), 74 deletions(-) diff --git a/src/Agent.Listener/_project.json b/src/Agent.Listener/_project.json index 3947c50f31..d38c1d9235 100644 --- a/src/Agent.Listener/_project.json +++ b/src/Agent.Listener/_project.json @@ -34,7 +34,7 @@ "version": "1.0.0-*" }, "Microsoft.Win32.Registry": "4.0.0", - "vss-api-netcore": "0.5.22-private", + "vss-api-netcore": "0.5.23-private", "System.ServiceProcess.ServiceController": "4.1.0", "System.IO.Compression.ZipFile": "4.0.1", "System.IO.FileSystem.AccessControl": "4.0.0", diff --git a/src/Agent.Worker/AsyncCommandContext.cs b/src/Agent.Worker/AsyncCommandContext.cs index cd03aa29f4..f28f0df9dd 100644 --- a/src/Agent.Worker/AsyncCommandContext.cs +++ b/src/Agent.Worker/AsyncCommandContext.cs @@ -11,13 +11,32 @@ public interface IAsyncCommandContext : IAgentService Task Task { get; set; } void InitializeCommandContext(IExecutionContext context, string name); void Output(string message); + void Debug(string message); Task WaitAsync(); } public class AsyncCommandContext : AgentService, IAsyncCommandContext { + private class OutputMessage + { + public OutputMessage(OutputType type, string message) + { + Type = type; + Message = message; + } + + public OutputType Type { get; } + public String Message { get; } + } + + private enum OutputType + { + Info, + Debug, + } + private IExecutionContext _executionContext; - private readonly ConcurrentQueue _outputQueue = new ConcurrentQueue(); + private readonly ConcurrentQueue _outputQueue = new ConcurrentQueue(); public string Name { get; private set; } public Task Task { get; set; } @@ -30,7 +49,12 @@ public void InitializeCommandContext(IExecutionContext context, string name) public void Output(string message) { - _outputQueue.Enqueue(message); + _outputQueue.Enqueue(new OutputMessage(OutputType.Info, message)); + } + + public void Debug(string message) + { + _outputQueue.Enqueue(new OutputMessage(OutputType.Debug, message)); } public async Task WaitAsync() @@ -39,12 +63,20 @@ public async Task WaitAsync() // start flushing output queue Trace.Info("Start flush buffered output."); _executionContext.Section($"Async Command Start: {Name}"); - string output; + OutputMessage output; while (!this.Task.IsCompleted) { while (_outputQueue.TryDequeue(out output)) { - _executionContext.Output(output); + switch (output.Type) + { + case OutputType.Info: + _executionContext.Output(output.Message); + break; + case OutputType.Debug: + _executionContext.Debug(output.Message); + break; + } } await Task.WhenAny(Task.Delay(TimeSpan.FromMilliseconds(500)), this.Task); @@ -54,7 +86,15 @@ public async Task WaitAsync() Trace.Verbose("Command task has finished, flush out all remaining buffered output."); while (_outputQueue.TryDequeue(out output)) { - _executionContext.Output(output); + switch (output.Type) + { + case OutputType.Info: + _executionContext.Output(output.Message); + break; + case OutputType.Debug: + _executionContext.Debug(output.Message); + break; + } } _executionContext.Section($"Async Command End: {Name}"); diff --git a/src/Agent.Worker/Build/FileContainerServer.cs b/src/Agent.Worker/Build/FileContainerServer.cs index 6e8d114bc2..143faeceab 100644 --- a/src/Agent.Worker/Build/FileContainerServer.cs +++ b/src/Agent.Worker/Build/FileContainerServer.cs @@ -16,14 +16,16 @@ namespace Microsoft.VisualStudio.Services.Agent.Worker.Build public class FileContainerServer { private readonly ConcurrentQueue _fileUploadQueue = new ConcurrentQueue(); - private readonly ConcurrentBag _exceptionsDuringFileUpload = new ConcurrentBag(); + private readonly ConcurrentDictionary> _fileUploadTraceLog = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _fileUploadProgressLog = new ConcurrentDictionary>(); private readonly FileContainerHttpClient _fileContainerHttpClient; private CancellationTokenSource _uploadCancellationTokenSource; + private TaskCompletionSource _uploadFinished; private Guid _projectId; private long _containerId; private string _containerPath; - private int _filesUploaded = 0; + private int _filesProcessed = 0; private string _sourceParentDirectory; public FileContainerServer( @@ -56,7 +58,7 @@ public async Task CopyToContainerAsync( int maxConcurrentUploads = Math.Min(Environment.ProcessorCount, 2); //context.Output($"Max Concurrent Uploads {maxConcurrentUploads}"); - IEnumerable files; + List files; if (File.Exists(source)) { files = new List() { source }; @@ -64,105 +66,228 @@ public async Task CopyToContainerAsync( } else { - files = Directory.EnumerateFiles(source, "*", SearchOption.AllDirectories); + files = Directory.EnumerateFiles(source, "*", SearchOption.AllDirectories).ToList(); _sourceParentDirectory = source.TrimEnd(Path.DirectorySeparatorChar, Path.AltDirectorySeparatorChar); } context.Output(StringUtil.Loc("TotalUploadFiles", files.Count())); - foreach (var file in files) - { - _fileUploadQueue.Enqueue(file); - } - using (_uploadCancellationTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken)) { - List allRunningTasks = new List(); - // start reporting task to keep tracking upload progress. - allRunningTasks.Add(ReportingAsync(context, files.Count(), _uploadCancellationTokenSource.Token)); + // hook up reporting event from file container client. + _fileContainerHttpClient.UploadFileReportTrace += UploadFileTraceReportReceived; + _fileContainerHttpClient.UploadFileReportProgress += UploadFileProgressReportReceived; - // start parallel upload task. - for (int i = 0; i < maxConcurrentUploads; i++) - { - allRunningTasks.Add(ParallelUploadAsync(context, i, _uploadCancellationTokenSource.Token)); - } - - // the only expected type of exception will throw from both parallel upload task and reporting task is OperationCancelledException. try { - await Task.WhenAll(allRunningTasks); - } - catch (OperationCanceledException) - { - // throw aggregate exception for all non-operationcancelexception we catched during file upload. - if (_exceptionsDuringFileUpload.Count > 0) + // try upload all files for the first time. + List failedFiles = await ParallelUploadAsync(context, files, maxConcurrentUploads, _uploadCancellationTokenSource.Token); + + if (failedFiles.Count == 0) { - throw new AggregateException(_exceptionsDuringFileUpload).Flatten(); + // all files have been upload suceed. + context.Output(StringUtil.Loc("FileUploadSucceed")); + return; + } + else + { + context.Output(StringUtil.Loc("FileUploadFailedRetryLater", failedFiles.Count)); + } + + // Delay 1 min then retry failed files. + for (int timer = 60; timer > 0; timer -= 5) + { + context.Output(StringUtil.Loc("FileUploadRetryInSecond", timer)); + await Task.Delay(TimeSpan.FromSeconds(5), _uploadCancellationTokenSource.Token); } - throw; + // Retry upload all failed files. + context.Output(StringUtil.Loc("FileUploadRetry", failedFiles.Count)); + failedFiles = await ParallelUploadAsync(context, failedFiles, maxConcurrentUploads, _uploadCancellationTokenSource.Token); + + if (failedFiles.Count == 0) + { + // all files have been upload suceed after retry. + context.Output(StringUtil.Loc("FileUploadRetrySucceed")); + return; + } + else + { + throw new Exception(StringUtil.Loc("FileUploadFailedAfterRetry")); + } + } + finally + { + _fileContainerHttpClient.UploadFileReportTrace -= UploadFileTraceReportReceived; + _fileContainerHttpClient.UploadFileReportProgress -= UploadFileProgressReportReceived; } } } - private async Task ParallelUploadAsync(IAsyncCommandContext context, int uploaderId, CancellationToken token) + private async Task> ParallelUploadAsync(IAsyncCommandContext context, List files, int concurrentUploads, CancellationToken token) { + // return files that fail to upload + List failedFiles = new List(); + + // nothing needs to upload + if (files.Count == 0) + { + return failedFiles; + } + + // ensure the file upload queue is empty. + if (!_fileUploadQueue.IsEmpty) + { + throw new ArgumentOutOfRangeException(nameof(_fileUploadQueue)); + } + + // enqueue file into upload queue. + foreach (var file in files) + { + _fileUploadQueue.Enqueue(file); + } + + // Start upload monitor task. + _filesProcessed = 0; + _uploadFinished = new TaskCompletionSource(); + _fileUploadTraceLog.Clear(); + _fileUploadProgressLog.Clear(); + Task uploadMonitor = ReportingAsync(context, files.Count(), _uploadCancellationTokenSource.Token); + + // Start parallel upload tasks. + List>> parallelUploadingTasks = new List>>(); + for (int uploader = 0; uploader < concurrentUploads; uploader++) + { + parallelUploadingTasks.Add(UploadAsync(context, uploader, _uploadCancellationTokenSource.Token)); + } + + // Wait for parallel upload finish. + await Task.WhenAll(parallelUploadingTasks); + foreach (var uploadTask in parallelUploadingTasks) + { + // record all failed files. + failedFiles.AddRange(await uploadTask); + } + + // Stop monitor task; + _uploadFinished.TrySetResult(0); + await uploadMonitor; + + return failedFiles; + } + + private async Task> UploadAsync(IAsyncCommandContext context, int uploaderId, CancellationToken token) + { + List failedFiles = new List(); string fileToUpload; Stopwatch uploadTimer = new Stopwatch(); while (_fileUploadQueue.TryDequeue(out fileToUpload)) { token.ThrowIfCancellationRequested(); - try + Interlocked.Increment(ref _filesProcessed); + + using (FileStream fs = File.Open(fileToUpload, FileMode.Open, FileAccess.Read)) { - context.Output(StringUtil.Loc("StartFileUpload", fileToUpload, new FileInfo(fileToUpload).Length)); - using (FileStream fs = File.Open(fileToUpload, FileMode.Open, FileAccess.Read)) + string itemPath = (_containerPath.TrimEnd('/') + "/" + fileToUpload.Remove(0, _sourceParentDirectory.Length + 1)).Replace('\\', '/'); + uploadTimer.Restart(); + bool catchExceptionDuringUpload = false; + HttpResponseMessage response = null; + try { - string itemPath = (_containerPath.TrimEnd('/') + "/" + fileToUpload.Remove(0, _sourceParentDirectory.Length + 1)).Replace('\\', '/'); - uploadTimer.Restart(); - HttpResponseMessage response = null; - try - { - response = await _fileContainerHttpClient.UploadFileAsync(_containerId, itemPath, fs, _projectId, token); - } - catch (OperationCanceledException) when (token.IsCancellationRequested) - { - context.Output(StringUtil.Loc("FileUploadCancelled", fileToUpload)); - throw; - } - catch (Exception ex) + response = await _fileContainerHttpClient.UploadFileAsync(_containerId, itemPath, fs, _projectId, token); + } + catch (OperationCanceledException) when (token.IsCancellationRequested) + { + context.Output(StringUtil.Loc("FileUploadCancelled", fileToUpload)); + throw; + } + catch (Exception ex) + { + catchExceptionDuringUpload = true; + context.Output(StringUtil.Loc("FileUploadFailed", fileToUpload, ex.Message)); + context.Output(ex.ToString()); + } + + uploadTimer.Stop(); + if (catchExceptionDuringUpload || response.StatusCode != System.Net.HttpStatusCode.Created) + { + if (response != null) { - context.Output(StringUtil.Loc("FileUploadFailed", fileToUpload, ex.Message)); - throw; + context.Output(StringUtil.Loc("FileContainerUploadFailed", response.StatusCode, response.ReasonPhrase, fileToUpload, itemPath)); } - uploadTimer.Stop(); - if (response.StatusCode != System.Net.HttpStatusCode.Created) + // output detail upload trace for the file. + ConcurrentQueue logQueue; + if (_fileUploadTraceLog.TryGetValue(itemPath, out logQueue)) { - throw new Exception(StringUtil.Loc("FileContainerUploadFailed", response.StatusCode, response.ReasonPhrase, fileToUpload, itemPath)); + context.Output(StringUtil.Loc("FileUploadDetailTrace", itemPath)); + string message; + while (logQueue.TryDequeue(out message)) + { + context.Output(message); + } } - else + + // tracking file that failed to upload. + failedFiles.Add(fileToUpload); + } + else + { + context.Debug(StringUtil.Loc("FileUploadFinish", fileToUpload, uploadTimer.ElapsedMilliseconds)); + + // debug detail upload trace for the file. + ConcurrentQueue logQueue; + if (_fileUploadTraceLog.TryGetValue(itemPath, out logQueue)) { - context.Output(StringUtil.Loc("FileUploadFinish", fileToUpload, uploadTimer.ElapsedMilliseconds)); + context.Debug($"Detail upload trace for file: {itemPath}"); + string message; + while (logQueue.TryDequeue(out message)) + { + context.Debug(message); + } } } - - Interlocked.Increment(ref _filesUploaded); - } - catch (Exception ex) when (!(ex is OperationCanceledException)) - { - _exceptionsDuringFileUpload.Add(ex); - _uploadCancellationTokenSource.Cancel(); - return; } } + + return failedFiles; } private async Task ReportingAsync(IAsyncCommandContext context, int totalFiles, CancellationToken token) { - while (_filesUploaded != totalFiles) + int traceInterval = 0; + while (!_uploadFinished.Task.IsCompleted && !token.IsCancellationRequested) { - context.Output(StringUtil.Loc("FileUploadProgress", totalFiles, _filesUploaded)); - await Task.Delay(2000, token); + bool hasDetailProgress = false; + foreach (var file in _fileUploadProgressLog) + { + string message; + while (file.Value.TryDequeue(out message)) + { + hasDetailProgress = true; + context.Output(message); + } + } + + // trace total file progress every 25 seconds when there is no file level detail progress + if (++traceInterval % 2 == 0 && !hasDetailProgress) + { + context.Output(StringUtil.Loc("FileUploadProgress", totalFiles, _filesProcessed, (_filesProcessed * 100) / totalFiles)); + } + + await Task.WhenAny(_uploadFinished.Task, Task.Delay(5000, token)); } } + + private void UploadFileTraceReportReceived(object sender, ReportTraceEventArgs e) + { + ConcurrentQueue logQueue = _fileUploadTraceLog.GetOrAdd(e.File, new ConcurrentQueue()); + logQueue.Enqueue(e.Message); + } + + private void UploadFileProgressReportReceived(object sender, ReportProgressEventArgs e) + { + ConcurrentQueue progressQueue = _fileUploadProgressLog.GetOrAdd(e.File, new ConcurrentQueue()); + progressQueue.Enqueue(StringUtil.Loc("FileUploadProgressDetail", e.File, (e.CurrentChunk * 100) / e.TotalChunks)); + } } } \ No newline at end of file diff --git a/src/Agent.Worker/Build/GitCommandManager.cs b/src/Agent.Worker/Build/GitCommandManager.cs index b7e930ee0a..3183268eb1 100644 --- a/src/Agent.Worker/Build/GitCommandManager.cs +++ b/src/Agent.Worker/Build/GitCommandManager.cs @@ -316,7 +316,7 @@ public async Task GitVersion(IExecutionContext context) Version version = null; List outputStrings = new List(); int exitCode = await ExecuteGitCommandAsync(context, IOUtil.GetWorkPath(HostContext), "version", null, outputStrings); - context.Debug($"git version ouput: {string.Join(Environment.NewLine, outputStrings)}"); + context.Debug($"git version output: {string.Join(Environment.NewLine, outputStrings)}"); if (exitCode == 0) { // remove any empty line. diff --git a/src/Agent.Worker/_project.json b/src/Agent.Worker/_project.json index c9422580fe..3cae59ec0f 100644 --- a/src/Agent.Worker/_project.json +++ b/src/Agent.Worker/_project.json @@ -33,7 +33,7 @@ "target": "project", "version": "1.0.0-*" }, - "vss-api-netcore": "0.5.22-private", + "vss-api-netcore": "0.5.23-private", "System.Collections.NonGeneric": "4.0.1", "System.Diagnostics.TraceSource": "4.0.0", "System.Linq": "4.1.0", diff --git a/src/Microsoft.VisualStudio.Services.Agent/_project.json b/src/Microsoft.VisualStudio.Services.Agent/_project.json index 4935391668..bee381eaa9 100644 --- a/src/Microsoft.VisualStudio.Services.Agent/_project.json +++ b/src/Microsoft.VisualStudio.Services.Agent/_project.json @@ -26,7 +26,7 @@ }, "dependencies": { "NETStandard.Library": "1.6.0", - "vss-api-netcore": "0.5.22-private", + "vss-api-netcore": "0.5.23-private", "Microsoft.Win32.Registry": "4.0.0", "Newtonsoft.Json": "8.0.3", "System.Diagnostics.TraceSource": "4.0.0", diff --git a/src/Misc/layoutbin/en-US/strings.json b/src/Misc/layoutbin/en-US/strings.json index 4917dbdb6a..653193a661 100644 --- a/src/Misc/layoutbin/en-US/strings.json +++ b/src/Misc/layoutbin/en-US/strings.json @@ -87,9 +87,17 @@ "FileDoesNotExist": "File '{0}' does not exist or is not accessible.", "FileNotFound": "File not found: '{0}'", "FileUploadCancelled": "File upload has been cancelled during upload file: '{0}'.", + "FileUploadDetailTrace": "Detail upload trace for file that fail to upload: {0}", "FileUploadFailed": "Fail to upload '{0}' due to '{1}'.", + "FileUploadFailedAfterRetry": "File upload failed even after retry.", + "FileUploadFailedRetryLater": "{0} files failed to upload, retry these files after a minute.", "FileUploadFinish": "File: '{0}' tooks {1} milliseconds to finish upload", - "FileUploadProgress": "Total file: {0} ---- Uploaded file: {1}", + "FileUploadProgress": "Total file: {0} ---- Processed file: {1} ({2}%)", + "FileUploadProgressDetail": "Uploading '{0}' ({1}%)", + "FileUploadRetry": "Start retry {0} failed files upload.", + "FileUploadRetryInSecond": "Retry file upload after {0} seconds.", + "FileUploadRetrySucceed": "File upload succeed after retry.", + "FileUploadSucceed": "File upload succeed.", "GenerateAndRunUpdateScript": "Generate and execute update script.", "GetSources": "Get Sources", "GroupDoesNotExists": "Group: {0} does not Exist", @@ -332,7 +340,6 @@ "SourceArtifactProviderNotFound": "Can not find source provider for artifact of type {0}", "SourceDirectoriesNotSpecified": "Source directories field is not specified. Specify valid source directories for code coverage reports to include highlighted source code and try again.", "SourceDirectoriesNotSpecifiedForMultiModule": "Source directories field is not specified. Source directories field is required for a multi module project for code coverage reports to include highlighted source code.", - "StartFileUpload": "Start uploading file: '{0}' ({1} Bytes)", "StepTimedOut": "The task has timed out.", "Success": "Succeeded: ", "SupportedRepositoryEndpointNotFound": "Unable to match any source repository endpoints with any of the supported source providers.", diff --git a/src/Test/_project.json b/src/Test/_project.json index ca031f06c6..bbe1890b7d 100644 --- a/src/Test/_project.json +++ b/src/Test/_project.json @@ -30,7 +30,7 @@ "System.Buffers": "4.0.0", "System.Reflection.TypeExtensions": "4.1.0", "System.Threading.ThreadPool": "4.0.10", - "vss-api-netcore": "0.5.22-private", + "vss-api-netcore": "0.5.23-private", "moq.netcore": "4.4.0-beta8", "xunit": "2.1.0", "xunit.console.netcore": "1.0.3-prerelease-00520-02",