From fad5bff9a40530654b699035b02dee93135a803c Mon Sep 17 00:00:00 2001 From: Ed Summers Date: Mon, 17 Jul 2023 17:47:46 -0400 Subject: [PATCH] Persist Globus file size Modify the FetchGlobusJob to use GlobusClient.list_files instead of GlobusClient.get_filenames so that it can get access to the file sizes as well as the file names. GlobusService.download_chunk needed to be defined as a no-op or else the call to attach the blob throws a NotImplementedError when it tries to identify the content type of a blob with a non-zero size. Also add a cleanup:file_sizes rake task for updating the 0 file sizes using the size stored in SDR. Fixes #3230 --- app/jobs/fetch_globus_job.rb | 26 ++++---- .../active_storage/service/globus_service.rb | 8 +++ lib/tasks/cleanup.rake | 59 +++++++++++++++++++ spec/jobs/fetch_globus_job_spec.rb | 19 +++--- 4 files changed, 92 insertions(+), 20 deletions(-) diff --git a/app/jobs/fetch_globus_job.rb b/app/jobs/fetch_globus_job.rb index 5f6ea1d66..debbe9c49 100644 --- a/app/jobs/fetch_globus_job.rb +++ b/app/jobs/fetch_globus_job.rb @@ -9,7 +9,7 @@ class FetchGlobusJob < BaseDepositJob def perform(work_version) work_version.attached_files.destroy_all - filepaths = filepaths_for(work_version) + files = files_for(work_version) # Since it can take a while (hours) to get the filepaths from Globus API for large # deposits we need to ensure that we still have an active database connection @@ -17,33 +17,35 @@ def perform(work_version) # PG::UnableToSend: SSL SYSCALL error: EOF detected ActiveRecord::Base.clear_active_connections! - filepaths.each do |path| - next if ignore?(path) + files.each do |file| + next if ignore?(file.name) - work_version.attached_files << new_attached_file(path, work_version) + work_version.attached_files << new_attached_file(file, work_version) end work_version.upload_type = "browser" work_version.fetch_globus_complete! end - def filepaths_for(work_version) + def files_for(work_version) GlobusClient - .get_filenames(path: work_version.globus_endpoint, user_id: work_version.work.owner.email) - .map { |filepath| filepath.delete_prefix(work_version.globus_endpoint_fullpath) } + .list_files(path: work_version.globus_endpoint, user_id: work_version.work.owner.email) + .map do |file| + file.tap { file.name = file.name.delete_prefix(work_version.globus_endpoint_fullpath) } + end end def ignore?(path) path.start_with?("__MACOSX") || path.end_with?(".DS_Store") end - def new_attached_file(path, work_version) - AttachedFile.new(path:, work_version:).tap do |attached_file| + def new_attached_file(file, work_version) + AttachedFile.new(path: file.name, work_version:).tap do |attached_file| blob = ActiveStorage::Blob.create_before_direct_upload!( key: attached_file.create_globus_active_storage_key, - filename: path, + filename: file.name, service_name: ActiveStorage::Service::GlobusService::SERVICE_NAME, - byte_size: 0, - checksum: path + byte_size: file.size, + checksum: file.name ) attached_file.file.attach(blob) end diff --git a/app/services/active_storage/service/globus_service.rb b/app/services/active_storage/service/globus_service.rb index 4b40a79c4..c40f67235 100644 --- a/app/services/active_storage/service/globus_service.rb +++ b/app/services/active_storage/service/globus_service.rb @@ -11,6 +11,14 @@ def download(key, &) raise NotImplementedError end + def download_chunk(key, range) + # This is called by ActiveStorage::Blob::Identifiable when an + # ActiveStorage::Blob is being attached to an AttachedFile to identify the + # content type of the file. Since we don't have access to the content + # here we don't return anything. If we didn't have this here we would get a + # NotImplementedError exception. + end + def delete(key) # This is called by ActiveSupport when #destroy is called on AttachedFile due to our use # of ActiveStorage for file storage. This can happen during a decommission of a work. diff --git a/lib/tasks/cleanup.rake b/lib/tasks/cleanup.rake index 3048ff35b..22960212a 100644 --- a/lib/tasks/cleanup.rake +++ b/lib/tasks/cleanup.rake @@ -1,5 +1,7 @@ # frozen_string_literal: true +require "logger" + namespace :cleanup do desc "Remove unattached files" task uploads: :environment do @@ -8,4 +10,61 @@ namespace :cleanup do .where("DATE(active_storage_blobs.created_at) = ?", 7.days.ago.to_date) .find_each(&:purge_later) end + + desc "Update zero length files" + task file_sizes: :environment do + logger = Logger.new($stdout) + + # find druids that have zero length files in active storage + sql = + <<-SQL + SELECT + druid, + work_versions.id AS work_version_id, + active_storage_blobs.filename AS filename, + active_storage_blobs.id AS blob_id + FROM works + JOIN work_versions ON work_versions.work_id = works.id + JOIN attached_files ON attached_files.work_version_id = work_versions.id + JOIN active_storage_attachments ON active_storage_attachments.record_id = attached_files.id + JOIN active_storage_blobs ON active_storage_blobs.id = active_storage_attachments.blob_id + WHERE active_storage_blobs.byte_size = 0 + AND druid IS NOT NULL; + SQL + + # update the blob with the filesize from the SDR + objects = {} + ActiveRecord::Base.connection.execute(sql).each do |result| + # look up the druid if we haven't seen it already + if !objects.has_key?(result["druid"]) + begin + objects[result["druid"]] = Repository.find(result["druid"]) + rescue RuntimeError + logger.error("Unable to lookup %{result['druid']} in SDR") + next + end + end + object = objects[result["druid"]] + + # find the file in the structural metadata + sdr_file = nil + object.structural.contains.each do |fileset| + sdr_file ||= fileset.structural.contains.find { |file| file.filename == result["filename"] } + end + + # update the blob! + if sdr_file + blob = ActiveStorage::Blob.find(result["blob_id"]) + if blob.byte_size == 0 + blob.byte_size = sdr_file.size + blob.save + logger.info("updated blob #{blob.id} size to #{sdr_file.size}") + else + logger.error(%(blob #{blob.id} for #{result["druid"]} doesn't have zero byte size!)) + end + else + logger.error(%(couldn't find #{result["filename"]} for #{result["druid"]})) + end + end + end end diff --git a/spec/jobs/fetch_globus_job_spec.rb b/spec/jobs/fetch_globus_job_spec.rb index e21148acd..9e9c5404d 100644 --- a/spec/jobs/fetch_globus_job_spec.rb +++ b/spec/jobs/fetch_globus_job_spec.rb @@ -13,15 +13,17 @@ let(:work) { build(:work) } + let(:file_info) { GlobusClient::Endpoint::FileInfo } + before do - allow(GlobusClient).to receive(:get_filenames).and_return( + allow(GlobusClient).to receive(:list_files).and_return( [ - "/uploads/jstanford/work333/version1/file1.txt", - "/uploads/jstanford/work333/version1/__MACOSX/._file1.txt", - "/uploads/jstanford/work333/version1/dir1/file2.txt", - "/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt", - "/uploads/jstanford/work333/version1/dir2/.DS_Store", - "/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store" + file_info.new("/uploads/jstanford/work333/version1/file1.txt", 24601), + file_info.new("/uploads/jstanford/work333/version1/__MACOSX/._file1.txt", 1), + file_info.new("/uploads/jstanford/work333/version1/dir1/file2.txt", 1), + file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir1/._file2.txt", 1), + file_info.new("/uploads/jstanford/work333/version1/dir2/.DS_Store", 1), + file_info.new("/uploads/jstanford/work333/version1/__MACOSX/dir2/._.DS_Store", 1) ] ) work.update!(head: first_work_version) @@ -36,9 +38,10 @@ expect(AttachedFile.find_by(id: attached_file.id)).to be_nil attached_file = first_work_version.reload.attached_files.first expect(attached_file.path).to eq("file1.txt") + expect(attached_file.byte_size).to eq(24601) expect(attached_file.blob.service_name).to eq("globus") expect(attached_file.blob.key).to eq("#{first_work_version.work.id}/1/file1.txt") - expect(GlobusClient).to have_received(:get_filenames).with(path: "jstanford/work333/version1", + expect(GlobusClient).to have_received(:list_files).with(path: "jstanford/work333/version1", user_id: work.owner.email) end end