From 5f7daf09326e5f4f1e6e5d6e199bbbcddc74726b Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 13 Nov 2023 14:31:00 +0000 Subject: [PATCH 01/16] universe: fix comment spelling --- universe/auto_syncer.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 4bed26bae..ac43307cc 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -287,7 +287,7 @@ func (f *FederationEnvoy) syncer() { // to synchronize state with all the active universe servers in // the federation. case <-syncTicker.C: - // Error propogation is handled in tryFetchServers, we + // Error propagation is handled in tryFetchServers, we // only need to exit here. fedServers, err := f.tryFetchServers() if err != nil { From a284ce0a0a627cc907468c8addc359e4a3be55dc Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 13 Nov 2023 14:20:27 +0000 Subject: [PATCH 02/16] universe: add doc to fields --- universe/auto_syncer.go | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index ac43307cc..3a32df1ab 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -71,8 +71,12 @@ type FederationPushReq struct { // Leaf is the new leaf to add. Leaf *Leaf + // resp is a channel that will be sent the asset issuance/transfer + // proof and corresponding universe/multiverse inclusion proofs if the + // federation proof push was successful. resp chan *Proof - err chan error + + err chan error } // FederationProofBatchPushReq is used to push out a batch of universe proof @@ -97,8 +101,12 @@ type FederationEnvoy struct { stopOnce sync.Once + // pushRequests is a channel that will be sent new requests to push out + // proof leaves to the federation. pushRequests chan *FederationPushReq + // batchPushRequests is a channel that will be sent new requests to push + // out batch proof leaves to the federation. batchPushRequests chan *FederationProofBatchPushReq } From 46f692da1cd6d2ae8bebbd1ecee0053ea1db13d1 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 27 Nov 2023 11:56:17 +0000 Subject: [PATCH 03/16] tapdb: rename ListUniverseServers into QueryUniverseServers This commit renames `ListUniverseServers` into `QueryUniverseServers`. It also adds a `WHERE` clause to the SQL statement to allow us to filter on server host or row ID. --- tapdb/sqlc/querier.go | 2 +- tapdb/sqlc/queries/universe.sql | 7 ++-- tapdb/sqlc/universe.sql.go | 62 +++++++++++++++++++-------------- tapdb/universe_federation.go | 13 +++++-- 4 files changed, 51 insertions(+), 33 deletions(-) diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index ee001f911..1a64f4291 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -94,7 +94,6 @@ type Querier interface { InsertPassiveAsset(ctx context.Context, arg InsertPassiveAssetParams) error InsertRootKey(ctx context.Context, arg InsertRootKeyParams) error InsertUniverseServer(ctx context.Context, arg InsertUniverseServerParams) error - ListUniverseServers(ctx context.Context) ([]UniverseServer, error) LogProofTransferAttempt(ctx context.Context, arg LogProofTransferAttemptParams) error LogServerSync(ctx context.Context, arg LogServerSyncParams) error NewMintingBatch(ctx context.Context, arg NewMintingBatchParams) error @@ -129,6 +128,7 @@ type Querier interface { // root, simplifies queries QueryUniverseAssetStats(ctx context.Context, arg QueryUniverseAssetStatsParams) ([]QueryUniverseAssetStatsRow, error) QueryUniverseLeaves(ctx context.Context, arg QueryUniverseLeavesParams) ([]QueryUniverseLeavesRow, error) + QueryUniverseServers(ctx context.Context, arg QueryUniverseServersParams) ([]UniverseServer, error) QueryUniverseStats(ctx context.Context) (QueryUniverseStatsRow, error) ReAnchorPassiveAssets(ctx context.Context, arg ReAnchorPassiveAssetsParams) error SetAddrManaged(ctx context.Context, arg SetAddrManagedParams) error diff --git a/tapdb/sqlc/queries/universe.sql b/tapdb/sqlc/queries/universe.sql index cab652573..3376cd0bf 100644 --- a/tapdb/sqlc/queries/universe.sql +++ b/tapdb/sqlc/queries/universe.sql @@ -115,8 +115,11 @@ UPDATE universe_servers SET last_sync_time = @new_sync_time WHERE server_host = @target_server; --- name: ListUniverseServers :many -SELECT * FROM universe_servers; +-- name: QueryUniverseServers :many +SELECT * FROM universe_servers +WHERE (id = sqlc.narg('id') OR sqlc.narg('id') IS NULL) AND + (server_host = sqlc.narg('server_host') + OR sqlc.narg('server_host') IS NULL); -- name: InsertNewSyncEvent :exec WITH group_key_root_id AS ( diff --git a/tapdb/sqlc/universe.sql.go b/tapdb/sqlc/universe.sql.go index 53452cab5..a0e7167af 100644 --- a/tapdb/sqlc/universe.sql.go +++ b/tapdb/sqlc/universe.sql.go @@ -265,33 +265,6 @@ func (q *Queries) InsertUniverseServer(ctx context.Context, arg InsertUniverseSe return err } -const listUniverseServers = `-- name: ListUniverseServers :many -SELECT id, server_host, last_sync_time FROM universe_servers -` - -func (q *Queries) ListUniverseServers(ctx context.Context) ([]UniverseServer, error) { - rows, err := q.db.QueryContext(ctx, listUniverseServers) - if err != nil { - return nil, err - } - defer rows.Close() - var items []UniverseServer - for rows.Next() { - var i UniverseServer - if err := rows.Scan(&i.ID, &i.ServerHost, &i.LastSyncTime); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const logServerSync = `-- name: LogServerSync :exec UPDATE universe_servers SET last_sync_time = $1 @@ -688,6 +661,41 @@ func (q *Queries) QueryUniverseLeaves(ctx context.Context, arg QueryUniverseLeav return items, nil } +const queryUniverseServers = `-- name: QueryUniverseServers :many +SELECT id, server_host, last_sync_time FROM universe_servers +WHERE (id = $1 OR $1 IS NULL) AND + (server_host = $2 + OR $2 IS NULL) +` + +type QueryUniverseServersParams struct { + ID sql.NullInt64 + ServerHost sql.NullString +} + +func (q *Queries) QueryUniverseServers(ctx context.Context, arg QueryUniverseServersParams) ([]UniverseServer, error) { + rows, err := q.db.QueryContext(ctx, queryUniverseServers, arg.ID, arg.ServerHost) + if err != nil { + return nil, err + } + defer rows.Close() + var items []UniverseServer + for rows.Next() { + var i UniverseServer + if err := rows.Scan(&i.ID, &i.ServerHost, &i.LastSyncTime); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryUniverseStats = `-- name: QueryUniverseStats :one WITH stats AS ( SELECT total_asset_syncs, total_asset_proofs diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index 5052ac1a0..c306baf30 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -40,6 +40,9 @@ type ( // FedUniSyncConfigs is the universe specific federation sync config // returned from a query. FedUniSyncConfigs = sqlc.FederationUniSyncConfig + + // QueryUniServersParams is used to query for universe servers. + QueryUniServersParams = sqlc.QueryUniverseServersParams ) var ( @@ -97,8 +100,10 @@ type UniverseServerStore interface { // LogServerSync marks that a server was just synced in the DB. LogServerSync(ctx context.Context, arg sqlc.LogServerSyncParams) error - // ListUniverseServers returns the total set of all universe servers. - ListUniverseServers(ctx context.Context) ([]sqlc.UniverseServer, error) + // QueryUniverseServers returns a set of universe servers. + QueryUniverseServers(ctx context.Context, + arg sqlc.QueryUniverseServersParams) ([]sqlc.UniverseServer, + error) } // UniverseFederationOptions is the database tx object for the universe server store. @@ -174,7 +179,9 @@ func (u *UniverseFederationDB) UniverseServers( readTx := NewUniverseFederationReadTx() dbErr := u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { - servers, err := db.ListUniverseServers(ctx) + servers, err := db.QueryUniverseServers( + ctx, QueryUniServersParams{}, + ) if err != nil { return err } From b5ea17d195145781e16616bcbe8bc32d2695e663 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 27 Nov 2023 12:52:59 +0000 Subject: [PATCH 04/16] universe: add function to derive universe ID from bytes This commit adds a new function called `NewUniIDFromRawArgs`. The function can be used to derive a universe identifier from raw asset ID/group key bytes. We will use this function to derive a universe ID from data stored in the database. --- universe/interface.go | 60 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/universe/interface.go b/universe/interface.go index 5fdf19adb..ffd5db9cf 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -84,6 +84,66 @@ func (i *Identifier) StringForLog() string { i.String(), i.AssetID[:], groupKey, i.ProofType) } +// NewUniIDFromRawArgs creates a new universe ID from the raw arguments. The +// asset ID bytes and group key bytes are mutually exclusive. If the group key +// bytes are set, then the asset ID bytes will be ignored. +// This function is useful in deriving a universe ID from the data stored in the +// database. +func NewUniIDFromRawArgs(assetIDBytes []byte, groupKeyBytes []byte, + proofTypeStr string) (Identifier, error) { + + proofType, err := ParseStrProofType(proofTypeStr) + if err != nil { + return Identifier{}, err + } + + // If the group key bytes are set, then we'll preferentially populate + // the universe ID with that and not the asset ID. + if len(groupKeyBytes) != 0 { + groupKey, err := parseGroupKey(groupKeyBytes) + if err != nil { + return Identifier{}, fmt.Errorf("unable to parse "+ + "group key: %w", err) + } + return Identifier{ + GroupKey: groupKey, + ProofType: proofType, + }, nil + } + + // At this point we know that the group key bytes are nil, so we'll + // attempt to parse the asset ID bytes. + if len(assetIDBytes) == 0 { + return Identifier{}, fmt.Errorf("asset ID bytes and group " + + "key bytes are both nil") + } + + var assetID asset.ID + copy(assetID[:], assetIDBytes) + + return Identifier{ + AssetID: assetID, + ProofType: proofType, + }, nil +} + +// parseGroupKey parses a group key from bytes, which can be in either the +// Schnorr or Compressed format. +func parseGroupKey(scriptKey []byte) (*btcec.PublicKey, error) { + switch len(scriptKey) { + case schnorr.PubKeyBytesLen: + return schnorr.ParsePubKey(scriptKey) + + // Truncate the key and then parse as a Schnorr key. + case btcec.PubKeyBytesLenCompressed: + return schnorr.ParsePubKey(scriptKey[1:]) + + default: + return nil, fmt.Errorf("unknown script key length: %v", + len(scriptKey)) + } +} + // ValidateProofUniverseType validates that the proof type matches the universe // identifier proof type. func ValidateProofUniverseType(a *asset.Asset, uniID Identifier) error { From 721e064d13eb611861b85f762dad3e290ddbe021 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 27 Nov 2023 12:03:44 +0000 Subject: [PATCH 05/16] universe+tapdb: add universe federation proof sync log table This commit also adds log query/upserts SQL statements. --- ...00013_universe_fed_proof_sync_log.down.sql | 2 + .../000013_universe_fed_proof_sync_log.up.sql | 36 +++ tapdb/sqlc/models.go | 11 + tapdb/sqlc/querier.go | 4 + tapdb/sqlc/queries/universe.sql | 98 +++++- tapdb/sqlc/universe.sql.go | 198 ++++++++++++ tapdb/universe_federation.go | 297 ++++++++++++++++++ universe/interface.go | 103 +++++- 8 files changed, 746 insertions(+), 3 deletions(-) create mode 100644 tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql create mode 100644 tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql diff --git a/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql new file mode 100644 index 000000000..42bdbfbb8 --- /dev/null +++ b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS federation_proof_sync_log_unique_index_proof_leaf_id_servers_id; +DROP TABLE IF EXISTS federation_proof_sync_log; \ No newline at end of file diff --git a/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql new file mode 100644 index 000000000..ba7e7c100 --- /dev/null +++ b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql @@ -0,0 +1,36 @@ +-- This table stores the log of federation universe proof sync attempts. Rows +-- in this table are specific to a given proof leaf, server, and sync direction. +CREATE TABLE IF NOT EXISTS federation_proof_sync_log ( + id BIGINT PRIMARY KEY, + + -- The status of the proof sync attempt. + status TEXT NOT NULL CHECK(status IN ('pending', 'complete')), + + -- The timestamp of when the log entry for the associated proof was last + -- updated. + timestamp TIMESTAMP NOT NULL, + + -- The number of attempts that have been made to sync the proof. + attempt_counter BIGINT NOT NULL DEFAULT 0, + + -- The direction of the proof sync attempt. + sync_direction TEXT NOT NULL CHECK(sync_direction IN ('push', 'pull')), + + -- The ID of the subject proof leaf. + proof_leaf_id BIGINT NOT NULL REFERENCES universe_leaves(id), + + -- The ID of the universe that the proof leaf belongs to. + universe_root_id BIGINT NOT NULL REFERENCES universe_roots(id), + + -- The ID of the server that the proof will be/was synced to. + servers_id BIGINT NOT NULL REFERENCES universe_servers(id) +); + +-- Create a unique index on table federation_proof_sync_log +CREATE UNIQUE INDEX federation_proof_sync_log_unique_index_proof_leaf_id_servers_id +ON federation_proof_sync_log ( + sync_direction, + proof_leaf_id, + universe_root_id, + servers_id +); \ No newline at end of file diff --git a/tapdb/sqlc/models.go b/tapdb/sqlc/models.go index b89980ab2..26eef9b5d 100644 --- a/tapdb/sqlc/models.go +++ b/tapdb/sqlc/models.go @@ -164,6 +164,17 @@ type FederationGlobalSyncConfig struct { AllowSyncExport bool } +type FederationProofSyncLog struct { + ID int64 + Status string + Timestamp time.Time + AttemptCounter int64 + SyncDirection string + ProofLeafID int64 + UniverseRootID int64 + ServersID int64 +} + type FederationUniSyncConfig struct { Namespace string AssetID []byte diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index 1a64f4291..c20633c10 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -121,6 +121,9 @@ type Querier interface { QueryAssets(ctx context.Context, arg QueryAssetsParams) ([]QueryAssetsRow, error) QueryEventIDs(ctx context.Context, arg QueryEventIDsParams) ([]QueryEventIDsRow, error) QueryFederationGlobalSyncConfigs(ctx context.Context) ([]FederationGlobalSyncConfig, error) + // Join on mssmt_nodes to get leaf related fields. + // Join on genesis_info_view to get leaf related fields. + QueryFederationProofSyncLog(ctx context.Context, arg QueryFederationProofSyncLogParams) ([]QueryFederationProofSyncLogRow, error) QueryFederationUniSyncConfigs(ctx context.Context) ([]FederationUniSyncConfig, error) QueryPassiveAssets(ctx context.Context, transferID int64) ([]QueryPassiveAssetsRow, error) QueryProofTransferAttempts(ctx context.Context, arg QueryProofTransferAttemptsParams) ([]time.Time, error) @@ -145,6 +148,7 @@ type Querier interface { UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error UpsertChainTx(ctx context.Context, arg UpsertChainTxParams) (int64, error) UpsertFederationGlobalSyncConfig(ctx context.Context, arg UpsertFederationGlobalSyncConfigParams) error + UpsertFederationProofSyncLog(ctx context.Context, arg UpsertFederationProofSyncLogParams) (int64, error) UpsertFederationUniSyncConfig(ctx context.Context, arg UpsertFederationUniSyncConfigParams) error UpsertGenesisAsset(ctx context.Context, arg UpsertGenesisAssetParams) (int64, error) UpsertGenesisPoint(ctx context.Context, prevOut []byte) (int64, error) diff --git a/tapdb/sqlc/queries/universe.sql b/tapdb/sqlc/queries/universe.sql index 3376cd0bf..3fd9a75b2 100644 --- a/tapdb/sqlc/queries/universe.sql +++ b/tapdb/sqlc/queries/universe.sql @@ -364,4 +364,100 @@ ON CONFLICT(namespace) -- name: QueryFederationUniSyncConfigs :many SELECT namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export FROM federation_uni_sync_config -ORDER BY group_key NULLS LAST, asset_id NULLS LAST, proof_type; \ No newline at end of file +ORDER BY group_key NULLS LAST, asset_id NULLS LAST, proof_type; + +-- name: UpsertFederationProofSyncLog :one +INSERT INTO federation_proof_sync_log as log ( + status, timestamp, sync_direction, proof_leaf_id, universe_root_id, + servers_id +) VALUES ( + @status, @timestamp, @sync_direction, + ( + -- Select the leaf id from the universe_leaves table. + SELECT id + FROM universe_leaves + WHERE leaf_node_namespace = @leaf_namespace + AND minting_point = @leaf_minting_point_bytes + AND script_key_bytes = @leaf_script_key_bytes + LIMIT 1 + ), + ( + -- Select the universe root id from the universe_roots table. + SELECT id + FROM universe_roots + WHERE namespace_root = @universe_id_namespace + LIMIT 1 + ), + ( + -- Select the server id from the universe_servers table. + SELECT id + FROM universe_servers + WHERE server_host = @server_host + LIMIT 1 + ) +) ON CONFLICT (sync_direction, proof_leaf_id, universe_root_id, servers_id) +DO UPDATE SET + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp, + -- Increment the attempt counter. + attempt_counter = CASE + WHEN @bump_sync_attempt_counter = true THEN log.attempt_counter + 1 + ELSE log.attempt_counter + END +RETURNING id; + +-- name: QueryFederationProofSyncLog :many +SELECT + log.id, status, timestamp, sync_direction, attempt_counter, + + -- Select fields from the universe_servers table. + server.id as server_id, + server.server_host, + + -- Select universe leaf related fields. + leaf.minting_point as leaf_minting_point_bytes, + leaf.script_key_bytes as leaf_script_key_bytes, + mssmt_node.value as leaf_genesis_proof, + genesis.gen_asset_id as leaf_gen_asset_id, + genesis.asset_id as leaf_asset_id, + + -- Select fields from the universe_roots table. + root.asset_id as uni_asset_id, + root.group_key as uni_group_key, + root.proof_type as uni_proof_type + +FROM federation_proof_sync_log as log + +JOIN universe_leaves as leaf + ON leaf.id = log.proof_leaf_id + +-- Join on mssmt_nodes to get leaf related fields. +JOIN mssmt_nodes mssmt_node + ON leaf.leaf_node_key = mssmt_node.key AND + leaf.leaf_node_namespace = mssmt_node.namespace + +-- Join on genesis_info_view to get leaf related fields. +JOIN genesis_info_view genesis + ON leaf.asset_genesis_id = genesis.gen_asset_id + +JOIN universe_servers as server + ON server.id = log.servers_id + +JOIN universe_roots as root + ON root.id = log.universe_root_id + +WHERE (log.sync_direction = sqlc.narg('sync_direction') + OR sqlc.narg('sync_direction') IS NULL) + AND + (log.status = sqlc.narg('status') OR sqlc.narg('status') IS NULL) + AND + + -- Universe leaves WHERE clauses. + (leaf.leaf_node_namespace = sqlc.narg('leaf_namespace') + OR sqlc.narg('leaf_namespace') IS NULL) + AND + (leaf.minting_point = sqlc.narg('leaf_minting_point_bytes') + OR sqlc.narg('leaf_minting_point_bytes') IS NULL) + AND + (leaf.script_key_bytes = sqlc.narg('leaf_script_key_bytes') + OR sqlc.narg('leaf_script_key_bytes') IS NULL); \ No newline at end of file diff --git a/tapdb/sqlc/universe.sql.go b/tapdb/sqlc/universe.sql.go index a0e7167af..9c5c21339 100644 --- a/tapdb/sqlc/universe.sql.go +++ b/tapdb/sqlc/universe.sql.go @@ -402,6 +402,134 @@ func (q *Queries) QueryFederationGlobalSyncConfigs(ctx context.Context) ([]Feder return items, nil } +const queryFederationProofSyncLog = `-- name: QueryFederationProofSyncLog :many +SELECT + log.id, status, timestamp, sync_direction, attempt_counter, + + -- Select fields from the universe_servers table. + server.id as server_id, + server.server_host, + + -- Select universe leaf related fields. + leaf.minting_point as leaf_minting_point_bytes, + leaf.script_key_bytes as leaf_script_key_bytes, + mssmt_node.value as leaf_genesis_proof, + genesis.gen_asset_id as leaf_gen_asset_id, + genesis.asset_id as leaf_asset_id, + + -- Select fields from the universe_roots table. + root.asset_id as uni_asset_id, + root.group_key as uni_group_key, + root.proof_type as uni_proof_type + +FROM federation_proof_sync_log as log + +JOIN universe_leaves as leaf + ON leaf.id = log.proof_leaf_id + +JOIN mssmt_nodes mssmt_node + ON leaf.leaf_node_key = mssmt_node.key AND + leaf.leaf_node_namespace = mssmt_node.namespace + +JOIN genesis_info_view genesis + ON leaf.asset_genesis_id = genesis.gen_asset_id + +JOIN universe_servers as server + ON server.id = log.servers_id + +JOIN universe_roots as root + ON root.id = log.universe_root_id + +WHERE (log.sync_direction = $1 + OR $1 IS NULL) + AND + (log.status = $2 OR $2 IS NULL) + AND + + -- Universe leaves WHERE clauses. + (leaf.leaf_node_namespace = $3 + OR $3 IS NULL) + AND + (leaf.minting_point = $4 + OR $4 IS NULL) + AND + (leaf.script_key_bytes = $5 + OR $5 IS NULL) +` + +type QueryFederationProofSyncLogParams struct { + SyncDirection sql.NullString + Status sql.NullString + LeafNamespace sql.NullString + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte +} + +type QueryFederationProofSyncLogRow struct { + ID int64 + Status string + Timestamp time.Time + SyncDirection string + AttemptCounter int64 + ServerID int64 + ServerHost string + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte + LeafGenesisProof []byte + LeafGenAssetID int64 + LeafAssetID []byte + UniAssetID []byte + UniGroupKey []byte + UniProofType string +} + +// Join on mssmt_nodes to get leaf related fields. +// Join on genesis_info_view to get leaf related fields. +func (q *Queries) QueryFederationProofSyncLog(ctx context.Context, arg QueryFederationProofSyncLogParams) ([]QueryFederationProofSyncLogRow, error) { + rows, err := q.db.QueryContext(ctx, queryFederationProofSyncLog, + arg.SyncDirection, + arg.Status, + arg.LeafNamespace, + arg.LeafMintingPointBytes, + arg.LeafScriptKeyBytes, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []QueryFederationProofSyncLogRow + for rows.Next() { + var i QueryFederationProofSyncLogRow + if err := rows.Scan( + &i.ID, + &i.Status, + &i.Timestamp, + &i.SyncDirection, + &i.AttemptCounter, + &i.ServerID, + &i.ServerHost, + &i.LeafMintingPointBytes, + &i.LeafScriptKeyBytes, + &i.LeafGenesisProof, + &i.LeafGenAssetID, + &i.LeafAssetID, + &i.UniAssetID, + &i.UniGroupKey, + &i.UniProofType, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryFederationUniSyncConfigs = `-- name: QueryFederationUniSyncConfigs :many SELECT namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export FROM federation_uni_sync_config @@ -877,6 +1005,76 @@ func (q *Queries) UpsertFederationGlobalSyncConfig(ctx context.Context, arg Upse return err } +const upsertFederationProofSyncLog = `-- name: UpsertFederationProofSyncLog :one +INSERT INTO federation_proof_sync_log as log ( + status, timestamp, sync_direction, proof_leaf_id, universe_root_id, + servers_id +) VALUES ( + $1, $2, $3, + ( + -- Select the leaf id from the universe_leaves table. + SELECT id + FROM universe_leaves + WHERE leaf_node_namespace = $4 + AND minting_point = $5 + AND script_key_bytes = $6 + LIMIT 1 + ), + ( + -- Select the universe root id from the universe_roots table. + SELECT id + FROM universe_roots + WHERE namespace_root = $7 + LIMIT 1 + ), + ( + -- Select the server id from the universe_servers table. + SELECT id + FROM universe_servers + WHERE server_host = $8 + LIMIT 1 + ) +) ON CONFLICT (sync_direction, proof_leaf_id, universe_root_id, servers_id) +DO UPDATE SET + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp, + -- Increment the attempt counter. + attempt_counter = CASE + WHEN $9 = true THEN log.attempt_counter + 1 + ELSE log.attempt_counter + END +RETURNING id +` + +type UpsertFederationProofSyncLogParams struct { + Status string + Timestamp time.Time + SyncDirection string + LeafNamespace string + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte + UniverseIDNamespace string + ServerHost string + BumpSyncAttemptCounter interface{} +} + +func (q *Queries) UpsertFederationProofSyncLog(ctx context.Context, arg UpsertFederationProofSyncLogParams) (int64, error) { + row := q.db.QueryRowContext(ctx, upsertFederationProofSyncLog, + arg.Status, + arg.Timestamp, + arg.SyncDirection, + arg.LeafNamespace, + arg.LeafMintingPointBytes, + arg.LeafScriptKeyBytes, + arg.UniverseIDNamespace, + arg.ServerHost, + arg.BumpSyncAttemptCounter, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + const upsertFederationUniSyncConfig = `-- name: UpsertFederationUniSyncConfig :exec INSERT INTO federation_uni_sync_config ( namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index c306baf30..a7b5d49f3 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -1,7 +1,9 @@ package tapdb import ( + "bytes" "context" + "database/sql" "errors" "fmt" "sort" @@ -9,8 +11,11 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcec/v2/schnorr" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightninglabs/taproot-assets/universe" "github.com/lightningnetwork/lnd/clock" @@ -19,6 +24,17 @@ import ( ) type ( + // UpsertFedProofSyncLogParams is used to upsert federation proof sync + // logs. + UpsertFedProofSyncLogParams = sqlc.UpsertFederationProofSyncLogParams + + // QueryFedProofSyncLogParams is used to query for federation proof sync + // logs. + QueryFedProofSyncLogParams = sqlc.QueryFederationProofSyncLogParams + + // ProofSyncLogEntry is a single entry from the proof sync log. + ProofSyncLogEntry = sqlc.QueryFederationProofSyncLogRow + // NewUniverseServer is used to create a new universe server. NewUniverseServer = sqlc.InsertUniverseServerParams @@ -62,6 +78,22 @@ var ( } ) +// FederationProofSyncLogStore is used to log the sync status of individual +// universe proofs. +type FederationProofSyncLogStore interface { + BaseUniverseStore + + // UpsertFederationProofSyncLog upserts a proof sync log entry for a + // given proof leaf and server. + UpsertFederationProofSyncLog(ctx context.Context, + arg UpsertFedProofSyncLogParams) (int64, error) + + // QueryFederationProofSyncLog returns the set of proof sync logs for a + // given proof leaf. + QueryFederationProofSyncLog(ctx context.Context, + arg QueryFedProofSyncLogParams) ([]ProofSyncLogEntry, error) +} + // FederationSyncConfigStore is used to manage the set of Universe servers as // part of a federation. type FederationSyncConfigStore interface { @@ -90,6 +122,7 @@ type FederationSyncConfigStore interface { // of a federation. type UniverseServerStore interface { FederationSyncConfigStore + FederationProofSyncLogStore // InsertUniverseServer inserts a new universe server in to the DB. InsertUniverseServer(ctx context.Context, arg NewUniverseServer) error @@ -268,6 +301,270 @@ func (u *UniverseFederationDB) LogNewSyncs(ctx context.Context, }) } +// UpsertFederationProofSyncLog upserts a federation proof sync log entry for a +// given universe server and proof. +func (u *UniverseFederationDB) UpsertFederationProofSyncLog( + ctx context.Context, uniID universe.Identifier, + leafKey universe.LeafKey, addr universe.ServerAddr, + syncDirection universe.SyncDirection, + syncStatus universe.ProofSyncStatus, + bumpSyncAttemptCounter bool) (int64, error) { + + // Encode the leaf key outpoint as bytes. We'll use this to look up the + // leaf ID in the DB. + leafKeyOutpointBytes, err := encodeOutpoint(leafKey.OutPoint) + if err != nil { + return 0, err + } + + // Encode the leaf script key pub key as bytes. We'll use this to look + // up the leaf ID in the DB. + scriptKeyPubKeyBytes := schnorr.SerializePubKey( + leafKey.ScriptKey.PubKey, + ) + + var ( + writeTx UniverseFederationOptions + logID int64 + ) + + err = u.db.ExecTx(ctx, &writeTx, func(db UniverseServerStore) error { + params := UpsertFedProofSyncLogParams{ + Status: string(syncStatus), + Timestamp: time.Now().UTC(), + SyncDirection: string(syncDirection), + UniverseIDNamespace: uniID.String(), + LeafNamespace: uniID.String(), + LeafMintingPointBytes: leafKeyOutpointBytes, + LeafScriptKeyBytes: scriptKeyPubKeyBytes, + ServerHost: addr.HostStr(), + BumpSyncAttemptCounter: bumpSyncAttemptCounter, + } + logID, err = db.UpsertFederationProofSyncLog(ctx, params) + if err != nil { + return err + } + + return nil + }) + + return logID, err +} + +// QueryFederationProofSyncLog queries the federation proof sync log and returns +// the log entries which correspond to the given universe proof leaf. +func (u *UniverseFederationDB) QueryFederationProofSyncLog( + ctx context.Context, uniID universe.Identifier, + leafKey universe.LeafKey, + syncDirection universe.SyncDirection, + syncStatus universe.ProofSyncStatus) ([]*universe.ProofSyncLogEntry, + error) { + + // Encode the leaf key outpoint as bytes. We'll use this to look up the + // leaf ID in the DB. + leafKeyOutpointBytes, err := encodeOutpoint(leafKey.OutPoint) + if err != nil { + return nil, err + } + + // Encode the leaf script key pub key as bytes. We'll use this to look + // up the leaf ID in the DB. + scriptKeyPubKeyBytes := schnorr.SerializePubKey( + leafKey.ScriptKey.PubKey, + ) + + var ( + readTx = NewUniverseFederationReadTx() + proofSyncLogs []*universe.ProofSyncLogEntry + ) + + err = u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { + params := QueryFedProofSyncLogParams{ + SyncDirection: sqlStr(string(syncDirection)), + Status: sqlStr(string(syncStatus)), + LeafNamespace: sqlStr(uniID.String()), + LeafMintingPointBytes: leafKeyOutpointBytes, + LeafScriptKeyBytes: scriptKeyPubKeyBytes, + } + logEntries, err := db.QueryFederationProofSyncLog(ctx, params) + + // Parse database proof sync logs. Multiple log entries may + // exist for a given leaf because each log entry is unique to a + // server. + proofSyncLogs = make( + []*universe.ProofSyncLogEntry, 0, len(logEntries), + ) + for idx := range logEntries { + entry := logEntries[idx] + + parsedLogEntry, err := fetchProofSyncLogEntry( + ctx, entry, db, + ) + if err != nil { + return err + } + + proofSyncLogs = append(proofSyncLogs, parsedLogEntry) + } + + return err + }) + if err != nil { + return nil, err + } + + return proofSyncLogs, nil +} + +// FetchPendingProofsSyncLog queries the federation proof sync log and returns +// all log entries with sync status pending. +func (u *UniverseFederationDB) FetchPendingProofsSyncLog(ctx context.Context, + syncDirection *universe.SyncDirection) ([]*universe.ProofSyncLogEntry, + error) { + + var ( + readTx = NewUniverseFederationReadTx() + proofSyncLogs []*universe.ProofSyncLogEntry + ) + + err := u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { + // If the sync direction is not set, then we'll query for all + // pending proof sync log entries. + var sqlSyncDirection sql.NullString + if syncDirection != nil { + sqlSyncDirection = sqlStr(string(*syncDirection)) + } + + sqlProofSyncStatus := sqlStr( + string(universe.ProofSyncStatusPending), + ) + + params := QueryFedProofSyncLogParams{ + SyncDirection: sqlSyncDirection, + Status: sqlProofSyncStatus, + } + logEntries, err := db.QueryFederationProofSyncLog(ctx, params) + if err != nil { + return fmt.Errorf("unable to query proof sync log: %w", + err) + } + + // Parse log entries from database row. + proofSyncLogs = make( + []*universe.ProofSyncLogEntry, 0, len(logEntries), + ) + for idx := range logEntries { + entry := logEntries[idx] + + parsedLogEntry, err := fetchProofSyncLogEntry( + ctx, entry, db, + ) + if err != nil { + return err + } + + proofSyncLogs = append(proofSyncLogs, parsedLogEntry) + } + + return nil + }) + if err != nil { + return nil, err + } + + return proofSyncLogs, nil +} + +// fetchProofSyncLogEntry returns a proof sync log entry given a DB row. +func fetchProofSyncLogEntry(ctx context.Context, entry ProofSyncLogEntry, + dbTx UniverseServerStore) (*universe.ProofSyncLogEntry, error) { + + // Fetch asset genesis for the leaf. + leafAssetGen, err := fetchGenesis(ctx, dbTx, entry.LeafGenAssetID) + if err != nil { + return nil, err + } + + // We only need to obtain the asset at this point, so we'll do a sparse + // decode here to decode only the asset record. + var leafAsset asset.Asset + assetRecord := proof.AssetLeafRecord(&leafAsset) + err = proof.SparseDecode( + bytes.NewReader(entry.LeafGenesisProof), assetRecord, + ) + if err != nil { + return nil, fmt.Errorf("unable to decode proof: %w", err) + } + + leaf := &universe.Leaf{ + GenesisWithGroup: universe.GenesisWithGroup{ + Genesis: leafAssetGen, + GroupKey: leafAsset.GroupKey, + }, + RawProof: entry.LeafGenesisProof, + Asset: &leafAsset, + Amt: leafAsset.Amount, + } + + // Parse leaf key from leaf DB row. + scriptKeyPub, err := schnorr.ParsePubKey( + entry.LeafScriptKeyBytes, + ) + if err != nil { + return nil, err + } + scriptKey := asset.NewScriptKey(scriptKeyPub) + + var outPoint wire.OutPoint + err = readOutPoint( + bytes.NewReader(entry.LeafMintingPointBytes), 0, 0, + &outPoint, + ) + if err != nil { + return nil, err + } + + leafKey := universe.LeafKey{ + OutPoint: outPoint, + ScriptKey: &scriptKey, + } + + // Parse server address from DB row. + serverAddr := universe.NewServerAddr(entry.ServerID, entry.ServerHost) + + // Parse proof sync status directly from the DB row. + status, err := universe.ParseStrProofSyncStatus(entry.Status) + if err != nil { + return nil, err + } + + // Parse proof sync direction directly from the DB row. + direction, err := universe.ParseStrSyncDirection(entry.SyncDirection) + if err != nil { + return nil, err + } + + uniID, err := universe.NewUniIDFromRawArgs( + entry.UniAssetID, entry.UniGroupKey, + entry.UniProofType, + ) + if err != nil { + return nil, err + } + + return &universe.ProofSyncLogEntry{ + Timestamp: entry.Timestamp, + SyncStatus: status, + SyncDirection: direction, + AttemptCounter: entry.AttemptCounter, + ServerAddr: serverAddr, + + UniID: uniID, + LeafKey: leafKey, + Leaf: *leaf, + }, nil +} + // UpsertFederationSyncConfig upserts both the global and universe specific // federation sync configs. func (u *UniverseFederationDB) UpsertFederationSyncConfig( diff --git a/universe/interface.go b/universe/interface.go index ffd5db9cf..09e316994 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -757,10 +757,109 @@ type FederationSyncConfigDB interface { uniSyncConfigs []*FedUniSyncConfig) error } -// FederationDB is used for CRUD operations related to federation sync config -// and tracked servers. +// SyncDirection is the direction of a proof sync. +type SyncDirection string + +const ( + // SyncDirectionPush indicates that the sync is a push sync (from the local + // server to the remote server). + SyncDirectionPush SyncDirection = "push" + + // SyncDirectionPull indicates that the sync is a pull sync (from the remote + // server to the local server). + SyncDirectionPull SyncDirection = "pull" +) + +// ParseStrSyncDirection parses a string into a SyncDirection. +func ParseStrSyncDirection(s string) (SyncDirection, error) { + switch s { + case string(SyncDirectionPush): + return SyncDirectionPush, nil + case string(SyncDirectionPull): + return SyncDirectionPull, nil + default: + return "", fmt.Errorf("unknown sync direction: %v", s) + } +} + +// ProofSyncStatus is the status of a proof sync. +type ProofSyncStatus string + +const ( + // ProofSyncStatusPending indicates that the sync is pending. + ProofSyncStatusPending ProofSyncStatus = "pending" + + // ProofSyncStatusComplete indicates that the sync is complete. + ProofSyncStatusComplete ProofSyncStatus = "complete" +) + +// ParseStrProofSyncStatus parses a string into a ProofSyncStatus. +func ParseStrProofSyncStatus(s string) (ProofSyncStatus, error) { + switch s { + case string(ProofSyncStatusPending): + return ProofSyncStatusPending, nil + case string(ProofSyncStatusComplete): + return ProofSyncStatusComplete, nil + default: + return "", fmt.Errorf("unknown proof sync status: %v", s) + } +} + +// ProofSyncLogEntry is a log entry for a proof sync. +type ProofSyncLogEntry struct { + // Timestamp is the timestamp of the log entry. + Timestamp time.Time + + // SyncStatus is the status of the sync. + SyncStatus ProofSyncStatus + + // SyncDirection is the direction of the sync. + SyncDirection SyncDirection + + // AttemptCounter is the number of times the sync has been attempted. + AttemptCounter int64 + + // ServerAddr is the address of the sync counterparty server. + ServerAddr ServerAddr + + // UniID is the identifier of the universe associated with the sync event. + UniID Identifier + + // LeafKey is the leaf key associated with the sync event. + LeafKey LeafKey + + // Leaf is the leaf associated with the sync event. + Leaf Leaf +} + +// FederationProofSyncLog is used for CRUD operations relating to the federation +// proof sync log. +type FederationProofSyncLog interface { + // UpsertFederationProofSyncLog upserts a federation proof sync log + // entry for a given universe server and proof. + UpsertFederationProofSyncLog(ctx context.Context, uniID Identifier, + leafKey LeafKey, addr ServerAddr, syncDirection SyncDirection, + syncStatus ProofSyncStatus, + bumpSyncAttemptCounter bool) (int64, error) + + // QueryFederationProofSyncLog queries the federation proof sync log and + // returns the log entries which correspond to the given universe proof + // leaf. + QueryFederationProofSyncLog(ctx context.Context, uniID Identifier, + leafKey LeafKey, syncDirection SyncDirection, + syncStatus ProofSyncStatus) ([]*ProofSyncLogEntry, error) + + // FetchPendingProofsSyncLog queries the federation proof sync log and + // returns all log entries with sync status pending. + FetchPendingProofsSyncLog(ctx context.Context, + syncDirection *SyncDirection) ([]*ProofSyncLogEntry, error) +} + +// FederationDB is used for CRUD operations related to federation logs and +// configuration. type FederationDB interface { FederationLog + FederationProofSyncLog FederationSyncConfigDB } From 14644fff3b188b01eb159be8bd0a7484f70054a7 Mon Sep 17 00:00:00 2001 From: ffranr Date: Mon, 27 Nov 2023 14:33:48 +0000 Subject: [PATCH 06/16] universe: use proof log db table in federation envoy This commit adds a flag to the federation universe push request to indicate that the proof leaf sync attempt should be logged and actively managed to ensure that the federation push procedure is repeated in the event of a failure. --- tapgarden/caretaker.go | 5 + universe/auto_syncer.go | 319 +++++++++++++++++++++++++++++++++++----- universe/interface.go | 6 + 3 files changed, 290 insertions(+), 40 deletions(-) diff --git a/tapgarden/caretaker.go b/tapgarden/caretaker.go index fb1c1c65a..78b867174 100644 --- a/tapgarden/caretaker.go +++ b/tapgarden/caretaker.go @@ -1179,6 +1179,11 @@ func (b *BatchCaretaker) storeMintingProof(ctx context.Context, ID: uniID, Key: leafKey, Leaf: mintingLeaf, + + // We set this to true to indicate that we would like the syncer + // to log and reattempt (in the event of a failure) to push sync + // this proof leaf. + LogProofSync: true, }, nil } diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 3a32df1ab..688b599b3 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -76,6 +76,12 @@ type FederationPushReq struct { // federation proof push was successful. resp chan *Proof + // LogProofSync is a boolean that indicates, if true, that the proof + // leaf sync attempt should be logged and actively managed to ensure + // that the federation push procedure is repeated in the event of a + // failure. + LogProofSync bool + err chan error } @@ -227,48 +233,105 @@ func (f *FederationEnvoy) syncServerState(ctx context.Context, return nil } -// pushProofToFederation attempts to push out a new proof to the current -// federation in parallel. -func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, - leaf *Leaf) { +// pushProofToServer attempts to push out a new proof to the target server. +func (f *FederationEnvoy) pushProofToServer(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() - if err != nil || len(fedServers) == 0 { - return + remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) + if err != nil { + return fmt.Errorf("cannot push proof unable to connect "+ + "to remote server(%v): %w", addr.HostStr(), err) } - log.Infof("Pushing new proof to %v federation members, proof_key=%v", - len(fedServers), spew.Sdump(key)) + _, err = remoteUniverseServer.UpsertProofLeaf( + ctx, uniID, key, leaf, + ) + if err != nil { + return fmt.Errorf("cannot push proof to remote "+ + "server(%v): %w", addr.HostStr(), err) + } - ctx, cancel := f.WithCtxQuitNoTimeout() - defer cancel() + return nil +} + +// pushProofToServerLogged attempts to push out a new proof to the target +// server, and logs the sync attempt. +func (f *FederationEnvoy) pushProofToServerLogged(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { + + // Ensure that we have a pending sync log entry for this + // leaf and server pair. This will allow us to handle all + // pending syncs in the event of a restart or at a different + // point in the envoy. + _, err := f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusPending, true, + ) + if err != nil { + return fmt.Errorf("unable to log proof sync as pending: %w", + err) + } + + // Push the proof to the remote server. + err = f.pushProofToServer(ctx, uniID, key, leaf, addr) + if err != nil { + return fmt.Errorf("cannot push proof to remote server(%v): %w", + addr.HostStr(), err) + } + + // We did not encounter an error in our proof push + // attempt. Log the proof sync attempt as complete. + _, err = f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusComplete, false, + ) + if err != nil { + return fmt.Errorf("unable to log proof sync attempt: %w", err) + } + + return nil +} + +// pushProofToFederation attempts to push out a new proof to the current +// federation in parallel. +func (f *FederationEnvoy) pushProofToFederation(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, fedServers []ServerAddr, + logProofSync bool) { + + log.Infof("Pushing proof to %v federation members, proof_key=%v", + len(fedServers), spew.Sdump(key)) // To push a new proof out, we'll attempt to dial to the remote // registrar, then will attempt to push the new proof directly to the // register. pushNewProof := func(ctx context.Context, addr ServerAddr) error { - remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) - if err != nil { - log.Warnf("cannot push proof unable to connect "+ - "to remote server(%v): %v", addr.HostStr(), - err) + // If we are logging proof sync attempts, we will use the + // logged version of the push function. + if logProofSync { + err := f.pushProofToServerLogged( + ctx, uniID, key, leaf, addr, + ) + if err != nil { + log.Warnf("Cannot push proof via logged "+ + "server push: %v", err) + } + return nil } - _, err = remoteUniverseServer.UpsertProofLeaf( - ctx, uniID, key, leaf, - ) + // If we are not logging proof sync attempts, we will use the + // non-logged version of the push function. + err := f.pushProofToServer(ctx, uniID, key, leaf, addr) if err != nil { - log.Warnf("cannot push proof to remote "+ - "server(%v): %v", addr.HostStr(), err) + log.Warnf("Cannot push proof: %v", err) } + return nil } // To conclude, we'll attempt to push the new proof to all the universe // servers in parallel. - err = fn.ParSlice(ctx, fedServers, pushNewProof) + err := fn.ParSlice(ctx, fedServers, pushNewProof) if err != nil { // TODO(roasbeef): retry in the background until successful? log.Errorf("unable to push proof to federation: %v", err) @@ -276,6 +339,57 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, } } +// filterProofSyncPending filters out servers that have already been synced +// with for the given leaf. +func (f *FederationEnvoy) filterProofSyncPending(fedServers []ServerAddr, + uniID Identifier, key LeafKey) ([]ServerAddr, error) { + + // If there are no servers to filter, then we'll return early. This + // saves from querying the database unnecessarily. + if len(fedServers) == 0 { + return nil, nil + } + + ctx, cancel := f.WithCtxQuit() + defer cancel() + + // Select all sync push complete log entries for the given universe + // leaf. If there are any servers which are sync complete within this + // log set, we will filter them out of our target server set. + logs, err := f.cfg.FederationDB.QueryFederationProofSyncLog( + ctx, uniID, key, SyncDirectionPush, + ProofSyncStatusComplete, + ) + if err != nil { + return nil, fmt.Errorf("unable to query federation sync log: %w", + err) + } + + // Construct a map of servers that have already been synced with for the + // given leaf. + syncedServers := make(map[string]struct{}) + for idx := range logs { + logEntry := logs[idx] + syncedServers[logEntry.ServerAddr.HostStr()] = struct{}{} + } + + // Filter out servers that we've already pushed to. + filteredFedServers := fn.Filter(fedServers, func(a ServerAddr) bool { + // Filter out servers that have a log entry with sync status + // complete. + if _, ok := syncedServers[a.HostStr()]; ok { + return false + } + + // By this point we haven't found logs corresponding to the + // given server, we will therefore return true and include the + // server as a sync target for the given leaf. + return true + }) + + return filteredFedServers, nil +} + // syncer is the main goroutine that's responsible for interacting with the // federation envoy. It also accepts incoming requests to push out new updates // to the federation. @@ -289,12 +403,24 @@ func (f *FederationEnvoy) syncer() { syncTicker := time.NewTicker(f.cfg.SyncInterval) defer syncTicker.Stop() + // We'll use a timeout that's slightly less than the sync interval to + // help avoid ticking into a new sync event before the previous event + // has finished. + syncContextTimeout := f.cfg.SyncInterval - 1*time.Second + if syncContextTimeout < 0 { + // If the sync interval is less than a second, then we'll use + // the sync interval as the timeout. + syncContextTimeout = f.cfg.SyncInterval + } + for { select { // A new sync event has just been triggered, so we'll attempt // to synchronize state with all the active universe servers in // the federation. case <-syncTicker.C: + log.Debug("Federation envoy handling new tick event") + // Error propagation is handled in tryFetchServers, we // only need to exit here. fedServers, err := f.tryFetchServers() @@ -313,11 +439,60 @@ func (f *FederationEnvoy) syncer() { continue } + // After we've synced with the federation, we'll + // attempt to push out any pending proofs that we + // haven't yet completed. + ctxFetchLog, cancelFetchLog := f.WithCtxQuitNoTimeout() + syncDirection := SyncDirectionPush + db := f.cfg.FederationDB + logEntries, err := db.FetchPendingProofsSyncLog( + ctxFetchLog, &syncDirection, + ) + cancelFetchLog() + if err != nil { + log.Warnf("unable to query pending push "+ + "sync log: %w", err) + continue + } + + if len(logEntries) > 0 { + log.Debugf("Handling pending proof sync log "+ + "entries (entries_count=%d)", + len(logEntries)) + } + + // TODO(ffranr): Take account of any new servers that + // have been added since the last time we populated the + // log for a given proof leaf. Pending proof sync log + // entries are only relevant for the set of servers + // that existed at the time the log entry was created. + // If a new server is added, then we should create a + // new log entry for the new server. + + for idx := range logEntries { + entry := logEntries[idx] + + servers := []ServerAddr{ + entry.ServerAddr, + } + + ctxPush, cancelPush := + f.CtxBlockingCustomTimeout( + syncContextTimeout, + ) + f.pushProofToFederation( + ctxPush, entry.UniID, entry.LeafKey, + &entry.Leaf, servers, true, + ) + cancelPush() + } + // A new push request has just arrived. We'll perform a // asynchronous registration with the local Universe registrar, // then push it out in an async manner to the federation // members. case pushReq := <-f.pushRequests: + log.Debug("Federation envoy handling push request") ctx, cancel := f.WithCtxQuit() // First, we'll attempt to registrar the proof leaf with @@ -341,13 +516,53 @@ func (f *FederationEnvoy) syncer() { // proof out to the federation in the background. pushReq.resp <- newProof - // With the response sent above, we'll push this out to - // all the Universe servers in the background. - go f.pushProofToFederation( - pushReq.ID, pushReq.Key, pushReq.Leaf, + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch "+ + "federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + + if pushReq.LogProofSync { + // We are attempting to sync using the + // logged proof sync procedure. We will + // therefore narrow down the set of target + // servers based on the sync log. Only servers + // that are not yet push sync complete will be + // targeted. + fedServers, err = f.filterProofSyncPending( + fedServers, pushReq.ID, pushReq.Key, + ) + if err != nil { + log.Warnf("failed to filter " + + "federation servers") + continue + } + } + + // With the response sent above, we'll push this + // out to all the Universe servers in the + // background. + ctxPush, cancelPush := f.WithCtxQuitNoTimeout() + f.pushProofToFederation( + ctxPush, pushReq.ID, pushReq.Key, + pushReq.Leaf, fedServers, + pushReq.LogProofSync, ) + cancelPush() case pushReq := <-f.batchPushRequests: + log.Debug("Federation envoy handling batch push " + + "request") ctx, cancel := f.WithCtxQuitNoTimeout() // First, we'll attempt to registrar the proof leaf with @@ -370,16 +585,34 @@ func (f *FederationEnvoy) syncer() { // we'll return back to the caller. pushReq.resp <- struct{}{} + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch "+ + "federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + // With the response sent above, we'll push this out to // all the Universe servers in the background. - go func() { - for idx := range pushReq.Batch { - item := pushReq.Batch[idx] - f.pushProofToFederation( - item.ID, item.Key, item.Leaf, - ) - } - }() + ctxPush, cancelPush := f.WithCtxQuitNoTimeout() + for idx := range pushReq.Batch { + item := pushReq.Batch[idx] + + f.pushProofToFederation( + ctxPush, item.ID, item.Key, item.Leaf, + fedServers, item.LogProofSync, + ) + } + cancelPush() case <-f.Quit: return @@ -395,12 +628,18 @@ func (f *FederationEnvoy) syncer() { func (f *FederationEnvoy) UpsertProofLeaf(_ context.Context, id Identifier, key LeafKey, leaf *Leaf) (*Proof, error) { + // If we're attempting to push an issuance proof, then we'll ensure + // that we track the sync attempt to ensure that we retry in the event + // of a failure. + logProofSync := id.ProofType == ProofTypeIssuance + pushReq := &FederationPushReq{ - ID: id, - Key: key, - Leaf: leaf, - resp: make(chan *Proof, 1), - err: make(chan error, 1), + ID: id, + Key: key, + Leaf: leaf, + LogProofSync: logProofSync, + resp: make(chan *Proof, 1), + err: make(chan error, 1), } if !fn.SendOrQuit(f.pushRequests, pushReq, f.Quit) { diff --git a/universe/interface.go b/universe/interface.go index 09e316994..a6b8afb78 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -397,6 +397,12 @@ type Item struct { // MetaReveal is the meta reveal associated with the given proof leaf. MetaReveal *proof.MetaReveal + + // LogProofSync is a boolean that indicates, if true, that the proof + // leaf sync attempt should be logged and actively managed to ensure + // that the federation push procedure is repeated in the event of a + // failure. + LogProofSync bool } // BatchRegistrar is an interface that allows a caller to register a batch of From 14a653e8503779b6e090f9cd1a3df8e0a9f75faf Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 29 Nov 2023 21:36:59 +0000 Subject: [PATCH 07/16] universe+tapdb: add SQL statement to delete proof sync log entries --- tapdb/sqlc/querier.go | 1 + tapdb/sqlc/queries/universe.sql | 22 ++++++++++++++++++- tapdb/sqlc/universe.sql.go | 38 +++++++++++++++++++++++++++++++++ tapdb/universe_federation.go | 38 +++++++++++++++++++++++++++++++++ universe/interface.go | 4 ++++ 5 files changed, 102 insertions(+), 1 deletion(-) diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index c20633c10..3a9a6b27a 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -25,6 +25,7 @@ type Querier interface { DeleteAllNodes(ctx context.Context, namespace string) (int64, error) DeleteAssetWitnesses(ctx context.Context, assetID int64) error DeleteExpiredUTXOLeases(ctx context.Context, now sql.NullTime) error + DeleteFederationProofSyncLog(ctx context.Context, arg DeleteFederationProofSyncLogParams) error DeleteManagedUTXO(ctx context.Context, outpoint []byte) error DeleteNode(ctx context.Context, arg DeleteNodeParams) (int64, error) DeleteRoot(ctx context.Context, namespace string) (int64, error) diff --git a/tapdb/sqlc/queries/universe.sql b/tapdb/sqlc/queries/universe.sql index 3fd9a75b2..767c50f25 100644 --- a/tapdb/sqlc/queries/universe.sql +++ b/tapdb/sqlc/queries/universe.sql @@ -460,4 +460,24 @@ WHERE (log.sync_direction = sqlc.narg('sync_direction') OR sqlc.narg('leaf_minting_point_bytes') IS NULL) AND (leaf.script_key_bytes = sqlc.narg('leaf_script_key_bytes') - OR sqlc.narg('leaf_script_key_bytes') IS NULL); \ No newline at end of file + OR sqlc.narg('leaf_script_key_bytes') IS NULL); + +-- name: DeleteFederationProofSyncLog :exec +WITH selected_server_id AS ( + -- Select the server ids from the universe_servers table for the specified + -- hosts. + SELECT id + FROM universe_servers + WHERE + (server_host = sqlc.narg('server_host') + OR sqlc.narg('server_host') IS NULL) +) +DELETE FROM federation_proof_sync_log +WHERE + servers_id IN (SELECT id FROM selected_server_id) AND + (status = sqlc.narg('status') + OR sqlc.narg('status') IS NULL) AND + (timestamp >= sqlc.narg('min_timestamp') + OR sqlc.narg('min_timestamp') IS NULL) AND + (attempt_counter >= sqlc.narg('min_attempt_counter') + OR sqlc.narg('min_attempt_counter') IS NULL); \ No newline at end of file diff --git a/tapdb/sqlc/universe.sql.go b/tapdb/sqlc/universe.sql.go index 9c5c21339..b1e7b33ec 100644 --- a/tapdb/sqlc/universe.sql.go +++ b/tapdb/sqlc/universe.sql.go @@ -11,6 +11,44 @@ import ( "time" ) +const deleteFederationProofSyncLog = `-- name: DeleteFederationProofSyncLog :exec +WITH selected_server_id AS ( + -- Select the server ids from the universe_servers table for the specified + -- hosts. + SELECT id + FROM universe_servers + WHERE + (server_host = $4 + OR $4 IS NULL) +) +DELETE FROM federation_proof_sync_log +WHERE + servers_id IN (SELECT id FROM selected_server_id) AND + (status = $1 + OR $1 IS NULL) AND + (timestamp >= $2 + OR $2 IS NULL) AND + (attempt_counter >= $3 + OR $3 IS NULL) +` + +type DeleteFederationProofSyncLogParams struct { + Status sql.NullString + MinTimestamp sql.NullTime + MinAttemptCounter sql.NullInt64 + ServerHost sql.NullString +} + +func (q *Queries) DeleteFederationProofSyncLog(ctx context.Context, arg DeleteFederationProofSyncLogParams) error { + _, err := q.db.ExecContext(ctx, deleteFederationProofSyncLog, + arg.Status, + arg.MinTimestamp, + arg.MinAttemptCounter, + arg.ServerHost, + ) + return err +} + const deleteUniverseEvents = `-- name: DeleteUniverseEvents :exec WITH root_id AS ( SELECT id diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index a7b5d49f3..c6bbea8a5 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -32,6 +32,9 @@ type ( // logs. QueryFedProofSyncLogParams = sqlc.QueryFederationProofSyncLogParams + // DeleteFedProofSyncLogParams is used to delete proof sync log entries. + DeleteFedProofSyncLogParams = sqlc.DeleteFederationProofSyncLogParams + // ProofSyncLogEntry is a single entry from the proof sync log. ProofSyncLogEntry = sqlc.QueryFederationProofSyncLogRow @@ -92,6 +95,10 @@ type FederationProofSyncLogStore interface { // given proof leaf. QueryFederationProofSyncLog(ctx context.Context, arg QueryFedProofSyncLogParams) ([]ProofSyncLogEntry, error) + + // DeleteFederationProofSyncLog deletes proof sync log entries. + DeleteFederationProofSyncLog(ctx context.Context, + arg DeleteFedProofSyncLogParams) error } // FederationSyncConfigStore is used to manage the set of Universe servers as @@ -565,6 +572,37 @@ func fetchProofSyncLogEntry(ctx context.Context, entry ProofSyncLogEntry, }, nil } +// DeleteProofsSyncLogEntries deletes a set of proof sync log entries. +func (u *UniverseFederationDB) DeleteProofsSyncLogEntries(ctx context.Context, + servers ...universe.ServerAddr) error { + + var writeTx UniverseFederationOptions + + err := u.db.ExecTx(ctx, &writeTx, func(db UniverseServerStore) error { + // Delete proof sync log entries which are associated with each + // server. + for i := range servers { + server := servers[i] + + err := db.DeleteFederationProofSyncLog( + ctx, DeleteFedProofSyncLogParams{ + ServerHost: sqlStr(server.HostStr()), + }, + ) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + // UpsertFederationSyncConfig upserts both the global and universe specific // federation sync configs. func (u *UniverseFederationDB) UpsertFederationSyncConfig( diff --git a/universe/interface.go b/universe/interface.go index a6b8afb78..469e6eaa6 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -859,6 +859,10 @@ type FederationProofSyncLog interface { // returns all log entries with sync status pending. FetchPendingProofsSyncLog(ctx context.Context, syncDirection *SyncDirection) ([]*ProofSyncLogEntry, error) + + // DeleteProofsSyncLogEntries deletes proof sync log entries. + DeleteProofsSyncLogEntries(ctx context.Context, + servers ...ServerAddr) error } // FederationDB is used for CRUD operations related to federation logs and From 499f45210ec338447eae1835906b566728989143 Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 29 Nov 2023 21:37:44 +0000 Subject: [PATCH 08/16] rpcserver: when removing fed server also remove from proof sync log --- rpcserver.go | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/rpcserver.go b/rpcserver.go index 59f273c2f..992bd79d4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3997,7 +3997,17 @@ func (r *rpcServer) DeleteFederationServer(ctx context.Context, serversToDel := fn.Map(req.Servers, unmarshalUniverseServer) - err := r.cfg.FederationDB.RemoveServers(ctx, serversToDel...) + // Remove the servers from the proofs sync log. This is necessary before + // we can remove the servers from the database because of a foreign + // key constraint. + err := r.cfg.FederationDB.DeleteProofsSyncLogEntries( + ctx, serversToDel..., + ) + if err != nil { + return nil, err + } + + err = r.cfg.FederationDB.RemoveServers(ctx, serversToDel...) if err != nil { return nil, err } From 75f93b2303be4454d7dfea09b7118869fb966341 Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 29 Nov 2023 16:45:22 +0000 Subject: [PATCH 09/16] itest: add tapd harness config param for federation sync ticker interval --- itest/tapd_harness.go | 8 ++++++++ itest/test_harness.go | 5 +++++ 2 files changed, 13 insertions(+) diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index fe81f4ef2..1c500875a 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -100,6 +100,10 @@ type harnessOpts struct { proofCourier proof.CourierHarness custodianProofRetrievalDelay *time.Duration addrAssetSyncerDisable bool + + // fedSyncTickerInterval is the interval at which the federation envoy + // sync ticker will fire. + fedSyncTickerInterval *time.Duration } type harnessOption func(*harnessOpts) @@ -242,6 +246,10 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay } + if opts.fedSyncTickerInterval != nil { + finalCfg.Universe.SyncInterval = *opts.fedSyncTickerInterval + } + return &tapdHarness{ cfg: &cfg, clientCfg: finalCfg, diff --git a/itest/test_harness.go b/itest/test_harness.go index 1550f3a7b..2614bbe82 100644 --- a/itest/test_harness.go +++ b/itest/test_harness.go @@ -366,6 +366,10 @@ type tapdHarnessParams struct { // synced from the above node. startupSyncNumAssets int + // fedSyncTickerInterval is the interval at which the federation envoy + // sync ticker will fire. + fedSyncTickerInterval *time.Duration + // noDefaultUniverseSync indicates whether the default universe server // should be added as a federation server or not. noDefaultUniverseSync bool @@ -402,6 +406,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest, ho.proofCourier = selectedProofCourier ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable + ho.fedSyncTickerInterval = params.fedSyncTickerInterval } tapdHarness, err := newTapdHarness(t, ht, tapdConfig{ From 9fbf274d3f74534e15bdb8217ddab3a55325f5a4 Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 29 Nov 2023 16:53:13 +0000 Subject: [PATCH 10/16] itest: test that the fed envoy reattempts mint proof push syncs This commit adds an integration test which helps to ensure that a minting node will retry pushing a minting proof to a federation server peer node, in the event that that peer node failed to receive the proof at the time of the initial sync attempt. --- itest/test_list_on_test.go | 4 ++ itest/universe_federation_test.go | 99 +++++++++++++++++++++++++++++++ 2 files changed, 103 insertions(+) create mode 100644 itest/universe_federation_test.go diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index 9f0297038..9d4d6a582 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -195,6 +195,10 @@ var testCases = []*testCase{ name: "universe pagination simple", test: testUniversePaginationSimple, }, + { + name: "mint proof repeat fed sync attempt", + test: testMintProofRepeatFedSyncAttempt, + }, } var optionalTestCases = []*testCase{ diff --git a/itest/universe_federation_test.go b/itest/universe_federation_test.go new file mode 100644 index 000000000..69d7dd712 --- /dev/null +++ b/itest/universe_federation_test.go @@ -0,0 +1,99 @@ +package itest + +import ( + "context" + "time" + + "github.com/lightninglabs/taproot-assets/taprpc/mintrpc" + unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc" + "github.com/stretchr/testify/require" +) + +// testMintProofRepeatFedSyncAttempt tests that the minting node will retry +// pushing the minting proofs to the federation server peer node, if the peer +// node is offline at the time of the initial sync attempt. +func testMintProofRepeatFedSyncAttempt(t *harnessTest) { + // Create a new minting node, without hooking it up to any existing + // Universe server. We will also set the sync ticker to 4 second, so + // that we can test that the proof push sync is retried and eventually + // succeeds after the fed server peer node reappears online. + syncTickerInterval := 4 * time.Second + mintingNode := setupTapdHarness( + t.t, t, t.lndHarness.Bob, nil, + func(params *tapdHarnessParams) { + params.fedSyncTickerInterval = &syncTickerInterval + params.noDefaultUniverseSync = true + }, + ) + defer func() { + require.NoError(t.t, mintingNode.stop(!*noDelete)) + }() + + // We'll use the main node as our federation universe server + // counterparty. + fedServerNode := t.tapd + + // Keep a reference to the fed server node RPC host address, so that we + // can assert that it has not changed after the restart. This is + // important, because the minting node will be retrying the proof push + // to this address. + fedServerNodeRpcHost := fedServerNode.rpcHost() + + // Register the fedServerNode as a federation universe server with the + // minting node. + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) + defer cancel() + + _, err := mintingNode.AddFederationServer( + ctxt, &unirpc.AddFederationServerRequest{ + Servers: []*unirpc.UniverseFederationServer{ + { + Host: fedServerNodeRpcHost, + }, + }, + }, + ) + require.NoError(t.t, err) + + // Assert that the fed server node has not seen any asset proofs. + AssertUniverseStats(t.t, fedServerNode, 0, 0, 0) + + // Stop the federation server peer node, so that it does not receive the + // newly minted asset proofs immediately upon minting. + t.Logf("Stopping fed server tapd node") + require.NoError(t.t, fedServerNode.stop(false)) + + // Now that federation peer node is inactive, we'll mint some assets. + t.Logf("Minting assets on minting node") + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner.Client, mintingNode, + []*mintrpc.MintAssetRequest{ + simpleAssets[0], issuableAssets[0], + }, + ) + require.Len(t.t, rpcAssets, 2) + + t.lndHarness.MineBlocks(7) + + // Wait for the minting node to attempt (and fail) to push the minting + // proofs to the fed peer node. We wait some multiple of the sync ticker + // interval to ensure that the minting node has had time to retry the + // proof push sync. + time.Sleep(syncTickerInterval * 2) + + // Start the federation server peer node. The federation envoy component + // of our minting node should currently be retrying the proof push sync + // with the federation peer at each tick. + t.Logf("Start (previously stopped) fed server tapd node") + err = fedServerNode.start(false) + require.NoError(t.t, err) + + // Ensure that the federation server node RPC host address has not + // changed after the restart. If it has, then the minting node will be + // retrying the proof push to the wrong address. + require.Equal(t.t, fedServerNodeRpcHost, fedServerNode.rpcHost()) + + t.Logf("Assert that fed peer node has seen the asset minting proofs") + AssertUniverseStats(t.t, fedServerNode, 2, 2, 1) +} From 078cdb6c7b3e660a3ffbbb93420dfd0ef29eb5c1 Mon Sep 17 00:00:00 2001 From: ffranr Date: Fri, 1 Dec 2023 17:43:40 +0000 Subject: [PATCH 11/16] tapdb: add unit test multi db store handler This structure will allow us to pass around the same db store handler to different helper functions which will aid in setting up a unit test db. --- tapdb/sqlutils_test.go | 77 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 tapdb/sqlutils_test.go diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go new file mode 100644 index 000000000..492a479ee --- /dev/null +++ b/tapdb/sqlutils_test.go @@ -0,0 +1,77 @@ +package tapdb + +import ( + "database/sql" + "testing" + "time" + + "github.com/lightninglabs/taproot-assets/tapdb/sqlc" + "github.com/lightningnetwork/lnd/clock" +) + +// DbHandler is a helper struct that contains all the database stores. +type DbHandler struct { + // UniverseFederationStore is a handle to the universe federation store. + UniverseFederationStore *UniverseFederationDB + + // MultiverseStore is a handle to the multiverse store. + MultiverseStore *MultiverseStore + + // AssetMintingStore is a handle to the pending (minting) assets store. + AssetMintingStore *AssetMintingStore + + // AssetStore is a handle to the active assets store. + AssetStore *AssetStore + + // DirectQuery is a handle to the underlying database that can be used + // to query the database directly. + DirectQuery sqlc.Querier +} + +// NewDbHandle creates a new store and query handle to the test database. +func NewDbHandle(t *testing.T) *DbHandler { + // Create a new test database. + db := NewTestDB(t) + + testClock := clock.NewTestClock(time.Now()) + + // Gain a handle to the pending (minting) universe federation store. + universeServerTxCreator := NewTransactionExecutor( + db, func(tx *sql.Tx) UniverseServerStore { + return db.WithTx(tx) + }, + ) + fedStore := NewUniverseFederationDB(universeServerTxCreator, testClock) + + // Gain a handle to the multiverse store. + multiverseTxCreator := NewTransactionExecutor(db, + func(tx *sql.Tx) BaseMultiverseStore { + return db.WithTx(tx) + }, + ) + multiverseStore := NewMultiverseStore(multiverseTxCreator) + + // Gain a handle to the pending (minting) assets store. + assetMintingDB := NewTransactionExecutor( + db, func(tx *sql.Tx) PendingAssetStore { + return db.WithTx(tx) + }, + ) + assetMintingStore := NewAssetMintingStore(assetMintingDB) + + // Gain a handle to the active assets store. + assetsDB := NewTransactionExecutor( + db, func(tx *sql.Tx) ActiveAssetsStore { + return db.WithTx(tx) + }, + ) + activeAssetsStore := NewAssetStore(assetsDB, testClock) + + return &DbHandler{ + UniverseFederationStore: fedStore, + MultiverseStore: multiverseStore, + AssetMintingStore: assetMintingStore, + AssetStore: activeAssetsStore, + DirectQuery: db, + } +} From b76189f664aaf55a08bfe186a9c3568839c51cbf Mon Sep 17 00:00:00 2001 From: ffranr Date: Fri, 1 Dec 2023 17:45:36 +0000 Subject: [PATCH 12/16] tapdb: refactor randProof such that it optionally uses a provided asset --- tapdb/universe_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/tapdb/universe_test.go b/tapdb/universe_test.go index c5803e02b..b82223e40 100644 --- a/tapdb/universe_test.go +++ b/tapdb/universe_test.go @@ -129,7 +129,12 @@ func randLeafKey(t *testing.T) universe.LeafKey { } } -func randProof(t *testing.T) *proof.Proof { +func randProof(t *testing.T, argAsset *asset.Asset) *proof.Proof { + proofAsset := *asset.RandAsset(t, asset.Normal) + if argAsset != nil { + proofAsset = *argAsset + } + return &proof.Proof{ PrevOut: wire.OutPoint{}, BlockHeader: wire.BlockHeader{ @@ -142,7 +147,7 @@ func randProof(t *testing.T) *proof.Proof { }}, }, TxMerkleProof: proof.TxMerkleProof{}, - Asset: *asset.RandAsset(t, asset.Normal), + Asset: proofAsset, InclusionProof: proof.TaprootProof{ InternalKey: test.RandPubKey(t), }, @@ -152,7 +157,7 @@ func randProof(t *testing.T) *proof.Proof { func randMintingLeaf(t *testing.T, assetGen asset.Genesis, groupKey *btcec.PublicKey) universe.Leaf { - randProof := randProof(t) + randProof := randProof(t, nil) leaf := universe.Leaf{ GenesisWithGroup: universe.GenesisWithGroup{ @@ -320,7 +325,7 @@ func TestUniverseIssuanceProofs(t *testing.T) { testLeaf := &testLeaves[idx] var proofBuf bytes.Buffer - randProof := randProof(t) + randProof := randProof(t, nil) require.NoError(t, randProof.Encode(&proofBuf)) testLeaf.Leaf.RawProof = proofBuf.Bytes() From 473ae23be56a05fc60cf2ad94bf8115e232febec Mon Sep 17 00:00:00 2001 From: ffranr Date: Fri, 1 Dec 2023 17:58:11 +0000 Subject: [PATCH 13/16] tapdb: extract unit test func for populating asset and proof in db This commit adds a unit test db handler helper method which we will use in a new test (in a subsequent commit) to populate a unit test db with an asset and its corresponding proof. --- tapdb/assets_store_test.go | 108 ++++-------------------------- tapdb/sqlutils_test.go | 130 +++++++++++++++++++++++++++++++++++++ 2 files changed, 141 insertions(+), 97 deletions(-) diff --git a/tapdb/assets_store_test.go b/tapdb/assets_store_test.go index 85a914033..82c607300 100644 --- a/tapdb/assets_store_test.go +++ b/tapdb/assets_store_test.go @@ -246,105 +246,17 @@ func assertAssetEqual(t *testing.T, a, b *asset.Asset) { func TestImportAssetProof(t *testing.T) { t.Parallel() - // First, we'll create a new instance of the database. - _, assetStore, db := newAssetStore(t) - - // Next, we'll make a new random asset that also has a few inputs with - // dummy witness information. - testAsset := randAsset(t) - - assetRoot, err := commitment.NewAssetCommitment(testAsset) - require.NoError(t, err) - - taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot) - require.NoError(t, err) - - // With our asset created, we can now create the AnnotatedProof we use - // to import assets into the database. - var blockHash chainhash.Hash - _, err = rand.Read(blockHash[:]) - require.NoError(t, err) + var ( + ctxb = context.Background() - anchorTx := wire.NewMsgTx(2) - anchorTx.AddTxIn(&wire.TxIn{}) - anchorTx.AddTxOut(&wire.TxOut{ - PkScript: bytes.Repeat([]byte{0x01}, 34), - Value: 10, - }) + dbHandle = NewDbHandle(t) + assetStore = dbHandle.AssetStore + ) + // Add a random asset and corresponding proof into the database. + testAsset, testProof := dbHandle.AddRandomAssetProof(t) assetID := testAsset.ID() - anchorPoint := wire.OutPoint{ - Hash: anchorTx.TxHash(), - Index: 0, - } - initialBlob := bytes.Repeat([]byte{0x0}, 100) - updatedBlob := bytes.Repeat([]byte{0x77}, 100) - testProof := &proof.AnnotatedProof{ - Locator: proof.Locator{ - AssetID: &assetID, - ScriptKey: *testAsset.ScriptKey.PubKey, - }, - Blob: initialBlob, - AssetSnapshot: &proof.AssetSnapshot{ - Asset: testAsset, - OutPoint: anchorPoint, - AnchorBlockHash: blockHash, - AnchorBlockHeight: test.RandInt[uint32](), - AnchorTxIndex: test.RandInt[uint32](), - AnchorTx: anchorTx, - OutputIndex: 0, - InternalKey: test.RandPubKey(t), - ScriptRoot: taprootAssetRoot, - }, - } - if testAsset.GroupKey != nil { - testProof.GroupKey = &testAsset.GroupKey.GroupPubKey - } - - // We'll now insert the internal key information as well as the script - // key ahead of time to reflect the address creation that happens - // elsewhere. - ctxb := context.Background() - _, err = db.UpsertInternalKey(ctxb, InternalKey{ - RawKey: testProof.InternalKey.SerializeCompressed(), - KeyFamily: test.RandInt[int32](), - KeyIndex: test.RandInt[int32](), - }) - require.NoError(t, err) - rawScriptKeyID, err := db.UpsertInternalKey(ctxb, InternalKey{ - RawKey: testAsset.ScriptKey.RawKey.PubKey.SerializeCompressed(), - KeyFamily: int32(testAsset.ScriptKey.RawKey.Family), - KeyIndex: int32(testAsset.ScriptKey.RawKey.Index), - }) - require.NoError(t, err) - _, err = db.UpsertScriptKey(ctxb, NewScriptKey{ - InternalKeyID: rawScriptKeyID, - TweakedScriptKey: testAsset.ScriptKey.PubKey.SerializeCompressed(), - Tweak: nil, - }) - require.NoError(t, err) - - // We'll add the chain transaction of the proof now to simulate a - // batched transfer on a higher layer. - var anchorTxBuf bytes.Buffer - err = testProof.AnchorTx.Serialize(&anchorTxBuf) - require.NoError(t, err) - anchorTXID := testProof.AnchorTx.TxHash() - _, err = db.UpsertChainTx(ctxb, ChainTxParams{ - Txid: anchorTXID[:], - RawTx: anchorTxBuf.Bytes(), - BlockHeight: sqlInt32(testProof.AnchorBlockHeight), - BlockHash: testProof.AnchorBlockHash[:], - TxIndex: sqlInt32(testProof.AnchorTxIndex), - }) - require.NoError(t, err, "unable to insert chain tx: %w", err) - - // With all our test data constructed, we'll now attempt to import the - // asset into the database. - require.NoError(t, assetStore.ImportProofs( - ctxb, proof.MockHeaderVerifier, proof.MockGroupVerifier, false, - testProof, - )) + initialBlob := testProof.Blob // We should now be able to retrieve the set of all assets inserted on // disk. @@ -371,7 +283,7 @@ func TestImportAssetProof(t *testing.T) { ScriptKey: *testAsset.ScriptKey.PubKey, }) require.NoError(t, err) - require.Equal(t, initialBlob, []byte(currentBlob)) + require.Equal(t, initialBlob, currentBlob) // We should also be able to fetch the created asset above based on // either the asset ID, or key group via the main coin selection @@ -391,6 +303,8 @@ func TestImportAssetProof(t *testing.T) { // We'll now attempt to overwrite the proof with one that has different // block information (simulating a re-org). + updatedBlob := bytes.Repeat([]byte{0x77}, 100) + testProof.AnchorBlockHash = chainhash.Hash{12, 34, 56} testProof.AnchorBlockHeight = 1234 testProof.AnchorTxIndex = 5678 diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go index 492a479ee..5c3114ea4 100644 --- a/tapdb/sqlutils_test.go +++ b/tapdb/sqlutils_test.go @@ -1,12 +1,22 @@ package tapdb import ( + "bytes" + "context" "database/sql" + "math/rand" "testing" "time" + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/commitment" + "github.com/lightninglabs/taproot-assets/internal/test" + "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightningnetwork/lnd/clock" + "github.com/stretchr/testify/require" ) // DbHandler is a helper struct that contains all the database stores. @@ -28,6 +38,126 @@ type DbHandler struct { DirectQuery sqlc.Querier } +// AddRandomAssetProof generates a random asset and corresponding proof and +// inserts them into the given test database. +func (d *DbHandler) AddRandomAssetProof(t *testing.T) (*asset.Asset, + *proof.AnnotatedProof) { + + var ( + ctx = context.Background() + + assetStore = d.AssetStore + db = d.DirectQuery + ) + + // Next, we'll make a new random asset that also has a few inputs with + // dummy witness information. + testAsset := randAsset(t) + + assetRoot, err := commitment.NewAssetCommitment(testAsset) + require.NoError(t, err) + + taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot) + require.NoError(t, err) + + // With our asset created, we can now create the AnnotatedProof we use + // to import assets into the database. + var blockHash chainhash.Hash + _, err = rand.Read(blockHash[:]) + require.NoError(t, err) + + anchorTx := wire.NewMsgTx(2) + anchorTx.AddTxIn(&wire.TxIn{}) + anchorTx.AddTxOut(&wire.TxOut{ + PkScript: bytes.Repeat([]byte{0x01}, 34), + Value: 10, + }) + + assetID := testAsset.ID() + anchorPoint := wire.OutPoint{ + Hash: anchorTx.TxHash(), + Index: 0, + } + + // Generate a random proof and encode it into a proof blob. + testProof := randProof(t, testAsset) + + var proofBlobBuffer bytes.Buffer + err = testProof.Encode(&proofBlobBuffer) + require.NoError(t, err) + + proofBlob := proofBlobBuffer.Bytes() + scriptKey := testAsset.ScriptKey + + annotatedProof := &proof.AnnotatedProof{ + Locator: proof.Locator{ + AssetID: &assetID, + ScriptKey: *scriptKey.PubKey, + }, + Blob: proofBlob, + AssetSnapshot: &proof.AssetSnapshot{ + Asset: testAsset, + OutPoint: anchorPoint, + AnchorBlockHash: blockHash, + AnchorBlockHeight: test.RandInt[uint32](), + AnchorTxIndex: test.RandInt[uint32](), + AnchorTx: anchorTx, + OutputIndex: 0, + InternalKey: test.RandPubKey(t), + ScriptRoot: taprootAssetRoot, + }, + } + if testAsset.GroupKey != nil { + annotatedProof.GroupKey = &testAsset.GroupKey.GroupPubKey + } + + // We'll now insert the internal key information as well as the script + // key ahead of time to reflect the address creation that happens + // elsewhere. + _, err = db.UpsertInternalKey(ctx, InternalKey{ + RawKey: annotatedProof.InternalKey.SerializeCompressed(), + KeyFamily: test.RandInt[int32](), + KeyIndex: test.RandInt[int32](), + }) + require.NoError(t, err) + rawScriptKeyID, err := db.UpsertInternalKey(ctx, InternalKey{ + RawKey: scriptKey.RawKey.PubKey.SerializeCompressed(), + KeyFamily: int32(testAsset.ScriptKey.RawKey.Family), + KeyIndex: int32(testAsset.ScriptKey.RawKey.Index), + }) + require.NoError(t, err) + _, err = db.UpsertScriptKey(ctx, NewScriptKey{ + InternalKeyID: rawScriptKeyID, + TweakedScriptKey: scriptKey.PubKey.SerializeCompressed(), + Tweak: nil, + }) + require.NoError(t, err) + + // We'll add the chain transaction of the proof now to simulate a + // batched transfer on a higher layer. + var anchorTxBuf bytes.Buffer + err = annotatedProof.AnchorTx.Serialize(&anchorTxBuf) + require.NoError(t, err) + anchorTXID := annotatedProof.AnchorTx.TxHash() + _, err = db.UpsertChainTx(ctx, ChainTxParams{ + Txid: anchorTXID[:], + RawTx: anchorTxBuf.Bytes(), + BlockHeight: sqlInt32(annotatedProof.AnchorBlockHeight), + BlockHash: annotatedProof.AnchorBlockHash[:], + TxIndex: sqlInt32(annotatedProof.AnchorTxIndex), + }) + require.NoError(t, err, "unable to insert chain tx: %w", err) + + // With all our test data constructed, we'll now attempt to import the + // asset into the database. + require.NoError(t, assetStore.ImportProofs( + ctx, proof.MockHeaderVerifier, proof.MockGroupVerifier, false, + annotatedProof, + )) + + return testAsset, annotatedProof +} + // NewDbHandle creates a new store and query handle to the test database. func NewDbHandle(t *testing.T) *DbHandler { // Create a new test database. From fece1d1fc5a2de2ada8634e39f4c8800d47228b4 Mon Sep 17 00:00:00 2001 From: ffranr Date: Fri, 1 Dec 2023 18:21:37 +0000 Subject: [PATCH 14/16] tapdb: add test db helper func for server addresses and proof leaves This commit adds a helper method for inserting server addresses into a unit test db. It also adds a helper method for inserting a proof leaf into a unit test db. These helper methods will be used in a subsequent commit. --- tapdb/sqlutils_test.go | 63 +++++++++++++++++++++++++++++++ tapdb/universe_federation_test.go | 23 +++-------- universe/interface.go | 20 ++++++++++ 3 files changed, 89 insertions(+), 17 deletions(-) diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go index 5c3114ea4..6f0786fb3 100644 --- a/tapdb/sqlutils_test.go +++ b/tapdb/sqlutils_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "database/sql" + "fmt" "math/rand" "testing" "time" @@ -15,6 +16,7 @@ import ( "github.com/lightninglabs/taproot-assets/internal/test" "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" + "github.com/lightninglabs/taproot-assets/universe" "github.com/lightningnetwork/lnd/clock" "github.com/stretchr/testify/require" ) @@ -158,6 +160,67 @@ func (d *DbHandler) AddRandomAssetProof(t *testing.T) (*asset.Asset, return testAsset, annotatedProof } +// AddUniProofLeaf generates a universe proof leaf and inserts it into the test +// database. +func (d *DbHandler) AddUniProofLeaf(t *testing.T, testAsset *asset.Asset, + annotatedProof *proof.AnnotatedProof) *universe.Proof { + + ctx := context.Background() + + // Insert proof into the multiverse/universe store. This step will + // populate the universe root and universe leaves tables. + uniId := universe.NewUniIDFromAsset(*testAsset) + + leafKey := universe.LeafKey{ + OutPoint: annotatedProof.AssetSnapshot.OutPoint, + ScriptKey: &testAsset.ScriptKey, + } + + leaf := universe.Leaf{ + GenesisWithGroup: universe.GenesisWithGroup{ + Genesis: testAsset.Genesis, + GroupKey: testAsset.GroupKey, + }, + RawProof: annotatedProof.Blob, + Asset: testAsset, + Amt: testAsset.Amount, + } + + uniProof, err := d.MultiverseStore.UpsertProofLeaf( + ctx, uniId, leafKey, &leaf, nil, + ) + require.NoError(t, err) + + return uniProof +} + +// AddRandomServerAddrs is a helper function that will create server addresses +// and add them to the database. +func (d *DbHandler) AddRandomServerAddrs(t *testing.T, + numServers int) []universe.ServerAddr { + + var ( + ctx = context.Background() + fedDB = d.UniverseFederationStore + ) + + addrs := make([]universe.ServerAddr, 0, numServers) + for i := 0; i < numServers; i++ { + portOffset := i + 10_000 + hostStr := fmt.Sprintf("localhost:%v", portOffset) + + addr := universe.NewServerAddr(int64(i+1), hostStr) + addrs = append(addrs, addr) + } + + // With the set of addrs created, we'll now insert them all into the + // database. + err := fedDB.AddServers(ctx, addrs...) + require.NoError(t, err) + + return addrs +} + // NewDbHandle creates a new store and query handle to the test database. func NewDbHandle(t *testing.T) *DbHandler { // Create a new test database. diff --git a/tapdb/universe_federation_test.go b/tapdb/universe_federation_test.go index b7fb80a21..3ffbf80a0 100644 --- a/tapdb/universe_federation_test.go +++ b/tapdb/universe_federation_test.go @@ -3,7 +3,6 @@ package tapdb import ( "context" "database/sql" - "fmt" "testing" "time" @@ -35,10 +34,12 @@ func newTestFederationDb(t *testing.T, func TestUniverseFederationCRUD(t *testing.T) { t.Parallel() - testClock := clock.NewTestClock(time.Now()) - fedDB, _ := newTestFederationDb(t, testClock) + var ( + ctx = context.Background() - ctx := context.Background() + db = NewDbHandle(t) + fedDB = db.UniverseFederationStore + ) // If we try to list the set of servers without any added, we should // get the error we expect. @@ -47,19 +48,7 @@ func TestUniverseFederationCRUD(t *testing.T) { require.Empty(t, dbServers) // Next, we'll try to add a new series of servers to the DB. - const numServers = 10 - addrs := make([]universe.ServerAddr, 0, numServers) - for i := int64(0); i < numServers; i++ { - portOffset := i + 10_000 - hostStr := fmt.Sprintf("localhost:%v", portOffset) - - addrs = append(addrs, universe.NewServerAddr(i+1, hostStr)) - } - - // With the set of addrs created, we'll now insert them all into the - // database. - err = fedDB.AddServers(ctx, addrs...) - require.NoError(t, err) + addrs := db.AddRandomServerAddrs(t, 10) // If we try to insert them all again, then we should get an error as // we ensure the host names are unique. diff --git a/universe/interface.go b/universe/interface.go index 469e6eaa6..29712cc4f 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -84,6 +84,26 @@ func (i *Identifier) StringForLog() string { i.String(), i.AssetID[:], groupKey, i.ProofType) } +// NewUniIDFromAsset creates a new universe ID from an asset. +func NewUniIDFromAsset(a asset.Asset) Identifier { + proofType := ProofTypeTransfer + if a.IsGenesisAsset() { + proofType = ProofTypeIssuance + } + + if a.GroupKey != nil { + return Identifier{ + GroupKey: &a.GroupKey.GroupPubKey, + ProofType: proofType, + } + } + + return Identifier{ + AssetID: a.ID(), + ProofType: proofType, + } +} + // NewUniIDFromRawArgs creates a new universe ID from the raw arguments. The // asset ID bytes and group key bytes are mutually exclusive. If the group key // bytes are set, then the asset ID bytes will be ignored. From b732d20831191158996c59342ddd68e8cee6f68e Mon Sep 17 00:00:00 2001 From: ffranr Date: Fri, 1 Dec 2023 18:22:18 +0000 Subject: [PATCH 15/16] tapdb: add proof sync log db queries unit test --- tapdb/universe_federation_test.go | 278 ++++++++++++++++++++++++++++++ 1 file changed, 278 insertions(+) diff --git a/tapdb/universe_federation_test.go b/tapdb/universe_federation_test.go index 3ffbf80a0..7492a2c61 100644 --- a/tapdb/universe_federation_test.go +++ b/tapdb/universe_federation_test.go @@ -87,6 +87,284 @@ func TestUniverseFederationCRUD(t *testing.T) { require.NoError(t, err) } +// TestFederationProofSyncLogCRUD tests that we can add, modify, and remove +// proof sync log entries from the Universe DB. +func TestFederationProofSyncLogCRUD(t *testing.T) { + t.Parallel() + + var ( + ctx = context.Background() + dbHandle = NewDbHandle(t) + fedStore = dbHandle.UniverseFederationStore + ) + + // Populate the database with a random asset, its associated proof, and + // a set of servers. + testAsset, testAnnotatedProof := dbHandle.AddRandomAssetProof(t) + uniProof := dbHandle.AddUniProofLeaf(t, testAsset, testAnnotatedProof) + uniId := universe.NewUniIDFromAsset(*testAsset) + + servers := dbHandle.AddRandomServerAddrs(t, 3) + + // Designate pending sync status for all servers except the first. + // Make a map set of pending sync servers. + pendingSyncServers := make(map[universe.ServerAddr]struct{}) + for i := range servers { + server := servers[i] + if i == 0 { + continue + } + pendingSyncServers[server] = struct{}{} + } + + // Add log entries for the first server. + syncServer := servers[0] + + // Add push log entry. + _, err := fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, syncServer, + universe.SyncDirectionPush, universe.ProofSyncStatusComplete, + true, + ) + require.NoError(t, err) + + // Add pull log entry. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, syncServer, + universe.SyncDirectionPull, universe.ProofSyncStatusComplete, + true, + ) + require.NoError(t, err) + + // We've already added log entries for the first server. We will now + // insert new proof sync log entries for the remaining servers. + for _, server := range servers[1:] { + _, err := fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, server, + universe.SyncDirectionPush, + universe.ProofSyncStatusPending, false, + ) + require.NoError(t, err) + } + + // Retrieve all sync status pending log entries. + syncDirectionPush := universe.SyncDirectionPush + pendingLogEntries, err := fedStore.FetchPendingProofsSyncLog( + ctx, &syncDirectionPush, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + require.Equal( + t, universe.ProofSyncStatusPending, entry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPush, entry.SyncDirection, + ) + require.Equal(t, uniId.String(), entry.UniID.String()) + require.Equal(t, int64(0), entry.AttemptCounter) + + assertProofSyncLogLeafKey(t, uniProof.LeafKey, entry.LeafKey) + assertProofSyncLogLeaf(t, *uniProof.Leaf, entry.Leaf) + + // Check for server address in pending sync server set. + _, ok := pendingSyncServers[entry.ServerAddr] + require.True(t, ok) + } + + // Retrieve all push sync status complete log entries. + completePushLogEntries, err := fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + + // There should only be one complete push log entry. + require.Len(t, completePushLogEntries, 1) + + // Check that the complete log entry is as expected. + completePushEntry := completePushLogEntries[0] + + require.Equal(t, servers[0], completePushEntry.ServerAddr) + require.Equal( + t, universe.ProofSyncStatusComplete, + completePushEntry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPush, completePushEntry.SyncDirection, + ) + require.Equal(t, uniId.String(), completePushEntry.UniID.String()) + require.Equal(t, int64(0), completePushEntry.AttemptCounter) + + assertProofSyncLogLeafKey( + t, uniProof.LeafKey, completePushEntry.LeafKey, + ) + assertProofSyncLogLeaf(t, *uniProof.Leaf, completePushEntry.Leaf) + + // Retrieve all pull sync status complete log entries. + completePullLogEntries, err := fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPull, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + + // There should only be one complete push log entry. + require.Len(t, completePullLogEntries, 1) + + // Check that the complete log entry is as expected. + completePullEntry := completePullLogEntries[0] + + require.Equal(t, servers[0], completePullEntry.ServerAddr) + require.Equal( + t, universe.ProofSyncStatusComplete, + completePullEntry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPull, completePullEntry.SyncDirection, + ) + require.Equal(t, uniId.String(), completePullEntry.UniID.String()) + require.Equal(t, int64(0), completePullEntry.AttemptCounter) + + assertProofSyncLogLeafKey( + t, uniProof.LeafKey, completePullEntry.LeafKey, + ) + assertProofSyncLogLeaf(t, *uniProof.Leaf, completePullEntry.Leaf) + + // Increment the attempt counter for one of the pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusPending, + true, + ) + require.NoError(t, err) + + // Check that the attempt counter was incremented as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + if entry.ServerAddr == servers[1] { + require.Equal(t, int64(1), entry.AttemptCounter) + } else { + require.Equal(t, int64(0), entry.AttemptCounter) + } + } + + // Upsert without incrementing the attempt counter for one of the + // pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusPending, + false, + ) + require.NoError(t, err) + + // Check that the attempt counter was not changed as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + if entry.ServerAddr == servers[1] { + require.Equal(t, int64(1), entry.AttemptCounter) + } else { + require.Equal(t, int64(0), entry.AttemptCounter) + } + } + + // Set the sync status to complete for one of the pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusComplete, + false, + ) + require.NoError(t, err) + + // Check that the sync status was updated as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 1) + + completePushLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + require.Len(t, completePushLogEntries, 2) + + // Delete log entries for one of the servers. + err = fedStore.DeleteProofsSyncLogEntries(ctx, servers[0], servers[1]) + require.NoError(t, err) + + // Only one log entry should remain and it should have sync status + // pending. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 1) + + // Check that the remaining log entry is as expected. + pendingEntry := pendingLogEntries[0] + require.Equal(t, servers[2], pendingEntry.ServerAddr) +} + +// assertProofSyncLogLeafKey asserts that a leaf key derived from a proof sync +// log entry is equal to a given leaf key. +func assertProofSyncLogLeafKey(t *testing.T, actualLeafKey universe.LeafKey, + logLeafKey universe.LeafKey) { + + // We can safely ignore the tweaked script key as it is the derivation + // information for the script key. It is only ever known to the owner of + // the asset and is never serialized in a proof + actualLeafKey.ScriptKey.TweakedScriptKey = nil + require.Equal(t, actualLeafKey, logLeafKey) +} + +// assertProofSyncLogLeaf asserts that a leaf derived from a proof sync log +// entry is equal to a given universe leaf. +func assertProofSyncLogLeaf(t *testing.T, actualLeaf universe.Leaf, + logLeaf universe.Leaf) { + + if actualLeaf.GenesisWithGroup.GroupKey != nil { + // We can safely ignore the group key witness as it is the + // basically just extracted from the asset and won't be relevant + // when parsing the proof. + actualLeaf.GenesisWithGroup.GroupKey.Witness = nil + + // We can safely ignore the pre-tweaked group key + // (GroupKey.RawKey) as it is the derivation information for the + // group key. It is only ever known to the owner of the asset + // and is never serialized in a proof. + actualLeaf.GenesisWithGroup.GroupKey.RawKey.PubKey = nil + } + + require.Equal(t, actualLeaf.Amt, logLeaf.Amt) + require.Equal(t, actualLeaf.RawProof, logLeaf.RawProof) + require.Equal(t, actualLeaf.GenesisWithGroup, logLeaf.GenesisWithGroup) + + // We compare the assets with our custom asset quality function as the + // SplitCommitmentRoot field MS-SMT node types will differ. A computed + // node is derived from the database data whereas the generated asset + // may have a MS-SMT branch node type. + actualLeaf.Asset.DeepEqual(logLeaf.Asset) +} + // TestFederationConfigDefault tests that we're able to fetch the default // federation config. func TestFederationConfigDefault(t *testing.T) { From a34ec3032b90fabdb495caf4b18fe7e8a83d8ff3 Mon Sep 17 00:00:00 2001 From: ffranr Date: Wed, 6 Dec 2023 14:27:17 +0000 Subject: [PATCH 16/16] tapdb: log path to sqlite test db --- tapdb/sqlite.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tapdb/sqlite.go b/tapdb/sqlite.go index 500980fcb..2a3b2d064 100644 --- a/tapdb/sqlite.go +++ b/tapdb/sqlite.go @@ -160,11 +160,11 @@ func NewSqliteStore(cfg *SqliteConfig) (*SqliteStore, error) { func NewTestSqliteDB(t *testing.T) *SqliteStore { t.Helper() - t.Logf("Creating new SQLite DB for testing") + dbFileName := filepath.Join(t.TempDir(), "tmp.db") + t.Logf("Creating new SQLite DB for testing: %s", dbFileName) // TODO(roasbeef): if we pass :memory: for the file name, then we get // an in mem version to speed up tests - dbFileName := filepath.Join(t.TempDir(), "tmp.db") sqlDB, err := NewSqliteStore(&SqliteConfig{ DatabaseFileName: dbFileName, SkipMigrations: false,