Skip to content

Commit

Permalink
support batch certificate v2
Browse files Browse the repository at this point in the history
  • Loading branch information
HarukaMa committed Nov 3, 2023
1 parent f6d0fff commit 831d1f2
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 43 deletions.
33 changes: 30 additions & 3 deletions aleo_types/vm_block.py
Original file line number Diff line number Diff line change
Expand Up @@ -3251,7 +3251,20 @@ def load(cls, data: BytesIO):
transmission_ids=transmission_ids, previous_certificate_ids=previous_certificate_ids,
signature=signature)


class BatchCertificate(Serializable):

batch_header: BatchHeader

@classmethod
def load(cls, data: BytesIO) -> Self:
version = u8.load(data)
if version == BatchCertificate1.version:
return BatchCertificate1.load(data)
elif version == BatchCertificate2.version:
return BatchCertificate2.load(data)

class BatchCertificate1(BatchCertificate):
version = u8(1)

def __init__(self, *, certificate_id: Field, batch_header: BatchHeader, signatures: Vec[Tuple[Signature, i64], u32]):
Expand All @@ -3264,15 +3277,29 @@ def dump(self) -> bytes:

@classmethod
def load(cls, data: BytesIO):
version = u8.load(data)
if version != cls.version:
raise ValueError("invalid batch certificate version")
certificate_id = Field.load(data)
batch_header = BatchHeader.load(data)
signatures = Vec[Tuple[Signature, i64], u32].load(data)
return cls(certificate_id=certificate_id, batch_header=batch_header, signatures=signatures)


class BatchCertificate2(BatchCertificate):
version = u8(2)

def __init__(self, *, batch_header: BatchHeader, signatures: Vec[Signature, u16]):
self.batch_header = batch_header
self.signatures = signatures

def dump(self) -> bytes:
return self.version.dump() + self.batch_header.dump() + self.signatures.dump()

@classmethod
def load(cls, data: BytesIO):
batch_header = BatchHeader.load(data)
signatures = Vec[Signature, u16].load(data)
return cls(batch_header=batch_header, signatures=signatures)


class Subdag(Serializable):
version = u8(1)

Expand Down
124 changes: 84 additions & 40 deletions db.py
Original file line number Diff line number Diff line change
Expand Up @@ -902,24 +902,42 @@ async def _save_block(self, block: Block):
for index, certificate in enumerate(certificates):
if round_ != certificate.batch_header.round:
raise ValueError("invalid subdag round")
await cur.execute(
"INSERT INTO dag_vertex (authority_id, round, batch_certificate_id, batch_id, "
"author, timestamp, author_signature, index) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(authority_db_id, round_, str(certificate.certificate_id), str(certificate.batch_header.batch_id),
str(certificate.batch_header.author), certificate.batch_header.timestamp,
str(certificate.batch_header.signature), index)
)
if isinstance(certificate, BatchCertificate1):
await cur.execute(
"INSERT INTO dag_vertex (authority_id, round, batch_certificate_id, batch_id, "
"author, timestamp, author_signature, index) "
"VALUES (%s, %s, %s, %s, %s, %s, %s, %s) RETURNING id",
(authority_db_id, round_, str(certificate.certificate_id), str(certificate.batch_header.batch_id),
str(certificate.batch_header.author), certificate.batch_header.timestamp,
str(certificate.batch_header.signature), index)
)
elif isinstance(certificate, BatchCertificate2):
await cur.execute(
"INSERT INTO dag_vertex (authority_id, round, batch_id, "
"author, timestamp, author_signature, index) "
"VALUES (%s, %s, %s, %s, %s, %s, %s) RETURNING id",
(authority_db_id, round_, str(certificate.batch_header.batch_id),
str(certificate.batch_header.author), certificate.batch_header.timestamp,
str(certificate.batch_header.signature), index)
)
if (res := await cur.fetchone()) is None:
raise RuntimeError("failed to insert row into database")
vertex_db_id = res["id"]

for sig_index, (signature, timestamp) in enumerate(certificate.signatures):
await cur.execute(
"INSERT INTO dag_vertex_signature (vertex_id, signature, timestamp, index) "
"VALUES (%s, %s, %s, %s)",
(vertex_db_id, str(signature), timestamp, sig_index)
)
if isinstance(certificate, BatchCertificate1):
for sig_index, (signature, timestamp) in enumerate(certificate.signatures):
await cur.execute(
"INSERT INTO dag_vertex_signature (vertex_id, signature, timestamp, index) "
"VALUES (%s, %s, %s, %s)",
(vertex_db_id, str(signature), timestamp, sig_index)
)
elif isinstance(certificate, BatchCertificate2):
for sig_index, signature in enumerate(certificate.signatures):
await cur.execute(
"INSERT INTO dag_vertex_signature (vertex_id, signature, index) "
"VALUES (%s, %s, %s)",
(vertex_db_id, str(signature), sig_index)
)

