diff --git a/atxm/machine.py b/atxm/machine.py index 63191ae..19f4fff 100644 --- a/atxm/machine.py +++ b/atxm/machine.py @@ -2,14 +2,26 @@ from typing import List, Optional, Type from eth_account.signers.local import LocalAccount +from eth_utils import ValidationError from statemachine import State, StateMachine from twisted.internet import reactor from twisted.internet.defer import Deferred from twisted.internet.task import LoopingCall from web3 import Web3 +from web3.exceptions import ( + ProviderConnectionError, + TimeExhausted, + TooManyRequests, + Web3Exception, +) from web3.types import TxParams -from atxm.exceptions import TransactionFaulted, TransactionReverted +from atxm.exceptions import ( + Fault, + InsufficientFunds, + TransactionFaulted, + TransactionReverted, +) from atxm.strategies import ( AsyncTxStrategy, ExponentialSpeedupStrategy, @@ -26,7 +38,6 @@ _get_average_blocktime, _get_confirmations, _get_receipt, - _handle_rpc_error, _make_tx_params, fire_hook, ) @@ -301,9 +312,18 @@ def __fire(self, tx: AsyncTx, msg: str) -> TxHash: into the active transaction slot if broadcast is successful. """ signer: LocalAccount = self.__get_signer(tx.params["from"]) - txhash = self.w3.eth.send_raw_transaction( - signer.sign_transaction(tx.params).rawTransaction - ) + try: + txhash = self.w3.eth.send_raw_transaction( + signer.sign_transaction(tx.params).rawTransaction + ) + except ValidationError as e: + # special case for insufficient funds + if "Sender does not have enough" in str(e): + # TODO raised exception should be handled in some way #13. + raise InsufficientFunds + + raise e + self.log.info( f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}" ) @@ -338,7 +358,29 @@ def __strategize(self) -> None: _names = " -> ".join(s.name for s in self._strategies) # TODO try-except needed here (similar to broadcast) #14, #18, #20 - txhash = self.__fire(tx=_active_copy, msg=_names) + try: + txhash = self.__fire(tx=_active_copy, msg=_names) + except (TooManyRequests, ProviderConnectionError, TimeExhausted) as e: + # recoverable + # TODO don't retry forever #12, #20 + log.warn( + f"[retry] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} " + f"failed with updated params - {str(e)}; retry next round" + ) + return + except (ValidationError, Web3Exception, ValueError) as e: + # non-recoverable + log.error( + f"[retry] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} " + f"faulted with critical error - {str(e)}; tx will not be retried" + ) + fault_error = TransactionFaulted( + tx=_active_copy, + fault=Fault.ERROR, + message=str(e), + ) + self._tx_tracker.fault(fault_error=fault_error) + return _active_copy.txhash = txhash self._tx_tracker.update_after_retry(_active_copy) @@ -368,11 +410,25 @@ def __broadcast(self): try: txhash = self.__fire(tx=future_tx, msg="broadcast") - except ValueError as e: - _handle_rpc_error(e, tx=future_tx) + except (TooManyRequests, ProviderConnectionError, TimeExhausted) as e: + # recoverable - try again another time # TODO don't requeue forever #12, #20 + log.warn( + f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"failed - {str(e)}; requeueing tx" + ) self._tx_tracker._requeue(future_tx) return + except (ValidationError, Web3Exception, ValueError) as e: + # non-recoverable + log.error( + f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} " + f"has non-recoverable failure - {str(e)}; tx will not be requeued" + ) + hook = future_tx.on_broadcast_failure + if hook: + fire_hook(hook=hook, tx=future_tx, error=e) + return self._tx_tracker.morph(tx=future_tx, txhash=txhash) pending_tx = self._tx_tracker.pending diff --git a/atxm/tracker.py b/atxm/tracker.py index 93496d6..9a989ac 100644 --- a/atxm/tracker.py +++ b/atxm/tracker.py @@ -208,6 +208,7 @@ def _queue( params: TxParams, info: Dict[str, str] = None, on_broadcast: Optional[Callable[[PendingTx], None]] = None, + on_broadcast_failure: Optional[Callable[[FutureTx, Exception], None]] = None, on_finalized: Optional[Callable[[FinalizedTx], None]] = None, on_fault: Optional[Callable[[FaultedTx], None]] = None, ) -> FutureTx: @@ -220,6 +221,7 @@ def _queue( # configure hooks tx.on_broadcast = on_broadcast + tx.on_broadcast_failure = on_broadcast_failure tx.on_finalized = on_finalized tx.on_fault = on_fault diff --git a/atxm/tx.py b/atxm/tx.py index 573d2f5..0aa1779 100644 --- a/atxm/tx.py +++ b/atxm/tx.py @@ -5,7 +5,7 @@ from eth_typing import ChecksumAddress from eth_utils import encode_hex from hexbytes import HexBytes -from web3.types import PendingTx, TxData, TxParams, TxReceipt +from web3.types import TxData, TxParams, TxReceipt from atxm.exceptions import Fault @@ -18,7 +18,10 @@ class AsyncTx(ABC): params: TxParams final: bool = field(default=None, init=False) fault: Optional[Fault] = field(default=None, init=False) - on_broadcast: Optional[Callable[[PendingTx], None]] = field( + on_broadcast: Optional[Callable[["PendingTx"], None]] = field( + default=None, init=False + ) + on_broadcast_failure: Optional[Callable[["FutureTx", Exception], None]] = field( default=None, init=False ) on_finalized: Optional[Callable[["FinalizedTx"], None]] = field( diff --git a/atxm/utils.py b/atxm/utils.py index 825e459..8ba1540 100644 --- a/atxm/utils.py +++ b/atxm/utils.py @@ -6,10 +6,9 @@ from web3 import Web3 from web3.exceptions import TransactionNotFound from web3.types import TxData, TxParams -from web3.types import RPCError, TxReceipt, Wei +from web3.types import TxReceipt, Wei from atxm.exceptions import ( - InsufficientFunds, TransactionReverted, ) from atxm.logging import log @@ -116,25 +115,6 @@ def _hook() -> None: log.info(f"[hook] fired hook {hook} for transaction #atx-{tx.id}") -def _handle_rpc_error(e: Exception, tx: AsyncTx) -> None: - try: - error = RPCError(**e.args[0]) - except TypeError: - log.critical( - f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {e}" - ) - else: - log.critical( - f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {error['code']} | {error['message']}" - ) - if error["code"] == -32000: - if "insufficient funds" in error["message"]: - raise InsufficientFunds - hook = tx.on_fault - if hook: - fire_hook(hook=hook, tx=tx, error=e) - - def _make_tx_params(data: TxData) -> TxParams: """ TxData -> TxParams: Creates a transaction parameters