Skip to content

Commit

Permalink
Potential specialized handling of error cases when firing a tx; this …
Browse files Browse the repository at this point in the history
…can happen during broadcast or retry.

Broadcast failures are handled differently than retry failures so added a new callback.
InsufficientFunds is raised separately - but not currently handed; punting this decision to a different issue/pr.
  • Loading branch information
derekpierre committed Mar 6, 2024
1 parent 28f67f4 commit a7eff18
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 31 deletions.
72 changes: 64 additions & 8 deletions atxm/machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,26 @@
from typing import List, Optional, Type

from eth_account.signers.local import LocalAccount
from eth_utils import ValidationError
from statemachine import State, StateMachine
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import (
ProviderConnectionError,
TimeExhausted,
TooManyRequests,
Web3Exception,
)
from web3.types import TxParams

from atxm.exceptions import TransactionFaulted, TransactionReverted
from atxm.exceptions import (
Fault,
InsufficientFunds,
TransactionFaulted,
TransactionReverted,
)
from atxm.strategies import (
AsyncTxStrategy,
ExponentialSpeedupStrategy,
Expand All @@ -26,7 +38,6 @@
_get_average_blocktime,
_get_confirmations,
_get_receipt,
_handle_rpc_error,
_make_tx_params,
fire_hook,
)
Expand Down Expand Up @@ -301,9 +312,18 @@ def __fire(self, tx: AsyncTx, msg: str) -> TxHash:
into the active transaction slot if broadcast is successful.
"""
signer: LocalAccount = self.__get_signer(tx.params["from"])
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
try:
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
except ValidationError as e:
# special case for insufficient funds
if "Sender does not have enough" in str(e):
# TODO raised exception should be handled in some way #13.
raise InsufficientFunds

raise e

self.log.info(
f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}"
)
Expand Down Expand Up @@ -338,7 +358,29 @@ def __strategize(self) -> None:
_names = " -> ".join(s.name for s in self._strategies)

# TODO try-except needed here (similar to broadcast) #14, #18, #20
txhash = self.__fire(tx=_active_copy, msg=_names)
try:
txhash = self.__fire(tx=_active_copy, msg=_names)
except (TooManyRequests, ProviderConnectionError, TimeExhausted) as e:
# recoverable
# TODO don't retry forever #12, #20
log.warn(
f"[retry] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} "
f"failed with updated params - {str(e)}; retry next round"
)
return
except (ValidationError, Web3Exception, ValueError) as e:
# non-recoverable
log.error(
f"[retry] transaction #atx-{_active_copy.id}|{_active_copy.params['nonce']} "
f"faulted with critical error - {str(e)}; tx will not be retried"
)
fault_error = TransactionFaulted(
tx=_active_copy,
fault=Fault.ERROR,
message=str(e),
)
self._tx_tracker.fault(fault_error=fault_error)
return

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)
Expand Down Expand Up @@ -368,11 +410,25 @@ def __broadcast(self):

try:
txhash = self.__fire(tx=future_tx, msg="broadcast")
except ValueError as e:
_handle_rpc_error(e, tx=future_tx)
except (TooManyRequests, ProviderConnectionError, TimeExhausted) as e:
# recoverable - try again another time
# TODO don't requeue forever #12, #20
log.warn(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed - {str(e)}; requeueing tx"
)
self._tx_tracker._requeue(future_tx)
return
except (ValidationError, Web3Exception, ValueError) as e:
# non-recoverable
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"has non-recoverable failure - {str(e)}; tx will not be requeued"
)
hook = future_tx.on_broadcast_failure
if hook:
fire_hook(hook=hook, tx=future_tx, error=e)
return

self._tx_tracker.morph(tx=future_tx, txhash=txhash)
pending_tx = self._tx_tracker.pending
Expand Down
2 changes: 2 additions & 0 deletions atxm/tracker.py
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,7 @@ def _queue(
params: TxParams,
info: Dict[str, str] = None,
on_broadcast: Optional[Callable[[PendingTx], None]] = None,
on_broadcast_failure: Optional[Callable[[FutureTx, Exception], None]] = None,
on_finalized: Optional[Callable[[FinalizedTx], None]] = None,
on_fault: Optional[Callable[[FaultedTx], None]] = None,
) -> FutureTx:
Expand All @@ -220,6 +221,7 @@ def _queue(

# configure hooks
tx.on_broadcast = on_broadcast
tx.on_broadcast_failure = on_broadcast_failure
tx.on_finalized = on_finalized
tx.on_fault = on_fault

Expand Down
7 changes: 5 additions & 2 deletions atxm/tx.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from eth_typing import ChecksumAddress
from eth_utils import encode_hex
from hexbytes import HexBytes
from web3.types import PendingTx, TxData, TxParams, TxReceipt
from web3.types import TxData, TxParams, TxReceipt

from atxm.exceptions import Fault

Expand All @@ -18,7 +18,10 @@ class AsyncTx(ABC):
params: TxParams
final: bool = field(default=None, init=False)
fault: Optional[Fault] = field(default=None, init=False)
on_broadcast: Optional[Callable[[PendingTx], None]] = field(
on_broadcast: Optional[Callable[["PendingTx"], None]] = field(
default=None, init=False
)
on_broadcast_failure: Optional[Callable[["FutureTx", Exception], None]] = field(
default=None, init=False
)
on_finalized: Optional[Callable[["FinalizedTx"], None]] = field(
Expand Down
22 changes: 1 addition & 21 deletions atxm/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@
from web3 import Web3
from web3.exceptions import TransactionNotFound
from web3.types import TxData, TxParams
from web3.types import RPCError, TxReceipt, Wei
from web3.types import TxReceipt, Wei

from atxm.exceptions import (
InsufficientFunds,
TransactionReverted,
)
from atxm.logging import log
Expand Down Expand Up @@ -116,25 +115,6 @@ def _hook() -> None:
log.info(f"[hook] fired hook {hook} for transaction #atx-{tx.id}")


def _handle_rpc_error(e: Exception, tx: AsyncTx) -> None:
try:
error = RPCError(**e.args[0])
except TypeError:
log.critical(
f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {e}"
)
else:
log.critical(
f"[error] transaction #atx-{tx.id}|{tx.params['nonce']} failed with {error['code']} | {error['message']}"
)
if error["code"] == -32000:
if "insufficient funds" in error["message"]:
raise InsufficientFunds
hook = tx.on_fault
if hook:
fire_hook(hook=hook, tx=tx, error=e)


def _make_tx_params(data: TxData) -> TxParams:
"""
TxData -> TxParams: Creates a transaction parameters
Expand Down

0 comments on commit a7eff18

Please sign in to comment.