diff --git a/lib/exth/transport/ipc.ex b/lib/exth/transport/ipc.ex index cf80867..9361e26 100644 --- a/lib/exth/transport/ipc.ex +++ b/lib/exth/transport/ipc.ex @@ -1,30 +1,97 @@ defmodule Exth.Transport.Ipc do - @moduledoc false + @moduledoc """ + IPC (Inter-Process Communication) transport implementation for JSON-RPC requests using Unix domain sockets. + + Implements the `Exth.Transport.Transportable` protocol for making IPC connections to JSON-RPC + endpoints via Unix domain sockets. Uses NimblePool for connection pooling and efficient + resource management. + + ## Features + + * Unix domain socket communication + * Connection pooling with NimblePool + * Automatic connection management + * Configurable pool size and timeouts + * Efficient resource utilization + * Process registration with via-tuples + + ## Usage + + transport = Transportable.new( + %Exth.Transport.Ipc{}, + path: "/tmp/ethereum.ipc" + ) + + {:ok, response} = Transportable.call(transport, request) + + ## Configuration + + Required options: + * `:path` - The Unix domain socket path (e.g., "/tmp/ethereum.ipc") + + Optional options: + * `:timeout` - Request timeout in milliseconds (defaults to 30000) + * `:socket_opts` - TCP socket options (defaults to [:binary, active: false, reuseaddr: true]) + * `:pool_size` - Number of connections in the pool (defaults to 10) + * `:pool_lazy_workers` - Whether to create workers lazily (defaults to true) + * `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil) + * `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1) + + ## Connection Pooling + + The IPC transport uses NimblePool to manage a pool of Unix domain socket connections. + This provides several benefits: + + * Efficient resource utilization + * Automatic connection lifecycle management + * Configurable pool size for different workloads + * Connection reuse for better performance + + ## Error Handling + + The transport handles several error cases: + * Invalid socket path format + * Missing required options + * Connection failures + * Socket communication errors + * Pool exhaustion + + See `Exth.Transport.Transportable` for protocol details. + """ alias __MODULE__.ConnectionPool - # alias __MODULE__.Socket + + @typedoc "IPC transport configuration" + @type t :: %__MODULE__{ + path: String.t(), + pool: struct(), + socket_opts: list(), + timeout: non_neg_integer() + } defstruct [:path, :pool, :socket_opts, :timeout] @default_timeout 30_000 @default_socket_opts [:binary, active: false, reuseaddr: true] + @spec new(keyword()) :: t() def new(opts) do with {:ok, path} <- validate_required_path(opts[:path]) do timeout = opts[:timeout] || @default_timeout socket_opts = opts[:socket_opts] || @default_socket_opts - {:ok, pool_name} = ConnectionPool.start(path: path, socket_opts: socket_opts) + {:ok, pool} = ConnectionPool.start(opts ++ [socket_opts: socket_opts]) %__MODULE__{ path: path, - pool: pool_name, + pool: pool, socket_opts: socket_opts, timeout: timeout } end end + @spec call(t(), String.t()) :: {:ok, String.t()} | {:error, term()} def call(%__MODULE__{} = transport, request) do ConnectionPool.call(transport.pool, request, transport.timeout) end diff --git a/lib/exth/transport/ipc/connection_pool.ex b/lib/exth/transport/ipc/connection_pool.ex index d84435b..e77deaa 100644 --- a/lib/exth/transport/ipc/connection_pool.ex +++ b/lib/exth/transport/ipc/connection_pool.ex @@ -4,29 +4,97 @@ defmodule Exth.Transport.Ipc.ConnectionPool do alias Exth.Transport alias Exth.Transport.Ipc + @typedoc "Connection pool configuration" + @type t :: %__MODULE__{ + lazy: boolean(), + max_idle_pings: integer(), + name: atom(), + pool_size: non_neg_integer(), + worker: {module(), term()}, + worker_idle_timeout: non_neg_integer() | nil + } + + defstruct [:lazy, :max_idle_pings, :name, :pool_size, :worker, :worker_idle_timeout] + @pool_timeout 30_000 ### ### Public API ### + @doc """ + Starts a new connection pool for the given socket path. + + ## Options + * `:path` - (required) The Unix domain socket path + * `:socket_opts` - (required) TCP socket options + * `:pool_size` - Number of connections in the pool (defaults to 10) + * `:pool_lazy_workers` - Whether to create workers lazily (defaults to true) + * `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil) + * `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1) + + ## Returns + * `{:ok, pool}` - Successfully started pool + * `{:error, reason}` - Failed to start pool + + ## Examples + + {:ok, pool} = ConnectionPool.start( + path: "/tmp/ethereum.ipc", + socket_opts: [:binary, active: false], + pool_size: 5 + ) + """ + @spec start(keyword()) :: {:ok, t()} | {:error, term()} def start(opts) do path = Keyword.fetch!(opts, :path) socket_opts = Keyword.fetch!(opts, :socket_opts) + # pool opts + {pool_size, opts} = Keyword.pop(opts, :pool_size, 10) + {pool_lazy_workers, opts} = Keyword.pop(opts, :pool_lazy_workers, true) + {pool_worker_idle_timeout, opts} = Keyword.pop(opts, :pool_worker_idle_timeout, nil) + {pool_max_idle_pings, _opts} = Keyword.pop(opts, :pool_max_idle_pings, -1) pool_name = via_tuple(path) - pool_spec = - {NimblePool, worker: {__MODULE__, {path, socket_opts}}, name: pool_name} + pool_opts = [ + lazy: pool_lazy_workers, + max_idle_pings: pool_max_idle_pings, + name: pool_name, + pool_size: pool_size, + worker: {__MODULE__, {path, socket_opts}}, + worker_idle_timeout: pool_worker_idle_timeout + ] - {:ok, _pid} = Ipc.DynamicSupervisor.start_pool(pool_spec) + {:ok, _pid} = Ipc.DynamicSupervisor.start_pool({NimblePool, pool_opts}) - {:ok, pool_name} + {:ok, struct(__MODULE__, pool_opts)} end - def call(pool, request, timeout) do + @doc """ + Makes a request through the connection pool. + + Checks out a connection from the pool, sends the request, and returns the response. + The connection is automatically returned to the pool after use. + + ## Arguments + * `pool` - The connection pool instance + * `request` - The JSON-RPC request as a string + * `timeout` - Request timeout in milliseconds + + ## Returns + * `{:ok, response}` - Successful request with encoded response + * `{:error, {:socket_error, reason}}` - Socket communication error + * `{:error, reason}` - Other errors (timeout, pool exhaustion, etc) + + ## Examples + + {:ok, response} = ConnectionPool.call(pool, request, 30_000) + """ + @spec call(t(), String.t(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()} + def call(%__MODULE__{} = pool, request, timeout) do NimblePool.checkout!( - pool, + pool.name, :checkout, fn _from, socket -> result = send_request(socket, request, timeout) diff --git a/test/exth/transport/ipc_test.exs b/test/exth/transport/ipc_test.exs index 4aff4c5..36c6571 100644 --- a/test/exth/transport/ipc_test.exs +++ b/test/exth/transport/ipc_test.exs @@ -6,20 +6,22 @@ defmodule Exth.Transport.IpcTest do describe "new/1" do test "creates a new IPC transport with valid path" do - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end) + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) transport = Ipc.new(path: "/tmp/test.sock") assert %Ipc{ - pool: "pool_name", path: "/tmp/test.sock", + pool: ^pool, socket_opts: [:binary, active: false, reuseaddr: true], timeout: 30_000 } = transport end test "creates a new IPC transport with custom options" do - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end) + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) transport = Ipc.new( @@ -30,6 +32,7 @@ defmodule Exth.Transport.IpcTest do assert %Ipc{ path: "/tmp/custom.sock", + pool: ^pool, socket_opts: [:binary, active: false], timeout: 15_000 } = transport @@ -50,17 +53,18 @@ defmodule Exth.Transport.IpcTest do describe "call/2" do setup do - pool_name = "pool_name" - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool_name} end) + path = "/tmp/test.sock" + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) - {:ok, transport: Ipc.new(path: "/tmp/test.sock"), pool_name: pool_name} + {:ok, transport: Ipc.new(path: path), pool: pool} end - test "sends request through socket", %{transport: transport, pool_name: pool_name} do + test "sends request through socket", %{transport: transport, pool: pool} do request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []}) response = Jason.encode!(%{jsonrpc: "2.0", id: 1, result: "0x1"}) - expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 -> {:ok, response} end) + expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 -> {:ok, response} end) result = Ipc.call(transport, request) @@ -69,11 +73,11 @@ defmodule Exth.Transport.IpcTest do test "returns error when something is wrong with the socket", %{ transport: transport, - pool_name: pool_name + pool: pool } do request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []}) - expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 -> + expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 -> {:error, {:socket_error, :bad_data}} end)