From 923a6aa54c0f006a0fdae5d861640ba45a912d7e Mon Sep 17 00:00:00 2001 From: Quest Date: Wed, 21 Mar 2018 20:42:58 +0100 Subject: [PATCH] .group_by operator --- lib/rx/linq/observable/group_by.rb | 62 ++++++++ test/rx/linq/observable/test_group_by.rb | 179 +++++++++++++++++++++++ 2 files changed, 241 insertions(+) create mode 100644 lib/rx/linq/observable/group_by.rb create mode 100644 test/rx/linq/observable/test_group_by.rb diff --git a/lib/rx/linq/observable/group_by.rb b/lib/rx/linq/observable/group_by.rb new file mode 100644 index 0000000..5185fca --- /dev/null +++ b/lib/rx/linq/observable/group_by.rb @@ -0,0 +1,62 @@ +module Rx + module Observable + def group_by(key_selector, value_selector, duration_selector) + AnonymousObservable.new do |observer| + gate = Mutex.new + group_map = {} + group = CompositeSubscription.new + new_obs = Observer.configure do |o| + o.on_next do |v| + key = nil + value = nil + begin + key = key_selector.call(v) + value = value_selector.call(v) + rescue => err + observer.on_error(err) + group_map.each { |k, s| s.on_error(err) } + next + end + subject = group_map[key] + unless subject + subject = group_map[key] = Subject.new + observer.on_next(subject) + is = nil + duration_obs = Observer.configure do |io| + io.on_error do |err| + group_map.delete(key) + subject.on_error(err) + group.delete(is) + end + io.on_completed do + group_map.delete(key) + subject.on_completed + group.delete(is) + end + end + begin + is = duration_selector.call(subject) + .subscribe(duration_obs) + rescue => err + group_map.each { |k, s| s.on_error(err) } + observer.on_error(err) + next + end + group << is + end + subject.on_next(value) + end + o.on_error do |err| + group_map.each { |k, s| s.on_error(err) } + observer.on_error(err) + end + o.on_completed do + group_map.each { |k, s| s.on_completed } + observer.on_completed + end + end + group << subscribe(new_obs) + end + end + end +end \ No newline at end of file diff --git a/test/rx/linq/observable/test_group_by.rb b/test/rx/linq/observable/test_group_by.rb new file mode 100644 index 0000000..9cb4592 --- /dev/null +++ b/test/rx/linq/observable/test_group_by.rb @@ -0,0 +1,179 @@ +require 'test_helper' + +class TestOperatorGroupBy < Minitest::Test + include Rx::MarbleTesting + + def test_emits_time_limited_subjects + source = cold(' 12345|') + expected = msgs('--xx--x|', x: lambda { |x| x.is_a? Rx::Subject }) + expected_a = msgs('--5-7|') + expected_b = msgs('---6-8|') + expected_c = msgs('------9|') + duration = cold('---|') + + obs = [] + actual = scheduler.configure do + source.group_by( + lambda { |x| x % 2 }, + lambda { |x| x + 4 }, + lambda do |g| + o = scheduler.create_observer + obs << o + g.subscribe(o) + duration + end + ) + end + a, b, c = obs + assert_msgs expected, actual + assert_msgs expected_a, a + assert_msgs expected_b, b + assert_msgs expected_c, c + end + + def test_propagates_error + source = cold(' 12#') + expected = msgs('--xx#', x: lambda { |x| x.is_a? Rx::Subject }) + expected_a = msgs('--1-#') + expected_b = msgs('---2#') + + obs = [] + actual = scheduler.configure do + source.group_by( + lambda { |x| x }, + lambda { |x| x }, + lambda do |g| + o = scheduler.create_observer + obs << o + g.subscribe(o) + cold('') + end + ) + end + a, b = obs + + assert_msgs expected, actual + assert_msgs expected_a, a + assert_msgs expected_b, b + end + + def test_key_selector_raises + source = cold(' 12') + expected = msgs('--x#', x: ->(_) { true }) + expected_s = msgs('--1#') + duration = cold('') + + s = nil + actual = scheduler.configure do + source.group_by( + lambda { |x| raise error if x > 1 ; 1 }, + lambda { |x| x }, + lambda do |g| + s = scheduler.create_observer + g.subscribe(s) + duration + end + ) + end + assert_msgs expected, actual + assert_msgs expected_s, s + end + + def test_value_selector_raises + source = cold(' 12') + expected = msgs('--x#', x: ->(_) { true }) + expected_s = msgs('--1#') + duration = cold('') + + s = nil + actual = scheduler.configure do + source.group_by( + lambda { |x| x }, + lambda { |x| raise error if x > 1 ; 1 }, + lambda do |g| + s = scheduler.create_observer + g.subscribe(s) + duration + end + ) + end + assert_msgs expected, actual + assert_msgs expected_s, s + end + + def test_duration_selector_raises + source = cold(' 12|') + expected = msgs('--x(x#)', x: ->(_) { true }) + expected_s = msgs('--1#') + duration = cold('') + + s = nil + n = 0 + actual = scheduler.configure do + source.group_by( + lambda { |x| x }, + lambda { |x| x }, + lambda do |g| + raise error if (n += 1) > 1 + s = scheduler.create_observer + g.subscribe(s) + duration + end + ) + end + assert_msgs expected, actual + assert_msgs expected_s, s + end + + def test_ignore_duration_selector_values + source = cold(' 111|') + expected = msgs('--x-x|', x: lambda { |x| x.is_a? Rx::Subject }) + expected_a = msgs('--1(1|)') + expected_b = msgs('----1|') + duration = cold('2|') + + obs = [] + actual = scheduler.configure do + source.group_by( + lambda { |x| x }, + lambda { |x| x }, + lambda do |g| + o = scheduler.create_observer + obs << o + g.subscribe(o) + duration + end + ) + end + a, b = obs + assert_msgs expected, actual + assert_msgs expected_a, a + assert_msgs expected_b, b + end + + def test_propagate_duration_selector_error + source = cold(' 11|') + expected = msgs('--xx|', x: lambda { |x| x.is_a? Rx::Subject }) + expected_a = msgs('--(1#)') + expected_b = msgs('---(1#)') + duration = cold('#') + + obs = [] + actual = scheduler.configure do + source.group_by( + lambda { |x| x }, + lambda { |x| x }, + lambda do |g| + o = scheduler.create_observer + obs << o + g.subscribe(o) + duration + end + ) + end + a, b = obs + assert_msgs expected, actual + assert_msgs expected_a, a + assert_msgs expected_b, b + end +end \ No newline at end of file