Skip to content

wavezync/durable

Folders and files

NameName
Last commit message
Last commit date

Latest commit

ย 

History

25 Commits
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 
ย 

Repository files navigation

Durable

Build Status Hex.pm

A durable, resumable workflow engine for Elixir, similar to Temporal/Inngest.

โœจ Features

  • ๐Ÿ“ Declarative DSL - Clean macro-based workflow definitions
  • โธ๏ธ Resumability - Sleep, wait for events, wait for human input
  • ๐Ÿ”€ Conditional Branching - Intuitive branch construct for flow control
  • โšก Parallel Execution - Run steps concurrently with parallel
  • ๐Ÿ”„ Reliability - Automatic retries with configurable backoff strategies
  • ๐Ÿ” Observability - Built-in log capture per step
  • ๐Ÿ’พ Persistence - PostgreSQL-backed execution state

๐Ÿ“ฆ Installation

Add durable to your list of dependencies in mix.exs:

def deps do
  [
    {:durable, "~> 0.0.0-alpha"}
  ]
end

๐Ÿš€ Quick Start

1. Create the Migration

Durable stores all data in a dedicated PostgreSQL schema called durable. Create a migration:

mix ecto.gen.migration add_durable
# priv/repo/migrations/XXXXXX_add_durable.exs
defmodule MyApp.Repo.Migrations.AddDurable do
  use Ecto.Migration

  def up, do: Durable.Migration.up()
  def down, do: Durable.Migration.down()
end

Run the migration:

mix ecto.migrate

2. Add to Supervision Tree

Add Durable to your application's supervision tree:

