diff --git a/lib/composite.ex b/lib/composite.ex index ff9aa8d..89b7c4e 100644 --- a/lib/composite.ex +++ b/lib/composite.ex @@ -36,7 +36,7 @@ defmodule Strom.Composite do @type t :: %__MODULE__{} - @spec new([struct()]) :: __MODULE__.t() + @spec new([struct()], atom()) :: __MODULE__.t() def new(components, name \\ nil) when is_list(components) do components = components diff --git a/lib/composite/manipulations.ex b/lib/composite/manipulations.ex index 8951aa7..89ed4a8 100644 --- a/lib/composite/manipulations.ex +++ b/lib/composite/manipulations.ex @@ -6,7 +6,7 @@ defmodule Strom.Composite.Manipulations do alias Strom.GenMix @spec insert(list(Strom.component()), integer(), list(Strom.component()), atom()) :: - {list(Strom.component()), list(Strom.component()), Strom.flow()} + {list(Strom.component()), list(Strom.component()), Strom.flow()} | {:error, any()} def insert(components, index, new_components, name) when is_list(new_components) do case valid_insert_indices?(components, index) do true -> diff --git a/lib/composite/topology.ex b/lib/composite/topology.ex new file mode 100644 index 0000000..879d0a6 --- /dev/null +++ b/lib/composite/topology.ex @@ -0,0 +1,193 @@ +defmodule Strom.Composite.Topology do + @moduledoc """ + Draws a topology of a composite + """ + alias Strom.{Composite, Mixer, Splitter, Source, Transformer, Sink} + + @info_width 70 + + @spec draw(Composite.t()) :: :ok + def draw(%Composite{} = composite) do + IO.puts("") + components = refresh_components(composite) + + {streams, _} = + Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component, + {streams, index} -> + {inputs, outputs} = + case component do + %Source{} -> {[], Map.keys(outputs)} + %Sink{} -> {inputs, []} + _ -> {inputs, Map.keys(outputs)} + end + + started = inputs -- streams + streams = draw_line(index, component, streams ++ started, inputs, outputs) + + {streams, index + 1} + end) + + draw_stream_names(streams) + draw_streams(streams, []) + :ok + end + + defp refresh_components(composite) do + if is_pid(composite.pid) do + Composite.components(composite) + else + composite.components + end + end + + defp draw_line(index, component, streams_after_inputs, inputs, outputs) do + draw_stream_names(streams_after_inputs) + input_positions = draw_streams(streams_after_inputs, inputs) + average_position = average_position(input_positions) + + streams_after_outputs = + find_place_for_outputs(streams_after_inputs, inputs, outputs, average_position) + + draw_component_description(index, component) + draw_component(component, streams_after_outputs, outputs, average_position) + + streams_after_outputs + end + + defp average_position(input_positions) do + case input_positions do + [] -> + 0 + + positions when is_list(positions) -> + round(Enum.sum(positions) / length(positions)) + end + end + + defp find_place_for_outputs(streams, inputs, outputs, average_position) do + ended = inputs -- outputs + + streams = + Enum.reduce(streams, [], fn name, acc -> + case Enum.member?(ended, name) do + true -> + [nil | acc] + + false -> + [name | acc] + end + end) + |> Enum.reverse() + + Enum.reduce(outputs, streams, fn output, acc -> + case Enum.member?(streams, output) do + true -> + acc + + false -> + nils = + Enum.zip(acc, 0..length(acc)) + |> Enum.filter(fn {name, _} -> is_nil(name) end) + + case nils do + [] -> + [output | acc] + + nils when is_list(nils) -> + {nil, closest_to_average} = + Enum.min_by(nils, fn {_, index} -> abs(index - average_position) end) + + List.replace_at(acc, closest_to_average, output) + end + end + end) + end + + defp draw_stream_names(streams) do + string = + streams + |> Enum.filter(& &1) + |> Enum.map(&to_string/1) + |> Enum.join(" ") + + IO.write(format_to_width("\e[3m#{string}\e[0m", @info_width)) + end + + defp draw_streams(streams, inputs) do + {_, input_positions} = + Enum.reduce(streams, {0, []}, fn name, {counter, acc} -> + cond do + Enum.member?(inputs, name) -> + IO.write("\u275A ") + {counter + 1, [counter | acc]} + + is_nil(name) -> + IO.write(" ") + {counter + 1, acc} + + true -> + IO.write("| ") + {counter + 1, acc} + end + end) + + IO.puts("") + input_positions + end + + defp draw_component_description(index, component) do + case Keyword.get(component.opts, :label, nil) do + nil -> + case component do + %Mixer{} -> + IO.write(format_to_width("\e[1mMixer (#{index})\e[0m", @info_width)) + + %Splitter{} -> + IO.write(format_to_width("\e[1mSplitter (#{index})\e[0m", @info_width)) + + %Transformer{} -> + IO.write(format_to_width("\e[1mTransformer (#{index})\e[0m", @info_width)) + + %Source{} -> + IO.write(format_to_width("\e[1mSource (#{index})\e[0m", @info_width)) + + %Sink{} -> + IO.write(format_to_width("\e[1mSink (#{index})\e[0m", @info_width)) + end + + label -> + IO.write(format_to_width("\e[1m#{label} (#{index})\e[0m", @info_width)) + end + end + + defp draw_component(component, streams_after_outputs, outputs, average_position) do + Enum.with_index(streams_after_outputs, fn name, index -> + if index == average_position do + component_character(component) + else + if name do + if Enum.member?(outputs, name) do + IO.write(". ") + else + IO.write("| ") + end + else + IO.write(" ") + end + end + end) + + IO.puts("") + end + + defp component_character(%Mixer{}), do: IO.write("Y ") + defp component_character(%Splitter{}), do: IO.write("\u039B ") + defp component_character(%Transformer{}), do: IO.write("\u23FA ") + defp component_character(%Source{}), do: IO.write("\u25BC ") + defp component_character(%Sink{}), do: IO.write("\u25B2 ") + + defp format_to_width(string, width) do + string = Enum.join(List.duplicate(" ", width), "") <> string + String.slice(string, -(width - 2), width - 2) <> " " + end +end diff --git a/lib/gen_mix/streams.ex b/lib/gen_mix/streams.ex index d27190c..8521da7 100644 --- a/lib/gen_mix/streams.ex +++ b/lib/gen_mix/streams.ex @@ -13,9 +13,14 @@ defmodule Strom.GenMix.Streams do sub_flow = build_sub_flow(gm.outputs, gm_pid) - flow - |> Map.drop(gm.inputs) - |> Map.merge(sub_flow) + flow = Map.drop(flow, gm.inputs) + + Enum.reduce(sub_flow, flow, fn {name, stream}, flow -> + case Map.get(flow, name) do + nil -> Map.put(flow, name, stream) + existing_stream -> Map.put(flow, name, Stream.concat(existing_stream, stream)) + end + end) end defp build_sub_flow(outputs, gm_pid) do diff --git a/lib/mixer_tree.ex b/lib/mixer_tree.ex index 31b4106..6823023 100644 --- a/lib/mixer_tree.ex +++ b/lib/mixer_tree.ex @@ -30,6 +30,8 @@ defmodule Strom.MixerTree do {[mixer | acc], [output | outputs], counter + 1} end) + mixers = Enum.reverse(mixers) + if count > parts do mixers ++ build_mixers(outputs, level + 1, parts, final_output, opts) else diff --git a/test/manipulations/delete_components_test.exs b/test/composite/manipulations/delete_components_test.exs similarity index 99% rename from test/manipulations/delete_components_test.exs rename to test/composite/manipulations/delete_components_test.exs index abbdead..feb0bbf 100644 --- a/test/manipulations/delete_components_test.exs +++ b/test/composite/manipulations/delete_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.DeleteComponentsTest do +defmodule Strom.Composite.Manipulations.DeleteComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Composite, Mixer, Transformer} diff --git a/test/manipulations/insert_components_test.exs b/test/composite/manipulations/insert_components_test.exs similarity index 74% rename from test/manipulations/insert_components_test.exs rename to test/composite/manipulations/insert_components_test.exs index 75e0543..566c00e 100644 --- a/test/manipulations/insert_components_test.exs +++ b/test/composite/manipulations/insert_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.InsertComponentsTest do +defmodule Strom.Composite.Manipulations.InsertComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Splitter, Transformer} @@ -90,4 +90,37 @@ defmodule Strom.InsertComponentsTest do list = Task.await(task) assert length(list) == 20 end + + def func(event, position) do + if event < 3 do + composite = %Composite{name: :composite} + + {_, _} = + Composite.insert(composite, position + 2, [ + Transformer.new(:stream, &func/2, position + 2, chunk: 1), + Transformer.new(:stream, & &1, nil, chunk: 1) + ]) + + {[event + 1, event + 1], position} + else + {[event], position} + end + end + + test "insert on the fly" do + transformer1 = Transformer.new(:stream, &func/2, 0, chunk: 1) + transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1) + transformer3 = Transformer.new(:stream, & &1, nil, chunk: 1) + + composite = + [transformer1, transformer2, transformer3] + |> Composite.new(:composite) + |> Composite.start() + + stream = build_stream(Enum.to_list(1..1), 1) + %{stream: stream} = Composite.call(%{stream: stream}, composite) + + assert Enum.to_list(stream) == [2, 3, 3] + assert length(Composite.components(composite)) == 7 + end end diff --git a/test/manipulations/replace_components_test.exs b/test/composite/manipulations/replace_components_test.exs similarity index 97% rename from test/manipulations/replace_components_test.exs rename to test/composite/manipulations/replace_components_test.exs index 06a2ba0..0f35887 100644 --- a/test/manipulations/replace_components_test.exs +++ b/test/composite/manipulations/replace_components_test.exs @@ -1,4 +1,4 @@ -defmodule Strom.ReplaceComponentsTest do +defmodule Strom.Composite.Manipulations.ReplaceComponentsTest do use ExUnit.Case, async: false import Strom.TestHelper alias Strom.{Mixer, Splitter, Transformer} diff --git a/test/composite/topology/draw_test.exs b/test/composite/topology/draw_test.exs new file mode 100644 index 0000000..a608799 --- /dev/null +++ b/test/composite/topology/draw_test.exs @@ -0,0 +1,59 @@ +defmodule Strom.Composite.Topology.DrawTest do + use ExUnit.Case, async: false + alias Strom.{Composite, Mixer, MixerTree, Transformer, Sink, Source, Splitter} + alias Strom.Composite.Topology + + test "draw example 1" do + source = Source.new(:stream1, [], label: "Source of stream1") + transformer1 = Transformer.new(:stream1, & &1, nil, label: "Transformer 1") + splitter = Splitter.new(:stream1, [:stream4, :stream5], label: "Splitter") + transformer5 = Transformer.new(:stream5, & &1, nil, label: "Transformer 5") + + transformer2 = Transformer.new(:stream2, & &1) + mixer = Mixer.new([:stream1, :stream2, :stream3], :stream) + transformer3 = Transformer.new(:stream4, & &1) + transformer4 = Transformer.new(:stream, & &1) + sink1 = Sink.new(:stream, %{__struct__: Sink}) + sink2 = Sink.new(:stream4, %{__struct__: Sink}) + + composite = + [ + source, + transformer1, + splitter, + transformer5, + transformer2, + mixer, + transformer3, + transformer4, + sink1, + sink2 + ] + |> Composite.new() + + Topology.draw(composite) + end + + test "draw mixer tree" do + mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 2) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer_tree, transformer] + |> Composite.new() + + Topology.draw(composite) + end + + test "draw example 2" do + mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1) + mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer1, mixer2, transformer] + |> Composite.new() + + Topology.draw(composite) + end +end diff --git a/test/mixer_test.exs b/test/mixer_test.exs index f21848d..1b4be7c 100644 --- a/test/mixer_test.exs +++ b/test/mixer_test.exs @@ -2,7 +2,7 @@ defmodule Strom.MixerTest do use ExUnit.Case, async: true doctest Strom.Mixer - alias Strom.{Composite, Mixer, Source} + alias Strom.{Composite, Mixer, Source, Transformer} alias Strom.Source.ReadLines setup do @@ -145,4 +145,19 @@ defmodule Strom.MixerTest do Composite.stop(composite) end end + + test "two mixers mix into the same stream" do + mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1) + mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1) + transformer = Transformer.new(:stream, & &1) + + composite = + [mixer1, mixer2, transformer] + |> Composite.new() + |> Composite.start() + + %{stream: stream} = Composite.call(%{s1: [1], s2: [2], s3: [3], s4: [4]}, composite) + + assert Enum.sort(Enum.to_list(stream)) == [1, 2, 3, 4] + end end