diff --git a/.gitignore b/.gitignore index c065b7a..e637850 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,8 @@ data/ logs/ configs/config.yaml + +# Binaries +/indexer +/indexer.exe +*.exe diff --git a/configs/config.example.yaml b/configs/config.example.yaml index f772d64..7cb4ac7 100644 --- a/configs/config.example.yaml +++ b/configs/config.example.yaml @@ -61,6 +61,27 @@ chains: - url: "https://bsc.blockrazor.xyz" - url: "https://bnb.rpc.subquery.network/public" + cardano_mainnet: + internal_code: "CARDANO_MAINNET" + network_id: "cardano" + type: "cardano" + start_block: 12768402 # Cardano mainnet block height + poll_interval: "15s" # Cardano block time is ~20 seconds + nodes: + - url: "https://cardano-mainnet.blockfrost.io/api/v0" + auth: + type: "header" + key: "project_id" + value: "BLOCKFROST_API_KEY" # Get from https://blockfrost.io/ + client: + timeout: "30s" + max_retries: 3 + retry_delay: "10s" + throttle: + rps: 10 # Blockfrost free tier allows 10 req/s + burst: 20 + concurrency: 4 # With a free plan from providers like Blockfrost, it's recommended to keep this value low (e.g., 2-4) + # Infrastructure services services: port: 8080 # Health check and monitoring server port diff --git a/internal/indexer/cardano.go b/internal/indexer/cardano.go new file mode 100644 index 0000000..5698d69 --- /dev/null +++ b/internal/indexer/cardano.go @@ -0,0 +1,301 @@ +package indexer + +import ( + "context" + "fmt" + "strings" + "time" + "sync" + + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/cardano" + "github.com/fystack/multichain-indexer/pkg/common/config" + "github.com/fystack/multichain-indexer/pkg/common/constant" + "github.com/fystack/multichain-indexer/pkg/common/enum" + "github.com/fystack/multichain-indexer/pkg/common/logger" + "github.com/fystack/multichain-indexer/pkg/common/types" + "github.com/shopspring/decimal" +) + + +type CardanoIndexer struct { + chainName string + config config.ChainConfig + failover *rpc.Failover[cardano.CardanoAPI] +} + +func NewCardanoIndexer( + chainName string, + cfg config.ChainConfig, + failover *rpc.Failover[cardano.CardanoAPI], +) *CardanoIndexer { + return &CardanoIndexer{ + chainName: chainName, + config: cfg, + failover: failover, + } +} + +func (c *CardanoIndexer) GetName() string { return strings.ToUpper(c.chainName) } +func (c *CardanoIndexer) GetNetworkType() enum.NetworkType { return enum.NetworkTypeCardano } +func (c *CardanoIndexer) GetNetworkInternalCode() string { + return c.config.InternalCode +} +func (c *CardanoIndexer) GetNetworkId() string { + return c.config.NetworkId +} + +// GetLatestBlockNumber fetches the latest block number +func (c *CardanoIndexer) GetLatestBlockNumber(ctx context.Context) (uint64, error) { + var latest uint64 + err := c.failover.ExecuteWithRetry(ctx, func(api cardano.CardanoAPI) error { + n, err := api.GetLatestBlockNumber(ctx) + latest = n + return err + }) + return latest, err +} + +// GetBlock fetches a single block (header + txs fetched in parallel with quota) +func (c *CardanoIndexer) GetBlock(ctx context.Context, blockNumber uint64) (*types.Block, error) { + var ( + header *cardano.BlockResponse + txHashes []string + txs []cardano.Transaction + ) + + err := c.failover.ExecuteWithRetry(ctx, func(api cardano.CardanoAPI) error { + var err error + // Fetch block header first + header, err = api.GetBlockHeaderByNumber(ctx, blockNumber) + if err != nil { + return err + } + // Use block hash to fetch transactions (avoids duplicate GetBlockHeaderByNumber call) + txHashes, err = api.GetTransactionsByBlockHash(ctx, header.Hash) + if err != nil { + return err + } + concurrency := c.config.Throttle.Concurrency + if concurrency <= 0 { + concurrency = cardano.DefaultTxFetchConcurrency + } + // Clamp concurrency to the number of transactions to avoid creating useless goroutines + if numTxs := len(txHashes); numTxs > 0 && numTxs < concurrency { + concurrency = numTxs + } + txs, err = api.FetchTransactionsParallel(ctx, txHashes, concurrency) + return err + }) + if err != nil { + return nil, err + } + + block := &cardano.Block{ + Hash: header.Hash, + Height: header.Height, + Slot: header.Slot, + Time: header.Time, + ParentHash: header.ParentHash, + } + // attach txs + for i := range txs { + block.Txs = append(block.Txs, txs[i]) + } + + return c.convertBlock(block), nil +} + +// GetBlocks fetches a range of blocks +func (c *CardanoIndexer) GetBlocks( + ctx context.Context, + from, to uint64, + isParallel bool, +) ([]BlockResult, error) { + if to < from { + return nil, fmt.Errorf("invalid range: from=%d, to=%d", from, to) + } + + blockNums := make([]uint64, 0, to-from+1) + for n := from; n <= to; n++ { + blockNums = append(blockNums, n) + } + + return c.fetchBlocks(ctx, blockNums, isParallel) +} + +// GetBlocksByNumbers fetches blocks by their numbers +func (c *CardanoIndexer) GetBlocksByNumbers( + ctx context.Context, + blockNumbers []uint64, +) ([]BlockResult, error) { + return c.fetchBlocks(ctx, blockNumbers, false) +} + +// fetchBlocks is the internal method to fetch blocks +func (c *CardanoIndexer) fetchBlocks( + ctx context.Context, + blockNums []uint64, + isParallel bool, +) ([]BlockResult, error) { + if len(blockNums) == 0 { + return nil, nil + } + + // For Cardano, we should fetch blocks sequentially to avoid rate limiting + // because each block fetch involves multiple API calls (header + txs + utxos for each tx) + // With Blockfrost free tier (10 RPS), parallel block fetching can easily exceed limits + workers := 1 // Always use 1 worker for block fetching to be safe + + // Only use configured concurrency if explicitly parallel and concurrency > 1 + if isParallel && c.config.Throttle.Concurrency > 1 { + workers = c.config.Throttle.Concurrency + if workers > len(blockNums) { + workers = len(blockNums) + } + } + + type job struct { + num uint64 + index int + } + + jobs := make(chan job, len(blockNums)) + results := make([]BlockResult, len(blockNums)) + + var wg sync.WaitGroup + for i := 0; i < workers; i++ { + wg.Add(1) + go func(workerID int) { + defer wg.Done() + blockCount := 0 + for j := range jobs { + // Early exit if context is canceled + select { + case <-ctx.Done(): + return + default: + } + + // Add delay every 5 blocks to avoid rate limiting + // This is critical for Cardano/Blockfrost to prevent burst traffic + if blockCount > 0 && blockCount%5 == 0 { + logger.Debug("Rate limit protection: pausing between blocks", + "worker", workerID, "blocks_processed", blockCount) + select { + case <-ctx.Done(): + return + case <-time.After(2 * time.Second): + } + } + + block, err := c.GetBlock(ctx, j.num) + if err != nil { + logger.Warn("failed to fetch block", "block", j.num, "error", err) + results[j.index] = BlockResult{ + Number: j.num, + Error: &Error{ErrorType: ErrorTypeBlockNotFound, Message: err.Error()}, + } + } else { + results[j.index] = BlockResult{Number: j.num, Block: block} + } + blockCount++ + } + }(i) + } + + // Feed jobs to workers and close channel when done + go func() { + defer close(jobs) + for i, num := range blockNums { + select { + case jobs <- job{num: num, index: i}: + case <-ctx.Done(): + return + } + } + }() + + wg.Wait() + + // Check if the context was canceled during the operation + if ctx.Err() != nil { + return nil, ctx.Err() + } + + return results, nil +} + +// convertBlock converts a Cardano block to the common Block type +func (c *CardanoIndexer) convertBlock(block *cardano.Block) *types.Block { + // Pre-allocate slice with a reasonable capacity to reduce re-allocations + estimatedSize := len(block.Txs) * 2 + transactions := make([]types.Transaction, 0, estimatedSize) + + for _, tx := range block.Txs { + // Skip failed transactions (e.g., script validation failed) + // valid when: no script (nil) OR smart contract executed successfully (true) + if tx.ValidContract != nil && !*tx.ValidContract { + continue + } + // Find a representative from address from non-reference, non-collateral inputs + fromAddr := "" + for _, inp := range tx.Inputs { + if !inp.Reference && !inp.Collateral && inp.Address != "" { + fromAddr = inp.Address + break + } + } + + // Convert fee (lovelace -> ADA) and assign to the first transfer produced by this tx + feeAda := decimal.NewFromInt(int64(tx.Fee)).Div(decimal.NewFromInt(1_000_000)) + feeAssigned := false + + for _, out := range tx.Outputs { + // Skip collateral outputs as they are not considered transfers to the recipient + if out.Collateral { + continue + } + for _, amt := range out.Amounts { + if amt.Quantity == "" || amt.Quantity == "0" { + continue + } + tr := types.Transaction{ + TxHash: tx.Hash, + NetworkId: c.GetNetworkId(), + BlockNumber: block.Height, + FromAddress: fromAddr, + ToAddress: out.Address, + Amount: amt.Quantity, + Type: constant.TxnTypeTransfer, + Timestamp: block.Time, + } + if amt.Unit != "lovelace" { + tr.AssetAddress = amt.Unit + } + if !feeAssigned { + tr.TxFee = feeAda + feeAssigned = true + } + transactions = append(transactions, tr) + } + } + } + + return &types.Block{ + Number: block.Height, + Hash: block.Hash, + ParentHash: block.ParentHash, + Timestamp: block.Time, + Transactions: transactions, + } +} + +// IsHealthy checks if the indexer is healthy +func (c *CardanoIndexer) IsHealthy() bool { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + _, err := c.GetLatestBlockNumber(ctx) + return err == nil +} \ No newline at end of file diff --git a/internal/rpc/cardano/api.go b/internal/rpc/cardano/api.go new file mode 100644 index 0000000..4530982 --- /dev/null +++ b/internal/rpc/cardano/api.go @@ -0,0 +1,22 @@ +package cardano + +import ( + "context" + + "github.com/fystack/multichain-indexer/internal/rpc" +) + +// CardanoAPI defines the interface for Cardano RPC operations +type CardanoAPI interface { + rpc.NetworkClient + GetLatestBlockNumber(ctx context.Context) (uint64, error) + GetBlockHeaderByNumber(ctx context.Context, blockNumber uint64) (*BlockResponse, error) + GetBlockByNumber(ctx context.Context, blockNumber uint64) (*Block, error) + GetBlockHash(ctx context.Context, blockNumber uint64) (string, error) + GetTransactionsByBlock(ctx context.Context, blockNumber uint64) ([]string, error) + GetTransactionsByBlockHash(ctx context.Context, blockHash string) ([]string, error) + GetTransaction(ctx context.Context, txHash string) (*Transaction, error) + FetchTransactionsParallel(ctx context.Context, txHashes []string, concurrency int) ([]Transaction, error) + GetBlockByHash(ctx context.Context, blockHash string) (*Block, error) +} + diff --git a/internal/rpc/cardano/cardano_test.go b/internal/rpc/cardano/cardano_test.go new file mode 100644 index 0000000..06f5f20 --- /dev/null +++ b/internal/rpc/cardano/cardano_test.go @@ -0,0 +1,123 @@ +package cardano + +import ( + "context" + "os" + "testing" + "time" + + rpclib "github.com/fystack/multichain-indexer/internal/rpc" +) + +// newClient creates a Cardano client using Blockfrost and the env API key. +func newClient(t *testing.T) *CardanoClient { + t.Helper() + apiKey := os.Getenv("BLOCKFROST_API_KEY") + if apiKey == "" { + t.Skip("skipping: BLOCKFROST_API_KEY not set (export your Blockfrost project_id)") + } + return NewCardanoClient( + "https://cardano-mainnet.blockfrost.io/api/v0", + &rpclib.AuthConfig{Type: rpclib.AuthTypeHeader, Key: "project_id", Value: apiKey}, + 10*time.Second, + nil, + ) +} + +func TestCardanoGetLatestBlockNumber(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + client := newClient(t) + ctx := context.Background() + bn, err := client.GetLatestBlockNumber(ctx) + if err != nil { + t.Fatalf("GetLatestBlockNumber failed: %v", err) + } + if bn == 0 { + t.Fatal("expected non-zero latest block number") + } + t.Logf("latest: %d", bn) +} + +func TestCardanoGetBlockByNumber(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + client := newClient(t) + ctx := context.Background() + + latest, err := client.GetLatestBlockNumber(ctx) + if err != nil { + t.Fatalf("GetLatestBlockNumber failed: %v", err) + } + // pick a recent block to avoid head reorgs + target := latest + if latest > 5 { + target = latest - 5 + } + blk, err := client.GetBlockByNumber(ctx, target) + if err != nil { + t.Fatalf("GetBlockByNumber(%d) failed: %v", target, err) + } + if blk == nil || blk.Hash == "" { + t.Fatalf("invalid block returned: %+v", blk) + } + t.Logf("block %d hash=%s txs=%d", blk.Height, blk.Hash, len(blk.Txs)) +} + +func TestCardanoGetBlockByHash(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + client := newClient(t) + ctx := context.Background() + + latest, err := client.GetLatestBlockNumber(ctx) + if err != nil { + t.Fatalf("GetLatestBlockNumber failed: %v", err) + } + hash, err := client.GetBlockHash(ctx, latest) + if err != nil { + t.Fatalf("GetBlockHash failed: %v", err) + } + blk, err := client.GetBlockByHash(ctx, hash) + if err != nil { + t.Fatalf("GetBlockByHash failed: %v", err) + } + if blk == nil || blk.Hash == "" { + t.Fatalf("invalid block returned: %+v", blk) + } + t.Logf("block by hash %s -> height=%d txs=%d", hash, blk.Height, len(blk.Txs)) +} + +func TestCardanoFetchTransactionsParallel(t *testing.T) { + if testing.Short() { + t.Skip("skipping integration test in short mode") + } + client := newClient(t) + ctx := context.Background() + + latest, err := client.GetLatestBlockNumber(ctx) + if err != nil { + t.Fatalf("GetLatestBlockNumber failed: %v", err) + } + hashes, err := client.GetTransactionsByBlock(ctx, latest) + if err != nil { + t.Fatalf("GetTransactionsByBlock failed: %v", err) + } + if len(hashes) == 0 { + t.Skip("no txs in latest block") + } + if len(hashes) > 5 { + hashes = hashes[:5] // limit to avoid quota + } + txs, err := client.FetchTransactionsParallel(ctx, hashes, 3) + if err != nil { + t.Fatalf("FetchTransactionsParallel failed: %v", err) + } + if len(txs) == 0 { + t.Fatal("expected some transactions") + } +} + diff --git a/internal/rpc/cardano/client.go b/internal/rpc/cardano/client.go new file mode 100644 index 0000000..b1f8c91 --- /dev/null +++ b/internal/rpc/cardano/client.go @@ -0,0 +1,378 @@ +package cardano + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "sync" + "time" + + "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/pkg/common/logger" + "github.com/fystack/multichain-indexer/pkg/ratelimiter" + "golang.org/x/sync/errgroup" +) + +const DefaultTxFetchConcurrency = 4 + +type CardanoClient struct { + *rpc.BaseClient +} + +// NewCardanoClient creates a new Cardano RPC client +// Uses Blockfrost API (https://blockfrost.io/) or compatible Cardano REST API +func NewCardanoClient( + baseURL string, + auth *rpc.AuthConfig, + timeout time.Duration, + rl *ratelimiter.PooledRateLimiter, +) *CardanoClient { + return &CardanoClient{ + BaseClient: rpc.NewBaseClient( + baseURL, + "cardano", + rpc.ClientTypeREST, + auth, + timeout, + rl, + ), + } +} + +// GetBlockHeaderByNumber fetches only block header by height +func (c *CardanoClient) GetBlockHeaderByNumber(ctx context.Context, blockNumber uint64) (*BlockResponse, error) { + endpoint := fmt.Sprintf("/blocks/%d", blockNumber) + data, err := c.Do(ctx, "GET", endpoint, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get block header %d: %w", blockNumber, err) + } + var br BlockResponse + if err := json.Unmarshal(data, &br); err != nil { + return nil, fmt.Errorf("failed to unmarshal block header: %w", err) + } + + return &br, nil +} + + +// GetLatestBlockNumber fetches the latest block number from Cardano +func (c *CardanoClient) GetLatestBlockNumber(ctx context.Context) (uint64, error) { + // Using Blockfrost API: GET /blocks/latest + data, err := c.Do(ctx, "GET", "/blocks/latest", nil, nil) + if err != nil { + return 0, fmt.Errorf("failed to get latest block: %w", err) + } + + var block BlockResponse + if err := json.Unmarshal(data, &block); err != nil { + return 0, fmt.Errorf("failed to unmarshal block response: %w", err) + } + + return block.Height, nil +} + +// GetBlockByNumber fetches a block by its height +func (c *CardanoClient) GetBlockByNumber(ctx context.Context, blockNumber uint64) (*Block, error) { + br, err := c.GetBlockHeaderByNumber(ctx, blockNumber) + if err != nil { + return nil, fmt.Errorf("failed to get block %d: %w", blockNumber, err) + } + + // Fetch transactions for this block + txHashes, err := c.GetTransactionsByBlock(ctx, blockNumber) + if err != nil { + logger.Warn("failed to fetch transactions for block", "block", blockNumber, "error", err) + txHashes = []string{} + } + + // Fetch transaction details (parallel-safe) + txs, _ := c.FetchTransactionsParallel(ctx, txHashes, DefaultTxFetchConcurrency) + + return &Block{ + Hash: br.Hash, + Height: br.Height, + Slot: br.Slot, + Time: br.Time, + ParentHash: br.ParentHash, + Txs: txs, + }, nil +} + +// GetBlockHash fetches the hash of a block by its height +func (c *CardanoClient) GetBlockHash(ctx context.Context, blockNumber uint64) (string, error) { + br, err := c.GetBlockHeaderByNumber(ctx, blockNumber) + if err != nil { + return "", fmt.Errorf("failed to get block hash: %w", err) + } + return br.Hash, nil +} + +// GetBlockByHash fetches a block by its hash +func (c *CardanoClient) GetBlockByHash(ctx context.Context, blockHash string) (*Block, error) { + endpoint := fmt.Sprintf("/blocks/%s", blockHash) + data, err := c.Do(ctx, "GET", endpoint, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get block by hash: %w", err) + } + + var blockResp BlockResponse + if err := json.Unmarshal(data, &blockResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal block response: %w", err) + } + + // Fetch transactions for this block + txHashes, err := c.GetTransactionsByBlock(ctx, blockResp.Height) + if err != nil { + logger.Warn("failed to fetch transactions for block", "block", blockResp.Height, "error", err) + txHashes = []string{} + } + + // Use parallel fetch with concurrency=1 to respect rate limits + // This is more efficient than sequential fetching and respects throttle settings + txs, _ := c.FetchTransactionsParallel(ctx, txHashes, 1) + + return &Block{ + Hash: blockResp.Hash, + Height: blockResp.Height, + Slot: blockResp.Slot, + Time: blockResp.Time, + ParentHash: blockResp.ParentHash, + Txs: txs, + }, nil +} + +// GetTransactionsByBlock fetches all transaction hashes in a block by block number +// Makes 2 API calls: GetBlockHash (to resolve hash) + GetTransactionsByBlockHash +func (c *CardanoClient) GetTransactionsByBlock(ctx context.Context, blockNumber uint64) ([]string, error) { + hash, err := c.GetBlockHash(ctx, blockNumber) + if err != nil { + return nil, fmt.Errorf("failed to resolve block hash: %w", err) + } + + // Delay between API calls to prevent burst rate limiting + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(100 * time.Millisecond): + } + + return c.GetTransactionsByBlockHash(ctx, hash) +} + +// GetTransactionsByBlockHash fetches all transaction hashes in a block by block hash +// Makes 1 API call: GET /blocks/{hash}/txs +func (c *CardanoClient) GetTransactionsByBlockHash(ctx context.Context, blockHash string) ([]string, error) { + endpoint := fmt.Sprintf("/blocks/%s/txs", blockHash) + data, err := c.Do(ctx, "GET", endpoint, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get transactions for block hash %s: %w", blockHash, err) + } + + var txHashes []string + if err := json.Unmarshal(data, &txHashes); err != nil { + return nil, fmt.Errorf("failed to unmarshal transactions response: %w", err) + } + + return txHashes, nil +} + +// GetTransaction fetches a transaction by its hash +// Makes 2 API calls: GET /txs/{hash} + GET /txs/{hash}/utxos +func (c *CardanoClient) GetTransaction(ctx context.Context, txHash string) (*Transaction, error) { + endpoint := fmt.Sprintf("/txs/%s", txHash) + data, err := c.Do(ctx, "GET", endpoint, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get transaction %s: %w", txHash, err) + } + + var txResp TransactionResponse + if err := json.Unmarshal(data, &txResp); err != nil { + return nil, fmt.Errorf("failed to unmarshal transaction response: %w", err) + } + + // Validate transaction is finalized (in a block) + if txResp.Height == 0 { + return nil, fmt.Errorf("transaction %s not finalized: block_height is 0", txHash) + } + + // Validate TTL (Time To Live) - validity interval + // Note: If transaction is in a block, TTL should already be valid (validated by ledger) + // This is defensive programming to catch any edge cases + if txResp.InvalidBefore != nil && *txResp.InvalidBefore != "" { + invalidBefore, err := strconv.ParseUint(*txResp.InvalidBefore, 10, 64) + if err != nil { + logger.Warn("Failed to parse invalid_before", + "tx_hash", txHash, + "invalid_before", *txResp.InvalidBefore) + } else if txResp.Slot < invalidBefore { + logger.Warn("Transaction slot before invalid_before (should not happen)", + "tx_hash", txHash, + "slot", txResp.Slot, + "invalid_before", invalidBefore) + } + } + if txResp.InvalidHereafter != nil && *txResp.InvalidHereafter != "" { + invalidHereafter, err := strconv.ParseUint(*txResp.InvalidHereafter, 10, 64) + if err != nil { + logger.Warn("Failed to parse invalid_hereafter", + "tx_hash", txHash, + "invalid_hereafter", *txResp.InvalidHereafter) + } else if txResp.Slot > invalidHereafter { + logger.Warn("Transaction slot after invalid_hereafter (should not happen)", + "tx_hash", txHash, + "slot", txResp.Slot, + "invalid_hereafter", invalidHereafter) + } + } + + // Validate fees + // Note: Failed smart contracts have fees = "0" but lose collateral instead + // Normal transactions always have fees > 0 + fees, err := strconv.ParseUint(txResp.Fees, 10, 64) + if err != nil { + logger.Warn("Failed to parse transaction fees", + "tx_hash", txHash, + "fees", txResp.Fees, + "error", err) + } + if fees == 0 { + logger.Debug("Transaction with zero fees (likely failed smart contract)", + "tx_hash", txHash, + "block_height", txResp.Height) + } + + // Delay between requests to prevent burst rate limiting (critical for Blockfrost free tier) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(150 * time.Millisecond): + } + + // Fetch UTXOs (inputs/outputs) + utxoEndpoint := fmt.Sprintf("/txs/%s/utxos", txHash) + utxoData, err := c.Do(ctx, "GET", utxoEndpoint, nil, nil) + if err != nil { + return nil, fmt.Errorf("failed to get transaction utxos %s: %w", txHash, err) + } + + var utxos TxUTxOsResponse + if err := json.Unmarshal(utxoData, &utxos); err != nil { + return nil, fmt.Errorf("failed to unmarshal tx utxos: %w", err) + } + + // Convert inputs (multi-asset) + inputs := make([]Input, 0, len(utxos.Inputs)) + for _, inp := range utxos.Inputs { + inputs = append(inputs, Input{ + Address: inp.Address, + Amounts: inp.Amount, + TxHash: inp.TxHash, + Index: inp.OutputIndex, + Collateral: inp.Collateral, + Reference: inp.Reference, + }) + } + + // Convert outputs (multi-asset) + outputs := make([]Output, 0, len(utxos.Outputs)) + for _, out := range utxos.Outputs { + outputs = append(outputs, Output{ + Address: out.Address, + Amounts: out.Amount, + Index: out.OutputIndex, + Collateral: out.Collateral, + }) + } + + fees, _ = strconv.ParseUint(txResp.Fees, 10, 64) + + return &Transaction{ + Hash: txResp.Hash, + Slot: txResp.Slot, + BlockNum: txResp.Height, + Inputs: inputs, + Outputs: outputs, + Fee: fees, + ValidContract: txResp.ValidContract, + }, nil +} + +// FetchTransactionsParallel fetches transactions concurrently with bounded concurrency +// Each transaction requires 2 API calls (tx info + utxos), so actual RPS = 2 × concurrency +func (c *CardanoClient) FetchTransactionsParallel( + ctx context.Context, + txHashes []string, + concurrency int, +) ([]Transaction, error) { + if concurrency <= 0 { + concurrency = DefaultTxFetchConcurrency + } + if len(txHashes) == 0 { + return nil, nil + } + + var ( + mu sync.Mutex + results = make([]Transaction, 0, len(txHashes)) + g, gctx = errgroup.WithContext(ctx) + sem = make(chan struct{}, concurrency) + ) + + for i, h := range txHashes { + h := h + idx := i + sem <- struct{}{} + g.Go(func() error { + defer func() { <-sem }() + + // Delay between batches to prevent burst rate limiting + if idx > 0 && idx%concurrency == 0 { + select { + case <-gctx.Done(): + return gctx.Err() + case <-time.After(200 * time.Millisecond): + } + } + + tx, err := c.GetTransaction(gctx, h) + if err != nil { + // Detect rate-limit style errors (Blockfrost cancels context on quota) + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "rate limit") || strings.Contains(msg, "too many requests") || + strings.Contains(msg, "429") || + (strings.Contains(msg, "http request failed") && strings.Contains(msg, "context canceled")) { + logger.Warn("Rate limit detected in parallel fetch", "tx_hash", h, "error", err) + return err + } + // If group context is already canceled due to prior error, suppress noise + if gctx.Err() != nil { + return nil + } + logger.Warn("parallel tx fetch failed", "tx_hash", h, "error", err) + return nil // continue other txs + } + if tx != nil { + mu.Lock() + results = append(results, *tx) + mu.Unlock() + } + return nil + }) + } + + err := g.Wait() + if err != nil { + // Propagate rate-limit style errors upward to trigger failover. + msg := strings.ToLower(err.Error()) + if strings.Contains(msg, "rate limit") || strings.Contains(msg, "too many requests") || + strings.Contains(msg, "429") || + (strings.Contains(msg, "http request failed") && strings.Contains(msg, "context canceled")) { + return nil, err + } + // Otherwise, keep partial results and continue. + logger.Warn("fetch transactions parallel completed with error", "error", err) + } + return results, nil +} \ No newline at end of file diff --git a/internal/rpc/cardano/types.go b/internal/rpc/cardano/types.go new file mode 100644 index 0000000..ddcf027 --- /dev/null +++ b/internal/rpc/cardano/types.go @@ -0,0 +1,87 @@ +package cardano + +// Block represents a Cardano block +type Block struct { + Hash string `json:"hash"` + Height uint64 `json:"height"` + Slot uint64 `json:"slot"` + Time uint64 `json:"time"` + ParentHash string `json:"previous_block"` + Txs []Transaction `json:"-"` +} + +// Transaction represents a Cardano transaction +type Transaction struct { + Hash string `json:"hash"` + Slot uint64 `json:"slot"` + BlockNum uint64 `json:"block_height"` + Inputs []Input + Outputs []Output + Fee uint64 `json:"fees"` + ValidContract *bool `json:"valid_contract"` +} + +// Input represents a transaction input +type Input struct { + Address string `json:"address"` + Amounts []Amount `json:"amounts"` + TxHash string `json:"tx_hash"` + Index uint32 `json:"output_index"` + Collateral bool `json:"collateral"` + Reference bool `json:"reference"` +} + +// Output represents a transaction output +type Output struct { + Address string `json:"address"` + Amounts []Amount `json:"amounts"` + Index uint32 `json:"output_index"` + Collateral bool `json:"collateral"` +} + +// BlockResponse is the response from block query +// Blockfrost API returns "previous_block" not "parent_hash" +type BlockResponse struct { + Hash string `json:"hash"` + Height uint64 `json:"height"` + Slot uint64 `json:"slot"` + Time uint64 `json:"time"` + ParentHash string `json:"previous_block"` // Blockfrost uses "previous_block" field name +} + +// TransactionResponse is the response from transaction query +type TransactionResponse struct { + Hash string `json:"hash"` + Fees string `json:"fees"` + Height uint64 `json:"block_height"` + Time uint64 `json:"block_time"` + Slot uint64 `json:"slot"` + ValidContract *bool `json:"valid_contract"` + InvalidBefore *string `json:"invalid_before"` // TTL lower bound (optional, string from API) + InvalidHereafter *string `json:"invalid_hereafter"` // TTL upper bound (optional, string from API) +} + +type Amount struct { + Unit string `json:"unit"` + Quantity string `json:"quantity"` +} + +type UTxO struct { + Address string `json:"address"` + Amount []Amount `json:"amount"` + TxHash string `json:"tx_hash"` + OutputIndex uint32 `json:"output_index"` + Collateral bool `json:"collateral"` + Reference bool `json:"reference"` +} + +type TxUTxOsResponse struct { + Hash string `json:"hash"` + Inputs []UTxO `json:"inputs"` + Outputs []UTxO `json:"outputs"` +} + +// BlockTxsResponse is the response for block transactions +type BlockTxsResponse struct { + Transactions []string `json:"transactions"` +} \ No newline at end of file diff --git a/internal/worker/base.go b/internal/worker/base.go index 79b7db2..d9e2fd2 100644 --- a/internal/worker/base.go +++ b/internal/worker/base.go @@ -162,6 +162,7 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { } addressType := bw.chain.GetNetworkType() + for _, tx := range block.Transactions { if bw.pubkeyStore.Exist(addressType, tx.ToAddress) { bw.logger.Info("Emitting matched transaction", @@ -169,7 +170,7 @@ func (bw *BaseWorker) emitBlock(block *types.Block) { "to", tx.ToAddress, "chain", bw.chain.GetName(), "addressType", addressType, - "txhash", tx.Hash, + "txhash", tx.TxHash, "tx", tx, ) _ = bw.emitter.EmitTransaction(bw.chain.GetName(), &tx) diff --git a/internal/worker/factory.go b/internal/worker/factory.go index f2edfe7..2f04d01 100644 --- a/internal/worker/factory.go +++ b/internal/worker/factory.go @@ -6,6 +6,7 @@ import ( "github.com/fystack/multichain-indexer/internal/indexer" "github.com/fystack/multichain-indexer/internal/rpc" + "github.com/fystack/multichain-indexer/internal/rpc/cardano" "github.com/fystack/multichain-indexer/internal/rpc/evm" "github.com/fystack/multichain-indexer/internal/rpc/tron" "github.com/fystack/multichain-indexer/pkg/addressbloomfilter" @@ -175,6 +176,40 @@ func buildTronIndexer(chainName string, chainCfg config.ChainConfig, mode Worker return indexer.NewTronIndexer(chainName, chainCfg, failover) } +// buildCardanoIndexer constructs a Cardano indexer with failover and providers. +func buildCardanoIndexer(chainName string, chainCfg config.ChainConfig, mode WorkerMode) indexer.Indexer { + failover := rpc.NewFailover[cardano.CardanoAPI](nil) + + // Shared rate limiter for all workers of this chain (global across regular, catchup, etc.) + rl := ratelimiter.GetOrCreateSharedPooledRateLimiter( + chainName, chainCfg.Throttle.RPS, chainCfg.Throttle.Burst, + ) + + for i, node := range chainCfg.Nodes { + client := cardano.NewCardanoClient( + node.URL, + &rpc.AuthConfig{ + Type: rpc.AuthType(node.Auth.Type), + Key: node.Auth.Key, + Value: node.Auth.Value, + }, + chainCfg.Client.Timeout, + rl, + ) + + failover.AddProvider(&rpc.Provider{ + Name: chainName + "-" + strconv.Itoa(i+1), + URL: node.URL, + Network: chainName, + ClientType: "rest", + Client: client, + State: rpc.StateHealthy, // Initialize as healthy + }) + } + + return indexer.NewCardanoIndexer(chainName, chainCfg, failover) +} + // CreateManagerWithWorkers initializes manager and all workers for configured chains. func CreateManagerWithWorkers( ctx context.Context, @@ -208,6 +243,8 @@ func CreateManagerWithWorkers( idxr = buildEVMIndexer(chainName, chainCfg, ModeRegular, pubkeyStore) case enum.NetworkTypeTron: idxr = buildTronIndexer(chainName, chainCfg, ModeRegular) + case enum.NetworkTypeCardano: + idxr = buildCardanoIndexer(chainName, chainCfg, ModeRegular) default: logger.Fatal("Unsupported network type", "chain", chainName, "type", chainCfg.Type) } diff --git a/internal/worker/regular.go b/internal/worker/regular.go index 8a62f0d..c7ff17d 100644 --- a/internal/worker/regular.go +++ b/internal/worker/regular.go @@ -256,7 +256,8 @@ func (rw *RegularWorker) detectAndHandleReorg(res *indexer.BlockResult) (bool, e } func (rw *RegularWorker) isReorgCheckRequired() bool { - return rw.chain.GetNetworkType() == enum.NetworkTypeEVM + networkType := rw.chain.GetNetworkType() + return networkType == enum.NetworkTypeEVM || networkType == enum.NetworkTypeCardano } // addBlockHash adds a block hash to the array, maintaining max size diff --git a/pkg/common/config/types.go b/pkg/common/config/types.go index 95de1a2..ed4cd72 100644 --- a/pkg/common/config/types.go +++ b/pkg/common/config/types.go @@ -37,7 +37,7 @@ type ChainConfig struct { Name string `yaml:"-"` NetworkId string `yaml:"network_id"` InternalCode string `yaml:"internal_code"` - Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm"` + Type enum.NetworkType `yaml:"type" validate:"required,oneof=tron evm cardano"` FromLatest bool `yaml:"from_latest"` StartBlock int `yaml:"start_block" validate:"min=0"` PollInterval time.Duration `yaml:"poll_interval"` diff --git a/pkg/common/enum/enum.go b/pkg/common/enum/enum.go index 7683c47..fc02e21 100644 --- a/pkg/common/enum/enum.go +++ b/pkg/common/enum/enum.go @@ -19,11 +19,12 @@ const ( ) const ( - NetworkTypeEVM NetworkType = "evm" - NetworkTypeTron NetworkType = "tron" - NetworkTypeBtc NetworkType = "btc" - NetworkTypeSol NetworkType = "sol" - NetworkTypeApt NetworkType = "apt" + NetworkTypeEVM NetworkType = "evm" + NetworkTypeTron NetworkType = "tron" + NetworkTypeBtc NetworkType = "btc" + NetworkTypeSol NetworkType = "sol" + NetworkTypeApt NetworkType = "apt" + NetworkTypeCardano NetworkType = "cardano" ) const (