Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/composite.ex
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ defmodule Strom.Composite do

@type t :: %__MODULE__{}

@spec new([struct()]) :: __MODULE__.t()
@spec new([struct()], atom()) :: __MODULE__.t()
def new(components, name \\ nil) when is_list(components) do
components =
components
Expand Down
2 changes: 1 addition & 1 deletion lib/composite/manipulations.ex
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ defmodule Strom.Composite.Manipulations do
alias Strom.GenMix

@spec insert(list(Strom.component()), integer(), list(Strom.component()), atom()) ::
{list(Strom.component()), list(Strom.component()), Strom.flow()}
{list(Strom.component()), list(Strom.component()), Strom.flow()} | {:error, any()}
def insert(components, index, new_components, name) when is_list(new_components) do
case valid_insert_indices?(components, index) do
true ->
Expand Down
193 changes: 193 additions & 0 deletions lib/composite/topology.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
defmodule Strom.Composite.Topology do
@moduledoc """
Draws a topology of a composite
"""
alias Strom.{Composite, Mixer, Splitter, Source, Transformer, Sink}

@info_width 70

@spec draw(Composite.t()) :: :ok
def draw(%Composite{} = composite) do
IO.puts("")
components = refresh_components(composite)

{streams, _} =
Enum.reduce(components, {[], 0}, fn %{inputs: inputs, outputs: outputs} = component,
{streams, index} ->
{inputs, outputs} =
case component do
%Source{} -> {[], Map.keys(outputs)}
%Sink{} -> {inputs, []}
_ -> {inputs, Map.keys(outputs)}
end

started = inputs -- streams
streams = draw_line(index, component, streams ++ started, inputs, outputs)

{streams, index + 1}
end)

draw_stream_names(streams)
draw_streams(streams, [])
:ok
end

defp refresh_components(composite) do
if is_pid(composite.pid) do
Composite.components(composite)
else
composite.components
end
end

defp draw_line(index, component, streams_after_inputs, inputs, outputs) do
draw_stream_names(streams_after_inputs)
input_positions = draw_streams(streams_after_inputs, inputs)
average_position = average_position(input_positions)

streams_after_outputs =
find_place_for_outputs(streams_after_inputs, inputs, outputs, average_position)

draw_component_description(index, component)
draw_component(component, streams_after_outputs, outputs, average_position)

streams_after_outputs
end

defp average_position(input_positions) do
case input_positions do
[] ->
0

positions when is_list(positions) ->
round(Enum.sum(positions) / length(positions))
end
end

defp find_place_for_outputs(streams, inputs, outputs, average_position) do
ended = inputs -- outputs

streams =
Enum.reduce(streams, [], fn name, acc ->
case Enum.member?(ended, name) do
true ->
[nil | acc]

false ->
[name | acc]
end
end)
|> Enum.reverse()

Enum.reduce(outputs, streams, fn output, acc ->
case Enum.member?(streams, output) do
true ->
acc

false ->
nils =
Enum.zip(acc, 0..length(acc))
|> Enum.filter(fn {name, _} -> is_nil(name) end)

case nils do
[] ->
[output | acc]

nils when is_list(nils) ->
{nil, closest_to_average} =
Enum.min_by(nils, fn {_, index} -> abs(index - average_position) end)

List.replace_at(acc, closest_to_average, output)
end
end
end)
end

defp draw_stream_names(streams) do
string =
streams
|> Enum.filter(& &1)
|> Enum.map(&to_string/1)
|> Enum.join(" ")

IO.write(format_to_width("\e[3m#{string}\e[0m", @info_width))
end

defp draw_streams(streams, inputs) do
{_, input_positions} =
Enum.reduce(streams, {0, []}, fn name, {counter, acc} ->
cond do
Enum.member?(inputs, name) ->
IO.write("\u275A ")
{counter + 1, [counter | acc]}

is_nil(name) ->
IO.write(" ")
{counter + 1, acc}

true ->
IO.write("| ")
{counter + 1, acc}
end
end)

IO.puts("")
input_positions
end

defp draw_component_description(index, component) do
case Keyword.get(component.opts, :label, nil) do
nil ->
case component do
%Mixer{} ->
IO.write(format_to_width("\e[1mMixer (#{index})\e[0m", @info_width))

%Splitter{} ->
IO.write(format_to_width("\e[1mSplitter (#{index})\e[0m", @info_width))

%Transformer{} ->
IO.write(format_to_width("\e[1mTransformer (#{index})\e[0m", @info_width))

%Source{} ->
IO.write(format_to_width("\e[1mSource (#{index})\e[0m", @info_width))

%Sink{} ->
IO.write(format_to_width("\e[1mSink (#{index})\e[0m", @info_width))
end

label ->
IO.write(format_to_width("\e[1m#{label} (#{index})\e[0m", @info_width))
end
end

defp draw_component(component, streams_after_outputs, outputs, average_position) do
Enum.with_index(streams_after_outputs, fn name, index ->
if index == average_position do
component_character(component)
else
if name do
if Enum.member?(outputs, name) do
IO.write(". ")
else
IO.write("| ")
end
else
IO.write(" ")
end
end
end)

IO.puts("")
end

defp component_character(%Mixer{}), do: IO.write("Y ")
defp component_character(%Splitter{}), do: IO.write("\u039B ")
defp component_character(%Transformer{}), do: IO.write("\u23FA ")
defp component_character(%Source{}), do: IO.write("\u25BC ")
defp component_character(%Sink{}), do: IO.write("\u25B2 ")

defp format_to_width(string, width) do
string = Enum.join(List.duplicate(" ", width), "") <> string
String.slice(string, -(width - 2), width - 2) <> " "
end
end
11 changes: 8 additions & 3 deletions lib/gen_mix/streams.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,14 @@ defmodule Strom.GenMix.Streams do

