Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional testing for tracker and tx modules - mostly around serialization/deserialization #30

Merged
merged 70 commits into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
70 commits
Select commit Hold shift + click to select a range
73e3e0e
AsyncTxStrategy should do one of the following during execute: raise …
derekpierre Mar 1, 2024
013279c
Adjust strategies to accomodate latest paradigm for execute i.e. retu…
derekpierre Mar 1, 2024
9c2ec6e
Adjust existing strategy tests since None can now be returned from ex…
derekpierre Mar 1, 2024
39b4b36
Update timeout strategy logging.
derekpierre Mar 3, 2024
cf88810
Update speedup strategy calcs to use math.ceil instead of round
derekpierre Mar 3, 2024
1c01980
Move from a fixed max tip to a max tip factor based on the current su…
derekpierre Mar 4, 2024
352f74a
Simplify legacy transaction speedup logic.
derekpierre Mar 4, 2024
0be4f52
Add tests for constructor parameters and legacy tx speed up functiona…
derekpierre Mar 4, 2024
c023dea
Update logging of gas conditions.
derekpierre Mar 4, 2024
979f3f9
Better annotations about what the strategy is doing.
derekpierre Mar 4, 2024
9589151
Don't modify tx nonce as part of strategy.
derekpierre Mar 4, 2024
b66f5af
Fix outdated comment about value of warn factor.
derekpierre Mar 4, 2024
bf13194
Clean up logging logic since old_tip, old_max_fee may or may not be p…
derekpierre Mar 4, 2024
f7bd316
Add tests for eip1559 transaction speedup.
derekpierre Mar 4, 2024
699d38f
Rename speed up strategy; while the rate value is fixed the updates a…
derekpierre Mar 4, 2024
3755667
Use constant instead of "maxPriorityFeePerGas".
derekpierre Mar 4, 2024
16c2ed3
Use constant for minimum required speedup increase percentage.
derekpierre Mar 4, 2024
1c67055
Use a max between a low default value and warn factor calc in case ti…
derekpierre Mar 4, 2024
f8cf36b
Add todo about best way of setting a cap on the speedup strategy.
derekpierre Mar 4, 2024
1f1fb28
Use reactor to determine when hook is called instead of repeatedly ca…
derekpierre Mar 4, 2024
abb8555
Add TODO to determine whether strategies can be overriden.
derekpierre Mar 4, 2024
5491a6e
Update logging category for monitor when no longer tracking tx.
derekpierre Mar 5, 2024
2a9c0b5
Use enable/disabling of auto mining for better testing.
derekpierre Mar 5, 2024
436ce64
Fix nonce logging message for strategies.
derekpierre Mar 5, 2024
c7851ae
Add the ability to update the tracker's active pending tx after a retry.
derekpierre Mar 5, 2024
7e647c0
Ensure that "from" value in tx params matches/equals signer.
derekpierre Mar 5, 2024
283ba70
Make __fire reusable by both broadcast and strategize - it is now lim…
derekpierre Mar 5, 2024
bd27cbf
Add tests to ensure that strategies are employed when a tx hasn't alr…
derekpierre Mar 5, 2024
25ad0b5
Add testing for "from" tx parameter handling when a tx is being queued.
derekpierre Mar 6, 2024
adac77d
Add test for strategies which do not make updates to parameters.
derekpierre Mar 6, 2024
dc6cda3
Add testing statements to ensure that the broadcast hook is also call…
derekpierre Mar 6, 2024
28f67f4
txhash is not optionally returned from __fire - either it is or an ex…
derekpierre Mar 6, 2024
a7eff18
Potential specialized handling of error cases when firing a tx; this …
derekpierre Mar 6, 2024
9ea695c
Commonize check for recoverable type of error after calling sendRawTr…
derekpierre Mar 6, 2024
f42264d
Insufficient funds is really a special case and its own exception, no…
derekpierre Mar 6, 2024
9013c0d
Fix firing of hook with only ordered args.
derekpierre Mar 6, 2024
f5ecc5a
Add test for unrecoverable errors encoutered when trying to broadcast…
derekpierre Mar 6, 2024
4905e07
Add test for recoverable errors encoutered when trying to broadcast a…
derekpierre Mar 7, 2024
d695078
Add test for unrecoverable errors encoutered when trying to retry a tx.
derekpierre Mar 7, 2024
e000b64
Retry failures should be treated differently than broadcast failures.…
derekpierre Mar 7, 2024
b744a93
Make methods on TxTracker public instead of protected.
derekpierre Mar 7, 2024
431462e
Use a counter for tracking number of requeue attempts for failed txs …
derekpierre Mar 7, 2024
290fd65
Use a counter for tracking number of failed retry attempts when using…
derekpierre Mar 7, 2024
5c42f80
Update existing tests given limits on requeue/retry (redo) attempts.
derekpierre Mar 7, 2024
189b2dd
Add test for exceeding retries when encountering a broadcast error.
derekpierre Mar 7, 2024
f4670ac
Add additional assertions for the requeue counter in existing tests.
derekpierre Mar 7, 2024
c370dea
Existing retry test cleanup.
derekpierre Mar 7, 2024
f59aa74
Add test for exceeding retries when encountering retry errors.
derekpierre Mar 7, 2024
86734c3
Ensure that retry_failure_counter is cleared for all exit cases fault…
derekpierre Mar 7, 2024
190ed94
Rename constant to _MAX_REDO_ATTEMPTS.
derekpierre Mar 7, 2024
f820474
Add TODO.
derekpierre Mar 7, 2024
a2ea01c
Track num requeues and num retries directly on FutureTx and PendingTx…
derekpierre Mar 7, 2024
6f0e150
Strategies must be explicitly provided by the user, if not, no strate…
derekpierre Mar 7, 2024
008c308
There will always be one TimeoutStrategy used by the machine. The tim…
derekpierre Mar 7, 2024
b76111f
Update error messages when non-active tx provided for retry update/fa…
derekpierre Mar 8, 2024
3f52121
Get hook after ensuring active tx is set.
derekpierre Mar 8, 2024
7d5b060
Inital tests for tracker.
derekpierre Mar 8, 2024
b017820
Set retries value of copy instead of incrementing it - just safer.
derekpierre Mar 8, 2024
153161f
Rename handle retry failure parameter for clarity.
derekpierre Mar 8, 2024
e8e704c
Ensure that hook is given time to be called by using a deferred with …
derekpierre Mar 8, 2024
f36fb9d
Don't forget to pop the tx before morphing in tests.
derekpierre Mar 8, 2024
018b0e2
Remove the storage of txdata from PendingTx; it isn't used at all.
derekpierre Mar 8, 2024
a0f2c74
Fix bug in restoring queue values for _TxTracker.
derekpierre Mar 8, 2024
1914e6b
Move params out of AsyncTx and into FutureTx and PendingTx, the other…
derekpierre Mar 8, 2024
7185531
TxParams which has Union data types can take different forms so to en…
derekpierre Mar 8, 2024
faa51ed
Fix tests that used hook to wait using reactor, and enable inlineCall…
derekpierre Mar 8, 2024
084e1e2
Add tests for commit/restore functionality for _TxTracker.
derekpierre Mar 8, 2024
2b52a48
Ensure that hash comparisons of txs are also made as part of tests.
derekpierre Mar 8, 2024
e1f312e
Add serialization/deserialization test for FaultedTx. May/may not rea…
derekpierre Mar 8, 2024
3245c0f
Add test about trying to perform finalize when there isn't an existin…
derekpierre Mar 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 2 additions & 12 deletions atxm/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from enum import Enum

