From b938b0146020e507089631c70192304d10d7e998 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 11:26:57 -0300 Subject: [PATCH 1/6] progress --- core/retry.go | 1 + core/utils/eth_client_utils.go | 1 + 2 files changed, 2 insertions(+) diff --git a/core/retry.go b/core/retry.go index fb39e8d8dc..cfbe5b034d 100644 --- a/core/retry.go +++ b/core/retry.go @@ -159,6 +159,7 @@ request retry_interval (12 sec) randomized_interval (0.5) randomized_int Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ +// TODO: Make config optional by using default but passing nil. // Same as Retry only that the functionToRetry can return a value upon correct execution func RetryWithData[T any](functionToRetry func() (T, error), config *RetryParams) (T, error) { f := func() (T, error) { diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index b7bc5c66f1..5d087c467b 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -86,6 +86,7 @@ func CalculateGasPriceBumpBasedOnRetry(currentGasPrice *big.Int, baseBumpPercent return bumpedGasPrice } +//TODO: move to retryable function file /* GetGasPriceRetryable Get the gas price from the client with retry logic. From ae215eaa852b0be225b171b8df82bf18f1ed50b8 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 14:11:57 -0300 Subject: [PATCH 2/6] separate retryable logic from functions --- core/chainio/retryable.go | 23 ++++++++++++++++------- 1 file changed, 16 insertions(+), 7 deletions(-) diff --git a/core/chainio/retryable.go b/core/chainio/retryable.go index bf4724e2f2..769600f820 100644 --- a/core/chainio/retryable.go +++ b/core/chainio/retryable.go @@ -37,17 +37,17 @@ func (w *AvsWriter) RespondToTaskV2Retryable(opts *bind.TransactOpts, batchMerkl } /* -BatchesStateRetryable -Get the state of a batch from the AVS contract. +RespondToTaskV2Retryable +Send a transaction to the AVS contract to respond to a task. - All errors are considered Transient Errors -- Retry times (3 retries): 1 sec, 2 sec, 4 sec +- Retry times (3 retries): 12 sec (1 Blocks), 24 sec (2 Blocks), 48 sec (4 Blocks) +- NOTE: Contract call reverts are not considered `PermanentError`'s as block reorg's may lead to contract call revert in which case the aggregator should retry. */ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, config *retry.RetryParams) (struct { TaskCreatedBlock uint32 Responded bool RespondToTaskFeeLimit *big.Int }, error) { - batchesState_func := func() (struct { TaskCreatedBlock uint32 Responded bool @@ -65,8 +65,8 @@ func (w *AvsWriter) BatchesStateRetryable(opts *bind.CallOpts, arg0 [32]byte, co } /* -BatcherBalancesRetryable -Get the balance of a batcher from the AVS contract. +BatchesStateRetryable +Get the state of a batch from the AVS contract. - All errors are considered Transient Errors - Retry times (3 retries): 1 sec, 2 sec, 4 sec */ @@ -205,8 +205,17 @@ func SubscribeToNewTasksV2Retryable( batchMerkleRoot [][32]byte, config *retry.RetryParams, ) (event.Subscription, error) { + return retry.RetryWithData(SubscribeToNewTasksV2(opts, serviceManager, newTaskCreatedChan, batchMerkleRoot), retry.DefaultRetryConfig()) +} + +func SubscribeToNewTasksV3( + opts *bind.WatchOpts, + serviceManager *servicemanager.ContractAlignedLayerServiceManager, + newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, + batchMerkleRoot [][32]byte, +) func() (event.Subscription, error) { subscribe_func := func() (event.Subscription, error) { - return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot) + return serviceManager.WatchNewBatchV3(opts, newTaskCreatedChan, batchMerkleRoot) } return retry.RetryWithData(subscribe_func, config) } From 1ecbb9c225d4432afb155b77104ecffb1a15d6eb Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 15:08:11 -0300 Subject: [PATCH 3/6] refactor tests --- core/utils/eth_client_utils.go | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/core/utils/eth_client_utils.go b/core/utils/eth_client_utils.go index 5d087c467b..ee943487ff 100644 --- a/core/utils/eth_client_utils.go +++ b/core/utils/eth_client_utils.go @@ -31,7 +31,19 @@ func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackC } return receipt, nil } - return retry.RetryWithData(receipt_func, config) + return receipt_func +} + +// WaitForTransactionReceiptRetryable repeatedly attempts to fetch the transaction receipt for a given transaction hash. +// If the receipt is not found, the function will retry with exponential backoff until the specified `waitTimeout` duration is reached. +// If the receipt is still unavailable after `waitTimeout`, it will return an error. +// +// Note: The `time.Second * 2` is set as the max interval in the retry mechanism because we can't reliably measure the specific time the tx will be included in a block. +// Setting a higher value will imply doing less retries across the waitTimeout, and so we might lose the receipt +// All errors are considered Transient Errors +// - Retry times: 0.5s, 1s, 2s, 2s, 2s, ... until it reaches waitTimeout +func WaitForTransactionReceiptRetryable(client eth.InstrumentedClient, fallbackClient eth.InstrumentedClient, txHash gethcommon.Hash, config *retry.RetryConfig) (*types.Receipt, error) { + return retry.RetryWithData(WaitForTransactionReceipt(client, fallbackClient, txHash, config), config) } func BytesToQuorumNumbers(quorumNumbersBytes []byte) eigentypes.QuorumNums { From f667829ce0b2c2c66832e442fef1ba3b112bfc53 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 14 Nov 2024 15:18:31 -0300 Subject: [PATCH 4/6] rm cmts --- core/retry.go | 1 - 1 file changed, 1 deletion(-) diff --git a/core/retry.go b/core/retry.go index cfbe5b034d..fb39e8d8dc 100644 --- a/core/retry.go +++ b/core/retry.go @@ -159,7 +159,6 @@ request retry_interval (12 sec) randomized_interval (0.5) randomized_int Reference: https://github.com/cenkalti/backoff/blob/v4/exponential.go#L9 */ -// TODO: Make config optional by using default but passing nil. // Same as Retry only that the functionToRetry can return a value upon correct execution func RetryWithData[T any](functionToRetry func() (T, error), config *RetryParams) (T, error) { f := func() (T, error) { From 83773b2a2c478acfc2bac2a9e6b7a88b5756bba4 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Fri, 15 Nov 2024 17:09:42 -0300 Subject: [PATCH 5/6] refactor retry logic for SendSignedTaskResponseToAggregator + Local testnet works w/ telemetry --- config-files/config-operator-test.yaml | 41 ++++++++++++++++++++++++++ operator/pkg/operator.go | 4 +-- operator/pkg/rpc_client.go | 38 ++++++++++-------------- 3 files changed, 59 insertions(+), 24 deletions(-) create mode 100644 config-files/config-operator-test.yaml diff --git a/config-files/config-operator-test.yaml b/config-files/config-operator-test.yaml new file mode 100644 index 0000000000..8f50e9621f --- /dev/null +++ b/config-files/config-operator-test.yaml @@ -0,0 +1,41 @@ +# Common variables for all the services +# 'production' only prints info and above. 'development' also prints debug +environment: 'development' +aligned_layer_deployment_config_file_path: '../contracts/script/output/devnet/alignedlayer_deployment_output.json' +eigen_layer_deployment_config_file_path: '../contracts/script/output/devnet/eigenlayer_deployment_output.json' +eth_rpc_url: 'http://localhost:8545' +eth_rpc_url_fallback: 'http://localhost:8545' +eth_ws_url: 'ws://localhost:8545' +eth_ws_url_fallback: 'ws://localhost:8545' +eigen_metrics_ip_port_address: 'localhost:9090' + +## ECDSA Configurations +ecdsa: + private_key_store_path: '../config-files/devnet/keys/operator-1.ecdsa.key.json' + private_key_store_password: '' + +## BLS Configurations +bls: + private_key_store_path: '../config-files/devnet/keys/operator-1.bls.key.json' + private_key_store_password: '' + +## Operator Configurations +operator: + aggregator_rpc_server_ip_port_address: localhost:8090 + operator_tracker_ip_port_address: http://localhost:4001 + address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 + earnings_receiver_address: 0x70997970C51812dc3A010C7d01b50e0d17dc79C8 + delegation_approver_address: '0x0000000000000000000000000000000000000000' + staker_opt_out_window_blocks: 0 + metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json' + enable_metrics: true + metrics_ip_port_address: localhost:9092 + max_batch_size: 268435456 # 256 MiB + last_processed_batch_filepath: '../config-files/operator-1.last_processed_batch.json' + +# Operators variables needed for register it in EigenLayer +el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9' +private_key_store_path: config-files/devnet/keys/operator-1.ecdsa.key.json +bls_private_key_store_path: config-files/devnet/keys/operator-1.bls.key.json +signer_type: local_keystore +chain_id: 31337 diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index d4c36355cb..9f3ca7e965 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -334,7 +334,7 @@ func (o *Operator) handleNewBatchLogV2(newBatchLog *servicemanager.ContractAlign hex.EncodeToString(signedTaskResponse.SenderAddress[:]), ) - o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse) + o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) } func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) error { @@ -415,7 +415,7 @@ func (o *Operator) handleNewBatchLogV3(newBatchLog *servicemanager.ContractAlign hex.EncodeToString(signedTaskResponse.SenderAddress[:]), ) - o.aggRpcClient.SendSignedTaskResponseToAggregator(&signedTaskResponse) + o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) } func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) error { diff --git a/operator/pkg/rpc_client.go b/operator/pkg/rpc_client.go index 5726f14dfb..68390ab600 100644 --- a/operator/pkg/rpc_client.go +++ b/operator/pkg/rpc_client.go @@ -6,6 +6,7 @@ import ( "time" "github.com/Layr-Labs/eigensdk-go/logging" + retry "github.com/yetanotherco/aligned_layer/core" "github.com/yetanotherco/aligned_layer/core/types" ) @@ -16,11 +17,6 @@ type AggregatorRpcClient struct { logger logging.Logger } -const ( - MaxRetries = 10 - RetryInterval = 10 * time.Second -) - func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) (*AggregatorRpcClient, error) { client, err := rpc.DialHTTP("tcp", aggregatorIpPortAddr) if err != nil { @@ -34,31 +30,29 @@ func NewAggregatorRpcClient(aggregatorIpPortAddr string, logger logging.Logger) }, nil } -// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send -// their signed task response. -func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregator(signedTaskResponse *types.SignedTaskResponse) { - var reply uint8 - for retries := 0; retries < MaxRetries; retries++ { +func SendSignedTaskResponse(c *AggregatorRpcClient, signedTaskResponse *types.SignedTaskResponse) func() (uint8, error) { + send_task_func := func() (uint8, error) { + var reply uint8 err := c.rpcClient.Call("Aggregator.ProcessOperatorSignedTaskResponseV2", signedTaskResponse, &reply) if err != nil { c.logger.Error("Received error from aggregator", "err", err) if errors.Is(err, rpc.ErrShutdown) { c.logger.Error("Aggregator is shutdown. Reconnecting...") - client, err := rpc.DialHTTP("tcp", c.aggregatorIpPortAddr) - if err != nil { - c.logger.Error("Could not reconnect to aggregator", "err", err) - time.Sleep(RetryInterval) - } else { - c.rpcClient = client - c.logger.Info("Reconnected to aggregator") - } - } else { - c.logger.Infof("Received error from aggregator: %s. Retrying ProcessOperatorSignedTaskResponseV2 RPC call...", err) - time.Sleep(RetryInterval) } } else { c.logger.Info("Signed task response header accepted by aggregator.", "reply", reply) - return } + return reply, err } + return send_task_func +} + +// SendSignedTaskResponseToAggregator is the method called by operators via RPC to send +// their signed task response. +func (c *AggregatorRpcClient) SendSignedTaskResponseToAggregatorRetryable(signedTaskResponse *types.SignedTaskResponse) (uint8, error) { + config := retry.DefaultRetryConfig() + config.NumRetries = 10 + config.Multiplier = 1 // Constant retry interval + config.InitialInterval = 10 * time.Second + return retry.RetryWithData(SendSignedTaskResponse(c, signedTaskResponse), config) } From 9bd182df7f3607e1c665d7a8a9346caab7912138 Mon Sep 17 00:00:00 2001 From: PatStiles Date: Thu, 21 Nov 2024 10:51:47 -0300 Subject: [PATCH 6/6] lint --- operator/pkg/operator.go | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/operator/pkg/operator.go b/operator/pkg/operator.go index 9f3ca7e965..1e962a4e3d 100644 --- a/operator/pkg/operator.go +++ b/operator/pkg/operator.go @@ -334,7 +334,10 @@ func (o *Operator) handleNewBatchLogV2(newBatchLog *servicemanager.ContractAlign hex.EncodeToString(signedTaskResponse.SenderAddress[:]), ) - o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) + _, err = o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) + if err != nil { + o.Logger.Infof("Failed to send signed task response %x to Aggregator. Err: %v", signedTaskResponse.BatchMerkleRoot, err) + } } func (o *Operator) ProcessNewBatchLogV2(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) error { @@ -415,7 +418,10 @@ func (o *Operator) handleNewBatchLogV3(newBatchLog *servicemanager.ContractAlign hex.EncodeToString(signedTaskResponse.SenderAddress[:]), ) - o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) + _, err = o.aggRpcClient.SendSignedTaskResponseToAggregatorRetryable(&signedTaskResponse) + if err != nil { + o.Logger.Infof("Failed to send signed task response %x to Aggregator. Err: %v", signedTaskResponse.BatchMerkleRoot, err) + } } func (o *Operator) ProcessNewBatchLogV3(newBatchLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) error {