From 8b691817a20d84f24787cb7a5948bec1fcaeb70a Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Sat, 24 May 2025 11:53:08 +0200 Subject: [PATCH] Add SplitterTree with the copy and hash modes --- TODO.md | 9 ++- lib/composite/topology.ex | 4 +- lib/mixer_tree.ex | 5 +- lib/splitter_tree.ex | 69 ++++++++++++++++ test/composite/topology/draw_test.exs | 30 ++++++- test/data/output.csv | 111 +------------------------- test/mixer_tree_test.exs | 50 ++++++++---- test/splitter_tree_test.exs | 95 ++++++++++++++++++++++ 8 files changed, 241 insertions(+), 132 deletions(-) create mode 100644 lib/splitter_tree.ex create mode 100644 test/splitter_tree_test.exs diff --git a/TODO.md b/TODO.md index 6c84e9c..9933a44 100644 --- a/TODO.md +++ b/TODO.md @@ -13,6 +13,9 @@ Clustering: Refactoring -- chunk transformer -- hanging streams after delete component -- empty_gen_mix at the end of composite. delete, insert, and replace the last component \ No newline at end of file + +Bugs: +- Topology.draw source, when there are a lot of sources at the beginning. Also check the SplitterTree test + +New features: +- Splitter tree (copy, random, round-robin) \ No newline at end of file diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex index 879d0a6..1b95378 100644 --- a/lib/composite/topology.ex +++ b/lib/composite/topology.ex @@ -16,7 +16,7 @@ defmodule Strom.Composite.Topology do {streams, index} -> {inputs, outputs} = case component do - %Source{} -> {[], Map.keys(outputs)} + %Source{} -> {Map.keys(outputs), Map.keys(outputs)} %Sink{} -> {inputs, []} _ -> {inputs, Map.keys(outputs)} end @@ -91,7 +91,7 @@ defmodule Strom.Composite.Topology do case nils do [] -> - [output | acc] + acc ++ [output] nils when is_list(nils) -> {nil, closest_to_average} = diff --git a/lib/mixer_tree.ex b/lib/mixer_tree.ex index 6823023..1a693f8 100644 --- a/lib/mixer_tree.ex +++ b/lib/mixer_tree.ex @@ -11,7 +11,7 @@ defmodule Strom.MixerTree do list() ) :: Composite.t() - @parts 10 + @parts 2 def new(inputs, output, opts \\ []) when is_list(inputs) or (is_map(inputs) and map_size(inputs) > 0 and is_list(opts)) do @@ -25,12 +25,13 @@ defmodule Strom.MixerTree do inputs |> Enum.chunk_every(parts) |> Enum.reduce({[], [], 0}, fn stream_names, {acc, outputs, counter} -> - output = String.to_atom("mixer_tree_#{level}_#{counter}") + output = String.to_atom("_mt_#{level}#{counter}") mixer = Mixer.new(stream_names, output, opts) {[mixer | acc], [output | outputs], counter + 1} end) mixers = Enum.reverse(mixers) + outputs = Enum.reverse(outputs) if count > parts do mixers ++ build_mixers(outputs, level + 1, parts, final_output, opts) diff --git a/lib/splitter_tree.ex b/lib/splitter_tree.ex new file mode 100644 index 0000000..05c960c --- /dev/null +++ b/lib/splitter_tree.ex @@ -0,0 +1,69 @@ +defmodule Strom.SplitterTree do + @moduledoc "Composite of mixers, use it when you need mixing a lot of streams" + alias Strom.Composite + alias Strom.Splitter + + @type event() :: any() + @parts 2 + @modes [:copy, :hash] + @default_mode :copy + + @spec new(Strom.stream_name(), [Strom.stream_name()], list()) :: Composite.t() + def new(input, outputs, opts \\ []) + when is_atom(input) and is_list(outputs) and is_list(opts) do + {mode, opts} = define_mode(opts) + {parts, opts} = Keyword.pop(opts, :parts, @parts) + splitters = build_splitters(input, outputs, 0, parts, opts, mode) + Composite.new(splitters) + end + + defp build_splitters(input, outputs, level, parts, opts, mode) do + {splitters, local_inputs, count} = + outputs + |> Enum.chunk_every(parts) + |> Enum.reduce({[], [], 0}, fn stream_names, {acc, local_inputs, counter} -> + local_input = String.to_atom("_st_#{level}#{counter}") + local_outputs = build_local_outputs(stream_names, mode) + splitter = Splitter.new(local_input, local_outputs, opts) + {[splitter | acc], [local_input | local_inputs], counter + 1} + end) + + local_inputs = Enum.reverse(local_inputs) + splitters = Enum.reverse(splitters) + + if count > parts do + build_splitters(input, local_inputs, level + 1, parts, opts, mode) ++ splitters + else + local_outputs = build_local_outputs(local_inputs, mode) + [Splitter.new(input, local_outputs, opts) | splitters] + end + end + + defp define_mode(opts) do + {mode, opts} = Keyword.pop(opts, :mode, @default_mode) + + if mode not in @modes do + raise "Mode #{mode} must be in #{@modes}" + end + + {mode, opts} + end + + defp build_local_outputs(stream_names, :copy) do + stream_names + end + + defp build_local_outputs(stream_names, :hash) do + {local_outputs, _} = + Enum.reduce(stream_names, {%{}, 0}, fn output, {acc, index} -> + acc = + Map.put(acc, output, fn event -> + :erlang.phash2(event, length(stream_names)) == index + end) + + {acc, index + 1} + end) + + local_outputs + end +end diff --git a/test/composite/topology/draw_test.exs b/test/composite/topology/draw_test.exs index a608799..09024fa 100644 --- a/test/composite/topology/draw_test.exs +++ b/test/composite/topology/draw_test.exs @@ -1,6 +1,7 @@ defmodule Strom.Composite.Topology.DrawTest do use ExUnit.Case, async: false - alias Strom.{Composite, Mixer, MixerTree, Transformer, Sink, Source, Splitter} + alias Strom.{Composite, Mixer, Transformer, Sink, Source, Splitter} + alias Strom.{MixerTree, SplitterTree} alias Strom.Composite.Topology test "draw example 1" do @@ -35,7 +36,7 @@ defmodule Strom.Composite.Topology.DrawTest do end test "draw mixer tree" do - mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 2) + mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 3) transformer = Transformer.new(:stream, & &1) composite = @@ -56,4 +57,29 @@ defmodule Strom.Composite.Topology.DrawTest do Topology.draw(composite) end + + test "draw several sources and sinks" do + source1 = Source.new(:s1, []) + source2 = Source.new(:s2, []) + source3 = Source.new(:s3, []) + sink1 = Sink.new(:s1, %Sink{}) + sink2 = Sink.new(:s2, %Sink{}) + sink3 = Sink.new(:s3, %Sink{}) + + composite = + [source1, source2, source3, sink1, sink2, sink3] + |> Composite.new() + + Topology.draw(composite) + end + + test "draw mixer_tree (order of streams is important)" do + mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 3) + Topology.draw(mixer_tree) + end + + test "draw splitter_tree" do + splitter_tree = SplitterTree.new(:stream, [:s1, :s2, :s3, :s4, :s5, :s6, :s7], parts: 3) + Topology.draw(splitter_tree) + end end diff --git a/test/data/output.csv b/test/data/output.csv index f65c9b7..4f4b21b 100644 --- a/test/data/output.csv +++ b/test/data/output.csv @@ -1,107 +1,4 @@ -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 -ORDER_CREATED,2017-04-18T20:00:00.000Z,111,3 -ORDER_CREATED,2017-04-20T09:00:00.000Z,222,2 -ORDER_CREATED,2017-04-21T09:00:00.000Z,333,2 +1 +3 +4 +5 diff --git a/test/mixer_tree_test.exs b/test/mixer_tree_test.exs index e75a44f..34d5a37 100644 --- a/test/mixer_tree_test.exs +++ b/test/mixer_tree_test.exs @@ -1,29 +1,47 @@ defmodule Strom.MixerTreeTest do use ExUnit.Case, async: true - alias Strom.{Composite, MixerTree, Source} + alias Strom.{Composite, MixerTree} - @tag timeout: :infinity - test "messages" do - count = :rand.uniform(100) + test "mixes 5 streams with 2 parts" do + count = 5 + parts = 2 + names = Enum.map(1..count, &String.to_atom("s#{&1}")) + + mixer_tree = + names + |> MixerTree.new(:stream, parts: parts) + |> Composite.start() + + assert length(Composite.components(mixer_tree)) == 6 - names = Enum.map(1..count, &String.to_atom("tick#{&1}")) + flow = + names + |> Enum.reduce(%{}, fn name, acc -> Map.put(acc, name, Enum.to_list(1..10)) end) + |> Composite.call(mixer_tree) + + assert length(Enum.to_list(flow[:stream])) == count * 10 + Composite.stop(mixer_tree) + end + + test "mixes random number of streams" do + count = :rand.uniform(100) - sources = - Enum.map(names, fn name -> - Source.new(name, [:tick]) - end) + names = Enum.map(1..count, &String.to_atom("s#{&1}")) - mixer = MixerTree.new(names, :stream, parts: 5 + :rand.uniform(5)) + parts = 5 + :rand.uniform(5) - composite = - [sources, mixer] - |> Composite.new() + mixer_tree = + names + |> MixerTree.new(:stream, parts: parts) |> Composite.start() - flow = Composite.call(%{}, composite) + flow = + names + |> Enum.reduce(%{}, fn name, acc -> Map.put(acc, name, Enum.to_list(1..10)) end) + |> Composite.call(mixer_tree) - assert length(Enum.to_list(flow[:stream])) == count - Composite.stop(composite) + assert length(Enum.to_list(flow[:stream])) == count * 10 + Composite.stop(mixer_tree) end end diff --git a/test/splitter_tree_test.exs b/test/splitter_tree_test.exs new file mode 100644 index 0000000..373996c --- /dev/null +++ b/test/splitter_tree_test.exs @@ -0,0 +1,95 @@ +defmodule Strom.SplitterTreeTest do + use ExUnit.Case, async: true + + alias Strom.{Composite, SplitterTree} + + test "splits 1 stream into 5 with copying" do + count = 5 + output_streams = Enum.map(1..count, &String.to_atom("s#{&1}")) + parts = 2 + splitter_tree = SplitterTree.new(:stream, output_streams, parts: parts) + + composite = + [splitter_tree] + |> Composite.new() + |> Composite.start() + + max = 10 + flow = Composite.call(%{stream: Enum.to_list(1..max)}, composite) + + Enum.each(output_streams, fn output_stream -> + assert Enum.to_list(flow[output_stream]) == Enum.to_list(1..max) + end) + + Composite.stop(composite) + end + + test "splits 1 stream into 5 with random distribution (hash mode)" do + count = 5 + output_streams = Enum.map(1..count, &String.to_atom("s#{&1}")) + parts = 2 + splitter_tree = SplitterTree.new(:stream, output_streams, parts: parts, mode: :hash) + + composite = + [splitter_tree] + |> Composite.new() + |> Composite.start() + + max = 10 + flow = Composite.call(%{stream: Enum.to_list(1..max)}, composite) + + numbers = + Enum.reduce(output_streams, [], fn output_stream, numbers -> + numbers ++ Enum.to_list(flow[output_stream]) + end) + + assert Enum.sort(numbers) == Enum.to_list(1..max) + + Composite.stop(composite) + end + + test "splits a stream into multiple streams and copies data to each stream" do + count = :rand.uniform(100) + output_streams = Enum.map(1..count, &String.to_atom("s#{&1}")) + parts = 5 + :rand.uniform(5) + splitter_tree = SplitterTree.new(:stream, output_streams, parts: parts) + + composite = + [splitter_tree] + |> Composite.new() + |> Composite.start() + + max = 10 + flow = Composite.call(%{stream: Enum.to_list(1..max)}, composite) + + Enum.each(output_streams, fn output_stream -> + assert Enum.to_list(flow[output_stream]) == Enum.to_list(1..max) + end) + + Composite.stop(composite) + end + + test "splits a stream into multiple streams with the hash mode" do + count = :rand.uniform(100) + output_streams = Enum.map(1..count, &String.to_atom("s#{&1}")) + parts = 5 + :rand.uniform(5) + splitter_tree = SplitterTree.new(:stream, output_streams, parts: parts, mode: :hash) + + composite = + [splitter_tree] + |> Composite.new() + |> Composite.start() + + max = 10 + flow = Composite.call(%{stream: Enum.to_list(1..max)}, composite) + + numbers = + Enum.reduce(output_streams, [], fn output_stream, numbers -> + numbers ++ Enum.to_list(flow[output_stream]) + end) + + assert Enum.sort(numbers) == Enum.to_list(1..max) + + Composite.stop(composite) + end +end