from web3.types import PendingTx, RPCError, TxReceipt
from web3.types import PendingTx, TxReceipt


class Fault(Enum):
Expand All @@ -13,9 +13,6 @@ class Fault(Enum):
# Strategy has been running for too long
TIMEOUT = "timeout"

# Transaction has been capped and subsequently timed out
PAUSE = "pause"

# Transaction reverted
REVERT = "revert"

Expand All @@ -26,17 +23,10 @@ class Fault(Enum):
INSUFFICIENT_FUNDS = "insufficient_funds"


class InsufficientFunds(RPCError):
class InsufficientFunds(Exception):
"""raised when a transaction exceeds the spending cap"""


class Wait(Exception):
"""
Raised when a strategy exceeds a limitation.
Used to mark a pending transaction as "wait, don't retry".
"""


class TransactionFaulted(Exception):
"""Raised when a transaction has been faulted."""

Expand Down
193 changes: 144 additions & 49 deletions atxm/machine.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,26 @@
from copy import deepcopy
from typing import List, Optional, Type
from copy import copy, deepcopy
from typing import List, Optional

from eth_account.signers.local import LocalAccount
from eth_utils import ValidationError
from statemachine import State, StateMachine
from twisted.internet import reactor
from twisted.internet.defer import Deferred
from twisted.internet.task import LoopingCall
from web3 import Web3
from web3.exceptions import Web3Exception
from web3.types import TxParams

