From 8b8a347f6c4a35a93c0804499e7b89b5143f6a2c Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Sun, 4 Aug 2024 11:59:51 +0200 Subject: [PATCH 1/9] feat(core): add invocation params to scheduling input --- core/lib/core/domain/invoker.ex | 2 +- core/lib/core/domain/policies/app.ex | 17 +++++++++++------ core/lib/core/domain/policies/default.ex | 9 ++++++--- .../core/domain/policies/scheduling_policy.ex | 4 ++-- core/lib/core/domain/scheduler.ex | 5 +++-- 5 files changed, 23 insertions(+), 14 deletions(-) diff --git a/core/lib/core/domain/invoker.ex b/core/lib/core/domain/invoker.ex index b501ca00..06b480a8 100644 --- a/core/lib/core/domain/invoker.ex +++ b/core/lib/core/domain/invoker.ex @@ -68,7 +68,7 @@ defmodule Core.Domain.Invoker do metadata: struct(FunctionMetadata, %{tag: metadata.tag, capacity: metadata.capacity}) }) - with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config) do + with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config, ivk.args) do update_concurrent(worker, +1) out = diff --git a/core/lib/core/domain/policies/app.ex b/core/lib/core/domain/policies/app.ex index e2e5daea..7c7ab05b 100644 --- a/core/lib/core/domain/policies/app.ex +++ b/core/lib/core/domain/policies/app.ex @@ -34,6 +34,7 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do - configuration: an APP script (Data.Configurations.APP), generally obtained through parsing using the Core.Domain.Policies.Parsers.APP module. - workers: a list of Data.Worker structs, each with relevant worker metrics. - function: a Data.FunctionStruct struct, with the necessary function information. It must contain function metadata, specifically a tag and a capacity. + - args: not used ## Returns - {:ok, wrk} if a suitable worker was found, with `wrk` being the worker. @@ -43,12 +44,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do - {:error, :no_function_metadata} if the given FunctionStruct does not include the necessary metadata (i.e. tag, capacity). - {:error, :invalid_input} if the given input was invalid in any other way (e.g. wrong types). """ - @spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t()) :: + @spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t(), map()) :: {:ok, Data.Worker.t()} | select_errors() + def select(config, workers, function, args \\ %{}) + def select( %APP{tags: tags} = _configuration, [_ | _] = workers, - %Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function + %Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function, + _ ) do default = tags |> Map.get("default") tag = tags |> Map.get(tag_name, default) @@ -75,15 +79,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do end end - def select(%APP{tags: _}, [], _) do + def select(%APP{tags: _}, [], _, _) do {:error, :no_workers} end - def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}) do + def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}, _) do {:error, :no_function_metadata} end - def select(_, _, _) do + def select(_, _, _, _) do {:error, :invalid_input} end @@ -158,7 +162,8 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do Core.Domain.Policies.SchedulingPolicy.select( %Data.Configurations.Empty{}, wrk, - function + function, + %{} ) end end diff --git a/core/lib/core/domain/policies/default.ex b/core/lib/core/domain/policies/default.ex index 7ccc8973..5719dfb5 100644 --- a/core/lib/core/domain/policies/default.ex +++ b/core/lib/core/domain/policies/default.ex @@ -20,10 +20,13 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do # since otherwise in situations where e.g. Prometheus is down, we would always have no workers @spec select(Empty.t(), [Data.Worker.t()], Data.FunctionStruct.t()) :: {:ok, Data.Worker.t()} | {:error, :no_workers} | {:error, :no_valid_workers} + def select(config, workers, function, args \\ %{}) + def select( %Empty{}, [_ | _] = workers, - %Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}} + %Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}}, + _ ) do selected_worker = workers @@ -45,11 +48,11 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do end end - def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}) do + def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}, _) do {:error, :no_function_metadata} end - def select(%Empty{}, [], _) do + def select(%Empty{}, [], _, _) do {:error, :no_workers} end end diff --git a/core/lib/core/domain/policies/scheduling_policy.ex b/core/lib/core/domain/policies/scheduling_policy.ex index 175dd6b7..c6233425 100644 --- a/core/lib/core/domain/policies/scheduling_policy.ex +++ b/core/lib/core/domain/policies/scheduling_policy.ex @@ -21,7 +21,7 @@ defprotocol Core.Domain.Policies.SchedulingPolicy do @doc """ Should select a worker from a list of workers, given a specific configuration. """ - @spec select(t, [Data.Worker.t()], Data.FunctionStruct.t()) :: + @spec select(t, [Data.Worker.t()], Data.FunctionStruct.t(), map()) :: {:ok, Data.Worker.t()} | {:error, any} - def select(configuration, workers, function) + def select(configuration, workers, function, args) end diff --git a/core/lib/core/domain/scheduler.ex b/core/lib/core/domain/scheduler.ex index 483a7cf3..24ae5c25 100644 --- a/core/lib/core/domain/scheduler.ex +++ b/core/lib/core/domain/scheduler.ex @@ -36,7 +36,7 @@ defmodule Core.Domain.Scheduler do # NOTE: if we move this to a NIF, we should only pass # configuration information (to avoid serialising all parameters) - def select(workers, function, config) do + def select(workers, function, config, args \\ %{}) do Logger.info("Scheduler: selection with #{length(workers)} workers") # Get the resources @@ -54,7 +54,8 @@ defmodule Core.Domain.Scheduler do case SchedulingPolicy.select( config, resources, - function + function, + args ) do {:ok, wrk} -> {:ok, wrk.name} {:error, err} -> {:error, err} From 0a38d635d7f12d01fcdc5aa965879aa0430dac90 Mon Sep 17 00:00:00 2001 From: Giuseppe De Palma Date: Tue, 6 Aug 2024 16:05:42 +0200 Subject: [PATCH 2/9] fix: 401 render error --- core/lib/core_web/auth.ex | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/lib/core_web/auth.ex b/core/lib/core_web/auth.ex index 5ca68a26..69909a37 100644 --- a/core/lib/core_web/auth.ex +++ b/core/lib/core_web/auth.ex @@ -39,7 +39,7 @@ defmodule CoreWeb.Plug.Authenticate do _error -> conn |> put_status(:unauthorized) - |> Phoenix.Controller.put_view(CoreWeb.ErrorView) + |> Phoenix.Controller.put_view(CoreWeb.ErrorJSON) |> Phoenix.Controller.render(:"401") |> halt() end From 02a7b91d689b95eb64afa6eff2ef5634022ac5c7 Mon Sep 17 00:00:00 2001 From: Giuseppe De Palma Date: Mon, 12 Aug 2024 10:07:02 +0200 Subject: [PATCH 3/9] fix: makefile build tasks --- Makefile | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index a46f3993..0264ed5f 100644 --- a/Makefile +++ b/Makefile @@ -19,16 +19,15 @@ export SHELLOPTS:=$(if $(SHELLOPTS),$(SHELLOPTS):)pipefail:errexit ## Compile core docker image build-core-image: - cd core docker build \ + -f core/Dockerfile \ --build-arg SECRET_KEY_BASE=local-make-secret \ - --build-arg MIX_ENV="prod" \ + --build-arg MIX_ENV="dev" \ -t core . ## Compile worker docker image build-worker-image: - cd worker - docker build --build-arg MIX_ENV="prod" -t worker . + docker build -f worker/Dockerfile --build-arg MIX_ENV="dev" -t worker . ## Run credo --strict credo-core: From a005b987889ec5db4dbfc35f8eb398be7c8a575c Mon Sep 17 00:00:00 2001 From: Giuseppe De Palma Date: Mon, 12 Aug 2024 16:17:54 +0200 Subject: [PATCH 4/9] fix(Makefile): revert MIX_ENV to prod --- Makefile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Makefile b/Makefile index 0264ed5f..b820c3a6 100644 --- a/Makefile +++ b/Makefile @@ -22,12 +22,12 @@ build-core-image: docker build \ -f core/Dockerfile \ --build-arg SECRET_KEY_BASE=local-make-secret \ - --build-arg MIX_ENV="dev" \ + --build-arg MIX_ENV="prod" \ -t core . ## Compile worker docker image build-worker-image: - docker build -f worker/Dockerfile --build-arg MIX_ENV="dev" -t worker . + docker build -f worker/Dockerfile --build-arg MIX_ENV="prod" -t worker . ## Run credo --strict credo-core: From 425a40533e10f7ffe1e810ca2be8d67ae7ca1ad8 Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Tue, 3 Sep 2024 13:44:22 +0200 Subject: [PATCH 5/9] fix(core): convert metric values to float --- core/lib/core/adapters/telemetry/collector.ex | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/core/lib/core/adapters/telemetry/collector.ex b/core/lib/core/adapters/telemetry/collector.ex index af20dd25..0dca8793 100644 --- a/core/lib/core/adapters/telemetry/collector.ex +++ b/core/lib/core/adapters/telemetry/collector.ex @@ -129,7 +129,8 @@ defmodule Core.Adapters.Telemetry.Collector do Enum.reduce(result_list, %{}, fn result, acc -> case result do %{"metric" => %{"__name__" => name}, "value" => [_, value]} -> - Map.put(acc, name, value) + {float_val, _} = Float.parse(value) + Map.put(acc, name, float_val) _ -> acc From 29487156d321f10aca21fb6065aa2d6d92725161 Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Tue, 3 Sep 2024 17:22:59 +0200 Subject: [PATCH 6/9] fix(core): add function metadata in invocation --- core/lib/core/adapters/commands/test.ex | 2 +- core/lib/core/adapters/commands/worker.ex | 4 ++-- core/lib/core/domain/invoker.ex | 9 +++++---- core/lib/core/domain/ports/commands.ex | 7 ++++--- 4 files changed, 12 insertions(+), 10 deletions(-) diff --git a/core/lib/core/adapters/commands/test.ex b/core/lib/core/adapters/commands/test.ex index f939e806..a5945534 100644 --- a/core/lib/core/adapters/commands/test.ex +++ b/core/lib/core/adapters/commands/test.ex @@ -19,7 +19,7 @@ defmodule Core.Adapters.Commands.Test do alias Data.InvokeResult @impl true - def send_invoke(_worker, name, _ns, _hash, _args) do + def send_invoke(_worker, name, _ns, _hash, _args, _metadata \\ {}) do {:ok, %InvokeResult{result: name}} end diff --git a/core/lib/core/adapters/commands/worker.ex b/core/lib/core/adapters/commands/worker.ex index 5c09b170..f8965908 100644 --- a/core/lib/core/adapters/commands/worker.ex +++ b/core/lib/core/adapters/commands/worker.ex @@ -32,9 +32,9 @@ defmodule Core.Adapters.Commands.Worker do # Only the send_invoke call should return this. @impl true - def send_invoke(worker, name, mod, hash, args) do + def send_invoke(worker, name, mod, hash, args, metadata \\ {}) do worker_addr = {:worker, worker} - cmd = {:invoke, %{name: name, module: mod, hash: hash}, args} + cmd = {:invoke, %{name: name, module: mod, hash: hash, metadata: metadata}, args} Logger.info("Sending invoke for #{mod}/#{name} to #{inspect(worker_addr)}") case GenServer.call(worker_addr, cmd, 60_000) do diff --git a/core/lib/core/domain/invoker.ex b/core/lib/core/domain/invoker.ex index 06b480a8..b1a9c2f9 100644 --- a/core/lib/core/domain/invoker.ex +++ b/core/lib/core/domain/invoker.ex @@ -30,6 +30,7 @@ defmodule Core.Domain.Invoker do Scheduler } + alias Data.FunctionMetadata alias Data.FunctionStruct alias Data.InvokeParams alias Data.InvokeResult @@ -72,7 +73,7 @@ defmodule Core.Domain.Invoker do update_concurrent(worker, +1) out = - case invoke_without_code(worker, ivk, f.hash) do + case invoke_without_code(worker, ivk, f.hash, func.metadata) do {:error, :code_not_found, handler} -> [%{code: code}] = Functions.get_code_by_name_in_mod!(ivk.function, ivk.module) @@ -121,12 +122,12 @@ defmodule Core.Domain.Invoker do end end - @spec invoke_without_code(atom(), InvokeParams.t(), binary()) :: + @spec invoke_without_code(atom(), InvokeParams.t(), binary(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | invoke_errors() - def invoke_without_code(worker, ivk, hash) do + def invoke_without_code(worker, ivk, hash, metadata \\ %FunctionMetadata{}) do Logger.debug("Invoker: invoking #{ivk.module}/#{ivk.function} without code") # send invocation without code - Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args) + Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args, metadata) end @spec invoke_with_code(atom(), pid(), InvokeParams.t(), FunctionStruct.t()) :: diff --git a/core/lib/core/domain/ports/commands.ex b/core/lib/core/domain/ports/commands.ex index 718bdecf..2e5a524b 100644 --- a/core/lib/core/domain/ports/commands.ex +++ b/core/lib/core/domain/ports/commands.ex @@ -17,12 +17,13 @@ defmodule Core.Domain.Ports.Commands do Port for sending commands to workers. """ + alias Data.FunctionMetadata alias Data.FunctionStruct alias Data.InvokeResult @adapter :core |> Application.compile_env!(__MODULE__) |> Keyword.fetch!(:adapter) - @callback send_invoke(atom(), String.t(), String.t(), binary(), map()) :: + @callback send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()} @callback send_invoke_with_code(atom(), pid(), FunctionStruct.t()) :: {:ok, InvokeResult.t()} | {:error, any()} @@ -36,9 +37,9 @@ defmodule Core.Domain.Ports.Commands do Sends an invoke command to a worker passing the function name, module, hash and args. It requires a worker (a fully qualified name of another node with the :worker actor on) and function arguments can be empty. """ - @spec send_invoke(atom(), String.t(), String.t(), binary(), map()) :: + @spec send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) :: {:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()} - defdelegate send_invoke(worker, f_name, ns, hash, args), to: @adapter + defdelegate send_invoke(worker, f_name, ns, hash, args, metadata), to: @adapter @doc """ Sends an invoke command to a worker passing a struct with the function name, module and the code (wasm file binary). From 3c1ffdc67224cf3c5b25b53524111aaa1b708883 Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Wed, 29 May 2024 21:15:21 +0200 Subject: [PATCH 7/9] fix(core): correctly handle empty worker list in scheduler --- core/lib/core/domain/scheduler.ex | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/core/lib/core/domain/scheduler.ex b/core/lib/core/domain/scheduler.ex index 24ae5c25..650c79de 100644 --- a/core/lib/core/domain/scheduler.ex +++ b/core/lib/core/domain/scheduler.ex @@ -29,14 +29,16 @@ defmodule Core.Domain.Scheduler do """ @spec select([worker_atom()], Data.FunctionStruct.t(), configuration()) :: {:ok, worker_atom()} | {:error, :no_workers} | {:error, :no_valid_workers} - def select([], _, _) do + def select(workers, function, config, args \\ %{}) + + def select([], _, _, _args) do Logger.warn("Scheduler: tried selection with NO workers") {:error, :no_workers} end # NOTE: if we move this to a NIF, we should only pass # configuration information (to avoid serialising all parameters) - def select(workers, function, config, args \\ %{}) do + def select(workers, function, config, args) do Logger.info("Scheduler: selection with #{length(workers)} workers") # Get the resources From c2819a74280d4cfe14912a599bb084760e88b32b Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Tue, 26 Nov 2024 10:16:59 +0100 Subject: [PATCH 8/9] test(core): fix invoke and function_controller tests --- core/test/core/integration/invoke_test.exs | 8 ++++---- .../controllers/function_controller_test.exs | 10 ++++++---- 2 files changed, 10 insertions(+), 8 deletions(-) diff --git a/core/test/core/integration/invoke_test.exs b/core/test/core/integration/invoke_test.exs index 3e960d42..db37e787 100644 --- a/core/test/core/integration/invoke_test.exs +++ b/core/test/core/integration/invoke_test.exs @@ -85,7 +85,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) pars = %InvokeParams{function: function.name, module: module.name} assert Invoker.invoke(pars) == {:error, {:exec_error, "some error"}} @@ -99,7 +99,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end) {:ok, resources} = Core.Telemetry.Metrics.Mock.resources(:worker@localhost) concurrent = resources |> Map.get(:concurrent_functions, 0) @@ -133,7 +133,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:core@somewhere, :worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn worker, _, _, _, _ -> + |> Mox.expect(:send_invoke, fn worker, _, _, _, _, _ -> {:ok, %InvokeResult{result: worker}} end) @@ -168,7 +168,7 @@ defmodule Core.InvokeTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end) Core.Commands.Mock |> Mox.expect(:send_invoke_with_code, fn _worker, _handler, function -> diff --git a/core/test/core_web/integration/controllers/function_controller_test.exs b/core/test/core_web/integration/controllers/function_controller_test.exs index ac2e6996..5ade5e8b 100644 --- a/core/test/core_web/integration/controllers/function_controller_test.exs +++ b/core/test/core_web/integration/controllers/function_controller_test.exs @@ -453,7 +453,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 200) @@ -467,7 +467,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}", args: %{name: "World"}) @@ -482,7 +482,7 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 200) @@ -519,7 +519,9 @@ defmodule CoreWeb.FunctionControllerTest do Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end) Core.Commands.Mock - |> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some reason"}} end) + |> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> + {:error, {:exec_error, "some reason"}} + end) conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}") assert response(conn, 422) From 5fc4a11446eb13e31edf8397fe7e757493f3af06 Mon Sep 17 00:00:00 2001 From: Matteo Trentin Date: Thu, 23 Jan 2025 14:00:26 +0100 Subject: [PATCH 9/9] fix(core): handle dialyzer error --- core/lib/core/domain/invoker.ex | 1 - data/lib/function_metadata.ex | 2 +- 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/core/lib/core/domain/invoker.ex b/core/lib/core/domain/invoker.ex index b1a9c2f9..0e001f76 100644 --- a/core/lib/core/domain/invoker.ex +++ b/core/lib/core/domain/invoker.ex @@ -19,7 +19,6 @@ defmodule Core.Domain.Invoker do require Logger alias Core.FunctionsMetadata - alias Data.FunctionMetadata alias Core.Domain.{ Functions, diff --git a/data/lib/function_metadata.ex b/data/lib/function_metadata.ex index 4e9cb0e3..68505ecb 100644 --- a/data/lib/function_metadata.ex +++ b/data/lib/function_metadata.ex @@ -24,5 +24,5 @@ defmodule Data.FunctionMetadata do tag: String.t(), capacity: integer() } - defstruct tag: nil, capacity: -1 + defstruct tag: "", capacity: -1 end