diff --git a/.gitignore b/.gitignore index d87d4be..16f974f 100644 --- a/.gitignore +++ b/.gitignore @@ -15,3 +15,4 @@ spec/reports test/tmp test/version_tmp tmp +node_modules/**/* diff --git a/README.md b/README.md index 7090aec..4577eb3 100644 --- a/README.md +++ b/README.md @@ -184,6 +184,21 @@ MyModel.multiple_man_publish(:seed) 3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server +## Versioning + +Because you may have different versions of MultipleMan between publishers and subscribers, +MultipleMan attaches **versions** to every message sent. This ensures that updates to payloads, +metadata, etc. will not affect processing of your messages. + +In general, a subscriber will not be able to process messages published by a newer version of +MultipleMan. We use **minor versions** to indicate changes that may contain a breaking change +to payload formats. + +As a consequence, when upgrading MultipleMan, it's always safe to upgrade patch versions, but +when upgrading to a new major or minor version, ensure that you upgrade your subscribers +prior to upgrading your publishers (if two services both subscribe and publish, you'll need to +update them synchronously.) + ## Contributing 1. Fork it diff --git a/lib/multiple_man.rb b/lib/multiple_man.rb index 1a01653..52067b5 100644 --- a/lib/multiple_man.rb +++ b/lib/multiple_man.rb @@ -25,6 +25,9 @@ module MultipleMan require 'multiple_man/channel_maintenance/gc' require 'multiple_man/channel_maintenance/reaper' + require 'multiple_man/payload/payload' + require 'multiple_man/payload/v1' + def self.logger configuration.logger end diff --git a/lib/multiple_man/listeners/listener.rb b/lib/multiple_man/listeners/listener.rb index dd5c5dd..d661d2d 100644 --- a/lib/multiple_man/listeners/listener.rb +++ b/lib/multiple_man/listeners/listener.rb @@ -32,24 +32,24 @@ def init_connection attr_accessor :subscription, :connection def listen - MultipleMan.logger.info "Listening for #{subscription.klass} with routing key #{routing_key}." - queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, _, payload| - process_message(delivery_info, payload) + queue.bind(connection.topic, routing_key: routing_key).subscribe(ack: true) do |delivery_info, properties, payload| + parsed_payload = MultipleMan::Payload.build(delivery_info, properties, JSON.parse(payload).with_indifferent_access) + + begin + process_message(parsed_payload) + rescue Exception => ex + handle_error(ex, delivery_info) + else + MultipleMan.logger.debug " Successfully processed!" + queue.channel.acknowledge(delivery_info.delivery_tag, false) + end end end - def process_message(delivery_info, payload) - MultipleMan.logger.info "Processing message for #{delivery_info.routing_key}." - begin - payload = JSON.parse(payload).with_indifferent_access - subscription.send(operation(delivery_info, payload), payload) - rescue ex - handle_error(ex, delivery_info) - else - MultipleMan.logger.debug " Successfully processed!" - queue.channel.acknowledge(delivery_info.delivery_tag, false) - end + def process_message(payload) + MultipleMan.logger.info "Processing message for #{payload}." + subscription.send(payload.operation, payload) end def handle_error(ex, delivery_info) diff --git a/lib/multiple_man/model_populator.rb b/lib/multiple_man/model_populator.rb index f33d529..4a179e5 100644 --- a/lib/multiple_man/model_populator.rb +++ b/lib/multiple_man/model_populator.rb @@ -7,10 +7,9 @@ def initialize(record, fields) end def populate(payload) - data = payload[:id].merge(payload[:data]) - fields_for(data).each do |field| + fields_for(payload).each do |field| source, dest = field.is_a?(Array) ? field : [field, field] - populate_field(dest, data[source]) + populate_field(dest, payload[source]) end record end @@ -37,8 +36,8 @@ def populate_field(field, value) end end - def fields_for(data) - fields || data.keys + def fields_for(payload) + fields || payload.keys end end end diff --git a/lib/multiple_man/payload/payload.rb b/lib/multiple_man/payload/payload.rb new file mode 100644 index 0000000..a37d091 --- /dev/null +++ b/lib/multiple_man/payload/payload.rb @@ -0,0 +1,10 @@ +class MultipleMan::Payload + def self.build(delivery_info, properties, data) + case properties.headers["version"] + when "1", nil + V1.new(delivery_info, properties, data) + else + raise "This version of MultipleMan does not support the payload version supplied (#{properties.headers["version"]}). Please upgrade to the latest version." + end + end +end diff --git a/lib/multiple_man/payload/v1.rb b/lib/multiple_man/payload/v1.rb new file mode 100644 index 0000000..6ea3081 --- /dev/null +++ b/lib/multiple_man/payload/v1.rb @@ -0,0 +1,34 @@ + +class MultipleMan::Payload::V1 + def initialize(delivery_info, properties, payload) + self.payload = payload + self.delivery_info = delivery_info + end + + def keys + (payload['data'].keys + payload['id'].keys).uniq + end + + def [](value) + payload['data'][value.to_s] || payload['id'][value.to_s] + end + + def identify_by + if payload['id'].is_a?(Hash) + payload['id'] + else + {'multiple_man_identifier' => payload['id']} + end + end + + def operation + payload['operation'] || delivery_info.routing_key.split('.').last + end + + def to_s + delivery_info.routing_key + end + +private + attr_accessor :payload, :delivery_info +end diff --git a/lib/multiple_man/subscribers/base.rb b/lib/multiple_man/subscribers/base.rb index c6ff924..01c920c 100644 --- a/lib/multiple_man/subscribers/base.rb +++ b/lib/multiple_man/subscribers/base.rb @@ -7,19 +7,19 @@ def initialize(klass) attr_reader :klass - def create(_) + def create(payload) # noop end - def update(_) + def update(payload) # noop end - def destroy(_) + def destroy(payload) # noop end - def seed(_) + def seed(payload) # noop end diff --git a/lib/multiple_man/subscribers/model_subscriber.rb b/lib/multiple_man/subscribers/model_subscriber.rb index ca70f03..806e432 100644 --- a/lib/multiple_man/subscribers/model_subscriber.rb +++ b/lib/multiple_man/subscribers/model_subscriber.rb @@ -10,9 +10,8 @@ def initialize(klass, options) attr_accessor :options def create(payload) - id = payload[:id] - model = find_model(id) - MultipleMan::ModelPopulator.new(model, options[:fields]).populate(id: find_conditions(id), data: payload[:data]) + model = find_model(payload) + MultipleMan::ModelPopulator.new(model, options[:fields]).populate(payload) model.save! end @@ -20,18 +19,14 @@ def create(payload) alias_method :seed, :create def destroy(payload) - model = find_model(payload[:id]) + model = find_model(payload) model.destroy! end private - def find_model(id) - model_class.where(find_conditions(id)).first || model_class.new - end - - def find_conditions(id) - id.kind_of?(Hash) ? cleanse_id(id) : {multiple_man_identifier: id} + def find_model(payload) + model_class.where(cleanse_id(payload.identify_by)).first || model_class.new end def cleanse_id(hash) diff --git a/lib/multiple_man/version.rb b/lib/multiple_man/version.rb index 14f1307..5a0633c 100644 --- a/lib/multiple_man/version.rb +++ b/lib/multiple_man/version.rb @@ -1,3 +1,3 @@ module MultipleMan - VERSION = "0.8.1" + VERSION = "1.0.0" end diff --git a/spec/listeners/listener_spec.rb b/spec/listeners/listener_spec.rb index 841dd9b..776aa51 100644 --- a/spec/listeners/listener_spec.rb +++ b/spec/listeners/listener_spec.rb @@ -44,9 +44,9 @@ class MockClass2; end subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object listener = described_class.new(subscriber) - connection_stub.should_receive(:acknowledge) - subscriber.should_receive(:create).with({"a" => 1, "b" => 2}) - listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}') + subscriber.should_receive(:create).with(instance_of(MultipleMan::Payload::V1)) + + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1.create'), nil, "data" => {'a' => 1, 'b' => 2})) end specify "process_message should use the payload to determine the operation if it's available" do @@ -55,21 +55,8 @@ class MockClass2; end subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object listener = described_class.new(subscriber) - connection_stub.should_receive(:acknowledge) subscriber.should_receive(:create) - listener.process_message(OpenStruct.new(routing_key: "some random routing key"), '{"operation":"create","data":{"a":1,"b":2}}') - end - - it "should nack on failure" do - connection_stub = double(MultipleMan::Connection).as_null_object - MultipleMan::Connection.stub(:new).and_return(connection_stub) - subscriber = double(MultipleMan::Subscribers::ModelSubscriber, klass: MockClass1, routing_key: "MockClass1.#").as_null_object - listener = described_class.new(subscriber) - - connection_stub.should_receive(:nack) - MultipleMan.should_receive(:error) - subscriber.should_receive(:create).with({"a" => 1, "b" => 2}).and_raise("fail!") - listener.process_message(OpenStruct.new(routing_key: "app.MockClass1.create"), '{"a":1,"b":2}') + listener.process_message(MultipleMan::Payload::V1.new(double(:delivery_info, routing_key: 'app.MockClass1, update'), nil, "operation" => "create", "data" => {'a' => 1, 'b' => 2})) end end diff --git a/spec/model_populator_spec.rb b/spec/model_populator_spec.rb index b1998e9..7335ab3 100644 --- a/spec/model_populator_spec.rb +++ b/spec/model_populator_spec.rb @@ -7,10 +7,14 @@ class MockModel describe "populate" do let(:model) { MockModel.new } - let(:data) { { a: 1, b: 2 } } - let(:id) { { multiple_man_identifier: 'app_1' }} + let(:payload) { MultipleMan::Payload::V1.new(nil, nil, { + 'id' => id, + 'data' => data + })} + let(:data) { { 'a' => 1, 'b' => 2 } } + let(:id) { { 'multiple_man_identifier' => 'app_1' }} let(:fields) { nil } - subject { described_class.new(model, fields).populate(id: id, data: data) } + subject { described_class.new(model, fields).populate(payload) } its(:multiple_man_identifier) { should == 'app_1' } @@ -38,7 +42,7 @@ class MockModel let(:model) { Class.new do attr_accessor :source_id, :id end.new } - let(:data) { { id: 1 }} + let(:data) { { 'id' => 1 }} its(:source_id) { should == 1 } its(:id) { should be_nil } @@ -47,7 +51,7 @@ class MockModel let(:model) { Class.new do attr_accessor :id end.new } - let(:data) { { id: 1 }} + let(:data) { { 'id' => 1 }} its(:id) { should == 1 } end diff --git a/spec/payload/payload_spec.rb b/spec/payload/payload_spec.rb new file mode 100644 index 0000000..c1b6615 --- /dev/null +++ b/spec/payload/payload_spec.rb @@ -0,0 +1,25 @@ +require 'spec_helper' + +describe MultipleMan::Payload do + let(:properties) { Class.new do + attr_accessor :headers + def initialize(version) + self.headers = {"version" => version} + end + end } + + describe "::build" do + it "should assume v1 for a nil version" do + + payload = described_class.build(nil, properties.new(nil), nil) + payload.should be_instance_of(MultipleMan::Payload::V1) + end + it "should support v1" do + payload = described_class.build(nil, properties.new("1"), nil) + payload.should be_instance_of(MultipleMan::Payload::V1) + end + it "should fail on an unknown version" do + expect{ described_class.build(nil, properties.new("3"), nil)}.to raise_exception + end + end +end diff --git a/spec/payload/v1_spec.rb b/spec/payload/v1_spec.rb new file mode 100644 index 0000000..9053978 --- /dev/null +++ b/spec/payload/v1_spec.rb @@ -0,0 +1,55 @@ +require 'spec_helper' + +describe MultipleMan::Payload::V1 do + let(:delivery_info) { + double(:delivery_info, routing_key: 'blah.blah.create') + } + + let(:payload) { + described_class.new(delivery_info, nil, { + 'id' => { + 'id' => 1, + 'database' => 'app' + }, + 'data' => { + 'id' => 1, + 'database' => 'app', + 'foo' => 'bar' + } + }) + } + + it "should return appropriate identify_by keys" do + expect(payload.identify_by).to eq({'id' => 1, 'database' => 'app'}) + end + + it "should return appropriate keys" do + expect(payload.keys).to eq(['id', 'database', 'foo']) + end + + it "should include keys from the id even if they're not in the data" do + payload = described_class.new(nil, nil, {'id' => {'id' => 1}, 'data' => { 'foo' => 'bar'}}) + expect(payload.keys).to include('id') + end + + + it "should construct a multiple man identifier id if none exists" do + payload = described_class.new(delivery_info, nil, {'id' => 1, 'data' => {'foo' => 'bar'}}) + expect(payload.identify_by).to eq({'multiple_man_identifier' => 1}) + end + + it 'should store data appropriately' do + expect(payload['id']).to eq(1) + expect(payload['database']).to eq('app') + expect(payload['foo']).to eq('bar') + end + + it "should have an operation" do + expect(payload.operation).to eq('create') + end + + it "should let payloads override the operation" do + payload = described_class.new(delivery_info, nil, { 'operation' => 'update' }) + expect(payload.operation).to eq('update') + end +end diff --git a/spec/subscribers/model_subscriber_spec.rb b/spec/subscribers/model_subscriber_spec.rb index 7fbce9a..0e8a131 100644 --- a/spec/subscribers/model_subscriber_spec.rb +++ b/spec/subscribers/model_subscriber_spec.rb @@ -5,6 +5,13 @@ class MockClass end + let(:payload) { + MultipleMan::Payload::V1.new(nil, nil, { + 'id' => {'id' => 5 }, + 'data' => {'a' => 1, 'b' => 2} + }) + } + describe "initialize" do it "should listen to the object passed in for to" do subscriber = described_class.new(MockClass, to: 'PublishedClass') @@ -18,23 +25,18 @@ class MockClass MockClass.stub(:where).and_return([mock_object]) mock_populator = double(MultipleMan::ModelPopulator) MultipleMan::ModelPopulator.should_receive(:new).and_return(mock_populator) - mock_populator.should_receive(:populate).with(id: {id:5}, data: {a: 1, b: 2}) + mock_populator.should_receive(:populate).with(payload) mock_object.should_receive(:save!) - described_class.new(MockClass, {}).create({id: {id: 5}, data:{a: 1, b: 2}}) + described_class.new(MockClass, {}).create(payload) end end describe "find_model" do - it "should find by multiple_man_identifier for a single field" do - mock_object = double(MockClass).as_null_object - MockClass.should_receive(:where).with(multiple_man_identifier: 5).and_return([mock_object]) - described_class.new(MockClass, {}).create({id: 5, data:{a: 1, b: 2}}) - end it "should find by the hash for multiple fields" do mock_object = double(MockClass).as_null_object - MockClass.should_receive(:where).with(foo: 'bar').and_return([mock_object]) - described_class.new(MockClass, {}).create({id: {foo: 'bar'}, data:{a: 1, b: 2}}) + MockClass.should_receive(:where).with('id' => 5).and_return([mock_object]) + described_class.new(MockClass, {}).create(payload) end end @@ -44,7 +46,7 @@ class MockClass MockClass.should_receive(:where).and_return([mock_object]) mock_object.should_receive(:destroy!) - described_class.new(MockClass, {}).destroy({id: 1}) + described_class.new(MockClass, {}).destroy(payload) end end end diff --git a/spec/subscribers/registry_spec.rb b/spec/subscribers/registry_spec.rb index 4b8e67a..b471a85 100644 --- a/spec/subscribers/registry_spec.rb +++ b/spec/subscribers/registry_spec.rb @@ -1,11 +1,11 @@ require 'spec_helper' -describe MultipleMan::Subscribers::Registry do - describe "register" do +describe MultipleMan::Subscribers::Registry do + describe "register" do it "should add a subscriber" do subscription = double(:subscriber) described_class.register(subscription) - described_class.subscriptions[0].should == subscription + described_class.subscriptions.should include subscription end end -end \ No newline at end of file +end