from atxm.exceptions import TransactionFaulted, TransactionReverted, Wait
from atxm.strategies import (
AsyncTxStrategy,
FixedRateSpeedUp,
InsufficientFundsPause,
TimeoutStrategy,
from atxm.exceptions import (
Fault,
InsufficientFunds,
TransactionFaulted,
TransactionReverted,
)
from atxm.strategies import AsyncTxStrategy, TimeoutStrategy
from atxm.tracker import _TxTracker
from atxm.tx import (
AsyncTx,
FutureTx,
PendingTx,
TxHash,
Expand All @@ -26,7 +29,7 @@
_get_average_blocktime,
_get_confirmations,
_get_receipt,
_handle_rpc_error,
_is_recoverable_send_tx_error,
_make_tx_params,
fire_hook,
)
Expand Down Expand Up @@ -89,11 +92,8 @@ class _Machine(StateMachine):
_BLOCK_INTERVAL = 20 # ~20 blocks
_BLOCK_SAMPLE_SIZE = 10_000 # blocks

STRATEGIES: List[Type[AsyncTxStrategy]] = [
InsufficientFundsPause,
TimeoutStrategy,
FixedRateSpeedUp,
]
# max requeues/retries
_MAX_REDO_ATTEMPTS = 3

class LogObserver:
"""StateMachine observer for logging information about state/transitions."""
Expand All @@ -108,14 +108,16 @@ def on_transition(self, source, target):
def __init__(
self,
w3: Web3,
tx_exec_timeout: int = TimeoutStrategy.TIMEOUT,
strategies: Optional[List[AsyncTxStrategy]] = None,
disk_cache: bool = False,
):
# public
self.w3 = w3
self.signers = {}
self.log = log
self._strategies = [s(w3) for s in self.STRATEGIES]
# default TimeoutStrategy using provided timeout - guardrail for users
self._strategies = [TimeoutStrategy(w3, timeout=tx_exec_timeout)]
if strategies:
self._strategies.extend(list(strategies))

Expand Down Expand Up @@ -240,7 +242,7 @@ def _sleep(self) -> None:
# Lifecycle
#

def __handle_active_transaction(self) -> bool:
def __handle_active_transaction(self) -> None:
"""
Handles the currently tracked pending transaction.

Expand All @@ -249,7 +251,7 @@ def __handle_active_transaction(self) -> bool:
1. paused
2. reverted (fault)
3. finalized
4. strategize: retry, wait, or fault
4. strategize: retry, do nothing and wait, or fault

Returns True if the next queued transaction can be broadcasted right now.
"""
Expand All @@ -262,7 +264,7 @@ def __handle_active_transaction(self) -> bool:
# Outcome 2: the pending transaction was reverted (final error)
except TransactionReverted as e:
self._tx_tracker.fault(fault_error=e)
return True
return

# Outcome 3: pending transaction is finalized (final success)
if receipt:
Expand All @@ -273,14 +275,13 @@ def __handle_active_transaction(self) -> bool:
f"with {confirmations} confirmation(s) txhash: {final_txhash.hex()}"
)
self._tx_tracker.finalize_active_tx(receipt=receipt)
return True
return

