diff --git a/Makefile b/Makefile index a46f399..b820c3a 100644 --- a/Makefile +++ b/Makefile @@ -19,16 +19,15 @@ export SHELLOPTS:=$(if $(SHELLOPTS),$(SHELLOPTS):)pipefail:errexit ## Compile core docker image build-core-image: - cd core docker build \ + -f core/Dockerfile \ --build-arg SECRET_KEY_BASE=local-make-secret \ --build-arg MIX_ENV="prod" \ -t core . ## Compile worker docker image build-worker-image: - cd worker - docker build --build-arg MIX_ENV="prod" -t worker . + docker build -f worker/Dockerfile --build-arg MIX_ENV="prod" -t worker . ## Run credo --strict credo-core: diff --git a/core/lib/core/adapters/commands/test.ex b/core/lib/core/adapters/commands/test.ex index f939e80..a594553 100644 --- a/core/lib/core/adapters/commands/test.ex +++ b/core/lib/core/adapters/commands/test.ex @@ -19,7 +19,7 @@ defmodule Core.Adapters.Commands.Test do alias Data.InvokeResult @impl true - def send_invoke(_worker, name, _ns, _hash, _args) do + def send_invoke(_worker, name, _ns, _hash, _args, _metadata \\ {}) do {:ok, %InvokeResult{result: name}} end diff --git a/core/lib/core/adapters/commands/worker.ex b/core/lib/core/adapters/commands/worker.ex index 5c09b17..f896590 100644 --- a/core/lib/core/adapters/commands/worker.ex +++ b/core/lib/core/adapters/commands/worker.ex @@ -32,9 +32,9 @@ defmodule Core.Adapters.Commands.Worker do # Only the send_invoke call should return this. @impl true - def send_invoke(worker, name, mod, hash, args) do + def send_invoke(worker, name, mod, hash, args, metadata \\ {}) do worker_addr = {:worker, worker} - cmd = {:invoke, %{name: name, module: mod, hash: hash}, args} + cmd = {:invoke, %{name: name, module: mod, hash: hash, metadata: metadata}, args} Logger.info("Sending invoke for #{mod}/#{name} to #{inspect(worker_addr)}") case GenServer.call(worker_addr, cmd, 60_000) do diff --git a/core/lib/core/adapters/telemetry/collector.ex b/core/lib/core/adapters/telemetry/collector.ex index af20dd2..0dca879 100644 --- a/core/lib/core/adapters/telemetry/collector.ex +++ b/core/lib/core/adapters/telemetry/collector.ex @@ -129,7 +129,8 @@ defmodule Core.Adapters.Telemetry.Collector do Enum.reduce(result_list, %{}, fn result, acc -> case result do %{"metric" => %{"__name__" => name}, "value" => [_, value]} -> - Map.put(acc, name, value) + {float_val, _} = Float.parse(value) + Map.put(acc, name, float_val) _ -> acc diff --git a/core/lib/core/domain/invoker.ex b/core/lib/core/domain/invoker.ex index b501ca0..0e001f7 100644 --- a/core/lib/core/domain/invoker.ex +++ b/core/lib/core/domain/invoker.ex @@ -19,7 +19,6 @@ defmodule Core.Domain.Invoker do require Logger alias Core.FunctionsMetadata - alias Data.FunctionMetadata alias Core.Domain.{ Functions, @@ -30,6 +29,7 @@ defmodule Core.Domain.Invoker do Scheduler } + alias Data.FunctionMetadata alias Data.FunctionStruct alias Data.InvokeParams alias Data.InvokeResult @@ -68,11 +68,11 @@ defmodule Core.Domain.Invoker do metadata: struct(FunctionMetadata, %{tag: metadata.tag, capacity: metadata.capacity}) }) - with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config) do + with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config, ivk.args) do update_concurrent(worker, +1) out = - case invoke_without_code(worker, ivk, f.hash) do + case invoke_without_code(worker, ivk, f.hash, func.metadata) do {:error, :code_not_found, handler} -> [%{code: code}] = Functions.get_code_by_name_in_mod!(ivk.function, ivk.module) @@ -121,12 +121,12 @@ defmodule Core.Domain.Invoker do end end - @spec invoke_without_code(atom(), InvokeParams.t(), binary()) :: + @spec invoke_without_code(atom(), InvokeParams.t(), binary(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | invoke_errors() - def invoke_without_code(worker, ivk, hash) do + def invoke_without_code(worker, ivk, hash, metadata \\ %FunctionMetadata{}) do Logger.debug("Invoker: invoking #{ivk.module}/#{ivk.function} without code") # send invocation without code - Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args) + Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args, metadata) end @spec invoke_with_code(atom(), pid(), InvokeParams.t(), FunctionStruct.t()) :: diff --git a/core/lib/core/domain/policies/app.ex b/core/lib/core/domain/policies/app.ex index e2e5dae..7c7ab05 100644 --- a/core/lib/core/domain/policies/app.ex +++ b/core/lib/core/domain/policies/app.ex @@ -34,6 +34,7 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do - configuration: an APP script (Data.Configurations.APP), generally obtained through parsing using the Core.Domain.Policies.Parsers.APP module. - workers: a list of Data.Worker structs, each with relevant worker metrics. - function: a Data.FunctionStruct struct, with the necessary function information. It must contain function metadata, specifically a tag and a capacity. + - args: not used ## Returns - {:ok, wrk} if a suitable worker was found, with `wrk` being the worker. @@ -43,12 +44,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do - {:error, :no_function_metadata} if the given FunctionStruct does not include the necessary metadata (i.e. tag, capacity). - {:error, :invalid_input} if the given input was invalid in any other way (e.g. wrong types). """ - @spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t()) :: + @spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t(), map()) :: {:ok, Data.Worker.t()} | select_errors() + def select(config, workers, function, args \\ %{}) + def select( %APP{tags: tags} = _configuration, [_ | _] = workers, - %Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function + %Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function, + _ ) do default = tags |> Map.get("default") tag = tags |> Map.get(tag_name, default) @@ -75,15 +79,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do end end - def select(%APP{tags: _}, [], _) do + def select(%APP{tags: _}, [], _, _) do {:error, :no_workers} end - def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}) do + def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}, _) do {:error, :no_function_metadata} end - def select(_, _, _) do + def select(_, _, _, _) do {:error, :invalid_input} end @@ -158,7 +162,8 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do Core.Domain.Policies.SchedulingPolicy.select( %Data.Configurations.Empty{}, wrk, - function + function, + %{} ) end end diff --git a/core/lib/core/domain/policies/default.ex b/core/lib/core/domain/policies/default.ex index 7ccc897..5719dfb 100644 --- a/core/lib/core/domain/policies/default.ex +++ b/core/lib/core/domain/policies/default.ex @@ -20,10 +20,13 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do # since otherwise in situations where e.g. Prometheus is down, we would always have no workers @spec select(Empty.t(), [Data.Worker.t()], Data.FunctionStruct.t()) :: {:ok, Data.Worker.t()} | {:error, :no_workers} | {:error, :no_valid_workers} + def select(config, workers, function, args \\ %{}) + def select( %Empty{}, [_ | _] = workers, - %Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}} + %Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}}, + _ ) do selected_worker = workers @@ -45,11 +48,11 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do end end - def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}) do + def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}, _) do {:error, :no_function_metadata} end - def select(%Empty{}, [], _) do + def select(%Empty{}, [], _, _) do {:error, :no_workers} end end diff --git a/core/lib/core/domain/policies/scheduling_policy.ex b/core/lib/core/domain/policies/scheduling_policy.ex index 175dd6b..c623342 100644 --- a/core/lib/core/domain/policies/scheduling_policy.ex +++ b/core/lib/core/domain/policies/scheduling_policy.ex @@ -21,7 +21,7 @@ defprotocol Core.Domain.Policies.SchedulingPolicy do @doc """ Should select a worker from a list of workers, given a specific configuration. """ - @spec select(t, [Data.Worker.t()], Data.FunctionStruct.t()) :: + @spec select(t, [Data.Worker.t()], Data.FunctionStruct.t(), map()) :: {:ok, Data.Worker.t()} | {:error, any} - def select(configuration, workers, function) + def select(configuration, workers, function, args) end diff --git a/core/lib/core/domain/ports/commands.ex b/core/lib/core/domain/ports/commands.ex index 718bdec..2e5a524 100644 --- a/core/lib/core/domain/ports/commands.ex +++ b/core/lib/core/domain/ports/commands.ex @@ -17,12 +17,13 @@ defmodule Core.Domain.Ports.Commands do Port for sending commands to workers. """ + alias Data.FunctionMetadata alias Data.FunctionStruct alias Data.InvokeResult @adapter :core |> Application.compile_env!(__MODULE__) |> Keyword.fetch!(:adapter) - @callback send_invoke(atom(), String.t(), String.t(), binary(), map()) :: + @callback send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()} @callback send_invoke_with_code(atom(), pid(), FunctionStruct.t()) :: {:ok, InvokeResult.t()} | {:error, any()} @@ -36,9 +37,9 @@ defmodule Core.Domain.Ports.Commands do Sends an invoke command to a worker passing the function name, module, hash and args. It requires a worker (a fully qualified name of another node with the :worker actor on) and function arguments can be empty. """ - @spec send_invoke(atom(), String.t(), String.t(), binary(), map()) :: + @spec send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()} - defdelegate send_invoke(worker, f_name, ns, hash, args), to: @adapter + defdelegate send_invoke(worker, f_name, ns, hash, args, metadata), to: @adapter @doc """ Sends an invoke command to a worker passing a struct with the function name, module and the code (wasm file binary). diff --git a/core/lib/core/domain/scheduler.ex b/core/lib/core/domain/scheduler.ex index 483a7cf..650c79d 100644 --- a/core/lib/core/domain/scheduler.ex +++ b/core/lib/core/domain/scheduler.ex @@ -29,14 +29,16 @@ defmodule Core.Domain.Scheduler do """ @spec select([worker_atom()], Data.FunctionStruct.t(), configuration()) :: {:ok, worker_atom()} | {:error, :no_workers} | {:error, :no_valid_workers} - def select([], _, _) do + def select(workers, function, config, args \\ %{}) + + def select([], _, _, _args) do Logger.warn("Scheduler: tried selection with NO workers") {:error, :no_workers} end # NOTE: if we move this to a NIF, we should only pass # configuration information (to avoid serialising all parameters) - def select(workers, function, config) do + def select(workers, function, config, args) do Logger.info("Scheduler: selection with #{length(workers)} workers") # Get the resources @@ -54,7 +56,8 @@ defmodule Core.Domain.Scheduler do case SchedulingPolicy.select( config, resources, - function + function, + args ) do {:ok, wrk} -> {:ok, wrk.name} {:error, err} -> {:error, err} diff --git a/core/lib/core_web/auth.ex b/core/lib/core_web/auth.ex index 5ca68a2..69909a3 100644 --- a/core/lib/core_web/auth.ex +++ b/core/lib/core_web/auth.ex @@ -39,7 +39,7 @@ defmodule CoreWeb.Plug.Authenticate do _error -> conn |> put_status(:unauthorized) - |> Phoenix.Controller.put_view(CoreWeb.ErrorView) + |> Phoenix.Controller.put_view(CoreWeb.ErrorJSON) |> Phoenix.Controller.render(:"401") |> halt() end diff --git a/core/test/core/integration/invoke_test.exs b/core/test/core/integration/invoke_test.exs index 3e960d4..db37e78 100644 --- a/core/test/core/integration/invoke_test.exs +++ b/core/test/core/integration/invoke_test.exs @@ -85,7 +85,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) pars = %InvokeParams{function: function.name, module: module.name} assert Invoker.invoke(pars) == {:error, {:exec_error, "some error"}} @@ -99,7 +99,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) {:ok, resources} = Core.Telemetry.Metrics.Mock.resources(:worker@localhost) concurrent = resources |> Map.get(:concurrent_functions, 0) @@ -133,7 +133,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:core@somewhere, :worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn worker, _, _, _, _ -> + |> Mox.expect(:send_invoke, fn worker, _, _, _, _, _ -> {:ok, %InvokeResult{result: worker}} end) @@ -168,7 +168,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end) Core.Commands.Mock |> Mox.expect(:send_invoke_with_code, fn _worker, _handler, function -> diff --git a/core/test/core_web/integration/controllers/function_controller_test.exs b/core/test/core_web/integration/controllers/function_controller_test.exs index ac2e699..5ade5e8 100644 --- a/core/test/core_web/integration/controllers/function_controller_test.exs +++ b/core/test/core_web/integration/controllers/function_controller_test.exs @@ -453,7 +453,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 200) @@ -467,7 +467,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}", args: %{name: "World"}) @@ -482,7 +482,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 200) @@ -519,7 +519,9 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some reason"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> + {:error, {:exec_error, "some reason"}} + end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 422) diff --git a/data/lib/function_metadata.ex b/data/lib/function_metadata.ex index 4e9cb0e..68505ec 100644 --- a/data/lib/function_metadata.ex +++ b/data/lib/function_metadata.ex @@ -24,5 +24,5 @@ defmodule Data.FunctionMetadata do tag: String.t(), capacity: integer() } - defstruct tag: nil, capacity: -1 + defstruct tag: "", capacity: -1 end