Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,15 @@ export SHELLOPTS:=$(if $(SHELLOPTS),$(SHELLOPTS):)pipefail:errexit

## Compile core docker image
build-core-image:
cd core
docker build \
-f core/Dockerfile \
--build-arg SECRET_KEY_BASE=local-make-secret \
--build-arg MIX_ENV="prod" \
-t core .

## Compile worker docker image
build-worker-image:
cd worker
docker build --build-arg MIX_ENV="prod" -t worker .
docker build -f worker/Dockerfile --build-arg MIX_ENV="prod" -t worker .

## Run credo --strict
credo-core:
Expand Down
2 changes: 1 addition & 1 deletion core/lib/core/adapters/commands/test.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ defmodule Core.Adapters.Commands.Test do
alias Data.InvokeResult

@impl true
def send_invoke(_worker, name, _ns, _hash, _args) do
def send_invoke(_worker, name, _ns, _hash, _args, _metadata \\ {}) do
{:ok, %InvokeResult{result: name}}
end

Expand Down
4 changes: 2 additions & 2 deletions core/lib/core/adapters/commands/worker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,9 @@ defmodule Core.Adapters.Commands.Worker do
# Only the send_invoke call should return this.

@impl true
def send_invoke(worker, name, mod, hash, args) do
def send_invoke(worker, name, mod, hash, args, metadata \\ {}) do
worker_addr = {:worker, worker}
cmd = {:invoke, %{name: name, module: mod, hash: hash}, args}
cmd = {:invoke, %{name: name, module: mod, hash: hash, metadata: metadata}, args}
Logger.info("Sending invoke for #{mod}/#{name} to #{inspect(worker_addr)}")

case GenServer.call(worker_addr, cmd, 60_000) do
Expand Down
3 changes: 2 additions & 1 deletion core/lib/core/adapters/telemetry/collector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ defmodule Core.Adapters.Telemetry.Collector do
Enum.reduce(result_list, %{}, fn result, acc ->
case result do
%{"metric" => %{"__name__" => name}, "value" => [_, value]} ->
Map.put(acc, name, value)
{float_val, _} = Float.parse(value)
Map.put(acc, name, float_val)

_ ->
acc
Expand Down
12 changes: 6 additions & 6 deletions core/lib/core/domain/invoker.ex
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ defmodule Core.Domain.Invoker do
require Logger

alias Core.FunctionsMetadata
alias Data.FunctionMetadata

alias Core.Domain.{
Functions,
Expand All @@ -30,6 +29,7 @@ defmodule Core.Domain.Invoker do
Scheduler
}

alias Data.FunctionMetadata
alias Data.FunctionStruct
alias Data.InvokeParams
alias Data.InvokeResult
Expand Down Expand Up @@ -68,11 +68,11 @@ defmodule Core.Domain.Invoker do
metadata: struct(FunctionMetadata, %{tag: metadata.tag, capacity: metadata.capacity})
})

with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config) do
with {:ok, worker} <- Nodes.worker_nodes() |> Scheduler.select(func, ivk.config, ivk.args) do
update_concurrent(worker, +1)

out =
case invoke_without_code(worker, ivk, f.hash) do
case invoke_without_code(worker, ivk, f.hash, func.metadata) do
{:error, :code_not_found, handler} ->
[%{code: code}] = Functions.get_code_by_name_in_mod!(ivk.function, ivk.module)

Expand Down Expand Up @@ -121,12 +121,12 @@ defmodule Core.Domain.Invoker do
end
end