sub_flow = build_sub_flow(gm.outputs, gm_pid)

flow
|> Map.drop(gm.inputs)
|> Map.merge(sub_flow)
flow = Map.drop(flow, gm.inputs)

Enum.reduce(sub_flow, flow, fn {name, stream}, flow ->
case Map.get(flow, name) do
nil -> Map.put(flow, name, stream)
existing_stream -> Map.put(flow, name, Stream.concat(existing_stream, stream))
end
end)
end

defp build_sub_flow(outputs, gm_pid) do
Expand Down
2 changes: 2 additions & 0 deletions lib/mixer_tree.ex
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ defmodule Strom.MixerTree do
{[mixer | acc], [output | outputs], counter + 1}
end)

mixers = Enum.reverse(mixers)

if count > parts do
mixers ++ build_mixers(outputs, level + 1, parts, final_output, opts)
else
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Strom.DeleteComponentsTest do
defmodule Strom.Composite.Manipulations.DeleteComponentsTest do
use ExUnit.Case, async: false
import Strom.TestHelper
alias Strom.{Composite, Mixer, Transformer}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Strom.InsertComponentsTest do
defmodule Strom.Composite.Manipulations.InsertComponentsTest do
use ExUnit.Case, async: false
import Strom.TestHelper
alias Strom.{Splitter, Transformer}
Expand Down Expand Up @@ -90,4 +90,37 @@ defmodule Strom.InsertComponentsTest do
list = Task.await(task)
assert length(list) == 20
end

def func(event, position) do
if event < 3 do
composite = %Composite{name: :composite}

{_, _} =
Composite.insert(composite, position + 2, [
Transformer.new(:stream, &func/2, position + 2, chunk: 1),
Transformer.new(:stream, & &1, nil, chunk: 1)
])

{[event + 1, event + 1], position}
else
{[event], position}
end
end

test "insert on the fly" do
transformer1 = Transformer.new(:stream, &func/2, 0, chunk: 1)
transformer2 = Transformer.new(:stream, & &1, nil, chunk: 1)
transformer3 = Transformer.new(:stream, & &1, nil, chunk: 1)

composite =
[transformer1, transformer2, transformer3]
|> Composite.new(:composite)
|> Composite.start()

stream = build_stream(Enum.to_list(1..1), 1)
%{stream: stream} = Composite.call(%{stream: stream}, composite)

assert Enum.to_list(stream) == [2, 3, 3]
assert length(Composite.components(composite)) == 7
end
end
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
defmodule Strom.ReplaceComponentsTest do
defmodule Strom.Composite.Manipulations.ReplaceComponentsTest do
use ExUnit.Case, async: false
import Strom.TestHelper
alias Strom.{Mixer, Splitter, Transformer}
Expand Down
59 changes: 59 additions & 0 deletions test/composite/topology/draw_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
defmodule Strom.Composite.Topology.DrawTest do
use ExUnit.Case, async: false
alias Strom.{Composite, Mixer, MixerTree, Transformer, Sink, Source, Splitter}
alias Strom.Composite.Topology

test "draw example 1" do
source = Source.new(:stream1, [], label: "Source of stream1")
transformer1 = Transformer.new(:stream1, & &1, nil, label: "Transformer 1")
splitter = Splitter.new(:stream1, [:stream4, :stream5], label: "Splitter")
transformer5 = Transformer.new(:stream5, & &1, nil, label: "Transformer 5")

transformer2 = Transformer.new(:stream2, & &1)
mixer = Mixer.new([:stream1, :stream2, :stream3], :stream)
transformer3 = Transformer.new(:stream4, & &1)
transformer4 = Transformer.new(:stream, & &1)
sink1 = Sink.new(:stream, %{__struct__: Sink})
sink2 = Sink.new(:stream4, %{__struct__: Sink})

composite =
[
source,
transformer1,
splitter,
transformer5,
transformer2,
mixer,
transformer3,
transformer4,
sink1,
sink2
]
|> Composite.new()

Topology.draw(composite)
end

test "draw mixer tree" do
mixer_tree = MixerTree.new([:s1, :s2, :s3, :s4, :s5, :s6, :s7], :stream, parts: 2)
transformer = Transformer.new(:stream, & &1)

composite =
[mixer_tree, transformer]
|> Composite.new()

Topology.draw(composite)
end

test "draw example 2" do
mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1)
mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1)
transformer = Transformer.new(:stream, & &1)

composite =
[mixer1, mixer2, transformer]
|> Composite.new()

Topology.draw(composite)
end
end
17 changes: 16 additions & 1 deletion test/mixer_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ defmodule Strom.MixerTest do
use ExUnit.Case, async: true
doctest Strom.Mixer

alias Strom.{Composite, Mixer, Source}
alias Strom.{Composite, Mixer, Source, Transformer}
alias Strom.Source.ReadLines

setup do
Expand Down Expand Up @@ -145,4 +145,19 @@ defmodule Strom.MixerTest do
Composite.stop(composite)
end
end

test "two mixers mix into the same stream" do
mixer1 = Mixer.new([:s1, :s2], :stream, chunk: 1)
mixer2 = Mixer.new([:s3, :s4], :stream, chunk: 1)
transformer = Transformer.new(:stream, & &1)

composite =
[mixer1, mixer2, transformer]
|> Composite.new()
|> Composite.start()

%{stream: stream} = Composite.call(%{s1: [1], s2: [2], s3: [3], s4: [4]}, composite)

assert Enum.sort(Enum.to_list(stream)) == [1, 2, 3, 4]
end
end