# lib/my_app/application.ex
def start(_type, _args) do
  children = [
    MyApp.Repo,
    {Durable,
      repo: MyApp.Repo,
      queues: %{
        default: [concurrency: 10, poll_interval: 1000]
      }}
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

Configuration Options

Option Type Default Description
:repo atom required Your Ecto repo module
:name atom Durable Instance name (for multiple instances)
:prefix string "durable" PostgreSQL schema name
:queues map %{default: [...]} Queue configurations
:queue_enabled boolean true Enable/disable queue processing
:stale_lock_timeout integer 300 Seconds before a lock is considered stale
:heartbeat_interval integer 30_000 Milliseconds between worker heartbeats

3. Define a Workflow

defmodule MyApp.OrderWorkflow do
  use Durable
  use Durable.Context

  workflow "process_order", timeout: hours(2) do
    step :validate do
      order = input().order
      put_context(:order_id, order.id)
      put_context(:items, order.items)
    end

    step :calculate_total do
      items = get_context(:items)
      total = Enum.sum(Enum.map(items, & &1.price))
      put_context(:total, total)
    end

    step :charge_payment, retry: [max_attempts: 3, backoff: :exponential] do
      total = get_context(:total)
      {:ok, charge} = PaymentService.charge(get_context(:order_id), total)
      put_context(:charge_id, charge.id)
    end

    step :send_confirmation do
      EmailService.send_confirmation(get_context(:order_id))
    end
  end
end

4. Start a Workflow

{:ok, workflow_id} = Durable.start(MyApp.OrderWorkflow, %{order: order})

5. Query Execution Status

{:ok, execution} = Durable.get_execution(workflow_id)
execution.status  # => :completed
execution.context # => %{order_id: 123, total: 99.99, charge_id: "ch_xxx"}

๐Ÿ“„ Document Processing Example

Durable shines for multi-step pipelines that need reliability and clear flow control:

defmodule MyApp.DocumentProcessor do
  use Durable
  use Durable.Context

  workflow "process_document" do
    step :fetch do
      doc = DocumentStore.get(input()["doc_id"])
      put_context(:doc, doc)
      put_context(:doc_type, doc.type)
    end

    # Conditional branching - only ONE path executes
    branch on: get_context(:doc_type) do
      :invoice ->
        step :process_invoice, retry: [max_attempts: 3] do
          invoice = InvoiceParser.parse(get_context(:doc))
          put_context(:result, invoice)
        end

        step :validate_invoice do
          invoice = get_context(:result)
          put_context(:valid, invoice.total == Enum.sum(invoice.line_items))
        end

      :contract ->
        step :process_contract do
          contract = ContractParser.parse(get_context(:doc))
          put_context(:result, contract)
        end

      _ ->
        step :flag_for_review do
          put_context(:needs_review, true)
        end
    end

    # Runs after any branch completes
    step :store do
      DocumentStore.update(get_context(:doc).id, %{
        doc_type: get_context(:doc_type),
        processed_data: get_context(:result, %{}),
        needs_review: get_context(:needs_review, false)
      })
    end
  end
end

Key benefits:

  • ๐Ÿ”„ Automatic Retries - Failed steps retry with configurable backoff
  • ๐Ÿ’พ State Persistence - Workflow resumes from the last step after crashes
  • ๐Ÿ”€ Clear Flow Control - The branch construct makes conditional logic readable
  • ๐Ÿ” Observability - Each step's logs are captured for debugging

๐Ÿ“– DSL Reference

Workflow Definition

workflow "name", timeout: hours(2), max_retries: 3 do
  # steps...
end

Step Definition

step :name do
  # step logic
end

step :name, retry: [max_attempts: 3, backoff: :exponential] do
  # step with retry
end

step :name, timeout: minutes(5) do
  # step with timeout
end

๐Ÿ”€ Branch (Conditional Flow)

The branch macro provides intuitive conditional execution. Only ONE branch executes based on the condition, then execution continues after the branch block.

branch on: get_context(:status) do
  :approved ->
    step :process_approved do
      # Handle approved case
    end

  :rejected ->
    step :process_rejected do
      # Handle rejected case
    end

  _ ->
    step :process_default do
      # Default case
    end
end

Features:

  • Pattern matching on atoms, strings, integers, and booleans
  • Default clause with _ wildcard
  • Multiple steps per branch
  • Execution continues after the branch block

โšก Parallel Execution

Run multiple steps concurrently and wait for all to complete:

parallel do
  step :fetch_user do
    put_context(:user, UserService.get(input().user_id))
  end

  step :fetch_orders do
    put_context(:orders, OrderService.list(input().user_id))
  end

  step :fetch_preferences do
    put_context(:prefs, PreferenceService.get(input().user_id))
  end
end

# Continues after all parallel steps complete
step :build_dashboard do
  Dashboard.build(
    get_context(:user),
    get_context(:orders),
    get_context(:prefs)
  )
end

๐Ÿš€ Advanced: Parallel Workflows

Nest parallel inside branch to run different concurrent tasks based on conditions. This example processes documents differently by type - contracts run 3 extractions in parallel, while invoices use a single step:

workflow "process_document" do
  step :fetch do
    doc = DocumentStore.get(input()["doc_id"])
    put_context(:doc, doc)
  end

  step :classify, retry: [max_attempts: 3] do
    result = AI.classify(get_context(:doc).content)
    put_context(:doc_type, result.type)
  end

  branch on: get_context(:doc_type) do
    :contract ->
      # Run multiple AI extractions in parallel
      parallel do
        step :extract_parties do
          put_context(:parties, AI.extract_parties(get_context(:doc)))
        end

        step :extract_terms do
          put_context(:terms, AI.extract_terms(get_context(:doc)))
        end

        step :check_signatures do
          put_context(:signatures, AI.detect_signatures(get_context(:doc)))
        end
      end

      step :merge_results do
        put_context(:extracted, %{
          parties: get_context(:parties),
          terms: get_context(:terms),
          signatures: get_context(:signatures)
        })
      end

    :invoice ->
      step :extract_invoice do
        put_context(:extracted, AI.extract_invoice(get_context(:doc)))
      end

    _ ->
      step :flag_review do
        put_context(:needs_review, true)
      end
  end

  step :store do
    DocumentStore.save(get_context(:doc).id, get_context(:extracted, %{}))
  end
end

๐Ÿ“ฆ Context Management

use Durable.Context

# Read
context()                    # Get entire context
get_context(:key)            # Get specific key
get_context(:key, default)   # Get with default
input()                      # Get initial input
workflow_id()                # Get current workflow ID

# Write
put_context(:key, value)     # Set single key
put_context(%{k1: v1})       # Merge map
update_context(:key, &(&1 + 1))
delete_context(:key)

# Accumulators
append_context(:list, value)
increment_context(:counter, 1)

โฐ Time Helpers

seconds(30)   # 30,000 ms
minutes(5)    # 300,000 ms
hours(2)      # 7,200,000 ms
days(7)       # 604,800,000 ms

โธ๏ธ Wait Primitives

use Durable.Wait

# Sleep for duration
sleep_for(seconds: 30)
sleep_for(minutes: 5)
sleep_for(hours: 24)

# Sleep until specific time
sleep_until(~U[2025-12-25 00:00:00Z])

# Wait for external event
wait_for_event("payment_confirmed", timeout: minutes(5))

# Wait for human input
wait_for_input("manager_decision", timeout: days(3))

๐Ÿ”„ Retry Strategies

  • :exponential - Delay = base^attempt * 1000ms (default)
  • :linear - Delay = attempt * base * 1000ms
  • :constant - Fixed delay between retries
step :api_call, retry: [
  max_attempts: 5,
  backoff: :exponential,
  base: 2,
  max_backoff: 60_000  # Cap at 1 minute
] do
  ExternalAPI.call()
end

๐Ÿ”Œ API Reference

Starting Workflows

Durable.start(Module, input)
Durable.start(Module, input,
  workflow: "name",
  queue: :high_priority,
  priority: 10,
  scheduled_at: ~U[2025-01-01 00:00:00Z]
)

Querying Executions

Durable.get_execution(workflow_id)
Durable.get_execution(workflow_id, include_steps: true)

Durable.list_executions(
  workflow: MyApp.OrderWorkflow,
  status: :running,
  limit: 100
)

Controlling Workflows

Durable.cancel(workflow_id)
Durable.cancel(workflow_id, "reason")

Providing Input/Events

# Resume a waiting workflow with input
Durable.provide_input(workflow_id, "manager_decision", %{approved: true})

# Send an event to a waiting workflow
Durable.send_event(workflow_id, "payment_confirmed", %{payment_id: "pay_123"})

๐Ÿ”ฎ Coming Soon

  • ๐Ÿ” Collection iteration (each items, as: :item do ... end)
  • ๐Ÿ”— Workflow orchestration - Call child workflows from steps
  • ๐Ÿ”ง Pipe-based API - Functional workflow composition
  • โ†ฉ๏ธ Compensation/Saga patterns
  • ๐Ÿ“… Cron scheduling
  • ๐Ÿ“Š Graph visualization
  • ๐Ÿ–ฅ๏ธ Phoenix LiveView dashboard

๐Ÿ“„ License

MIT