@spec invoke_without_code(atom(), InvokeParams.t(), binary()) ::
@spec invoke_without_code(atom(), InvokeParams.t(), binary(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | invoke_errors()
def invoke_without_code(worker, ivk, hash) do
def invoke_without_code(worker, ivk, hash, metadata \\ %FunctionMetadata{}) do
Logger.debug("Invoker: invoking #{ivk.module}/#{ivk.function} without code")
# send invocation without code
Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args)
Commands.send_invoke(worker, ivk.function, ivk.module, hash, ivk.args, metadata)
end

@spec invoke_with_code(atom(), pid(), InvokeParams.t(), FunctionStruct.t()) ::
Expand Down
17 changes: 11 additions & 6 deletions core/lib/core/domain/policies/app.ex
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
- configuration: an APP script (Data.Configurations.APP), generally obtained through parsing using the Core.Domain.Policies.Parsers.APP module.
- workers: a list of Data.Worker structs, each with relevant worker metrics.
- function: a Data.FunctionStruct struct, with the necessary function information. It must contain function metadata, specifically a tag and a capacity.
- args: not used

## Returns
- {:ok, wrk} if a suitable worker was found, with `wrk` being the worker.
Expand All @@ -43,12 +44,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
- {:error, :no_function_metadata} if the given FunctionStruct does not include the necessary metadata (i.e. tag, capacity).
- {:error, :invalid_input} if the given input was invalid in any other way (e.g. wrong types).
"""
@spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t()) ::
@spec select(APP.t(), [Data.Worker.t()], Data.FunctionStruct.t(), map()) ::
{:ok, Data.Worker.t()} | select_errors()
def select(config, workers, function, args \\ %{})

def select(
%APP{tags: tags} = _configuration,
[_ | _] = workers,
%Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function
%Data.FunctionStruct{metadata: %{tag: tag_name, capacity: _function_capacity}} = function,
_
) do
default = tags |> Map.get("default")
tag = tags |> Map.get(tag_name, default)
Expand All @@ -75,15 +79,15 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
end
end

def select(%APP{tags: _}, [], _) do
def select(%APP{tags: _}, [], _, _) do
{:error, :no_workers}
end

def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}) do
def select(%APP{tags: _}, _, %Data.FunctionStruct{metadata: nil}, _) do
{:error, :no_function_metadata}
end

def select(_, _, _) do
def select(_, _, _, _) do
{:error, :invalid_input}
end

Expand Down Expand Up @@ -158,7 +162,8 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.APP do
Core.Domain.Policies.SchedulingPolicy.select(
%Data.Configurations.Empty{},
wrk,
function
function,
%{}
)
end
end
Expand Down
9 changes: 6 additions & 3 deletions core/lib/core/domain/policies/default.ex
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
# since otherwise in situations where e.g. Prometheus is down, we would always have no workers
@spec select(Empty.t(), [Data.Worker.t()], Data.FunctionStruct.t()) ::
{:ok, Data.Worker.t()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select(config, workers, function, args \\ %{})

def select(
%Empty{},
[_ | _] = workers,
%Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}}
%Data.FunctionStruct{metadata: %Data.FunctionMetadata{capacity: c}},
_
) do
selected_worker =
workers
Expand All @@ -45,11 +48,11 @@ defimpl Core.Domain.Policies.SchedulingPolicy, for: Data.Configurations.Empty do
end
end

def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}) do
def select(%Empty{}, _, %Data.FunctionStruct{metadata: nil}, _) do
{:error, :no_function_metadata}
end

def select(%Empty{}, [], _) do
def select(%Empty{}, [], _, _) do
{:error, :no_workers}
end
end
4 changes: 2 additions & 2 deletions core/lib/core/domain/policies/scheduling_policy.ex
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ defprotocol Core.Domain.Policies.SchedulingPolicy do
@doc """
Should select a worker from a list of workers, given a specific configuration.
"""
@spec select(t, [Data.Worker.t()], Data.FunctionStruct.t()) ::
@spec select(t, [Data.Worker.t()], Data.FunctionStruct.t(), map()) ::
{:ok, Data.Worker.t()} | {:error, any}
def select(configuration, workers, function)
def select(configuration, workers, function, args)
end
7 changes: 4 additions & 3 deletions core/lib/core/domain/ports/commands.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ defmodule Core.Domain.Ports.Commands do
Port for sending commands to workers.
"""

alias Data.FunctionMetadata
alias Data.FunctionStruct
alias Data.InvokeResult

@adapter :core |> Application.compile_env!(__MODULE__) |> Keyword.fetch!(:adapter)

@callback send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
@callback send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
@callback send_invoke_with_code(atom(), pid(), FunctionStruct.t()) ::
{:ok, InvokeResult.t()} | {:error, any()}
Expand All @@ -36,9 +37,9 @@ defmodule Core.Domain.Ports.Commands do
Sends an invoke command to a worker passing the function name, module, hash and args.
It requires a worker (a fully qualified name of another node with the :worker actor on) and function arguments can be empty.
"""
@spec send_invoke(atom(), String.t(), String.t(), binary(), map()) ::
@spec send_invoke(atom(), String.t(), String.t(), binary(), map(), FunctionMetadata.t()) ::
{:ok, InvokeResult.t()} | {:error, :code_not_found, pid()} | {:error, any()}
defdelegate send_invoke(worker, f_name, ns, hash, args), to: @adapter
defdelegate send_invoke(worker, f_name, ns, hash, args, metadata), to: @adapter

@doc """
Sends an invoke command to a worker passing a struct with the function name, module and the code (wasm file binary).
Expand Down
9 changes: 6 additions & 3 deletions core/lib/core/domain/scheduler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -29,14 +29,16 @@ defmodule Core.Domain.Scheduler do
"""
@spec select([worker_atom()], Data.FunctionStruct.t(), configuration()) ::
{:ok, worker_atom()} | {:error, :no_workers} | {:error, :no_valid_workers}
def select([], _, _) do
def select(workers, function, config, args \\ %{})

def select([], _, _, _args) do
Logger.warn("Scheduler: tried selection with NO workers")
{:error, :no_workers}
end

# NOTE: if we move this to a NIF, we should only pass
# configuration information (to avoid serialising all parameters)
def select(workers, function, config) do
def select(workers, function, config, args) do
Logger.info("Scheduler: selection with #{length(workers)} workers")

# Get the resources
Expand All @@ -54,7 +56,8 @@ defmodule Core.Domain.Scheduler do
case SchedulingPolicy.select(
config,
resources,
function
function,
args
) do
{:ok, wrk} -> {:ok, wrk.name}
{:error, err} -> {:error, err}
Expand Down
2 changes: 1 addition & 1 deletion core/lib/core_web/auth.ex
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ defmodule CoreWeb.Plug.Authenticate do
_error ->
conn
|> put_status(:unauthorized)
|> Phoenix.Controller.put_view(CoreWeb.ErrorView)
|> Phoenix.Controller.put_view(CoreWeb.ErrorJSON)
|> Phoenix.Controller.render(:"401")
|> halt()
end
Expand Down
8 changes: 4 additions & 4 deletions core/test/core/integration/invoke_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)

pars = %InvokeParams{function: function.name, module: module.name}
assert Invoker.invoke(pars) == {:error, {:exec_error, "some error"}}
Expand All @@ -99,7 +99,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, {:exec_error, "some error"}} end)

{:ok, resources} = Core.Telemetry.Metrics.Mock.resources(:worker@localhost)
concurrent = resources |> Map.get(:concurrent_functions, 0)
Expand Down Expand Up @@ -133,7 +133,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:core@somewhere, :worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn worker, _, _, _, _ ->
|> Mox.expect(:send_invoke, fn worker, _, _, _, _, _ ->
{:ok, %InvokeResult{result: worker}}
end)

Expand Down Expand Up @@ -168,7 +168,7 @@ defmodule Core.InvokeTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end)

Core.Commands.Mock
|> Mox.expect(:send_invoke_with_code, fn _worker, _handler, function ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,7 +453,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 200)
Expand All @@ -467,7 +467,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:ok, %{result: "Hello, World!"}} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}", args: %{name: "World"})

Expand All @@ -482,7 +482,7 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, :code_not_found, self()} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ -> {:error, :code_not_found, self()} end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 200)
Expand Down Expand Up @@ -519,7 +519,9 @@ defmodule CoreWeb.FunctionControllerTest do
Core.Cluster.Mock |> Mox.expect(:all_nodes, fn -> [:worker@localhost] end)

Core.Commands.Mock
|> Mox.expect(:send_invoke, fn _, _, _, _, _ -> {:error, {:exec_error, "some reason"}} end)
|> Mox.expect(:send_invoke, fn _, _, _, _, _, _ ->
{:error, {:exec_error, "some reason"}}
end)

conn = post(conn, ~p"/v1/fn/#{module_name}/#{function_name}")
assert response(conn, 422)
Expand Down
2 changes: 1 addition & 1 deletion data/lib/function_metadata.ex
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,5 @@ defmodule Data.FunctionMetadata do
tag: String.t(),
capacity: integer()
}
defstruct tag: nil, capacity: -1
defstruct tag: "", capacity: -1
end
Loading