Skip to content

Commit

Permalink
uploading files to S3 in parallel by using threads
Browse files Browse the repository at this point in the history
  • Loading branch information
glaszig committed Jul 4, 2015
1 parent fce83b5 commit 0cdcd4f
Showing 1 changed file with 66 additions and 42 deletions.
108 changes: 66 additions & 42 deletions lib/octopress-deploy/s3.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ def initialize(options)
@headers = options[:headers] || []
@remote_path = @remote_path.sub(/^\//,'') # remove leading slash
@pull_dir = options[:dir]
@bust_cache_files = []
@thread_pool = []
connect
end

Expand Down Expand Up @@ -78,64 +80,61 @@ def connect
# Write site files to the selected bucket
#
def write_files
puts "Writing #{pluralize('file', site_files.size)}:" if @verbose
files_to_invalidate = []
puts "Writing #{pluralize('file', site_files.size)}#{" (sequential mode)" unless parallel_upload?}:" if @verbose
@bust_cache_files = []

site_files.each do |file|
s3_filename = remote_path(file)
o = @bucket.objects[s3_filename]
file_with_options = get_file_with_metadata(file, s3_filename);

begin
s3sum = o.etag.tr('"','') if o.exists?
rescue AWS::S3::Errors::NoSuchKey
s3sum = ""
if parallel_upload?
threaded { write_file file }
else
write_file file
end
end

if @incremental && (s3sum == Digest::MD5.file(file).hexdigest)
if @verbose
puts "= #{remote_path(file)}"
else
progress('=')
end
else
o.write(file_with_options)
files_to_invalidate.push(file)
if @verbose
puts "+ #{remote_path(file)}"
else
progress('+')
end
end
@thread_pool.each(&:join)
bust_cloudfront_cache
end

def write_file file
if write_file? file
s3_upload_file file
@bust_cache_files << file
@verbose ? puts("+ #{remote_path(file)}") : progress('+')
else
@verbose ? puts("= #{remote_path(file)}") : progress('=')
end
end

invalidate_cache(files_to_invalidate) unless @distro_id.nil?
def s3_upload_file file
s3_object(file).write File.open(file), s3_object_options(file)
end

def invalidate_cache(files)
def bust_cloudfront_cache
return if @distro_id.nil?

puts "Invalidating cache for #{pluralize('file', site_files.size)}" if @verbose
@cloudfront.create_invalidation(
distribution_id: @distro_id,
invalidation_batch:{
paths:{
quantity: files.size,
items: files.map{|file| "/" + remote_path(file)}
},
quantity: @bust_cache_files.size,
items: @bust_cache_files.map{|file| "/" + remote_path(file)}
},
# String of 8 random chars to uniquely id this invalidation
caller_reference: (0...8).map { ('a'..'z').to_a[rand(26)] }.join
}
) unless files.empty?
) unless @bust_cache_files.empty?
@bust_cache_files = []
end

def get_file_with_metadata(file, s3_filename)
file_with_options = {
:file => file,
:acl => :public_read
}
def s3_object_options(file)
s3_filename = remote_path file
s3_options = { :acl => :public_read }

@headers.each do |conf|
if conf.has_key? 'filename' and s3_filename.match(conf['filename'])
if @verbose
puts "+ #{remote_path(file)} matched pattern #{conf['filename']}"
puts "+ #{s3_filename} matched pattern #{conf['filename']}"
end

if conf.has_key? 'expires'
Expand All @@ -151,24 +150,24 @@ def get_file_with_metadata(file, s3_filename)
expireDate = (Time.now + (60 * 60 * 24 * relative_days[1].to_i)).httpdate
end

file_with_options[:expires] = expireDate
s3_options[:expires] = expireDate
end

if conf.has_key? 'content_type'
file_with_options[:content_type] = conf['content_type']
s3_options[:content_type] = conf['content_type']
end

if conf.has_key? 'cache_control'
file_with_options[:cache_control] = conf['cache_control']
s3_options[:cache_control] = conf['cache_control']
end

if conf.has_key? 'content_encoding'
file_with_options[:content_encoding] = conf['content_encoding']
s3_options[:content_encoding] = conf['content_encoding']
end
end
end

return file_with_options
s3_options
end

# Delete files from the bucket, to ensure a 1:1 match with site files
Expand Down Expand Up @@ -273,9 +272,34 @@ def self.default_config(options={})
#{"verbose: #{options[:verbose] || 'false'}".ljust(40)} # Print out all file operations.
#{"incremental: #{options[:incremental] || 'false'}".ljust(40)} # Only upload new/changed files
#{"delete: #{options[:delete] || 'false'}".ljust(40)} # Remove files from destination which do not match source files.
#{"parallel: #{options[:parallel] || 'true'}".ljust(40)} # Speed up deployment by uploading files in parallel.
CONFIG
end

protected

def write_file? file
file_digest = Digest::MD5.file(file).hexdigest
o = s3_object file
s3sum = o.etag.tr('"','') if o.exists?
@incremental == false || s3sum.to_s != file_digest
end

def s3_object file
s3_filename = remote_path file
@bucket.objects[s3_filename]
end

def parallel_upload?
@options[:parallel]
end

def threaded &blk
@thread_pool << Thread.new(blk) do |operation|
operation.call
end
end

end
end
end

0 comments on commit 0cdcd4f

Please sign in to comment.