Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 71 additions & 4 deletions lib/exth/transport/ipc.ex
Original file line number Diff line number Diff line change
@@ -1,30 +1,97 @@
defmodule Exth.Transport.Ipc do
@moduledoc false
@moduledoc """
IPC (Inter-Process Communication) transport implementation for JSON-RPC requests using Unix domain sockets.

Implements the `Exth.Transport.Transportable` protocol for making IPC connections to JSON-RPC
endpoints via Unix domain sockets. Uses NimblePool for connection pooling and efficient
resource management.

## Features

* Unix domain socket communication
* Connection pooling with NimblePool
* Automatic connection management
* Configurable pool size and timeouts
* Efficient resource utilization
* Process registration with via-tuples

## Usage

transport = Transportable.new(
%Exth.Transport.Ipc{},
path: "/tmp/ethereum.ipc"
)

{:ok, response} = Transportable.call(transport, request)

## Configuration

Required options:
* `:path` - The Unix domain socket path (e.g., "/tmp/ethereum.ipc")

Optional options:
* `:timeout` - Request timeout in milliseconds (defaults to 30000)
* `:socket_opts` - TCP socket options (defaults to [:binary, active: false, reuseaddr: true])
* `:pool_size` - Number of connections in the pool (defaults to 10)
* `:pool_lazy_workers` - Whether to create workers lazily (defaults to true)
* `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil)
* `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1)

## Connection Pooling

The IPC transport uses NimblePool to manage a pool of Unix domain socket connections.
This provides several benefits:

* Efficient resource utilization
* Automatic connection lifecycle management
* Configurable pool size for different workloads
* Connection reuse for better performance

## Error Handling

The transport handles several error cases:
* Invalid socket path format
* Missing required options
* Connection failures
* Socket communication errors
* Pool exhaustion

See `Exth.Transport.Transportable` for protocol details.
"""

alias __MODULE__.ConnectionPool
# alias __MODULE__.Socket

@typedoc "IPC transport configuration"
@type t :: %__MODULE__{
path: String.t(),
pool: struct(),
socket_opts: list(),
timeout: non_neg_integer()
}

defstruct [:path, :pool, :socket_opts, :timeout]

@default_timeout 30_000
@default_socket_opts [:binary, active: false, reuseaddr: true]

@spec new(keyword()) :: t()
def new(opts) do
with {:ok, path} <- validate_required_path(opts[:path]) do
timeout = opts[:timeout] || @default_timeout
socket_opts = opts[:socket_opts] || @default_socket_opts

{:ok, pool_name} = ConnectionPool.start(path: path, socket_opts: socket_opts)
{:ok, pool} = ConnectionPool.start(opts ++ [socket_opts: socket_opts])

%__MODULE__{
path: path,
pool: pool_name,
pool: pool,
socket_opts: socket_opts,
timeout: timeout
}
end
end

@spec call(t(), String.t()) :: {:ok, String.t()} | {:error, term()}
def call(%__MODULE__{} = transport, request) do
ConnectionPool.call(transport.pool, request, transport.timeout)
end
Expand Down
80 changes: 74 additions & 6 deletions lib/exth/transport/ipc/connection_pool.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,97 @@ defmodule Exth.Transport.Ipc.ConnectionPool do
alias Exth.Transport
alias Exth.Transport.Ipc

@typedoc "Connection pool configuration"
@type t :: %__MODULE__{
lazy: boolean(),
max_idle_pings: integer(),
name: atom(),
pool_size: non_neg_integer(),
worker: {module(), term()},
worker_idle_timeout: non_neg_integer() | nil
}

defstruct [:lazy, :max_idle_pings, :name, :pool_size, :worker, :worker_idle_timeout]

@pool_timeout 30_000

###
### Public API
###

@doc """
Starts a new connection pool for the given socket path.

## Options
* `:path` - (required) The Unix domain socket path
* `:socket_opts` - (required) TCP socket options
* `:pool_size` - Number of connections in the pool (defaults to 10)
* `:pool_lazy_workers` - Whether to create workers lazily (defaults to true)
* `:pool_worker_idle_timeout` - Worker idle timeout (defaults to nil)
* `:pool_max_idle_pings` - Maximum idle pings before worker termination (defaults to -1)

## Returns
* `{:ok, pool}` - Successfully started pool
* `{:error, reason}` - Failed to start pool

## Examples

{:ok, pool} = ConnectionPool.start(
path: "/tmp/ethereum.ipc",
socket_opts: [:binary, active: false],
pool_size: 5
)
"""
@spec start(keyword()) :: {:ok, t()} | {:error, term()}
def start(opts) do
path = Keyword.fetch!(opts, :path)
socket_opts = Keyword.fetch!(opts, :socket_opts)

