Skip to content

Commit

Permalink
Download cloud files later (#930)
Browse files Browse the repository at this point in the history
* 🎁 Reschedule ImporterJob if downloads aren't done

This commit will add a check in the `ImporterJob` to see if the cloud
files finished downloading.  If they haven't, the job will be
rescheduled until they are.

* 🎁 Download Cloud Files later

This commit will bring in changes from `5.3.1-british_library` to move
the download of cloud files to a background job.

---------

Co-authored-by: Jeremy Friesen <[email protected]>
  • Loading branch information
kirkkwang and jeremyf committed Mar 12, 2024
1 parent 8f8482b commit 7420b9c
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 10 deletions.
3 changes: 2 additions & 1 deletion app/controllers/bulkrax/importers_controller.rb
Original file line number Diff line number Diff line change
Expand Up @@ -214,10 +214,11 @@ def files_for_import(file, cloud_files)
return if file.blank? && cloud_files.blank?
@importer[:parser_fields]['import_file_path'] = @importer.parser.write_import_file(file) if file.present?
if cloud_files.present?
@importer[:parser_fields]['cloud_file_paths'] = cloud_files
# For BagIt, there will only be one bag, so we get the file_path back and set import_file_path
# For CSV, we expect only file uploads, so we won't get the file_path back
# and we expect the import_file_path to be set already
target = @importer.parser.retrieve_cloud_files(cloud_files)
target = @importer.parser.retrieve_cloud_files(cloud_files, @importer)
@importer[:parser_fields]['import_file_path'] = target if target.present?
end
@importer.save
Expand Down
19 changes: 16 additions & 3 deletions app/jobs/bulkrax/download_cloud_file_job.rb
Original file line number Diff line number Diff line change
@@ -1,18 +1,31 @@
# frozen_string_literal: true

module Bulkrax
class DownloadCloudFileJob < ApplicationJob
queue_as Bulkrax.config.ingest_queue_name

include ActionView::Helpers::NumberHelper

# Retrieve cloud file and write to the imports directory
# Note: if using the file system, the mounted directory in
# browse_everything MUST be shared by web and worker servers
def perform(file, target_file)
retriever = BrowseEverything::Retriever.new
last_logged_time = Time.zone.now
log_interval = 3.seconds

retriever.download(file, target_file) do |filename, retrieved, total|
# The block is still useful for showing progress, but the
# first argument is the filename instead of a chunk of data.
percentage = (retrieved.to_f / total.to_f) * 100
current_time = Time.zone.now

if (current_time - last_logged_time) >= log_interval
# Use number_to_human_size for formatting
readable_retrieved = number_to_human_size(retrieved)
readable_total = number_to_human_size(total)
Rails.logger.info "Downloaded #{readable_retrieved} of #{readable_total}, #{filename}: #{percentage.round}% complete"
last_logged_time = current_time
end
end
Rails.logger.info "Download complete: #{file['url']} to #{target_file}"
end
end
end
20 changes: 18 additions & 2 deletions app/jobs/bulkrax/importer_job.rb
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ class ImporterJob < ApplicationJob

def perform(importer_id, only_updates_since_last_import = false)
importer = Importer.find(importer_id)
return schedule(importer, Time.zone.now + 3.minutes, 'Rescheduling: cloud files are not ready yet') unless all_files_completed?(importer)

importer.current_run
unzip_imported_file(importer.parser)
Expand All @@ -16,6 +17,8 @@ def perform(importer_id, only_updates_since_last_import = false)
importer.set_status_info(e)
end

private

def import(importer, only_updates_since_last_import)
importer.only_updates = only_updates_since_last_import || false
return unless importer.valid_import?
Expand All @@ -36,8 +39,21 @@ def update_current_run_counters(importer)
importer.current_run.save!
end

def schedule(importer)
ImporterJob.set(wait_until: importer.next_import_at).perform_later(importer.id, true)
def schedule(importer, wait_until = importer.next_import_at, message = nil)
Rails.logger.info message if message
ImporterJob.set(wait_until: wait_until).perform_later(importer.id, true)
end

# checks the file sizes of the download files to match the original files
def all_files_completed?(importer)
cloud_files = importer.parser_fields['cloud_file_paths']
original_files = importer.parser_fields['original_file_paths']
return true unless cloud_files.present? && original_files.present?

imported_file_sizes = cloud_files.map { |_, v| v['file_size'].to_i }
original_file_sizes = original_files.map { |imported_file| File.size(imported_file) }

original_file_sizes == imported_file_sizes
end
end
end
2 changes: 1 addition & 1 deletion app/parsers/bulkrax/application_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def create_entry_and_job(current_record, type, identifier = nil)
end

# Optional, define if using browse everything for file upload
def retrieve_cloud_files(files); end
def retrieve_cloud_files(_files, _importer); end

# @param file [#path, #original_filename] the file object that with the relevant data for the
# import.
Expand Down
2 changes: 1 addition & 1 deletion app/parsers/bulkrax/bagit_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ def write_triples(folder_count, e)
# @todo - investigate getting directory structure
# @todo - investigate using perform_later, and having the importer check for
# DownloadCloudFileJob before it starts
def retrieve_cloud_files(files)
def retrieve_cloud_files(files, _importer)
# There should only be one zip file for Bagit, take the first
return if files['0'].blank?
target_file = File.join(path_for_import, files['0']['file_name'].tr(' ', '_'))
Expand Down
7 changes: 5 additions & 2 deletions app/parsers/bulkrax/csv_parser.rb
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,10 @@ def records_split_count
# @todo - investigate getting directory structure
# @todo - investigate using perform_later, and having the importer check for
# DownloadCloudFileJob before it starts
def retrieve_cloud_files(files)
def retrieve_cloud_files(files, importer)
files_path = File.join(path_for_import, 'files')
FileUtils.mkdir_p(files_path) unless File.exist?(files_path)
target_files = []
files.each_pair do |_key, file|
# fixes bug where auth headers do not get attached properly
if file['auth_header'].present?
Expand All @@ -200,10 +201,12 @@ def retrieve_cloud_files(files)
end
# this only works for uniquely named files
target_file = File.join(files_path, file['file_name'].tr(' ', '_'))
target_files << target_file
# Now because we want the files in place before the importer runs
# Problematic for a large upload
Bulkrax::DownloadCloudFileJob.perform_now(file, target_file)
Bulkrax::DownloadCloudFileJob.perform_later(file, target_file)
end
importer[:parser_fields]['original_file_paths'] = target_files
return nil
end

Expand Down

0 comments on commit 7420b9c

Please sign in to comment.