Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Payload Versioning #12

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -15,3 +15,4 @@ spec/reports
test/tmp
test/version_tmp
tmp
node_modules/**/*
15 changes: 15 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,21 @@ MyModel.multiple_man_publish(:seed)

3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server

## Versioning

Because you may have different versions of MultipleMan between publishers and subscribers,
MultipleMan attaches **versions** to every message sent. This ensures that updates to payloads,
metadata, etc. will not affect processing of your messages.

In general, a subscriber will not be able to process messages published by a newer version of
MultipleMan. We use **minor versions** to indicate changes that may contain a breaking change
to payload formats.

As a consequence, when upgrading MultipleMan, it's always safe to upgrade patch versions, but
when upgrading to a new major or minor version, ensure that you upgrade your subscribers
prior to upgrading your publishers (if two services both subscribe and publish, you'll need to
update them synchronously.)

## Contributing

1. Fork it
Expand Down
3 changes: 3 additions & 0 deletions lib/multiple_man.rb
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ module MultipleMan
require 'multiple_man/channel_maintenance/gc'
require 'multiple_man/channel_maintenance/reaper'

require 'multiple_man/payload/payload'
require 'multiple_man/payload/v1'

def self.logger
configuration.logger
end
Expand Down
28 changes: 14 additions & 14 deletions lib/multiple_man/listeners/listener.rb
Original file line number Diff line number Diff line change
Expand Up @@ -32,24 +32,24 @@ def init_connection
attr_accessor :subscription, :connection

def listen

MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}."
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, _, payload|
process_message(delivery_info, payload)
queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload|
parsed_payload = MultipleMan::Payload.build(delivery_info, properties, JSON.parse(payload).with_indifferent_access)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rails dependency? with_indifferent_access


begin
process_message(parsed_payload)
rescue Exception => ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
end
end

def process_message(delivery_info, payload)
MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}."
begin
payload = JSON.parse(payload).with_indifferent_access
subscription.send(operation(delivery_info, payload), payload)
rescue ex
handle_error(ex, delivery_info)
else
MultipleMan.logger.debug " Successfully processed!"
queue.channel.acknowledge(delivery_info.delivery_tag, false)
end
def process_message(payload)
MultipleMan.logger.info "Processing message for #{payload}."
subscription.send(payload.operation, payload)
end

def handle_error(ex, delivery_info)
Expand Down
9 changes: 4 additions & 5 deletions lib/multiple_man/model_populator.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,9 @@ def initialize(record, fields)
end

def populate(payload)
data = payload[:id].merge(payload[:data])
fields_for(data).each do |field|
fields_for(payload).each do |field|
source, dest = field.is_a?(Array) ? field : [field, field]
populate_field(dest, data[source])
populate_field(dest, payload[source])
end
record
end
Expand All @@ -37,8 +36,8 @@ def populate_field(field, value)
end
end

def fields_for(data)
fields || data.keys
def fields_for(payload)
fields || payload.keys
end
end
end
10 changes: 10 additions & 0 deletions lib/multiple_man/payload/payload.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
class MultipleMan::Payload
def self.build(delivery_info, properties, data)
case properties.headers["version"]
when "1", nil
V1.new(delivery_info, properties, data)
else
raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version."
end
end
end
34 changes: 34 additions & 0 deletions lib/multiple_man/payload/v1.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@

class MultipleMan::Payload::V1
def initialize(delivery_info, properties, payload)
self.payload = payload
self.delivery_info = delivery_info
end

def keys
(payload['data'].keys + payload['id'].keys).uniq
end

def [](value)
payload['data'][value.to_s] || payload['id'][value.to_s]
end

def identify_by
if payload['id'].is_a?(Hash)
payload['id']
else
{'multiple_man_identifier' => payload['id']}
end
end

def operation
payload['operation'] || delivery_info.routing_key.split('.').last
end

def to_s
delivery_info.routing_key
end

private
attr_accessor :payload, :delivery_info
end
8 changes: 4 additions & 4 deletions lib/multiple_man/subscribers/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,19 +7,19 @@ def initialize(klass)

attr_reader :klass

def create(_)
def create(payload)
# noop
end

def update(_)
def update(payload)
# noop
end

def destroy(_)
def destroy(payload)
# noop
end

def seed(_)
def seed(payload)
# noop
end

Expand Down
15 changes: 5 additions & 10 deletions lib/multiple_man/subscribers/model_subscriber.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,28 +10,23 @@ def initialize(klass, options)
attr_accessor :options

def create(payload)
id = payload[:id]
model = find_model(id)
MultipleMan::ModelPopulator.new(model, options[:fields]).populate(id: find_conditions(id), data: payload[:data])
model = find_model(payload)
MultipleMan::ModelPopulator.new(model, options[:fields]).populate(payload)
model.save!
end

alias_method :update, :create
alias_method :seed, :create

def destroy(payload)
model = find_model(payload[:id])
model = find_model(payload)
model.destroy!
end

private

def find_model(id)
model_class.where(find_conditions(id)).first || model_class.new
end

def find_conditions(id)
id.kind_of?(Hash) ? cleanse_id(id) : {multiple_man_identifier: id}
def find_model(payload)
model_class.where(cleanse_id(payload.identify_by)).first || model_class.new
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can use .first_or_initialize here

