From 522975de8341b6c5b3ffd8d6115223eb187cdeb1 Mon Sep 17 00:00:00 2001 From: Haruka Date: Fri, 27 Oct 2023 01:07:41 +0900 Subject: [PATCH] add various address stats --- aleo_types/vm_block.py | 2 +- db.py | 225 ++++++++++++++---- pg_dump.sql | 39 ++- .../{base_style_v4.css => base_style_v5.css} | 2 +- webui/templates/base.jinja2 | 2 +- webui/templates/htmx/block.jinja2 | 8 +- webui/templates/htmx/transaction.jinja2 | 90 ++++--- 7 files changed, 273 insertions(+), 95 deletions(-) rename webui/static/{base_style_v4.css => base_style_v5.css} (98%) diff --git a/aleo_types/vm_block.py b/aleo_types/vm_block.py index 0fe5fb60..75ae0f97 100644 --- a/aleo_types/vm_block.py +++ b/aleo_types/vm_block.py @@ -2395,7 +2395,7 @@ def storage_cost(self) -> int: return len(self.dump()) async def finalize_costs(self, db: "Database"): - finalize_costs = [] + finalize_costs: list[int] = [] for transition in self.transitions: from util.global_cache import get_program program = await get_program(db, str(transition.program_id)) diff --git a/db.py b/db.py index 4df92684..08436dc2 100644 --- a/db.py +++ b/db.py @@ -3,7 +3,7 @@ import os import time from collections import defaultdict -from typing import Awaitable, ParamSpec +from typing import Awaitable, ParamSpec, cast import psycopg import psycopg.sql @@ -64,9 +64,19 @@ async def connect(self): return await self.message_callback(ExplorerMessage(ExplorerMessage.Type.DatabaseConnected, None)) + @staticmethod + def _get_addresses_from_struct(plaintext: StructPlaintext): + addresses: set[str] = set() + for _, p in plaintext.members: + if isinstance(p, LiteralPlaintext) and p.literal.type == Literal.Type.Address: + addresses.add(str(p.literal.primitive)) + elif isinstance(p, StructPlaintext): + addresses.update(Database._get_addresses_from_struct(p)) + return addresses + @staticmethod async def _insert_future(conn: psycopg.AsyncConnection[dict[str, Any]], future: Future, - transition_output_future_db_id: Optional[int] = None, argument_db_id: Optional[int] = None): + transition_output_future_db_id: Optional[int] = None, argument_db_id: Optional[int] = None,): async with conn.cursor() as cur: if transition_output_future_db_id: await cur.execute( @@ -74,23 +84,74 @@ async def _insert_future(conn: psycopg.AsyncConnection[dict[str, Any]], future: "VALUES ('Output', %s, %s, %s) RETURNING id", (transition_output_future_db_id, str(future.program_id), str(future.function_name)) ) + if (res := await cur.fetchone()) is None: + raise RuntimeError("failed to insert row into database") + future_db_id = res["id"] + await cur.execute( + "SELECT t.id FROM transition t " + "JOIN transition_output o on t.id = o.transition_id " + "JOIN transition_output_future tof on o.id = tof.transition_output_id " + "WHERE tof.id = %s", + (transition_output_future_db_id,) + ) + if (res := await cur.fetchone()) is None: + raise RuntimeError("database inconsistent") + transition_db_id = res["id"] elif argument_db_id: await cur.execute( "INSERT INTO future (type, future_argument_id, program_id, function_name) " "VALUES ('Argument', %s, %s, %s) RETURNING id", (argument_db_id, str(future.program_id), str(future.function_name)) ) + if (res := await cur.fetchone()) is None: + raise RuntimeError("failed to insert row into database") + future_db_id = res["id"] + while True: + await cur.execute( + "SELECT f.id, f.transition_output_future_id, f.future_argument_id FROM future f " + "JOIN future_argument a on f.id = a.future_id " + "WHERE a.id = %s", + (argument_db_id,) + ) + if (res := await cur.fetchone()) is None: + raise RuntimeError("database inconsistent") + if res["transition_output_future_id"]: + transition_output_future_db_id = res["transition_output_future_id"] + break + argument_db_id = res["future_argument_id"] + await cur.execute( + "SELECT t.id FROM transition t " + "JOIN transition_output o on t.id = o.transition_id " + "JOIN transition_output_future tof on o.id = tof.transition_output_id " + "WHERE tof.id = %s", + (transition_output_future_db_id,) + ) + if (res := await cur.fetchone()) is None: + raise RuntimeError("database inconsistent") + transition_db_id = res["id"] else: raise ValueError("transition_output_db_id or argument_db_id must be set") - if (res := await cur.fetchone()) is None: - raise RuntimeError("failed to insert row into database") - future_db_id = res["id"] for argument in future.arguments: if isinstance(argument, PlaintextArgument): + plaintext = argument.plaintext await cur.execute( "INSERT INTO future_argument (future_id, type, plaintext) VALUES (%s, %s, %s)", - (future_db_id, argument.type.name, argument.plaintext.dump()) + (future_db_id, argument.type.name, plaintext.dump()) ) + if isinstance(plaintext, LiteralPlaintext) and plaintext.literal.type == Literal.Type.Address: + address = str(plaintext.literal.primitive) + await cur.execute( + "INSERT INTO address_transition (address, transition_id) VALUES (%s, %s)", + (address, transition_db_id) + ) + elif isinstance(plaintext, StructPlaintext): + addresses = Database._get_addresses_from_struct(plaintext) + for address in addresses: + await cur.execute( + "INSERT INTO address_transition (address, transition_id) VALUES (%s, %s)", + (address, transition_db_id) + ) + elif isinstance(argument, FutureArgument): await cur.execute( "INSERT INTO future_argument (future_id, type) VALUES (%s, %s) RETURNING id", @@ -151,7 +212,14 @@ async def _load_future(conn: psycopg.AsyncConnection[dict[str, Any]], transition ) @staticmethod - async def _insert_transition(conn: psycopg.AsyncConnection[dict[str, Any]], exe_tx_db_id: Optional[int], fee_db_id: Optional[int], + def _get_primitive_from_argument_unchecked(argument: Argument): + plaintext = cast(PlaintextArgument, cast(PlaintextArgument, argument).plaintext) + literal = cast(LiteralPlaintext, plaintext).literal + return literal.primitive + + @staticmethod + async def _insert_transition(conn: psycopg.AsyncConnection[dict[str, Any]], redis_conn: Redis[str], + exe_tx_db_id: Optional[int], fee_db_id: Optional[int], transition: Transition, ts_index: int): async with conn.cursor() as cur: await cur.execute( @@ -266,6 +334,43 @@ async def _insert_transition(conn: psycopg.AsyncConnection[dict[str, Any]], exe_ (program_db_id, str(transition.function_name)) ) + if transition.program_id == "credits.aleo": + transfer_from = None + transfer_to = None + fee_from = None + if transition.function_name == "transfer_public": + output = cast(FutureTransitionOutput, transition.outputs[0]) + future = cast(Future, output.future.value) + transfer_from = str(Database._get_primitive_from_argument_unchecked(future.arguments[0])) + transfer_to = str(Database._get_primitive_from_argument_unchecked(future.arguments[1])) + amount = int(cast(int, Database._get_primitive_from_argument_unchecked(future.arguments[2]))) + elif transition.function_name == "transfer_private_to_public": + output = cast(FutureTransitionOutput, transition.outputs[1]) + future = cast(Future, output.future.value) + transfer_to = str(Database._get_primitive_from_argument_unchecked(future.arguments[0])) + amount = int(cast(int, Database._get_primitive_from_argument_unchecked(future.arguments[1]))) + elif transition.function_name == "transfer_public_to_private": + output = cast(FutureTransitionOutput, transition.outputs[1]) + future = cast(Future, output.future.value) + transfer_from = str(Database._get_primitive_from_argument_unchecked(future.arguments[0])) + amount = int(cast(int, Database._get_primitive_from_argument_unchecked(future.arguments[1]))) + elif transition.function_name == "fee_public": + output = cast(FutureTransitionOutput, transition.outputs[0]) + future = cast(Future, output.future.value) + fee_from = str(Database._get_primitive_from_argument_unchecked(future.arguments[0])) + amount = int(cast(int, Database._get_primitive_from_argument_unchecked(future.arguments[1]))) + + if transfer_from != transfer_to: + if transfer_from is not None: + await redis_conn.hincrby("address_transfer_out", transfer_from, amount) # type: ignore + if transfer_to is not None: + await redis_conn.hincrby("address_transfer_in", transfer_to, amount) # type: ignore + + if fee_from is not None: + await redis_conn.hincrby("address_fee", fee_from, amount) # type: ignore + + + async def save_builtin_program(self, program: Program): async with self.pool.connection() as conn: async with conn.cursor() as cur: @@ -284,20 +389,22 @@ async def _save_program(self, cur: psycopg.AsyncCursor[dict[str, Any]], program: await cur.execute( "INSERT INTO program " "(transaction_deploy_id, program_id, import, mapping, interface, record, " - "closure, function, raw_data, is_helloworld, feature_hash, owner, signature) " - "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", + "closure, function, raw_data, is_helloworld, feature_hash, owner, signature, address) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", (deploy_transaction_db_id, str(program.id), imports, mappings, interfaces, records, closures, functions, program.dump(), program.is_helloworld(), program.feature_hash(), - str(transaction.owner.address), str(transaction.owner.signature)) + str(transaction.owner.address), str(transaction.owner.signature), + aleo.program_id_to_address(str(program.id))) ) else: await cur.execute( "INSERT INTO program " "(program_id, import, mapping, interface, record, " - "closure, function, raw_data, is_helloworld, feature_hash) " - "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", + "closure, function, raw_data, is_helloworld, feature_hash, address) " + "VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) RETURNING id", (str(program.id), imports, mappings, interfaces, records, - closures, functions, program.dump(), program.is_helloworld(), program.feature_hash()) + closures, functions, program.dump(), program.is_helloworld(), program.feature_hash(), + aleo.program_id_to_address(str(program.id))) ) if (res := await cur.fetchone()) is None: raise Exception("failed to insert row into database") @@ -554,8 +661,9 @@ def _check_committee_staker_match(committee_members: dict[Address, tuple[u64, bo def _stake_rewards(committee_members: dict[Address, tuple[u64, bool_]], stakers: dict[Address, tuple[Address, u64]], block_reward: u64): total_stake = sum(x[0] for x in committee_members.values()) + stake_rewards: dict[Address, int] = {} if not stakers or total_stake == 0 or block_reward == 0: - return stakers + return stakers, stake_rewards new_stakers: dict[Address, tuple[Address, u64]] = {} @@ -567,10 +675,13 @@ def _stake_rewards(committee_members: dict[Address, tuple[u64, bool_]], new_stakers[staker] = validator, stake continue - new_stake = int(block_reward) * stake // total_stake + stake + reward = int(block_reward) * stake // total_stake + stake_rewards[staker] = reward + + new_stake = stake + reward new_stakers[staker] = validator, u64(new_stake) - return new_stakers + return new_stakers, stake_rewards @staticmethod def _next_committee_members(committee_members: dict[Address, tuple[u64, bool_]], @@ -593,9 +704,12 @@ async def _post_ratify(self, cur: psycopg.AsyncCursor[dict[str, Any]], redis_con Database._check_committee_staker_match(committee_members, stakers) - stakers = Database._stake_rewards(committee_members, stakers, ratification.amount) + stakers, stake_rewards = Database._stake_rewards(committee_members, stakers, ratification.amount) committee_members = Database._next_committee_members(committee_members, stakers) + for address, amount in stake_rewards.items(): + await self.redis.hincrby("address_stake_reward", str(address), amount) + await Database._update_committee_bonded_map(cur, self.redis, committee_members, stakers, height) await Database._save_committee_history(cur, height, Committee( starting_round=u64(round_), @@ -667,32 +781,50 @@ async def _post_ratify(self, cur: psycopg.AsyncCursor[dict[str, Any]], redis_con from interpreter.interpreter import execute_operations await execute_operations(self, cur, operations) + @staticmethod + async def _backup_redis_hash_key(redis_conn: Redis[str], keys: list[str], height: int): + if height != 0: + for key in keys: + backup_key = f"{key}:rollback_backup:{height}" + if await redis_conn.exists(backup_key) == 0: + backup_data = await redis_conn.hgetall(key) + if backup_data: + await redis_conn.hset(backup_key, mapping=backup_data) + else: + backup_data = await redis_conn.hgetall(backup_key) + await redis_conn.delete(key) + await redis_conn.hset(key, mapping=backup_data) + + @staticmethod + async def _redis_cleanup(redis_conn: Redis[str], keys: list[str], height: int, rollback: bool): + if height != 0: + for key in keys: + backup_key = f"{key}:rollback_backup:{height}" + if rollback: + backup_data = await redis_conn.hgetall(backup_key) + if backup_data: + await redis_conn.delete(key) + await redis_conn.hset(key, mapping=backup_data) + else: + await redis_conn.delete(backup_key) + + @profile async def _save_block(self, block: Block): async with self.pool.connection() as conn: async with conn.transaction(): async with conn.cursor() as cur: - if block.height != 0: - # redis is not protected by transaction so manually saving here - bonded_save = await self.redis.hgetall("credits.aleo:bonded") - committee_save = await self.redis.hgetall("credits.aleo:committee") - - bonded_backup_key = f"credits.aleo:bonded:rollback_backup:{block.header.metadata.height}" - committee_backup_key = f"credits.aleo:committee:rollback_backup:{block.header.metadata.height}" - - if await self.redis.exists(bonded_backup_key) == 0: - await self.redis.hset(bonded_backup_key, mapping=bonded_save) - else: - bonded_save = await self.redis.hgetall(bonded_backup_key) - await self.redis.delete("credits.aleo:bonded") - await self.redis.hset("credits.aleo:bonded", mapping=bonded_save) - - if await self.redis.exists(committee_backup_key) == 0: - await self.redis.hset(committee_backup_key, mapping=committee_save) - else: - committee_save = await self.redis.hgetall(committee_backup_key) - await self.redis.delete("credits.aleo:committee") - await self.redis.hset("credits.aleo:committee", mapping=committee_save) + height = block.height + # redis is not protected by transaction so manually saving here + redis_keys = [ + "credits.aleo:bonded", + "credits.aleo:committee", + "address_stake_reward", + "address_transfer_in", + "address_transfer_out", + "address_fee", + ] + await self._backup_redis_hash_key(self.redis, redis_keys, height) try: if block.height != 0: @@ -860,7 +992,7 @@ async def _save_block(self, block: Block): if (res := await cur.fetchone()) is None: raise RuntimeError("failed to insert row into database") fee_db_id = res["id"] - await self._insert_transition(conn, None, fee_db_id, transaction.fee.transition, 0) + await self._insert_transition(conn, self.redis, None, fee_db_id, transaction.fee.transition, 0) elif isinstance(confirmed_transaction, AcceptedExecute): if reject_reasons[ct_index] is not None: @@ -879,7 +1011,7 @@ async def _save_block(self, block: Block): execute_transaction_db_id = res["id"] for ts_index, transition in enumerate(transaction.execution.transitions): - await self._insert_transition(conn, execute_transaction_db_id, None, transition, ts_index) + await self._insert_transition(conn, self.redis, execute_transaction_db_id, None, transition, ts_index) if transaction.additional_fee.value is not None: fee = transaction.additional_fee.value @@ -891,7 +1023,7 @@ async def _save_block(self, block: Block): if (res := await cur.fetchone()) is None: raise RuntimeError("failed to insert row into database") fee_db_id = res["id"] - await self._insert_transition(conn, None, fee_db_id, fee.transition, 0) + await self._insert_transition(conn, self.redis, None, fee_db_id, fee.transition, 0) elif isinstance(confirmed_transaction, RejectedDeploy): raise ValueError("transaction type not implemented") @@ -913,7 +1045,7 @@ async def _save_block(self, block: Block): if (res := await cur.fetchone()) is None: raise RuntimeError("failed to insert row into database") fee_db_id = res["id"] - await self._insert_transition(conn, None, fee_db_id, fee.transition, 0) + await self._insert_transition(conn, self.redis, None, fee_db_id, fee.transition, 0) rejected = confirmed_transaction.rejected if not isinstance(rejected, RejectedExecution): @@ -928,7 +1060,7 @@ async def _save_block(self, block: Block): raise RuntimeError("failed to insert row into database") execute_transaction_db_id = res["id"] for ts_index, transition in enumerate(rejected.execution.transitions): - await self._insert_transition(conn, execute_transaction_db_id, None, transition, ts_index) + await self._insert_transition(conn, self.redis, execute_transaction_db_id, None, transition, ts_index) update_copy_data: list[tuple[int, str, str, str]] = [] for index, finalize_operation in enumerate(confirmed_transaction.finalize): @@ -1064,14 +1196,11 @@ async def _save_block(self, block: Block): await self._post_ratify(cur, self.redis, block.height, block.round, block.ratifications.ratifications, address_puzzle_rewards) - if block.height != 0: - await self.redis.delete(bonded_backup_key) - await self.redis.delete(committee_backup_key) + await self._redis_cleanup(self.redis, redis_keys, block.height, False) await self.message_callback(ExplorerMessage(ExplorerMessage.Type.DatabaseBlockAdded, block.header.metadata.height)) except Exception as e: - await self.redis.hset("credits.aleo:bonded", mapping=bonded_save) - await self.redis.hset("credits.aleo:committee", mapping=committee_save) + await self._redis_cleanup(self.redis, redis_keys, block.height, True) await self.message_callback(ExplorerMessage(ExplorerMessage.Type.DatabaseError, e)) raise diff --git a/pg_dump.sql b/pg_dump.sql index a1712f86..2d4fa8d6 100644 --- a/pg_dump.sql +++ b/pg_dump.sql @@ -2,8 +2,8 @@ -- PostgreSQL database dump -- --- Dumped from database version 15.1 --- Dumped by pg_dump version 15.3 +-- Dumped from database version 15.4 (Debian 15.4-3) +-- Dumped by pg_dump version 15.4 SET statement_timeout = 0; SET lock_timeout = 0; @@ -152,6 +152,16 @@ CREATE TABLE explorer._migration ( ); +-- +-- Name: address_transition; Type: TABLE; Schema: explorer; Owner: - +-- + +CREATE TABLE explorer.address_transition ( + address text NOT NULL, + transition_id integer NOT NULL +); + + -- -- Name: authority; Type: TABLE; Schema: explorer; Owner: - -- @@ -1092,7 +1102,8 @@ CREATE TABLE explorer.program ( feature_hash bytea NOT NULL, owner text, signature text, - leo_source text + leo_source text, + address text NOT NULL ); @@ -2373,6 +2384,20 @@ ALTER TABLE ONLY explorer.transition ADD CONSTRAINT transition_pk PRIMARY KEY (id); +-- +-- Name: address_transition_address_index; Type: INDEX; Schema: explorer; Owner: - +-- + +CREATE INDEX address_transition_address_index ON explorer.address_transition USING btree (address); + + +-- +-- Name: address_transition_transition_id_index; Type: INDEX; Schema: explorer; Owner: - +-- + +CREATE INDEX address_transition_transition_id_index ON explorer.address_transition USING btree (transition_id); + + -- -- Name: authority_block_id_index; Type: INDEX; Schema: explorer; Owner: - -- @@ -2954,6 +2979,14 @@ CREATE INDEX transition_transaction_execute_id_index ON explorer.transition USIN CREATE UNIQUE INDEX transition_transition_id_uindex ON explorer.transition USING btree (transition_id text_pattern_ops); +-- +-- Name: address_transition address_stats_transition_transition_id_fk; Type: FK CONSTRAINT; Schema: explorer; Owner: - +-- + +ALTER TABLE ONLY explorer.address_transition + ADD CONSTRAINT address_stats_transition_transition_id_fk FOREIGN KEY (transition_id) REFERENCES explorer.transition(id); + + -- -- Name: authority authority_block_id_fk; Type: FK CONSTRAINT; Schema: explorer; Owner: - -- diff --git a/webui/static/base_style_v4.css b/webui/static/base_style_v5.css similarity index 98% rename from webui/static/base_style_v4.css rename to webui/static/base_style_v5.css index 71056594..fbaafdc1 100644 --- a/webui/static/base_style_v4.css +++ b/webui/static/base_style_v5.css @@ -229,7 +229,7 @@ pre { font-family: monospace !important; } -#transitions, #rejected-transitions { +#rejected-transitions { margin-top: 20px; } diff --git a/webui/templates/base.jinja2 b/webui/templates/base.jinja2 index be89c987..47d2811f 100644 --- a/webui/templates/base.jinja2 +++ b/webui/templates/base.jinja2 @@ -22,7 +22,7 @@ {% endif %} } - + {% block head %}{% endblock %} diff --git a/webui/templates/htmx/block.jinja2 b/webui/templates/htmx/block.jinja2 index 9292b15e..dbf3123f 100644 --- a/webui/templates/htmx/block.jinja2 +++ b/webui/templates/htmx/block.jinja2 @@ -45,13 +45,13 @@
-
+
{% call data_line("Cumulative weight") %} {{ block.header.metadata.cumulative_weight | format_number | safe }} {% endcall %} @@ -79,7 +79,7 @@ {% endcall %}
-
+
diff --git a/webui/templates/htmx/transaction.jinja2 b/webui/templates/htmx/transaction.jinja2 index 4be26782..40601dfe 100644 --- a/webui/templates/htmx/transaction.jinja2 +++ b/webui/templates/htmx/transaction.jinja2 @@ -65,42 +65,15 @@ {% endif %} -
-

{% if type == "Deploy" %}Fee transition{% else %}Transitions{% endif %}

-
- - - - - - - - - {% for transition in transitions %} - - - - - - {% endfor %} - {% if fee_transition %} - - - - - - {% endif %} - -
IndexTransition IDProgram / Function call
{{ loop.index0 }} - {{ transition.transition_id }} - {{ transition.action }}
Fee - {{ fee_transition.transition_id }} - {{ fee_transition.action }}
-
+ - {% if state == "Rejected" %} -
-

Rejected Transitions

+
+
+ {% if type == "Deploy" %}

Fee transition

{% endif %} @@ -110,7 +83,7 @@ - {% for transition in rejected_transitions %} + {% for transition in transitions %} {% endfor %} + {% if fee_transition %} + + + + + + {% endif %}
{{ loop.index0 }} @@ -119,11 +92,54 @@ {{ transition.action }}
Fee + {{ fee_transition.transition_id }} + {{ fee_transition.action }}
+ + {% if state == "Rejected" %} +
+

Rejected Transitions

+ + + + + + + + + + {% for transition in rejected_transitions %} + + + + + + {% endfor %} + +
IndexTransition IDProgram / Function call
{{ loop.index0 }} + {{ transition.transition_id }} + {{ transition.action }}
+
+ {% endif %}
- {% endif %} +
+ Coming soon +
+ +
+ Coming soon +
+ +