diff --git a/app/controllers/bulkrax/importers_controller.rb b/app/controllers/bulkrax/importers_controller.rb index 7fcbeff0..284caafe 100644 --- a/app/controllers/bulkrax/importers_controller.rb +++ b/app/controllers/bulkrax/importers_controller.rb @@ -214,10 +214,11 @@ def files_for_import(file, cloud_files) return if file.blank? && cloud_files.blank? @importer[:parser_fields]['import_file_path'] = @importer.parser.write_import_file(file) if file.present? if cloud_files.present? + @importer[:parser_fields]['cloud_file_paths'] = cloud_files # For BagIt, there will only be one bag, so we get the file_path back and set import_file_path # For CSV, we expect only file uploads, so we won't get the file_path back # and we expect the import_file_path to be set already - target = @importer.parser.retrieve_cloud_files(cloud_files) + target = @importer.parser.retrieve_cloud_files(cloud_files, @importer) @importer[:parser_fields]['import_file_path'] = target if target.present? end @importer.save diff --git a/app/jobs/bulkrax/download_cloud_file_job.rb b/app/jobs/bulkrax/download_cloud_file_job.rb index 313c2f01..2b29bee2 100644 --- a/app/jobs/bulkrax/download_cloud_file_job.rb +++ b/app/jobs/bulkrax/download_cloud_file_job.rb @@ -1,18 +1,31 @@ # frozen_string_literal: true - module Bulkrax class DownloadCloudFileJob < ApplicationJob queue_as Bulkrax.config.ingest_queue_name + include ActionView::Helpers::NumberHelper + # Retrieve cloud file and write to the imports directory # Note: if using the file system, the mounted directory in # browse_everything MUST be shared by web and worker servers def perform(file, target_file) retriever = BrowseEverything::Retriever.new + last_logged_time = Time.zone.now + log_interval = 3.seconds + retriever.download(file, target_file) do |filename, retrieved, total| - # The block is still useful for showing progress, but the - # first argument is the filename instead of a chunk of data. + percentage = (retrieved.to_f / total.to_f) * 100 + current_time = Time.zone.now + + if (current_time - last_logged_time) >= log_interval + # Use number_to_human_size for formatting + readable_retrieved = number_to_human_size(retrieved) + readable_total = number_to_human_size(total) + Rails.logger.info "Downloaded #{readable_retrieved} of #{readable_total}, #{filename}: #{percentage.round}% complete" + last_logged_time = current_time + end end + Rails.logger.info "Download complete: #{file['url']} to #{target_file}" end end end diff --git a/app/jobs/bulkrax/importer_job.rb b/app/jobs/bulkrax/importer_job.rb index 9fb0f445..48e4ae8e 100644 --- a/app/jobs/bulkrax/importer_job.rb +++ b/app/jobs/bulkrax/importer_job.rb @@ -6,6 +6,7 @@ class ImporterJob < ApplicationJob def perform(importer_id, only_updates_since_last_import = false) importer = Importer.find(importer_id) + return schedule(importer, Time.zone.now + 3.minutes, 'Rescheduling: cloud files are not ready yet') unless all_files_completed?(importer) importer.current_run unzip_imported_file(importer.parser) @@ -16,6 +17,8 @@ def perform(importer_id, only_updates_since_last_import = false) importer.set_status_info(e) end + private + def import(importer, only_updates_since_last_import) importer.only_updates = only_updates_since_last_import || false return unless importer.valid_import? @@ -36,8 +39,21 @@ def update_current_run_counters(importer) importer.current_run.save! end - def schedule(importer) - ImporterJob.set(wait_until: importer.next_import_at).perform_later(importer.id, true) + def schedule(importer, wait_until = importer.next_import_at, message = nil) + Rails.logger.info message if message + ImporterJob.set(wait_until: wait_until).perform_later(importer.id, true) + end + + # checks the file sizes of the download files to match the original files + def all_files_completed?(importer) + cloud_files = importer.parser_fields['cloud_file_paths'] + original_files = importer.parser_fields['original_file_paths'] + return true unless cloud_files.present? && original_files.present? + + imported_file_sizes = cloud_files.map { |_, v| v['file_size'].to_i } + original_file_sizes = original_files.map { |imported_file| File.size(imported_file) } + + original_file_sizes == imported_file_sizes end end end diff --git a/app/parsers/bulkrax/application_parser.rb b/app/parsers/bulkrax/application_parser.rb index b85383f4..c0302008 100644 --- a/app/parsers/bulkrax/application_parser.rb +++ b/app/parsers/bulkrax/application_parser.rb @@ -272,7 +272,7 @@ def create_entry_and_job(current_record, type, identifier = nil) end # Optional, define if using browse everything for file upload - def retrieve_cloud_files(files); end + def retrieve_cloud_files(_files, _importer); end # @param file [#path, #original_filename] the file object that with the relevant data for the # import. diff --git a/app/parsers/bulkrax/bagit_parser.rb b/app/parsers/bulkrax/bagit_parser.rb index a7f2bbd5..1d804b04 100644 --- a/app/parsers/bulkrax/bagit_parser.rb +++ b/app/parsers/bulkrax/bagit_parser.rb @@ -166,7 +166,7 @@ def write_triples(folder_count, e) # @todo - investigate getting directory structure # @todo - investigate using perform_later, and having the importer check for # DownloadCloudFileJob before it starts - def retrieve_cloud_files(files) + def retrieve_cloud_files(files, _importer) # There should only be one zip file for Bagit, take the first return if files['0'].blank? target_file = File.join(path_for_import, files['0']['file_name'].tr(' ', '_')) diff --git a/app/parsers/bulkrax/csv_parser.rb b/app/parsers/bulkrax/csv_parser.rb index 515178be..67ef41f3 100644 --- a/app/parsers/bulkrax/csv_parser.rb +++ b/app/parsers/bulkrax/csv_parser.rb @@ -189,9 +189,10 @@ def records_split_count # @todo - investigate getting directory structure # @todo - investigate using perform_later, and having the importer check for # DownloadCloudFileJob before it starts - def retrieve_cloud_files(files) + def retrieve_cloud_files(files, importer) files_path = File.join(path_for_import, 'files') FileUtils.mkdir_p(files_path) unless File.exist?(files_path) + target_files = [] files.each_pair do |_key, file| # fixes bug where auth headers do not get attached properly if file['auth_header'].present? @@ -200,10 +201,12 @@ def retrieve_cloud_files(files) end # this only works for uniquely named files target_file = File.join(files_path, file['file_name'].tr(' ', '_')) + target_files << target_file # Now because we want the files in place before the importer runs # Problematic for a large upload - Bulkrax::DownloadCloudFileJob.perform_now(file, target_file) + Bulkrax::DownloadCloudFileJob.perform_later(file, target_file) end + importer[:parser_fields]['original_file_paths'] = target_files return nil end