prev_cert_ids = certificate.batch_header.previous_certificate_ids
await cur.execute(
Expand Down Expand Up @@ -1591,14 +1609,21 @@ async def _get_full_block(block: dict[str, Any], conn: psycopg.AsyncConnection[d
(dag_vertex["id"],)
)
dag_vertex_signatures = await cur.fetchall()
signatures: list[Tuple[Signature, i64]] = []
for signature in dag_vertex_signatures:
signatures.append(
Tuple[Signature, i64]((
Signature.loads(signature["signature"]),
i64(signature["timestamp"]),
))
)

if dag_vertex["batch_certificate_id"] is not None:
signatures: list[Tuple[Signature, i64]] = []
for signature in dag_vertex_signatures:
signatures.append(
Tuple[Signature, i64]((
Signature.loads(signature["signature"]),
i64(signature["timestamp"]),
))
)
else:
signatures: list[Signature] = []
for signature in dag_vertex_signatures:
signatures.append(Signature.loads(signature["signature"]))

await cur.execute(
"SELECT previous_vertex_id FROM dag_vertex_adjacency WHERE vertex_id = %s ORDER BY index",
(dag_vertex["id"],)
Expand Down Expand Up @@ -1626,21 +1651,37 @@ async def _get_full_block(block: dict[str, Any], conn: psycopg.AsyncConnection[d
elif tid["type"] == TransmissionID.Type.Transaction:
tids.append(TransactionTransmissionID(id_=TransactionID.loads(tid["transaction_id"])))

certificates.append(
BatchCertificate(
certificate_id=Field.loads(dag_vertex["batch_certificate_id"]),
batch_header=BatchHeader(
batch_id=Field.loads(dag_vertex["batch_id"]),
author=Address.loads(dag_vertex["author"]),
round_=u64(dag_vertex["round"]),
timestamp=i64(dag_vertex["timestamp"]),
transmission_ids=Vec[TransmissionID, u32](tids),
previous_certificate_ids=Vec[Field, u32]([Field.loads(x) for x in previous_cert_ids]),
signature=Signature.loads(dag_vertex["author_signature"]),
),
signatures=Vec[Tuple[Signature, i64], u32](signatures),
if dag_vertex["batch_certificate_id"] is not None:
certificates.append(
BatchCertificate1(
certificate_id=Field.loads(dag_vertex["batch_certificate_id"]),
batch_header=BatchHeader(
batch_id=Field.loads(dag_vertex["batch_id"]),
author=Address.loads(dag_vertex["author"]),
round_=u64(dag_vertex["round"]),
timestamp=i64(dag_vertex["timestamp"]),
transmission_ids=Vec[TransmissionID, u32](tids),
previous_certificate_ids=Vec[Field, u32]([Field.loads(x) for x in previous_cert_ids]),
signature=Signature.loads(dag_vertex["author_signature"]),
),
signatures=Vec[Tuple[Signature, i64], u32](signatures),
)
)
else:
certificates.append(
BatchCertificate2(
batch_header=BatchHeader(
batch_id=Field.loads(dag_vertex["batch_id"]),
author=Address.loads(dag_vertex["author"]),
round_=u64(dag_vertex["round"]),
timestamp=i64(dag_vertex["timestamp"]),
transmission_ids=Vec[TransmissionID, u32](tids),
previous_certificate_ids=Vec[Field, u32]([Field.loads(x) for x in previous_cert_ids]),
signature=Signature.loads(dag_vertex["author_signature"]),
),
signatures=Vec[Signature, u16](signatures),
)
)
)
subdags: dict[u64, Vec[BatchCertificate, u32]] = defaultdict(lambda: Vec[BatchCertificate, u32]([]))
for certificate in certificates:
subdags[certificate.batch_header.round].append(certificate)
Expand Down Expand Up @@ -2853,6 +2894,7 @@ async def migrate(self):
(1, self.migrate_1_add_dag_vertex_adjacency_index),
(2, self.migrate_2_add_helper_functions),
(3, self.migrate_3_set_mapping_history_key_not_null),
(4, self.migrate_4_support_batch_certificate_v2),
]
async with self.pool.connection() as conn:
async with conn.cursor() as cur:
Expand All @@ -2871,10 +2913,7 @@ async def migrate(self):

@staticmethod
async def migrate_1_add_dag_vertex_adjacency_index(conn: psycopg.AsyncConnection[dict[str, Any]]):
await conn.execute("""
create index dag_vertex_adjacency_vertex_id_index
on explorer.dag_vertex_adjacency (vertex_id);
""")
await conn.execute("create index dag_vertex_adjacency_vertex_id_index on explorer.dag_vertex_adjacency (vertex_id)")

@staticmethod
async def migrate_2_add_helper_functions(conn: psycopg.AsyncConnection[dict[str, Any]]):
Expand All @@ -2884,6 +2923,11 @@ async def migrate_2_add_helper_functions(conn: psycopg.AsyncConnection[dict[str,
async def migrate_3_set_mapping_history_key_not_null(conn: psycopg.AsyncConnection[dict[str, Any]]):
await conn.execute("alter table explorer.mapping_history alter column key set not null")

@staticmethod
async def migrate_4_support_batch_certificate_v2(conn: psycopg.AsyncConnection[dict[str, Any]]):
await conn.execute("alter table explorer.dag_vertex alter column batch_certificate_id drop not null")
await conn.execute("alter table explorer.dag_vertex_signature alter column timestamp drop not null")

# debug method
async def clear_database(self):
async with self.pool.connection() as conn:
Expand Down

0 comments on commit 831d1f2

Please sign in to comment.