# Outcome 4: re-strategize the pending transaction
pending_tx = self.__strategize()
return pending_tx is not None
self.__strategize()

#
# Broadcast
# Broadcast tx
#

def __get_signer(self, address: str) -> LocalAccount:
Expand All @@ -290,7 +291,7 @@ def __get_signer(self, address: str) -> LocalAccount:
raise ValueError(f"Signer {address} not found")
return signer

def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
def __fire(self, tx: AsyncTx, msg: str) -> TxHash:
"""
Signs and broadcasts a transaction, handling RPC errors
and internal state changes.
Expand All @@ -301,70 +302,156 @@ def __fire(self, tx: FutureTx, msg: str) -> Optional[PendingTx]:
Morphs a `FutureTx` into a `PendingTx` and advances it
into the active transaction slot if broadcast is successful.
"""
signer: LocalAccount = self.__get_signer(tx._from)
signer: LocalAccount = self.__get_signer(tx.params["from"])
try:
txhash = self.w3.eth.send_raw_transaction(
signer.sign_transaction(tx.params).rawTransaction
)
except ValueError as e:
_handle_rpc_error(e, tx=tx)
return
except ValidationError as e:
# special case for insufficient funds
if "Sender does not have enough" in str(e):
# TODO raised exception should be handled in some way #13.
raise InsufficientFunds

raise e

self.log.info(
f"[{msg}] fired transaction #atx-{tx.id}|{tx.params['nonce']}|{txhash.hex()}"
)
pending_tx = self._tx_tracker.morph(tx=tx, txhash=txhash)
if tx.on_broadcast:
fire_hook(hook=tx.on_broadcast, tx=pending_tx)
return pending_tx
return txhash

def __strategize(self) -> Optional[PendingTx]:
def __strategize(self) -> None:
"""Retry the currently tracked pending transaction with the configured strategy."""
if not self._tx_tracker.pending:
raise RuntimeError("No active transaction to strategize")

_active_copy = deepcopy(self._tx_tracker.pending)
params_updated = False
for strategy in self._strategies:
try:
params = strategy.execute(pending=_active_copy)
except Wait as e:
log.info(f"[wait] strategy {strategy.__class__} signalled wait: {e}")
return
except TransactionFaulted as e:
self._tx_tracker.fault(fault_error=e)
return
if params:
# in case the strategy accidentally returns None
# keep the parameters as they are.
_active_copy.params.update(params)
params_updated = True

if not params_updated:
# mandatory default timeout strategy prevents this from being a forever wait
log.info(
f"[wait] strategies made no suggested updates to "
f"pending tx #{_active_copy.id} - skipping retry round"
)
return

# (!) retry the transaction with the new parameters
retry_params = TxParams(_active_copy.params)
_names = " -> ".join(s.name for s in self._strategies)
pending_tx = self.__fire(tx=retry_params, msg=_names)

try:
txhash = self.__fire(tx=_active_copy, msg=_names)
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
# TODO should the following also be done?
# self._tx_tracker.update_failed_retry_attempt(_active_copy)
raise
except (ValidationError, Web3Exception, ValueError) as e:
self._tx_tracker.update_failed_retry_attempt(_active_copy)
self.__handle_retry_failure(_active_copy, e)
return

_active_copy.txhash = txhash
self._tx_tracker.update_after_retry(_active_copy)

pending_tx = self._tx_tracker.pending
self.log.info(f"[retry] transaction #{pending_tx.id} has been re-broadcasted")
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

return pending_tx
def __handle_retry_failure(self, attempted_tx: PendingTx, e: Exception):
log.warn(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed with updated params - {str(e)}; retry again next round"
)

