Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions lib/rx/linq/observable/group_by.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
module Rx
module Observable
def group_by(key_selector, value_selector, duration_selector)
AnonymousObservable.new do |observer|
gate = Mutex.new
group_map = {}
group = CompositeSubscription.new
new_obs = Observer.configure do |o|
o.on_next do |v|
key = nil
value = nil
begin
key = key_selector.call(v)
value = value_selector.call(v)
rescue => err
observer.on_error(err)
group_map.each { |k, s| s.on_error(err) }
next
end
subject = group_map[key]
unless subject
subject = group_map[key] = Subject.new
observer.on_next(subject)
is = nil
duration_obs = Observer.configure do |io|
io.on_error do |err|
group_map.delete(key)
subject.on_error(err)
group.delete(is)
end
io.on_completed do
group_map.delete(key)
subject.on_completed
group.delete(is)
end
end
begin
is = duration_selector.call(subject)
.subscribe(duration_obs)
rescue => err
group_map.each { |k, s| s.on_error(err) }
observer.on_error(err)
next
end
group << is
end
subject.on_next(value)
end
o.on_error do |err|
group_map.each { |k, s| s.on_error(err) }
observer.on_error(err)
end
o.on_completed do
group_map.each { |k, s| s.on_completed }
observer.on_completed
end
end
group << subscribe(new_obs)
end
end
end
end
179 changes: 179 additions & 0 deletions test/rx/linq/observable/test_group_by.rb
Original file line number Diff line number Diff line change
@@ -0,0 +1,179 @@
require 'test_helper'

class TestOperatorGroupBy < Minitest::Test
include Rx::MarbleTesting

def test_emits_time_limited_subjects
source = cold(' 12345|')
expected = msgs('--xx--x|', x: lambda { |x| x.is_a? Rx::Subject })
expected_a = msgs('--5-7|')
expected_b = msgs('---6-8|')
expected_c = msgs('------9|')
duration = cold('---|')

obs = []
actual = scheduler.configure do
source.group_by(
lambda { |x| x % 2 },
lambda { |x| x + 4 },
lambda do |g|
o = scheduler.create_observer
obs << o
g.subscribe(o)
duration
end
)
end
a, b, c = obs
assert_msgs expected, actual
assert_msgs expected_a, a
assert_msgs expected_b, b
assert_msgs expected_c, c
end

def test_propagates_error
source = cold(' 12#')
expected = msgs('--xx#', x: lambda { |x| x.is_a? Rx::Subject })
expected_a = msgs('--1-#')
expected_b = msgs('---2#')

obs = []
actual = scheduler.configure do
source.group_by(
lambda { |x| x },
lambda { |x| x },
lambda do |g|
o = scheduler.create_observer
obs << o
g.subscribe(o)
cold('')
end
)
end
a, b = obs

assert_msgs expected, actual
assert_msgs expected_a, a
assert_msgs expected_b, b
end

def test_key_selector_raises
source = cold(' 12')
expected = msgs('--x#', x: ->(_) { true })
expected_s = msgs('--1#')
duration = cold('')

s = nil
actual = scheduler.configure do
source.group_by(
lambda { |x| raise error if x > 1 ; 1 },
lambda { |x| x },
lambda do |g|
s = scheduler.create_observer
g.subscribe(s)
duration
end
)
end
assert_msgs expected, actual
assert_msgs expected_s, s
end

def test_value_selector_raises
source = cold(' 12')
expected = msgs('--x#', x: ->(_) { true })
expected_s = msgs('--1#')
duration = cold('')

s = nil
actual = scheduler.configure do
source.group_by(
lambda { |x| x },
lambda { |x| raise error if x > 1 ; 1 },
lambda do |g|
s = scheduler.create_observer
g.subscribe(s)
duration
end
)
end
assert_msgs expected, actual
assert_msgs expected_s, s
end

def test_duration_selector_raises
source = cold(' 12|')
expected = msgs('--x(x#)', x: ->(_) { true })
expected_s = msgs('--1#')
duration = cold('')

s = nil
n = 0
actual = scheduler.configure do
source.group_by(
lambda { |x| x },
lambda { |x| x },
lambda do |g|
raise error if (n += 1) > 1
s = scheduler.create_observer
g.subscribe(s)
duration
end
)
end
assert_msgs expected, actual
assert_msgs expected_s, s
end

def test_ignore_duration_selector_values
source = cold(' 111|')
expected = msgs('--x-x|', x: lambda { |x| x.is_a? Rx::Subject })
expected_a = msgs('--1(1|)')
expected_b = msgs('----1|')
duration = cold('2|')

obs = []
actual = scheduler.configure do
source.group_by(
lambda { |x| x },
lambda { |x| x },
lambda do |g|
o = scheduler.create_observer
obs << o
g.subscribe(o)
duration
end
)
end
a, b = obs
assert_msgs expected, actual
assert_msgs expected_a, a
assert_msgs expected_b, b
end

def test_propagate_duration_selector_error
source = cold(' 11|')
expected = msgs('--xx|', x: lambda { |x| x.is_a? Rx::Subject })
expected_a = msgs('--(1#)')
expected_b = msgs('---(1#)')
duration = cold('#')

obs = []
actual = scheduler.configure do
source.group_by(
lambda { |x| x },
lambda { |x| x },
lambda do |g|
o = scheduler.create_observer
obs << o
g.subscribe(o)
duration
end
)
end
a, b = obs
assert_msgs expected, actual
assert_msgs expected_a, a
assert_msgs expected_b, b
end
end