From 860b7548783bf0db8f15897af8d2cceb728b9da8 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Tue, 20 May 2025 23:06:43 +0200 Subject: [PATCH 1/5] First draft of the topology drawing --- lib/composite/topology.ex | 164 ++++++++++++++++++ lib/gen_mix/streams.ex | 11 +- lib/mixer_tree.ex | 2 + .../manipulations/delete_components_test.exs | 2 +- .../manipulations/insert_components_test.exs | 35 +++- .../manipulations/replace_components_test.exs | 2 +- test/composite/topology/draw_test.exs | 59 +++++++ test/mixer_test.exs | 17 +- 8 files changed, 285 insertions(+), 7 deletions(-) create mode 100644 lib/composite/topology.ex rename test/{ => composite}/manipulations/delete_components_test.exs (99%) rename test/{ => composite}/manipulations/insert_components_test.exs (77%) rename test/{ => composite}/manipulations/replace_components_test.exs (97%) create mode 100644 test/composite/topology/draw_test.exs diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex new file mode 100644 index 0000000..e10810d --- /dev/null +++ b/lib/composite/topology.ex @@ -0,0 +1,164 @@ +defmodule Strom.Composite.Topology do + @moduledoc """ + Draws a topology of a composite + """ + alias Strom.{Composite, Mixer, Splitter, Source, Transformer, Sink} + + @info_width 50 + def draw(%Composite{components: components}) do + Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, + {streams, index} -> + streams = + draw_line(index, component, Enum.uniq(streams ++ inputs), inputs, Map.keys(outputs)) + + {streams, index + 1} + end) + end + + defp draw_line(index, component, streams_after_inputs, inputs, outputs) do + draw_stream_names(streams_after_inputs) + input_positions = draw_streams(streams_after_inputs, inputs) + + {streams_after_outputs, output_positions} = + find_place_for_outputs(streams_after_inputs, inputs, outputs) + + average_position = average_position(input_positions, output_positions) + + draw_component_description(index, component) + draw_component(component, streams_after_outputs, outputs, average_position) + + streams_after_outputs + end + + defp find_place_for_outputs(streams, inputs, outputs) do + ended = inputs -- outputs + + streams = + Enum.reduce(streams, [], fn name, acc -> + case Enum.member?(ended, name) do + true -> + [nil | acc] + + false -> + [name | acc] + end + end) + |> Enum.reverse() + + Enum.reduce(outputs, {streams, []}, fn output, {acc, positions} -> + case Enum.member?(streams, output) do + true -> + {acc, positions} + + false -> + nils = + Enum.zip(acc, 0..length(acc)) + |> Enum.filter(fn {name, _} -> is_nil(name) end) + + case nils do + [] -> + {[output | acc], [length(acc) | positions]} + + nils when is_list(nils) -> + index = round(Enum.sum_by(nils, fn {_, index} -> index end) / length(nils)) + {List.replace_at(acc, index, output), [index | positions]} + end + end + end) + end + + defp average_position(input_positions, output_positions) do + case {input_positions, output_positions} do + {[], positions} when is_list(positions) -> + round(Enum.sum(positions) / length(positions)) + + {positions, []} when is_list(positions) -> + round(Enum.sum(positions) / length(positions)) + + {in_pos, out_pos} when is_list(in_pos) and is_list(out_pos) -> + round((Enum.sum(in_pos) + Enum.sum(out_pos)) / (length(in_pos) + length(out_pos))) + end + end + + defp draw_stream_names(streams) do + string = + streams + |> Enum.filter(& &1) + |> Enum.map(&to_string/1) + |> Enum.join(" ") + + IO.write(format_to_width(string, @info_width)) + end + + defp draw_streams(streams, inputs) do + {_, input_positions} = + Enum.reduce(streams, {0, []}, fn name, {counter, acc} -> + cond do + Enum.member?(inputs, name) -> + IO.write("\u275A ") + {counter + 1, [counter | acc]} + + is_nil(name) -> + IO.write(" ") + {counter + 1, acc} + + true -> + IO.write("| ") + {counter + 1, acc} + end + end) + + IO.puts("") + input_positions + end + + defp draw_component_description(index, component) do + case component do + %Mixer{} -> + IO.write(format_to_width("Mixer (#{index})", @info_width)) + + %Splitter{} -> + IO.write(format_to_width("Splitter (#{index})", @info_width)) + + %Transformer{} -> + IO.write(format_to_width("Transformer (#{index})", @info_width)) + + %Source{} -> + IO.write(format_to_width("Source (#{index})", @info_width)) + + %Sink{} -> + IO.write(format_to_width("Sink (#{index})", @info_width)) + end + end + + defp draw_component(component, streams_after_outputs, outputs, average_position) do + Enum.with_index(streams_after_outputs, fn name, index -> + if index == average_position do + component_character(component) + else + if name do + if Enum.member?(outputs, name) do + IO.write(". ") + else + IO.write("| ") + end + else + IO.write(" ") + end + end + end) + + IO.puts("") + end + + defp component_character(%Mixer{}), do: IO.write("Y ") + defp component_character(%Splitter{}), do: IO.write("\u039B ") + defp component_character(%Transformer{}), do: IO.write("\u23FA ") + defp component_character(%Source{}), do: IO.write("\u25BC ") + defp component_character(%Sink{}), do: IO.write("\u25B2 ") + + defp format_to_width(string, width) do + string = Enum.join(List.duplicate(" ", width), "") <> string + String.slice(string, -(width - 2), width - 2) <> " " + end +end diff --git a/lib/gen_mix/streams.ex b/lib/gen_mix/streams.ex index d27190c..8521da7 100644 --- a/lib/gen_mix/streams.ex +++ b/lib/gen_mix/streams.ex @@ -13,9 +13,14 @@ defmodule Strom.GenMix.Streams do sub_flow = build_sub_flow(gm.outputs, gm_pid) - flow - |> Map.drop(gm.inputs) - |> Map.merge(sub_flow) + flow = Map.drop(flow, gm.inputs) + + Enum.reduce(sub_flow, flow, fn {name, stream}, flow -> + case Map.get(flow, name) do + nil -> Map.put(flow, name, stream) + existing_stream -> Map.put(flow, name, Stream.concat(existing_stream, stream)) + end + end) end defp build_sub_flow(outputs, gm_pid) do diff --git a/lib/mixer_tree.ex b/lib/mixer_tree.ex index 31b4106..6823023 100644 --- a/lib/mixer_tree.ex +++ b/lib/mixer_tree.ex @@ -30,6 +30,8 @@ defmodule Strom.MixerTree do {[mixer | acc], [output | outputs], counter + 1} end) + mixers = Enum.reverse(mixers) + if count > parts do mixers ++ build_mixers(outputs, level + 1, parts, final_output, opts) else diff --git a/test/manipulations/delete_components_test.exs b/test/composite/manipulations/delete_components_test.exs similarity index 99% rename from test/manipulations/delete_components_test.exs rename to test/composite/manipulations/delete_components_test.exs index abbdead..feb0bbf 100644 --- a/test/manipulations/delete_components_test.exs +++ b/test/composite/manipulations/delete_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.DeleteComponentsTest do +defmodule Strom.Composite.Manipulations.DeleteComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Composite, Mixer, Transformer} diff --git a/test/manipulations/insert_components_test.exs b/test/composite/manipulations/insert_components_test.exs similarity index 77% rename from test/manipulations/insert_components_test.exs rename to test/composite/manipulations/insert_components_test.exs index 75e0543..5b8f26b 100644 --- a/test/manipulations/insert_components_test.exs +++ b/test/composite/manipulations/insert_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.InsertComponentsTest do +defmodule Strom.Composite.Manipulations.InsertComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Splitter, Transformer} @@ -90,4 +90,37 @@ defmodule Strom.InsertComponentsTest do list = Task.await(task) assert length(list) == 20 end + + def func(event) do + IO.puts("event: #{event}") + + if event < 10 do + composite = %Composite{name: :composite} + + Composite.insert(composite, event, [ + # transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1), + Transformer.new(:stream, &func/1, nil, chunk: 1) + ]) + |> IO.inspect() + + event + 1 + else + event |> IO.inspect(label: "after") + end + end + + # TODO + # test "insert on the fly" do + # transformer1 = Transformer.new(:stream, &func/1, nil, chunk: 1) + # transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1) + + # composite = + # [transformer1, transformer2] + # |> Composite.new(:composite) + # |> Composite.start() + + # %{stream: stream} = Composite.call(%{stream: [1, 11, 2, 12, 3, 13]}, composite) + + # Enum.to_list(stream) |> IO.inspect() + # end end diff --git a/test/manipulations/replace_components_test.exs b/test/composite/manipulations/replace_components_test.exs similarity index 97% rename from test/manipulations/replace_components_test.exs rename to test/composite/manipulations/replace_components_test.exs index 06a2ba0..0f35887 100644 --- a/test/manipulations/replace_components_test.exs +++ b/test/composite/manipulations/replace_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.ReplaceComponentsTest do +defmodule Strom.Composite.Manipulations.ReplaceComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Mixer, Splitter, Transformer} diff --git a/test/composite/topology/draw_test.exs b/test/composite/topology/draw_test.exs new file mode 100644 index 0000000..bd852c7 --- /dev/null +++ b/test/composite/topology/draw_test.exs @@ -0,0 +1,59 @@ +defmodule Strom.Composite.Topology.DrawTest do + use ExUnit.Case, async: false + alias Strom.{Composite, Mixer, MixerTree, Transformer, Sink, Source, Splitter} + alias Strom.Composite.Topology + + test "draw example 1" do + source = Source.new(:stream1, []) + transformer1 = Transformer.new(:stream1, & &1) + splitter = Splitter.new(:stream1, [:stream4, :stream5]) + transformer5 = Transformer.new(:stream5, & &1) + + transformer2 = Transformer.new(:stream2, & &1) + mixer = Mixer.new([:stream1, :stream2, :stream3], :stream) + transformer3 = Transformer.new(:stream4, & &1) + transformer4 = Transformer.new(:stream, & &1) + sink1 = Sink.new(:stream, %{__struct__: Sink}) + sink2 = Sink.new(:stream4, %{__struct__: Sink}) + + composite = + [ + source, + transformer1, + splitter, + transformer5, + transformer2, + mixer, + transformer3, + transformer4, + sink1, + sink2 + ] + |> Composite.new() + + Topology.draw(composite) + end + + test "draw mixer tree" do + mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6], :stream, parts: 2) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer_tree, transformer] + |> Composite.new() + + Topology.draw(composite) + end + + test "draw example 2" do + mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1) + mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer1, mixer2, transformer] + |> Composite.new() + + Topology.draw(composite) + end +end diff --git a/test/mixer_test.exs b/test/mixer_test.exs index f21848d..1b4be7c 100644 --- a/test/mixer_test.exs +++ b/test/mixer_test.exs @@ -2,7 +2,7 @@ defmodule Strom.MixerTest do use ExUnit.Case, async: true doctest Strom.Mixer - alias Strom.{Composite, Mixer, Source} + alias Strom.{Composite, Mixer, Source, Transformer} alias Strom.Source.ReadLines setup do @@ -145,4 +145,19 @@ defmodule Strom.MixerTest do Composite.stop(composite) end end + + test "two mixers mix into the same stream" do + mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1) + mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer1, mixer2, transformer] + |> Composite.new() + |> Composite.start() + + %{stream: stream} = Composite.call(%{s1: [1], s2: [2], s3: [3], s4: [4]}, composite) + + assert Enum.sort(Enum.to_list(stream)) == [1, 2, 3, 4] + end end From deab9ec805c1233ae35735491d688b3dbbf85418 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Tue, 20 May 2025 23:11:18 +0200 Subject: [PATCH 2/5] Fix spec --- lib/composite.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/composite.ex b/lib/composite.ex index ff9aa8d..89b7c4e 100644 --- a/lib/composite.ex +++ b/lib/composite.ex @@ -36,7 +36,7 @@ defmodule Strom.Composite do @type t :: %__MODULE__{} - @spec new([struct()]) :: __MODULE__.t() + @spec new([struct()], atom()) :: __MODULE__.t() def new(components, name \\ nil) when is_list(components) do components = components From 37ca63cce011a3931a00abaadeb0d95685c7ee56 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Tue, 20 May 2025 23:46:18 +0200 Subject: [PATCH 3/5] Insert on the fly --- lib/composite/manipulations.ex | 2 +- lib/composite/topology.ex | 11 ++++- .../manipulations/insert_components_test.exs | 44 +++++++++---------- 3 files changed, 33 insertions(+), 24 deletions(-) diff --git a/lib/composite/manipulations.ex b/lib/composite/manipulations.ex index 8951aa7..89ed4a8 100644 --- a/lib/composite/manipulations.ex +++ b/lib/composite/manipulations.ex @@ -6,7 +6,7 @@ defmodule Strom.Composite.Manipulations do alias Strom.GenMix @spec insert(list(Strom.component()), integer(), list(Strom.component()), atom()) :: - {list(Strom.component()), list(Strom.component()), Strom.flow()} + {list(Strom.component()), list(Strom.component()), Strom.flow()} | {:error, any()} def insert(components, index, new_components, name) when is_list(new_components) do case valid_insert_indices?(components, index) do true -> diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex index e10810d..774574a 100644 --- a/lib/composite/topology.ex +++ b/lib/composite/topology.ex @@ -5,7 +5,8 @@ defmodule Strom.Composite.Topology do alias Strom.{Composite, Mixer, Splitter, Source, Transformer, Sink} @info_width 50 - def draw(%Composite{components: components}) do + def draw(%Composite{} = composite) do + components = refresh_components(composite) Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, {streams, index} -> streams = @@ -15,6 +16,14 @@ defmodule Strom.Composite.Topology do end) end + defp refresh_components(composite) do + if is_pid(composite.pid) do + Composite.components(composite) + else + composite.components + end + end + defp draw_line(index, component, streams_after_inputs, inputs, outputs) do draw_stream_names(streams_after_inputs) input_positions = draw_streams(streams_after_inputs, inputs) diff --git a/test/composite/manipulations/insert_components_test.exs b/test/composite/manipulations/insert_components_test.exs index 5b8f26b..566c00e 100644 --- a/test/composite/manipulations/insert_components_test.exs +++ b/test/composite/manipulations/insert_components_test.exs @@ -91,36 +91,36 @@ defmodule Strom.Composite.Manipulations.InsertComponentsTest do assert length(list) == 20 end - def func(event) do - IO.puts("event: #{event}") - - if event < 10 do + def func(event, position) do + if event < 3 do composite = %Composite{name: :composite} - Composite.insert(composite, event, [ - # transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1), - Transformer.new(:stream, &func/1, nil, chunk: 1) - ]) - |> IO.inspect() + {_, _} = + Composite.insert(composite, position + 2, [ + Transformer.new(:stream, &func/2, position + 2, chunk: 1), + Transformer.new(:stream, & &1, nil, chunk: 1) + ]) - event + 1 + {[event + 1, event + 1], position} else - event |> IO.inspect(label: "after") + {[event], position} end end - # TODO - # test "insert on the fly" do - # transformer1 = Transformer.new(:stream, &func/1, nil, chunk: 1) - # transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1) + test "insert on the fly" do + transformer1 = Transformer.new(:stream, &func/2, 0, chunk: 1) + transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1) + transformer3 = Transformer.new(:stream, & &1, nil, chunk: 1) - # composite = - # [transformer1, transformer2] - # |> Composite.new(:composite) - # |> Composite.start() + composite = + [transformer1, transformer2, transformer3] + |> Composite.new(:composite) + |> Composite.start() - # %{stream: stream} = Composite.call(%{stream: [1, 11, 2, 12, 3, 13]}, composite) + stream = build_stream(Enum.to_list(1..1), 1) + %{stream: stream} = Composite.call(%{stream: stream}, composite) - # Enum.to_list(stream) |> IO.inspect() - # end + assert Enum.to_list(stream) == [2, 3, 3] + assert length(Composite.components(composite)) == 7 + end end From 7004cfe51bdc37cde2b2dafef1fb0b1f04a8f688 Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Wed, 21 May 2025 12:47:36 +0200 Subject: [PATCH 4/5] Simplify average position calculation --- lib/composite/topology.ex | 75 ++++++++++++++------------- test/composite/topology/draw_test.exs | 2 +- 2 files changed, 41 insertions(+), 36 deletions(-) diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex index 774574a..fc4a821 100644 --- a/lib/composite/topology.ex +++ b/lib/composite/topology.ex @@ -4,16 +4,26 @@ defmodule Strom.Composite.Topology do """ alias Strom.{Composite, Mixer, Splitter, Source, Transformer, Sink} - @info_width 50 + @info_width 70 + + @spec draw(Composite.t()) :: :ok def draw(%Composite{} = composite) do components = refresh_components(composite) - Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, + {streams, _} = Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, {streams, index} -> - streams = - draw_line(index, component, Enum.uniq(streams ++ inputs), inputs, Map.keys(outputs)) + {inputs, outputs} = case component do + %Source{} -> {[], Map.keys(outputs)} + %Sink{} -> {inputs, []} + _ -> {inputs, Map.keys(outputs)} + end + started = inputs -- streams + streams = draw_line(index, component, streams ++ started, inputs, outputs) {streams, index + 1} end) + draw_stream_names(streams) + draw_streams(streams, []) + :ok end defp refresh_components(composite) do @@ -27,19 +37,26 @@ defmodule Strom.Composite.Topology do defp draw_line(index, component, streams_after_inputs, inputs, outputs) do draw_stream_names(streams_after_inputs) input_positions = draw_streams(streams_after_inputs, inputs) - - {streams_after_outputs, output_positions} = - find_place_for_outputs(streams_after_inputs, inputs, outputs) - - average_position = average_position(input_positions, output_positions) - + average_position = average_position(input_positions) + streams_after_outputs = + find_place_for_outputs(streams_after_inputs, inputs, outputs, average_position) draw_component_description(index, component) draw_component(component, streams_after_outputs, outputs, average_position) - streams_after_outputs + streams_after_outputs + end + + defp average_position(input_positions) do + case input_positions do + [] -> + 0 + + positions when is_list(positions) -> + round(Enum.sum(positions) / length(positions)) + end end - defp find_place_for_outputs(streams, inputs, outputs) do + defp find_place_for_outputs(streams, inputs, outputs, average_position) do ended = inputs -- outputs streams = @@ -54,10 +71,10 @@ defmodule Strom.Composite.Topology do end) |> Enum.reverse() - Enum.reduce(outputs, {streams, []}, fn output, {acc, positions} -> + Enum.reduce(outputs, streams, fn output, acc -> case Enum.member?(streams, output) do true -> - {acc, positions} + acc false -> nils = @@ -66,28 +83,16 @@ defmodule Strom.Composite.Topology do case nils do [] -> - {[output | acc], [length(acc) | positions]} + [output | acc] nils when is_list(nils) -> - index = round(Enum.sum_by(nils, fn {_, index} -> index end) / length(nils)) - {List.replace_at(acc, index, output), [index | positions]} + {nil, closest_to_average} = Enum.min_by(nils, fn {_, index} -> abs(index - average_position) end) + List.replace_at(acc, closest_to_average, output) end end end) end - defp average_position(input_positions, output_positions) do - case {input_positions, output_positions} do - {[], positions} when is_list(positions) -> - round(Enum.sum(positions) / length(positions)) - - {positions, []} when is_list(positions) -> - round(Enum.sum(positions) / length(positions)) - - {in_pos, out_pos} when is_list(in_pos) and is_list(out_pos) -> - round((Enum.sum(in_pos) + Enum.sum(out_pos)) / (length(in_pos) + length(out_pos))) - end - end defp draw_stream_names(streams) do string = @@ -96,7 +101,7 @@ defmodule Strom.Composite.Topology do |> Enum.map(&to_string/1) |> Enum.join(" ") - IO.write(format_to_width(string, @info_width)) + IO.write(format_to_width("\e[3m#{string}\e[0m", @info_width)) end defp draw_streams(streams, inputs) do @@ -124,19 +129,19 @@ defmodule Strom.Composite.Topology do defp draw_component_description(index, component) do case component do %Mixer{} -> - IO.write(format_to_width("Mixer (#{index})", @info_width)) + IO.write(format_to_width("\e[1mMixer (#{index})\e[0m", @info_width)) %Splitter{} -> - IO.write(format_to_width("Splitter (#{index})", @info_width)) + IO.write(format_to_width("\e[1mSplitter (#{index})\e[0m", @info_width)) %Transformer{} -> - IO.write(format_to_width("Transformer (#{index})", @info_width)) + IO.write(format_to_width("\e[1mTransformer (#{index})\e[0m", @info_width)) %Source{} -> - IO.write(format_to_width("Source (#{index})", @info_width)) + IO.write(format_to_width("\e[1mSource (#{index})\e[0m", @info_width)) %Sink{} -> - IO.write(format_to_width("Sink (#{index})", @info_width)) + IO.write(format_to_width("\e[1mSink (#{index})\e[0m", @info_width)) end end diff --git a/test/composite/topology/draw_test.exs b/test/composite/topology/draw_test.exs index bd852c7..b2d99d2 100644 --- a/test/composite/topology/draw_test.exs +++ b/test/composite/topology/draw_test.exs @@ -35,7 +35,7 @@ defmodule Strom.Composite.Topology.DrawTest do end test "draw mixer tree" do - mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6], :stream, parts: 2) + mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 2) transformer = Transformer.new(:stream, & &1) composite = From 5d1740d541081973e4175fe03a9d07175e0815af Mon Sep 17 00:00:00 2001 From: Anton Mishchuk Date: Wed, 21 May 2025 12:59:36 +0200 Subject: [PATCH 5/5] Draw labels if provided --- lib/composite/topology.ex | 65 ++++++++++++++++----------- test/composite/topology/draw_test.exs | 8 ++-- 2 files changed, 44 insertions(+), 29 deletions(-) diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex index fc4a821..879d0a6 100644 --- a/lib/composite/topology.ex +++ b/lib/composite/topology.ex @@ -8,19 +8,25 @@ defmodule Strom.Composite.Topology do @spec draw(Composite.t()) :: :ok def draw(%Composite{} = composite) do + IO.puts("") components = refresh_components(composite) - {streams, _} = Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, - {streams, index} -> - {inputs, outputs} = case component do - %Source{} -> {[], Map.keys(outputs)} - %Sink{} -> {inputs, []} - _ -> {inputs, Map.keys(outputs)} - end - started = inputs -- streams - streams = draw_line(index, component, streams ++ started, inputs, outputs) - {streams, index + 1} - end) + {streams, _} = + Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, + {streams, index} -> + {inputs, outputs} = + case component do + %Source{} -> {[], Map.keys(outputs)} + %Sink{} -> {inputs, []} + _ -> {inputs, Map.keys(outputs)} + end + + started = inputs -- streams + streams = draw_line(index, component, streams ++ started, inputs, outputs) + + {streams, index + 1} + end) + draw_stream_names(streams) draw_streams(streams, []) :ok @@ -38,12 +44,14 @@ defmodule Strom.Composite.Topology do draw_stream_names(streams_after_inputs) input_positions = draw_streams(streams_after_inputs, inputs) average_position = average_position(input_positions) + streams_after_outputs = find_place_for_outputs(streams_after_inputs, inputs, outputs, average_position) + draw_component_description(index, component) draw_component(component, streams_after_outputs, outputs, average_position) - streams_after_outputs + streams_after_outputs end defp average_position(input_positions) do @@ -86,14 +94,15 @@ defmodule Strom.Composite.Topology do [output | acc] nils when is_list(nils) -> - {nil, closest_to_average} = Enum.min_by(nils, fn {_, index} -> abs(index - average_position) end) + {nil, closest_to_average} = + Enum.min_by(nils, fn {_, index} -> abs(index - average_position) end) + List.replace_at(acc, closest_to_average, output) end end end) end - defp draw_stream_names(streams) do string = streams @@ -127,21 +136,27 @@ defmodule Strom.Composite.Topology do end defp draw_component_description(index, component) do - case component do - %Mixer{} -> - IO.write(format_to_width("\e[1mMixer (#{index})\e[0m", @info_width)) + case Keyword.get(component.opts, :label, nil) do + nil -> + case component do + %Mixer{} -> + IO.write(format_to_width("\e[1mMixer (#{index})\e[0m", @info_width)) + + %Splitter{} -> + IO.write(format_to_width("\e[1mSplitter (#{index})\e[0m", @info_width)) - %Splitter{} -> - IO.write(format_to_width("\e[1mSplitter (#{index})\e[0m", @info_width)) + %Transformer{} -> + IO.write(format_to_width("\e[1mTransformer (#{index})\e[0m", @info_width)) - %Transformer{} -> - IO.write(format_to_width("\e[1mTransformer (#{index})\e[0m", @info_width)) + %Source{} -> + IO.write(format_to_width("\e[1mSource (#{index})\e[0m", @info_width)) - %Source{} -> - IO.write(format_to_width("\e[1mSource (#{index})\e[0m", @info_width)) + %Sink{} -> + IO.write(format_to_width("\e[1mSink (#{index})\e[0m", @info_width)) + end - %Sink{} -> - IO.write(format_to_width("\e[1mSink (#{index})\e[0m", @info_width)) + label -> + IO.write(format_to_width("\e[1m#{label} (#{index})\e[0m", @info_width)) end end diff --git a/test/composite/topology/draw_test.exs b/test/composite/topology/draw_test.exs index b2d99d2..a608799 100644 --- a/test/composite/topology/draw_test.exs +++ b/test/composite/topology/draw_test.exs @@ -4,10 +4,10 @@ defmodule Strom.Composite.Topology.DrawTest do alias Strom.Composite.Topology test "draw example 1" do - source = Source.new(:stream1, []) - transformer1 = Transformer.new(:stream1, & &1) - splitter = Splitter.new(:stream1, [:stream4, :stream5]) - transformer5 = Transformer.new(:stream5, & &1) + source = Source.new(:stream1, [], label: "Source of stream1") + transformer1 = Transformer.new(:stream1, & &1, nil, label: "Transformer 1") + splitter = Splitter.new(:stream1, [:stream4, :stream5], label: "Splitter") + transformer5 = Transformer.new(:stream5, & &1, nil, label: "Transformer 5") transformer2 = Transformer.new(:stream2, & &1) mixer = Mixer.new([:stream1, :stream2, :stream3], :stream)