# pool opts
{pool_size, opts} = Keyword.pop(opts, :pool_size, 10)
{pool_lazy_workers, opts} = Keyword.pop(opts, :pool_lazy_workers, true)
{pool_worker_idle_timeout, opts} = Keyword.pop(opts, :pool_worker_idle_timeout, nil)
{pool_max_idle_pings, _opts} = Keyword.pop(opts, :pool_max_idle_pings, -1)
pool_name = via_tuple(path)

pool_spec =
{NimblePool, worker: {__MODULE__, {path, socket_opts}}, name: pool_name}
pool_opts = [
lazy: pool_lazy_workers,
max_idle_pings: pool_max_idle_pings,
name: pool_name,
pool_size: pool_size,
worker: {__MODULE__, {path, socket_opts}},
worker_idle_timeout: pool_worker_idle_timeout
]

{:ok, _pid} = Ipc.DynamicSupervisor.start_pool(pool_spec)
{:ok, _pid} = Ipc.DynamicSupervisor.start_pool({NimblePool, pool_opts})

{:ok, pool_name}
{:ok, struct(__MODULE__, pool_opts)}
end

def call(pool, request, timeout) do
@doc """
Makes a request through the connection pool.

Checks out a connection from the pool, sends the request, and returns the response.
The connection is automatically returned to the pool after use.

## Arguments
* `pool` - The connection pool instance
* `request` - The JSON-RPC request as a string
* `timeout` - Request timeout in milliseconds

## Returns
* `{:ok, response}` - Successful request with encoded response
* `{:error, {:socket_error, reason}}` - Socket communication error
* `{:error, reason}` - Other errors (timeout, pool exhaustion, etc)

## Examples

{:ok, response} = ConnectionPool.call(pool, request, 30_000)
"""
@spec call(t(), String.t(), non_neg_integer()) :: {:ok, String.t()} | {:error, term()}
def call(%__MODULE__{} = pool, request, timeout) do
NimblePool.checkout!(
pool,
pool.name,
:checkout,
fn _from, socket ->
result = send_request(socket, request, timeout)
Expand Down
24 changes: 14 additions & 10 deletions test/exth/transport/ipc_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,22 @@ defmodule Exth.Transport.IpcTest do

describe "new/1" do
test "creates a new IPC transport with valid path" do
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end)
pool = %Ipc.ConnectionPool{name: "pool_name"}
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end)

transport = Ipc.new(path: "/tmp/test.sock")

assert %Ipc{
pool: "pool_name",
path: "/tmp/test.sock",
pool: ^pool,
socket_opts: [:binary, active: false, reuseaddr: true],
timeout: 30_000
} = transport
end

test "creates a new IPC transport with custom options" do
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, "pool_name"} end)
pool = %Ipc.ConnectionPool{name: "pool_name"}
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end)

transport =
Ipc.new(
Expand All @@ -30,6 +32,7 @@ defmodule Exth.Transport.IpcTest do

assert %Ipc{
path: "/tmp/custom.sock",
pool: ^pool,
socket_opts: [:binary, active: false],
timeout: 15_000
} = transport
Expand All @@ -50,17 +53,18 @@ defmodule Exth.Transport.IpcTest do

describe "call/2" do
setup do
pool_name = "pool_name"
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool_name} end)
path = "/tmp/test.sock"
pool = %Ipc.ConnectionPool{name: "pool_name"}
expect(Ipc.ConnectionPool, :start, fn _opts -> {:ok, pool} end)

{:ok, transport: Ipc.new(path: "/tmp/test.sock"), pool_name: pool_name}
{:ok, transport: Ipc.new(path: path), pool: pool}
end

test "sends request through socket", %{transport: transport, pool_name: pool_name} do
test "sends request through socket", %{transport: transport, pool: pool} do
request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []})
response = Jason.encode!(%{jsonrpc: "2.0", id: 1, result: "0x1"})

expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 -> {:ok, response} end)
expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 -> {:ok, response} end)

result = Ipc.call(transport, request)

Expand All @@ -69,11 +73,11 @@ defmodule Exth.Transport.IpcTest do

test "returns error when something is wrong with the socket", %{
transport: transport,
pool_name: pool_name
pool: pool
} do
request = Jason.encode!(%{jsonrpc: "2.0", id: 1, method: "eth_blockNumber", params: []})

expect(Ipc.ConnectionPool, :call, fn ^pool_name, ^request, 30_000 ->
expect(Ipc.ConnectionPool, :call, fn ^pool, ^request, 30_000 ->
{:error, {:socket_error, :bad_data}}
end)

Expand Down