end

def cleanse_id(hash)
Expand Down
2 changes: 1 addition & 1 deletion lib/multiple_man/version.rb
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module MultipleMan
VERSION = "0.8.1"
VERSION = "1.0.0"
end
21 changes: 4 additions & 17 deletions spec/listeners/listener_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -44,9 +44,9 @@ class MockClass2; end
subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object
listener = described_class.new(subscriber)

connection_stub.should_receive(:acknowledge)
subscriber.should_receive(:create).with({"a" => 1, "b" => 2})
listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}')
subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V1))

listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), nil, "data" => {'a' => 1, 'b' => 2}))
end

specify "process_message should use the payload to determine the operation if it's available" do
Expand All @@ -55,21 +55,8 @@ class MockClass2; end
subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object
listener = described_class.new(subscriber)

connection_stub.should_receive(:acknowledge)
subscriber.should_receive(:create)
listener.process_message(OpenStruct.new(routing_key: "some random routing key"), '{"operation":"create","data":{"a":1,"b":2}}')
end

it "should nack on failure" do
connection_stub = double(MultipleMan::Connection).as_null_object
MultipleMan::Connection.stub(:new).and_return(connection_stub)
subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object
listener = described_class.new(subscriber)

connection_stub.should_receive(:nack)
MultipleMan.should_receive(:error)
subscriber.should_receive(:create).with({"a" => 1, "b" => 2}).and_raise("fail!")

listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}')
listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1, update'), nil, "operation" => "create", "data" => {'a' => 1, 'b' => 2}))
end
end
14 changes: 9 additions & 5 deletions spec/model_populator_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ class MockModel

describe "populate" do
let(:model) { MockModel.new }
let(:data) { { a: 1, b: 2 } }
let(:id) { { multiple_man_identifier: 'app_1' }}
let(:payload) { MultipleMan::Payload::V1.new(nil, nil, {
'id' => id,
'data' => data
})}
let(:data) { { 'a' => 1, 'b' => 2 } }
let(:id) { { 'multiple_man_identifier' => 'app_1' }}
let(:fields) { nil }
subject { described_class.new(model, fields).populate(id: id, data: data) }
subject { described_class.new(model, fields).populate(payload) }

its(:multiple_man_identifier) { should == 'app_1' }

Expand Down Expand Up @@ -38,7 +42,7 @@ class MockModel
let(:model) { Class.new do
attr_accessor :source_id, :id
end.new }
let(:data) { { id: 1 }}
let(:data) { { 'id' => 1 }}

its(:source_id) { should == 1 }
its(:id) { should be_nil }
Expand All @@ -47,7 +51,7 @@ class MockModel
let(:model) { Class.new do
attr_accessor :id
end.new }
let(:data) { { id: 1 }}
let(:data) { { 'id' => 1 }}

its(:id) { should == 1 }
end
Expand Down
25 changes: 25 additions & 0 deletions spec/payload/payload_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
require 'spec_helper'

describe MultipleMan::Payload do
let(:properties) { Class.new do
attr_accessor :headers
def initialize(version)
self.headers = {"version" => version}
end
end }

describe "::build" do
it "should assume v1 for a nil version" do

payload = described_class.build(nil, properties.new(nil), nil)
payload.should be_instance_of(MultipleMan::Payload::V1)
end
it "should support v1" do
payload = described_class.build(nil, properties.new("1"), nil)
payload.should be_instance_of(MultipleMan::Payload::V1)
end
it "should fail on an unknown version" do
expect{ described_class.build(nil, properties.new("3"), nil)}.to raise_exception
end
end
end
55 changes: 55 additions & 0 deletions spec/payload/v1_spec.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
require 'spec_helper'

describe MultipleMan::Payload::V1 do
let(:delivery_info) {
double(:delivery_info, routing_key: 'blah.blah.create')
}

let(:payload) {
described_class.new(delivery_info, nil, {
'id' => {
'id' => 1,
'database' => 'app'
},
'data' => {
'id' => 1,
'database' => 'app',
'foo' => 'bar'
}
})
}

it "should return appropriate identify_by keys" do
expect(payload.identify_by).to eq({'id' => 1, 'database' => 'app'})
end

it "should return appropriate keys" do
expect(payload.keys).to eq(['id', 'database', 'foo'])
end

it "should include keys from the id even if they're not in the data" do
payload = described_class.new(nil, nil, {'id' => {'id' => 1}, 'data' => { 'foo' => 'bar'}})
expect(payload.keys).to include('id')
end


it "should construct a multiple man identifier id if none exists" do
payload = described_class.new(delivery_info, nil, {'id' => 1, 'data' => {'foo' => 'bar'}})
expect(payload.identify_by).to eq({'multiple_man_identifier' => 1})
end

it 'should store data appropriately' do
expect(payload['id']).to eq(1)
expect(payload['database']).to eq('app')
expect(payload['foo']).to eq('bar')
end

it "should have an operation" do
expect(payload.operation).to eq('create')
end

it "should let payloads override the operation" do
payload = described_class.new(delivery_info, nil, { 'operation' => 'update' })
expect(payload.operation).to eq('update')
end
end
Loading