Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
# MultipleMan

# WARNING: THIS PROJECT IS NO LONGER MAINTAINED. There will be no future updates unless another maintainer wishes to own it.

[![CircleCI](https://circleci.com/gh/influitive/multiple_man.png)](https://circleci.com/gh/influitive/multiple_man)
NOTE: This is a fork from https://github.com/influitive/multiple_man with some
fixes applied because the original one is not maintained anymore.

MultipleMan synchronizes your ActiveRecord models between Rails
apps, using RabbitMQ to send messages between your applications.
Expand All @@ -20,7 +19,7 @@ It's heavily inspired by Promiscuous, but differs in a few ways:
Add this line to your application's Gemfile:

```ruby
gem 'multiple_man'
gem 'multiple_man', github: 'owen2345/multiple_man'
```

And then execute:
Expand Down Expand Up @@ -205,6 +204,27 @@ MyModel.multiple_man_publish(:seed)

3. Stop the seeder rake task when all of your messages have been processed. You can check your RabbitMQ server

## Fixes and improvements
- Rails 6 deprecation warnings
- Easy rspec integration (Added: ```publish_test_message```, ```listener_for``` methods)
```
describe 'Sample subscribe sync' do
let(:service) { MultipleMan::Runner.new(mode: :general) }
let(:model) { build(:my_model) }
let(:model_name) { model.class.name }
let(:subscribed_fields) { service.listener_for(model_name).options[:fields] }
before { service.run }

it "Create a new Item when listen a new Item event" do
expect do
service.publish_test_message(model_name, {name: 'sample name'}, operation: :create)
end.to change { model.class.count }.by(1)
end
end
```
- Fixed exchange not found error
- Fixed missing attribute ```number``` when using bunny-mock gem

## Contributing

1. Fork it
Expand Down
2 changes: 1 addition & 1 deletion lib/multiple_man/channel_maintenance/gc.rb
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def push(channel)

queue << AddCommand.new(thread_id, channel)

puts "Opened channel #{channel.number}"
puts "Opened channel #{channel.try(:number)}"
self
end

Expand Down
2 changes: 1 addition & 1 deletion lib/multiple_man/channel_maintenance/reaper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def initialize(config)
channel = queue.pop
begin
channel.close unless channel.closed?
puts "Channel #{channel.number} closed!"
puts "Channel #{channel.try(:number)} closed!"
rescue Bunny::Exception, Timeout::Error
sleep config.connection_recovery[:time_between_retries]
retry
Expand Down
2 changes: 1 addition & 1 deletion lib/multiple_man/configuration.rb
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ class Configuration

def initialize
self.topic_name = "multiple_man"
self.app_name = Rails.application.class.parent.to_s if defined?(Rails)
self.app_name = Rails.application.class.module_parent.to_s if defined?(Rails)
self.enabled = true
self.worker_concurrency = 1
self.reraise_errors = true
Expand Down
23 changes: 21 additions & 2 deletions lib/multiple_man/runner.rb
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ class Runner
class ShutDown < Error; end
extend Forwardable

attr_reader :listener, :queue
MODES = [:general, :seed].freeze

def initialize(options = {})
Expand All @@ -22,6 +23,22 @@ def run
connection.close
end

# publish a test message using current connection
def publish_test_message(model_name, data, operation: :create, id: :id)
message = {
type: model_name,
data: data.merge(operation: operation)
}
message[:id] = { id => data[:id] }
routing_key = MultipleMan::RoutingKey.new(model_name, operation).to_s
queue.publish(message.to_json, routing_key: routing_key)
end

# find a subscriber listener for a specific class name
def listener_for(klass_name)
listener.send(:subscribers).values.find { |a| a.klass == klass_name }
end

private

attr_reader :mode
Expand Down Expand Up @@ -51,8 +68,10 @@ def preload_framework!
end

def build_listener
listener_class.new(
queue: channel.queue(*queue_params),
@queue = channel.queue(*queue_params)
@queue.channel.exchange(topic_name)
@listener = listener_class.new(
queue: queue,
subscribers: listeners,
topic: topic_name
)
Expand Down
61 changes: 50 additions & 11 deletions spec/runner_spec.rb
Original file line number Diff line number Diff line change
@@ -1,28 +1,67 @@
require 'spec_helper'

describe MultipleMan::Runner do
let(:mock_channel) { double("Channel", prefetch: true, queue: true) }
let(:mock_queue) { double('Queue') }
let(:mock_channel) { double("Channel", prefetch: true, queue: mock_queue) }
let(:mock_connection) { double("Connection", create_channel: mock_channel) }
let(:mock_consumer) { double("Consumer", listen: true) }

it 'boots app and listens on new channel' do
expect(MultipleMan::Connection).to receive(:connection).and_return(mock_connection)
expect(MultipleMan::Consumers::General).to receive(:new).and_return(mock_consumer)
expect(mock_consumer).to receive(:listen)
describe 'run' do
let(:runner) { described_class.new(mode: :general) }
before do
allow(mock_queue).to receive(:channel).and_return(mock_channel)
allow(mock_channel).to receive(:exchange)
allow(MultipleMan::Connection).to receive(:connection).and_return(mock_connection)
allow(MultipleMan::Consumers::General).to receive(:new).and_return(mock_consumer)
end

it 'boots app and listens on new channel' do
expect(mock_consumer).to receive(:listen)
runner.run
end

runner = described_class.new(mode: :general)
runner.run
it 'listener and queue availability' do
runner.run
expect(runner.listener).not_to be_nil
expect(runner.queue).not_to be_nil
end
end

context "shutdown" do
context 'shutdown' do
let(:connection) { MultipleMan::Connection.connection }

it 'closes connections and exits gracefully' do
MultipleMan::Consumers::General.stub(:new) { Process.kill('INT', 0) }
klass = MultipleMan::Consumers::General
allow_any_instance_of(klass).to receive(:listen) do
raise MultipleMan::Runner::ShutDown
end
expect(connection).to receive(:close).at_least(1)
MultipleMan::Runner.new.run
end
end

expect(connection).to receive(:close)
describe 'rspec utilities' do
class MockClass
include MultipleMan::Subscriber
subscribe fields: %i[id name]
attr_accessor :name
end

MultipleMan::Runner.new.run
let(:runner) { MultipleMan::Runner.new(mode: :general) }
before do
allow_any_instance_of(MultipleMan::Consumers::General).to receive(:listen)
end

it '#publish_test_message' do
runner.run
expect(runner.queue).to receive(:publish)
runner.publish_test_message('MockClass', name: 'name')
end

it '#listener_for' do
runner.run
res = runner.listener_for('MockClass')
expect(res).to be_an_instance_of(MultipleMan::Subscribers::ModelSubscriber)
end
end
end