def __broadcast(self) -> Optional[TxHash]:
if attempted_tx.retries >= self._MAX_REDO_ATTEMPTS:
log.error(
f"[retry] transaction #atx-{attempted_tx.id}|{attempted_tx.params['nonce']} "
f"failed for { attempted_tx.retries} attempts; tx will no longer be retried"
)

fault_error = TransactionFaulted(
tx=attempted_tx,
fault=Fault.ERROR,
message=str(e),
)
self._tx_tracker.fault(fault_error=fault_error)

def __broadcast(self):
"""
Attempts to broadcast the next `FutureTx` in the queue.
If the broadcast is not successful, it is re-queued.
"""
future_tx = self._tx_tracker._pop() # popleft
future_tx = self._tx_tracker.pop() # popleft
future_tx.params = _make_tx_params(future_tx.params)

# update nonce as necessary
signer = self.__get_signer(future_tx._from)
nonce = self.w3.eth.get_transaction_count(signer.address, "latest")
if nonce > future_tx.params["nonce"]:
self.log.warn(
f"[broadcast] nonce {future_tx.params['nonce']} has been front-run "
f"by another transaction. Updating queued tx nonce {future_tx.params['nonce']} -> {nonce}"
f"by another transaction. Updating queued tx "
f"nonce {future_tx.params['nonce']} -> {nonce}"
)
future_tx.params["nonce"] = nonce
pending_tx = self.__fire(tx=future_tx, msg="broadcast")
if not pending_tx:
self._tx_tracker._requeue(future_tx)

try:
txhash = self.__fire(tx=future_tx, msg="broadcast")
except InsufficientFunds:
# special case re-raise insufficient funds (for now)
# TODO #13
raise
except (ValidationError, Web3Exception, ValueError) as e:
# either requeue OR fail and move on to subsequent txs
self.__handle_broadcast_failure(future_tx, e)
return
return pending_tx.txhash

self._tx_tracker.morph(tx=future_tx, txhash=txhash)
pending_tx = self._tx_tracker.pending
if pending_tx.on_broadcast:
fire_hook(hook=pending_tx.on_broadcast, tx=pending_tx)

def __handle_broadcast_failure(self, future_tx: FutureTx, e: Exception):
is_broadcast_failure = False
if _is_recoverable_send_tx_error(e):
if future_tx.requeues >= self._MAX_REDO_ATTEMPTS:
is_broadcast_failure = True
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed for {future_tx.requeues} attempts; tx will not be requeued"
)
else:
log.warn(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"failed - {str(e)}; requeueing tx"
)
self._tx_tracker.requeue(future_tx)
else:
# non-recoverable
is_broadcast_failure = True
log.error(
f"[broadcast] transaction #atx-{future_tx.id}|{future_tx.params['nonce']} "
f"has non-recoverable failure - {str(e)}; tx will not be requeued"
)

if is_broadcast_failure:
hook = future_tx.on_broadcast_failure
if hook:
fire_hook(hook, future_tx, e)

#
# Monitoring
Expand All @@ -379,7 +466,7 @@ def __monitor_finalized(self) -> None:
if tx in self._tx_tracker.finalized:
self._tx_tracker.finalized.remove(tx)
self.log.info(
f"[clear] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
f"[monitor] stopped tracking {tx.txhash.hex()} after {confirmations} confirmations"
)
continue
self.log.info(
Expand Down Expand Up @@ -418,9 +505,17 @@ def queue_transaction(
if signer.address not in self.signers:
self.signers[signer.address] = signer

tx = self._tx_tracker._queue(
_from=signer.address, params=params, *args, **kwargs
)
params_copy = copy(params)

from_param = params_copy.get("from")
if from_param is None:
params_copy["from"] = signer.address
if from_param and from_param != signer.address:
raise ValueError(
f"Mismatched 'from' value ({from_param}) and 'signer' account ({signer.address})"
)

tx = self._tx_tracker.queue_tx(params=params_copy, *args, **kwargs)
if not previously_busy_or_paused:
self._wake()

Expand Down
Loading
Loading