From 785b21f2acecd4ed2a74d96c0a0ad4f15446d23c Mon Sep 17 00:00:00 2001 From: joaop21 Date: Mon, 4 Aug 2025 23:47:30 +0100 Subject: [PATCH 1/5] enrich pool opts --- lib/exth/transport/ipc/connection_pool.ex | 19 +++++++++++++++---- 1 file changed, 15 insertions(+), 4 deletions(-) diff --git a/lib/exth/transport/ipc/connection_pool.ex b/lib/exth/transport/ipc/connection_pool.ex index d84435b..8637ec9 100644 --- a/lib/exth/transport/ipc/connection_pool.ex +++ b/lib/exth/transport/ipc/connection_pool.ex @@ -14,14 +14,25 @@ defmodule Exth.Transport.Ipc.ConnectionPool do path = Keyword.fetch!(opts, :path) socket_opts = Keyword.fetch!(opts, :socket_opts) + # pool opts + {pool_size, opts} = Keyword.pop(opts, :pool_size, 10) + {pool_lazy_workers, opts} = Keyword.pop(opts, :pool_lazy_workers, true) + {pool_worker_idle_timeout, opts} = Keyword.pop(opts, :pool_worker_idle_timeout, nil) + {pool_max_idle_pings, _opts} = Keyword.pop(opts, :pool_max_idle_pings, -1) pool_name = via_tuple(path) - pool_spec = - {NimblePool, worker: {__MODULE__, {path, socket_opts}}, name: pool_name} + pool_opts = [ + lazy: pool_lazy_workers, + max_idle_pings: pool_max_idle_pings, + name: pool_name, + pool_size: pool_size, + worker: {__MODULE__, {path, socket_opts}}, + worker_idle_timeout: pool_worker_idle_timeout + ] - {:ok, _pid} = Ipc.DynamicSupervisor.start_pool(pool_spec) + {:ok, _pid} = Ipc.DynamicSupervisor.start_pool({NimblePool, pool_opts}) - {:ok, pool_name} + {:ok, pool_opts} end def call(pool, request, timeout) do From aa5dda479674c3fe9053e4196d07e4b30339e94b Mon Sep 17 00:00:00 2001 From: joaop21 Date: Tue, 5 Aug 2025 00:01:13 +0100 Subject: [PATCH 2/5] retrieve custom connection pool --- lib/exth/transport/ipc/connection_pool.ex | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/lib/exth/transport/ipc/connection_pool.ex b/lib/exth/transport/ipc/connection_pool.ex index 8637ec9..233bd95 100644 --- a/lib/exth/transport/ipc/connection_pool.ex +++ b/lib/exth/transport/ipc/connection_pool.ex @@ -4,6 +4,8 @@ defmodule Exth.Transport.Ipc.ConnectionPool do alias Exth.Transport alias Exth.Transport.Ipc + defstruct [:lazy, :max_idle_pings, :name, :pool_size, :worker, :worker_idle_timeout] + @pool_timeout 30_000 ### @@ -32,12 +34,12 @@ defmodule Exth.Transport.Ipc.ConnectionPool do {:ok, _pid} = Ipc.DynamicSupervisor.start_pool({NimblePool, pool_opts}) - {:ok, pool_opts} + {:ok, struct(__MODULE__, pool_opts)} end - def call(pool, request, timeout) do + def call(%__MODULE__{} = pool, request, timeout) do NimblePool.checkout!( - pool, + pool.name, :checkout, fn _from, socket -> result = send_request(socket, request, timeout) From 48612b4dd2e38124d9cc1902da881c14eeee4792 Mon Sep 17 00:00:00 2001 From: joaop21 Date: Tue, 5 Aug 2025 00:01:25 +0100 Subject: [PATCH 3/5] add socket_opts to opts --- lib/exth/transport/ipc.ex | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/lib/exth/transport/ipc.ex b/lib/exth/transport/ipc.ex index cf80867..39cbb32 100644 --- a/lib/exth/transport/ipc.ex +++ b/lib/exth/transport/ipc.ex @@ -14,11 +14,11 @@ defmodule Exth.Transport.Ipc do timeout = opts[:timeout] || @default_timeout socket_opts = opts[:socket_opts] || @default_socket_opts - {:ok, pool_name} = ConnectionPool.start(path: path, socket_opts: socket_opts) + {:ok, %ConnectionPool{} = pool} = ConnectionPool.start(opts ++ [socket_opts: socket_opts]) %__MODULE__{ path: path, - pool: pool_name, + pool: pool, socket_opts: socket_opts, timeout: timeout } From a8e0ac2945b91615188a63b0daf5b5d309a47399 Mon Sep 17 00:00:00 2001 From: joaop21 Date: Tue, 5 Aug 2025 00:01:30 +0100 Subject: [PATCH 4/5] update tests --- test/exth/transport/ipc_test.exs | 24 ++++++++++++++---------- 1 file changed, 14 insertions(+), 10 deletions(-) diff --git a/test/exth/transport/ipc_test.exs b/test/exth/transport/ipc_test.exs index 4aff4c5..36c6571 100644 --- a/test/exth/transport/ipc_test.exs +++ b/test/exth/transport/ipc_test.exs @@ -6,20 +6,22 @@ defmodule Exth.Transport.IpcTest do describe "new/1" do test "creates a new IPC transport with valid path" do - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end) + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) transport = Ipc.new(path: "/tmp/test.sock") assert %Ipc{ - pool: "pool_name", path: "/tmp/test.sock", + pool: ^pool, socket_opts: [:binary, active: false, reuseaddr: true], timeout: 30_000 } = transport end test "creates a new IPC transport with custom options" do - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end) + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) transport = Ipc.new( @@ -30,6 +32,7 @@ defmodule Exth.Transport.IpcTest do assert %Ipc{ path: "/tmp/custom.sock", + pool: ^pool, socket_opts: [:binary, active: false], timeout: 15_000 } = transport @@ -50,17 +53,18 @@ defmodule Exth.Transport.IpcTest do describe "call/2" do setup do - pool_name = "pool_name" - expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool_name} end) + path = "/tmp/test.sock" + pool = %Ipc.ConnectionPool{name: "pool_name"} + expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end) - {:ok, transport: Ipc.new(path: "/tmp/test.sock"), pool_name: pool_name} + {:ok, transport: Ipc.new(path: path), pool: pool} end - test "sends request through socket", %{transport: transport, pool_name: pool_name} do + test "sends request through socket", %{transport: transport, pool: pool} do request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []}) response = Jason.encode!(%{jsonrpc: "2.0", id: 1, result: "0x1"}) - expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 -> {:ok, response} end) + expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 -> {:ok, response} end) result = Ipc.call(transport, request) @@ -69,11 +73,11 @@ defmodule Exth.Transport.IpcTest do test "returns error when something is wrong with the socket", %{ transport: transport, - pool_name: pool_name + pool: pool } do request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []}) - expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 -> + expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 -> {:error, {:socket_error, :bad_data}} end) From ea302526b5c6ce2b1a8509928d331912d8ed03bb Mon Sep 17 00:00:00 2001 From: joaop21 Date: Tue, 5 Aug 2025 00:08:18 +0100 Subject: [PATCH 5/5] add docs to ipc and connection pool --- lib/exth/transport/ipc.ex | 73 ++++++++++++++++++++++- lib/exth/transport/ipc/connection_pool.ex | 55 +++++++++++++++++ 2 files changed, 125 insertions(+), 3 deletions(-) diff --git a/lib/exth/transport/ipc.ex b/lib/exth/transport/ipc.ex index 39cbb32..9361e26 100644 --- a/lib/exth/transport/ipc.ex +++ b/lib/exth/transport/ipc.ex @@ -1,20 +1,86 @@ defmodule Exth.Transport.Ipc do - @moduledoc false + @moduledoc """ + IPC (Inter-Process Communication) transport implementation for JSON-RPC requests using Unix domain sockets. + + Implements the `Exth.Transport.Transportable` protocol for making IPC connections to JSON-RPC + endpoints via Unix domain sockets. Uses NimblePool for connection pooling and efficient + resource management. + + ## Features + + * Unix domain socket communication + * Connection pooling with NimblePool + * Automatic connection management + * Configurable pool size and timeouts + * Efficient resource utilization + * Process registration with via-tuples + + ## Usage + + transport = Transportable.new( + %Exth.Transport.Ipc{}, + path: "/tmp/ethereum.ipc" + ) + + {:ok, response} = Transportable.call(transport, request) + + ## Configuration + + Required options: + * `:path` - The Unix domain socket path (e.g., "/tmp/ethereum.ipc") + + Optional options: + * `:timeout` - Request timeout in milliseconds (defaults to 30000) + * `:socket_opts` - TCP socket options (defaults to [:binary, active: false, reuseaddr: true]) + * `:pool_size` - Number of connections in the pool (defaults to 10) + * `:pool_lazy_workers` - Whether to create workers lazily (defaults to true) + * `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil) + * `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1) + + ## Connection Pooling + + The IPC transport uses NimblePool to manage a pool of Unix domain socket connections. + This provides several benefits: + + * Efficient resource utilization + * Automatic connection lifecycle management + * Configurable pool size for different workloads + * Connection reuse for better performance + + ## Error Handling + + The transport handles several error cases: + * Invalid socket path format + * Missing required options + * Connection failures + * Socket communication errors + * Pool exhaustion + + See `Exth.Transport.Transportable` for protocol details. + """ alias __MODULE__.ConnectionPool - # alias __MODULE__.Socket + + @typedoc "IPC transport configuration" + @type t :: %__MODULE__{ + path: String.t(), + pool: struct(), + socket_opts: list(), + timeout: non_neg_integer() + } defstruct [:path, :pool, :socket_opts, :timeout] @default_timeout 30_000 @default_socket_opts [:binary, active: false, reuseaddr: true] + @spec new(keyword()) :: t() def new(opts) do with {:ok, path} <- validate_required_path(opts[:path]) do timeout = opts[:timeout] || @default_timeout socket_opts = opts[:socket_opts] || @default_socket_opts - {:ok, %ConnectionPool{} = pool} = ConnectionPool.start(opts ++ [socket_opts: socket_opts]) + {:ok, pool} = ConnectionPool.start(opts ++ [socket_opts: socket_opts]) %__MODULE__{ path: path, @@ -25,6 +91,7 @@ defmodule Exth.Transport.Ipc do end end + @spec call(t(), String.t()) :: {:ok, String.t()} | {:error, term()} def call(%__MODULE__{} = transport, request) do ConnectionPool.call(transport.pool, request, transport.timeout) end diff --git a/lib/exth/transport/ipc/connection_pool.ex b/lib/exth/transport/ipc/connection_pool.ex index 233bd95..e77deaa 100644 --- a/lib/exth/transport/ipc/connection_pool.ex +++ b/lib/exth/transport/ipc/connection_pool.ex @@ -4,6 +4,16 @@ defmodule Exth.Transport.Ipc.ConnectionPool do alias Exth.Transport alias Exth.Transport.Ipc + @typedoc "Connection pool configuration" + @type t :: %__MODULE__{ + lazy: boolean(), + max_idle_pings: integer(), + name: atom(), + pool_size: non_neg_integer(), + worker: {module(), term()}, + worker_idle_timeout: non_neg_integer() | nil + } + defstruct [:lazy, :max_idle_pings, :name, :pool_size, :worker, :worker_idle_timeout] @pool_timeout 30_000 @@ -12,6 +22,30 @@ defmodule Exth.Transport.Ipc.ConnectionPool do ### Public API ### + @doc """ + Starts a new connection pool for the given socket path. + + ## Options + * `:path` - (required) The Unix domain socket path + * `:socket_opts` - (required) TCP socket options + * `:pool_size` - Number of connections in the pool (defaults to 10) + * `:pool_lazy_workers` - Whether to create workers lazily (defaults to true) + * `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil) + * `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1) + + ## Returns + * `{:ok, pool}` - Successfully started pool + * `{:error, reason}` - Failed to start pool + + ## Examples + + {:ok, pool} = ConnectionPool.start( + path: "/tmp/ethereum.ipc", + socket_opts: [:binary, active: false], + pool_size: 5 + ) + """ + @spec start(keyword()) :: {:ok, t()} | {:error, term()} def start(opts) do path = Keyword.fetch!(opts, :path) socket_opts = Keyword.fetch!(opts, :socket_opts) @@ -37,6 +71,27 @@ defmodule Exth.Transport.Ipc.ConnectionPool do {:ok, struct(__MODULE__, pool_opts)} end + @doc """ + Makes a request through the connection pool. + + Checks out a connection from the pool, sends the request, and returns the response. + The connection is automatically returned to the pool after use. + + ## Arguments + * `pool` - The connection pool instance + * `request` - The JSON-RPC request as a string + * `timeout` - Request timeout in milliseconds + + ## Returns + * `{:ok, response}` - Successful request with encoded response + * `{:error, {:socket_error, reason}}` - Socket communication error + * `{:error, reason}` - Other errors (timeout, pool exhaustion, etc) + + ## Examples + + {:ok, response} = ConnectionPool.call(pool, request, 30_000) + """ + @spec call(t(), String.t(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()} def call(%__MODULE__{} = pool, request, timeout) do NimblePool.checkout!( pool.name,