diff --git a/itest/tapd_harness.go b/itest/tapd_harness.go index fe81f4ef2..1c500875a 100644 --- a/itest/tapd_harness.go +++ b/itest/tapd_harness.go @@ -100,6 +100,10 @@ type harnessOpts struct { proofCourier proof.CourierHarness custodianProofRetrievalDelay *time.Duration addrAssetSyncerDisable bool + + // fedSyncTickerInterval is the interval at which the federation envoy + // sync ticker will fire. + fedSyncTickerInterval *time.Duration } type harnessOption func(*harnessOpts) @@ -242,6 +246,10 @@ func newTapdHarness(t *testing.T, ht *harnessTest, cfg tapdConfig, finalCfg.CustodianProofRetrievalDelay = *opts.custodianProofRetrievalDelay } + if opts.fedSyncTickerInterval != nil { + finalCfg.Universe.SyncInterval = *opts.fedSyncTickerInterval + } + return &tapdHarness{ cfg: &cfg, clientCfg: finalCfg, diff --git a/itest/test_harness.go b/itest/test_harness.go index 1550f3a7b..2614bbe82 100644 --- a/itest/test_harness.go +++ b/itest/test_harness.go @@ -366,6 +366,10 @@ type tapdHarnessParams struct { // synced from the above node. startupSyncNumAssets int + // fedSyncTickerInterval is the interval at which the federation envoy + // sync ticker will fire. + fedSyncTickerInterval *time.Duration + // noDefaultUniverseSync indicates whether the default universe server // should be added as a federation server or not. noDefaultUniverseSync bool @@ -402,6 +406,7 @@ func setupTapdHarness(t *testing.T, ht *harnessTest, ho.proofCourier = selectedProofCourier ho.custodianProofRetrievalDelay = params.custodianProofRetrievalDelay ho.addrAssetSyncerDisable = params.addrAssetSyncerDisable + ho.fedSyncTickerInterval = params.fedSyncTickerInterval } tapdHarness, err := newTapdHarness(t, ht, tapdConfig{ diff --git a/itest/test_list_on_test.go b/itest/test_list_on_test.go index 9f0297038..9d4d6a582 100644 --- a/itest/test_list_on_test.go +++ b/itest/test_list_on_test.go @@ -195,6 +195,10 @@ var testCases = []*testCase{ name: "universe pagination simple", test: testUniversePaginationSimple, }, + { + name: "mint proof repeat fed sync attempt", + test: testMintProofRepeatFedSyncAttempt, + }, } var optionalTestCases = []*testCase{ diff --git a/itest/universe_federation_test.go b/itest/universe_federation_test.go new file mode 100644 index 000000000..69d7dd712 --- /dev/null +++ b/itest/universe_federation_test.go @@ -0,0 +1,99 @@ +package itest + +import ( + "context" + "time" + + "github.com/lightninglabs/taproot-assets/taprpc/mintrpc" + unirpc "github.com/lightninglabs/taproot-assets/taprpc/universerpc" + "github.com/stretchr/testify/require" +) + +// testMintProofRepeatFedSyncAttempt tests that the minting node will retry +// pushing the minting proofs to the federation server peer node, if the peer +// node is offline at the time of the initial sync attempt. +func testMintProofRepeatFedSyncAttempt(t *harnessTest) { + // Create a new minting node, without hooking it up to any existing + // Universe server. We will also set the sync ticker to 4 second, so + // that we can test that the proof push sync is retried and eventually + // succeeds after the fed server peer node reappears online. + syncTickerInterval := 4 * time.Second + mintingNode := setupTapdHarness( + t.t, t, t.lndHarness.Bob, nil, + func(params *tapdHarnessParams) { + params.fedSyncTickerInterval = &syncTickerInterval + params.noDefaultUniverseSync = true + }, + ) + defer func() { + require.NoError(t.t, mintingNode.stop(!*noDelete)) + }() + + // We'll use the main node as our federation universe server + // counterparty. + fedServerNode := t.tapd + + // Keep a reference to the fed server node RPC host address, so that we + // can assert that it has not changed after the restart. This is + // important, because the minting node will be retrying the proof push + // to this address. + fedServerNodeRpcHost := fedServerNode.rpcHost() + + // Register the fedServerNode as a federation universe server with the + // minting node. + ctxb := context.Background() + ctxt, cancel := context.WithTimeout(ctxb, defaultWaitTimeout) + defer cancel() + + _, err := mintingNode.AddFederationServer( + ctxt, &unirpc.AddFederationServerRequest{ + Servers: []*unirpc.UniverseFederationServer{ + { + Host: fedServerNodeRpcHost, + }, + }, + }, + ) + require.NoError(t.t, err) + + // Assert that the fed server node has not seen any asset proofs. + AssertUniverseStats(t.t, fedServerNode, 0, 0, 0) + + // Stop the federation server peer node, so that it does not receive the + // newly minted asset proofs immediately upon minting. + t.Logf("Stopping fed server tapd node") + require.NoError(t.t, fedServerNode.stop(false)) + + // Now that federation peer node is inactive, we'll mint some assets. + t.Logf("Minting assets on minting node") + rpcAssets := MintAssetsConfirmBatch( + t.t, t.lndHarness.Miner.Client, mintingNode, + []*mintrpc.MintAssetRequest{ + simpleAssets[0], issuableAssets[0], + }, + ) + require.Len(t.t, rpcAssets, 2) + + t.lndHarness.MineBlocks(7) + + // Wait for the minting node to attempt (and fail) to push the minting + // proofs to the fed peer node. We wait some multiple of the sync ticker + // interval to ensure that the minting node has had time to retry the + // proof push sync. + time.Sleep(syncTickerInterval * 2) + + // Start the federation server peer node. The federation envoy component + // of our minting node should currently be retrying the proof push sync + // with the federation peer at each tick. + t.Logf("Start (previously stopped) fed server tapd node") + err = fedServerNode.start(false) + require.NoError(t.t, err) + + // Ensure that the federation server node RPC host address has not + // changed after the restart. If it has, then the minting node will be + // retrying the proof push to the wrong address. + require.Equal(t.t, fedServerNodeRpcHost, fedServerNode.rpcHost()) + + t.Logf("Assert that fed peer node has seen the asset minting proofs") + AssertUniverseStats(t.t, fedServerNode, 2, 2, 1) +} diff --git a/rpcserver.go b/rpcserver.go index 59f273c2f..992bd79d4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -3997,7 +3997,17 @@ func (r *rpcServer) DeleteFederationServer(ctx context.Context, serversToDel := fn.Map(req.Servers, unmarshalUniverseServer) - err := r.cfg.FederationDB.RemoveServers(ctx, serversToDel...) + // Remove the servers from the proofs sync log. This is necessary before + // we can remove the servers from the database because of a foreign + // key constraint. + err := r.cfg.FederationDB.DeleteProofsSyncLogEntries( + ctx, serversToDel..., + ) + if err != nil { + return nil, err + } + + err = r.cfg.FederationDB.RemoveServers(ctx, serversToDel...) if err != nil { return nil, err } diff --git a/tapdb/assets_store_test.go b/tapdb/assets_store_test.go index 85a914033..82c607300 100644 --- a/tapdb/assets_store_test.go +++ b/tapdb/assets_store_test.go @@ -246,105 +246,17 @@ func assertAssetEqual(t *testing.T, a, b *asset.Asset) { func TestImportAssetProof(t *testing.T) { t.Parallel() - // First, we'll create a new instance of the database. - _, assetStore, db := newAssetStore(t) - - // Next, we'll make a new random asset that also has a few inputs with - // dummy witness information. - testAsset := randAsset(t) - - assetRoot, err := commitment.NewAssetCommitment(testAsset) - require.NoError(t, err) - - taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot) - require.NoError(t, err) - - // With our asset created, we can now create the AnnotatedProof we use - // to import assets into the database. - var blockHash chainhash.Hash - _, err = rand.Read(blockHash[:]) - require.NoError(t, err) + var ( + ctxb = context.Background() - anchorTx := wire.NewMsgTx(2) - anchorTx.AddTxIn(&wire.TxIn{}) - anchorTx.AddTxOut(&wire.TxOut{ - PkScript: bytes.Repeat([]byte{0x01}, 34), - Value: 10, - }) + dbHandle = NewDbHandle(t) + assetStore = dbHandle.AssetStore + ) + // Add a random asset and corresponding proof into the database. + testAsset, testProof := dbHandle.AddRandomAssetProof(t) assetID := testAsset.ID() - anchorPoint := wire.OutPoint{ - Hash: anchorTx.TxHash(), - Index: 0, - } - initialBlob := bytes.Repeat([]byte{0x0}, 100) - updatedBlob := bytes.Repeat([]byte{0x77}, 100) - testProof := &proof.AnnotatedProof{ - Locator: proof.Locator{ - AssetID: &assetID, - ScriptKey: *testAsset.ScriptKey.PubKey, - }, - Blob: initialBlob, - AssetSnapshot: &proof.AssetSnapshot{ - Asset: testAsset, - OutPoint: anchorPoint, - AnchorBlockHash: blockHash, - AnchorBlockHeight: test.RandInt[uint32](), - AnchorTxIndex: test.RandInt[uint32](), - AnchorTx: anchorTx, - OutputIndex: 0, - InternalKey: test.RandPubKey(t), - ScriptRoot: taprootAssetRoot, - }, - } - if testAsset.GroupKey != nil { - testProof.GroupKey = &testAsset.GroupKey.GroupPubKey - } - - // We'll now insert the internal key information as well as the script - // key ahead of time to reflect the address creation that happens - // elsewhere. - ctxb := context.Background() - _, err = db.UpsertInternalKey(ctxb, InternalKey{ - RawKey: testProof.InternalKey.SerializeCompressed(), - KeyFamily: test.RandInt[int32](), - KeyIndex: test.RandInt[int32](), - }) - require.NoError(t, err) - rawScriptKeyID, err := db.UpsertInternalKey(ctxb, InternalKey{ - RawKey: testAsset.ScriptKey.RawKey.PubKey.SerializeCompressed(), - KeyFamily: int32(testAsset.ScriptKey.RawKey.Family), - KeyIndex: int32(testAsset.ScriptKey.RawKey.Index), - }) - require.NoError(t, err) - _, err = db.UpsertScriptKey(ctxb, NewScriptKey{ - InternalKeyID: rawScriptKeyID, - TweakedScriptKey: testAsset.ScriptKey.PubKey.SerializeCompressed(), - Tweak: nil, - }) - require.NoError(t, err) - - // We'll add the chain transaction of the proof now to simulate a - // batched transfer on a higher layer. - var anchorTxBuf bytes.Buffer - err = testProof.AnchorTx.Serialize(&anchorTxBuf) - require.NoError(t, err) - anchorTXID := testProof.AnchorTx.TxHash() - _, err = db.UpsertChainTx(ctxb, ChainTxParams{ - Txid: anchorTXID[:], - RawTx: anchorTxBuf.Bytes(), - BlockHeight: sqlInt32(testProof.AnchorBlockHeight), - BlockHash: testProof.AnchorBlockHash[:], - TxIndex: sqlInt32(testProof.AnchorTxIndex), - }) - require.NoError(t, err, "unable to insert chain tx: %w", err) - - // With all our test data constructed, we'll now attempt to import the - // asset into the database. - require.NoError(t, assetStore.ImportProofs( - ctxb, proof.MockHeaderVerifier, proof.MockGroupVerifier, false, - testProof, - )) + initialBlob := testProof.Blob // We should now be able to retrieve the set of all assets inserted on // disk. @@ -371,7 +283,7 @@ func TestImportAssetProof(t *testing.T) { ScriptKey: *testAsset.ScriptKey.PubKey, }) require.NoError(t, err) - require.Equal(t, initialBlob, []byte(currentBlob)) + require.Equal(t, initialBlob, currentBlob) // We should also be able to fetch the created asset above based on // either the asset ID, or key group via the main coin selection @@ -391,6 +303,8 @@ func TestImportAssetProof(t *testing.T) { // We'll now attempt to overwrite the proof with one that has different // block information (simulating a re-org). + updatedBlob := bytes.Repeat([]byte{0x77}, 100) + testProof.AnchorBlockHash = chainhash.Hash{12, 34, 56} testProof.AnchorBlockHeight = 1234 testProof.AnchorTxIndex = 5678 diff --git a/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql new file mode 100644 index 000000000..42bdbfbb8 --- /dev/null +++ b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.down.sql @@ -0,0 +1,2 @@ +DROP INDEX IF EXISTS federation_proof_sync_log_unique_index_proof_leaf_id_servers_id; +DROP TABLE IF EXISTS federation_proof_sync_log; \ No newline at end of file diff --git a/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql new file mode 100644 index 000000000..ba7e7c100 --- /dev/null +++ b/tapdb/sqlc/migrations/000013_universe_fed_proof_sync_log.up.sql @@ -0,0 +1,36 @@ +-- This table stores the log of federation universe proof sync attempts. Rows +-- in this table are specific to a given proof leaf, server, and sync direction. +CREATE TABLE IF NOT EXISTS federation_proof_sync_log ( + id BIGINT PRIMARY KEY, + + -- The status of the proof sync attempt. + status TEXT NOT NULL CHECK(status IN ('pending', 'complete')), + + -- The timestamp of when the log entry for the associated proof was last + -- updated. + timestamp TIMESTAMP NOT NULL, + + -- The number of attempts that have been made to sync the proof. + attempt_counter BIGINT NOT NULL DEFAULT 0, + + -- The direction of the proof sync attempt. + sync_direction TEXT NOT NULL CHECK(sync_direction IN ('push', 'pull')), + + -- The ID of the subject proof leaf. + proof_leaf_id BIGINT NOT NULL REFERENCES universe_leaves(id), + + -- The ID of the universe that the proof leaf belongs to. + universe_root_id BIGINT NOT NULL REFERENCES universe_roots(id), + + -- The ID of the server that the proof will be/was synced to. + servers_id BIGINT NOT NULL REFERENCES universe_servers(id) +); + +-- Create a unique index on table federation_proof_sync_log +CREATE UNIQUE INDEX federation_proof_sync_log_unique_index_proof_leaf_id_servers_id +ON federation_proof_sync_log ( + sync_direction, + proof_leaf_id, + universe_root_id, + servers_id +); \ No newline at end of file diff --git a/tapdb/sqlc/models.go b/tapdb/sqlc/models.go index b89980ab2..26eef9b5d 100644 --- a/tapdb/sqlc/models.go +++ b/tapdb/sqlc/models.go @@ -164,6 +164,17 @@ type FederationGlobalSyncConfig struct { AllowSyncExport bool } +type FederationProofSyncLog struct { + ID int64 + Status string + Timestamp time.Time + AttemptCounter int64 + SyncDirection string + ProofLeafID int64 + UniverseRootID int64 + ServersID int64 +} + type FederationUniSyncConfig struct { Namespace string AssetID []byte diff --git a/tapdb/sqlc/querier.go b/tapdb/sqlc/querier.go index ee001f911..3a9a6b27a 100644 --- a/tapdb/sqlc/querier.go +++ b/tapdb/sqlc/querier.go @@ -25,6 +25,7 @@ type Querier interface { DeleteAllNodes(ctx context.Context, namespace string) (int64, error) DeleteAssetWitnesses(ctx context.Context, assetID int64) error DeleteExpiredUTXOLeases(ctx context.Context, now sql.NullTime) error + DeleteFederationProofSyncLog(ctx context.Context, arg DeleteFederationProofSyncLogParams) error DeleteManagedUTXO(ctx context.Context, outpoint []byte) error DeleteNode(ctx context.Context, arg DeleteNodeParams) (int64, error) DeleteRoot(ctx context.Context, namespace string) (int64, error) @@ -94,7 +95,6 @@ type Querier interface { InsertPassiveAsset(ctx context.Context, arg InsertPassiveAssetParams) error InsertRootKey(ctx context.Context, arg InsertRootKeyParams) error InsertUniverseServer(ctx context.Context, arg InsertUniverseServerParams) error - ListUniverseServers(ctx context.Context) ([]UniverseServer, error) LogProofTransferAttempt(ctx context.Context, arg LogProofTransferAttemptParams) error LogServerSync(ctx context.Context, arg LogServerSyncParams) error NewMintingBatch(ctx context.Context, arg NewMintingBatchParams) error @@ -122,6 +122,9 @@ type Querier interface { QueryAssets(ctx context.Context, arg QueryAssetsParams) ([]QueryAssetsRow, error) QueryEventIDs(ctx context.Context, arg QueryEventIDsParams) ([]QueryEventIDsRow, error) QueryFederationGlobalSyncConfigs(ctx context.Context) ([]FederationGlobalSyncConfig, error) + // Join on mssmt_nodes to get leaf related fields. + // Join on genesis_info_view to get leaf related fields. + QueryFederationProofSyncLog(ctx context.Context, arg QueryFederationProofSyncLogParams) ([]QueryFederationProofSyncLogRow, error) QueryFederationUniSyncConfigs(ctx context.Context) ([]FederationUniSyncConfig, error) QueryPassiveAssets(ctx context.Context, transferID int64) ([]QueryPassiveAssetsRow, error) QueryProofTransferAttempts(ctx context.Context, arg QueryProofTransferAttemptsParams) ([]time.Time, error) @@ -129,6 +132,7 @@ type Querier interface { // root, simplifies queries QueryUniverseAssetStats(ctx context.Context, arg QueryUniverseAssetStatsParams) ([]QueryUniverseAssetStatsRow, error) QueryUniverseLeaves(ctx context.Context, arg QueryUniverseLeavesParams) ([]QueryUniverseLeavesRow, error) + QueryUniverseServers(ctx context.Context, arg QueryUniverseServersParams) ([]UniverseServer, error) QueryUniverseStats(ctx context.Context) (QueryUniverseStatsRow, error) ReAnchorPassiveAssets(ctx context.Context, arg ReAnchorPassiveAssetsParams) error SetAddrManaged(ctx context.Context, arg SetAddrManagedParams) error @@ -145,6 +149,7 @@ type Querier interface { UpsertAssetProof(ctx context.Context, arg UpsertAssetProofParams) error UpsertChainTx(ctx context.Context, arg UpsertChainTxParams) (int64, error) UpsertFederationGlobalSyncConfig(ctx context.Context, arg UpsertFederationGlobalSyncConfigParams) error + UpsertFederationProofSyncLog(ctx context.Context, arg UpsertFederationProofSyncLogParams) (int64, error) UpsertFederationUniSyncConfig(ctx context.Context, arg UpsertFederationUniSyncConfigParams) error UpsertGenesisAsset(ctx context.Context, arg UpsertGenesisAssetParams) (int64, error) UpsertGenesisPoint(ctx context.Context, prevOut []byte) (int64, error) diff --git a/tapdb/sqlc/queries/universe.sql b/tapdb/sqlc/queries/universe.sql index cab652573..767c50f25 100644 --- a/tapdb/sqlc/queries/universe.sql +++ b/tapdb/sqlc/queries/universe.sql @@ -115,8 +115,11 @@ UPDATE universe_servers SET last_sync_time = @new_sync_time WHERE server_host = @target_server; --- name: ListUniverseServers :many -SELECT * FROM universe_servers; +-- name: QueryUniverseServers :many +SELECT * FROM universe_servers +WHERE (id = sqlc.narg('id') OR sqlc.narg('id') IS NULL) AND + (server_host = sqlc.narg('server_host') + OR sqlc.narg('server_host') IS NULL); -- name: InsertNewSyncEvent :exec WITH group_key_root_id AS ( @@ -361,4 +364,120 @@ ON CONFLICT(namespace) -- name: QueryFederationUniSyncConfigs :many SELECT namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export FROM federation_uni_sync_config -ORDER BY group_key NULLS LAST, asset_id NULLS LAST, proof_type; \ No newline at end of file +ORDER BY group_key NULLS LAST, asset_id NULLS LAST, proof_type; + +-- name: UpsertFederationProofSyncLog :one +INSERT INTO federation_proof_sync_log as log ( + status, timestamp, sync_direction, proof_leaf_id, universe_root_id, + servers_id +) VALUES ( + @status, @timestamp, @sync_direction, + ( + -- Select the leaf id from the universe_leaves table. + SELECT id + FROM universe_leaves + WHERE leaf_node_namespace = @leaf_namespace + AND minting_point = @leaf_minting_point_bytes + AND script_key_bytes = @leaf_script_key_bytes + LIMIT 1 + ), + ( + -- Select the universe root id from the universe_roots table. + SELECT id + FROM universe_roots + WHERE namespace_root = @universe_id_namespace + LIMIT 1 + ), + ( + -- Select the server id from the universe_servers table. + SELECT id + FROM universe_servers + WHERE server_host = @server_host + LIMIT 1 + ) +) ON CONFLICT (sync_direction, proof_leaf_id, universe_root_id, servers_id) +DO UPDATE SET + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp, + -- Increment the attempt counter. + attempt_counter = CASE + WHEN @bump_sync_attempt_counter = true THEN log.attempt_counter + 1 + ELSE log.attempt_counter + END +RETURNING id; + +-- name: QueryFederationProofSyncLog :many +SELECT + log.id, status, timestamp, sync_direction, attempt_counter, + + -- Select fields from the universe_servers table. + server.id as server_id, + server.server_host, + + -- Select universe leaf related fields. + leaf.minting_point as leaf_minting_point_bytes, + leaf.script_key_bytes as leaf_script_key_bytes, + mssmt_node.value as leaf_genesis_proof, + genesis.gen_asset_id as leaf_gen_asset_id, + genesis.asset_id as leaf_asset_id, + + -- Select fields from the universe_roots table. + root.asset_id as uni_asset_id, + root.group_key as uni_group_key, + root.proof_type as uni_proof_type + +FROM federation_proof_sync_log as log + +JOIN universe_leaves as leaf + ON leaf.id = log.proof_leaf_id + +-- Join on mssmt_nodes to get leaf related fields. +JOIN mssmt_nodes mssmt_node + ON leaf.leaf_node_key = mssmt_node.key AND + leaf.leaf_node_namespace = mssmt_node.namespace + +-- Join on genesis_info_view to get leaf related fields. +JOIN genesis_info_view genesis + ON leaf.asset_genesis_id = genesis.gen_asset_id + +JOIN universe_servers as server + ON server.id = log.servers_id + +JOIN universe_roots as root + ON root.id = log.universe_root_id + +WHERE (log.sync_direction = sqlc.narg('sync_direction') + OR sqlc.narg('sync_direction') IS NULL) + AND + (log.status = sqlc.narg('status') OR sqlc.narg('status') IS NULL) + AND + + -- Universe leaves WHERE clauses. + (leaf.leaf_node_namespace = sqlc.narg('leaf_namespace') + OR sqlc.narg('leaf_namespace') IS NULL) + AND + (leaf.minting_point = sqlc.narg('leaf_minting_point_bytes') + OR sqlc.narg('leaf_minting_point_bytes') IS NULL) + AND + (leaf.script_key_bytes = sqlc.narg('leaf_script_key_bytes') + OR sqlc.narg('leaf_script_key_bytes') IS NULL); + +-- name: DeleteFederationProofSyncLog :exec +WITH selected_server_id AS ( + -- Select the server ids from the universe_servers table for the specified + -- hosts. + SELECT id + FROM universe_servers + WHERE + (server_host = sqlc.narg('server_host') + OR sqlc.narg('server_host') IS NULL) +) +DELETE FROM federation_proof_sync_log +WHERE + servers_id IN (SELECT id FROM selected_server_id) AND + (status = sqlc.narg('status') + OR sqlc.narg('status') IS NULL) AND + (timestamp >= sqlc.narg('min_timestamp') + OR sqlc.narg('min_timestamp') IS NULL) AND + (attempt_counter >= sqlc.narg('min_attempt_counter') + OR sqlc.narg('min_attempt_counter') IS NULL); \ No newline at end of file diff --git a/tapdb/sqlc/universe.sql.go b/tapdb/sqlc/universe.sql.go index 53452cab5..b1e7b33ec 100644 --- a/tapdb/sqlc/universe.sql.go +++ b/tapdb/sqlc/universe.sql.go @@ -11,6 +11,44 @@ import ( "time" ) +const deleteFederationProofSyncLog = `-- name: DeleteFederationProofSyncLog :exec +WITH selected_server_id AS ( + -- Select the server ids from the universe_servers table for the specified + -- hosts. + SELECT id + FROM universe_servers + WHERE + (server_host = $4 + OR $4 IS NULL) +) +DELETE FROM federation_proof_sync_log +WHERE + servers_id IN (SELECT id FROM selected_server_id) AND + (status = $1 + OR $1 IS NULL) AND + (timestamp >= $2 + OR $2 IS NULL) AND + (attempt_counter >= $3 + OR $3 IS NULL) +` + +type DeleteFederationProofSyncLogParams struct { + Status sql.NullString + MinTimestamp sql.NullTime + MinAttemptCounter sql.NullInt64 + ServerHost sql.NullString +} + +func (q *Queries) DeleteFederationProofSyncLog(ctx context.Context, arg DeleteFederationProofSyncLogParams) error { + _, err := q.db.ExecContext(ctx, deleteFederationProofSyncLog, + arg.Status, + arg.MinTimestamp, + arg.MinAttemptCounter, + arg.ServerHost, + ) + return err +} + const deleteUniverseEvents = `-- name: DeleteUniverseEvents :exec WITH root_id AS ( SELECT id @@ -265,33 +303,6 @@ func (q *Queries) InsertUniverseServer(ctx context.Context, arg InsertUniverseSe return err } -const listUniverseServers = `-- name: ListUniverseServers :many -SELECT id, server_host, last_sync_time FROM universe_servers -` - -func (q *Queries) ListUniverseServers(ctx context.Context) ([]UniverseServer, error) { - rows, err := q.db.QueryContext(ctx, listUniverseServers) - if err != nil { - return nil, err - } - defer rows.Close() - var items []UniverseServer - for rows.Next() { - var i UniverseServer - if err := rows.Scan(&i.ID, &i.ServerHost, &i.LastSyncTime); err != nil { - return nil, err - } - items = append(items, i) - } - if err := rows.Close(); err != nil { - return nil, err - } - if err := rows.Err(); err != nil { - return nil, err - } - return items, nil -} - const logServerSync = `-- name: LogServerSync :exec UPDATE universe_servers SET last_sync_time = $1 @@ -429,6 +440,134 @@ func (q *Queries) QueryFederationGlobalSyncConfigs(ctx context.Context) ([]Feder return items, nil } +const queryFederationProofSyncLog = `-- name: QueryFederationProofSyncLog :many +SELECT + log.id, status, timestamp, sync_direction, attempt_counter, + + -- Select fields from the universe_servers table. + server.id as server_id, + server.server_host, + + -- Select universe leaf related fields. + leaf.minting_point as leaf_minting_point_bytes, + leaf.script_key_bytes as leaf_script_key_bytes, + mssmt_node.value as leaf_genesis_proof, + genesis.gen_asset_id as leaf_gen_asset_id, + genesis.asset_id as leaf_asset_id, + + -- Select fields from the universe_roots table. + root.asset_id as uni_asset_id, + root.group_key as uni_group_key, + root.proof_type as uni_proof_type + +FROM federation_proof_sync_log as log + +JOIN universe_leaves as leaf + ON leaf.id = log.proof_leaf_id + +JOIN mssmt_nodes mssmt_node + ON leaf.leaf_node_key = mssmt_node.key AND + leaf.leaf_node_namespace = mssmt_node.namespace + +JOIN genesis_info_view genesis + ON leaf.asset_genesis_id = genesis.gen_asset_id + +JOIN universe_servers as server + ON server.id = log.servers_id + +JOIN universe_roots as root + ON root.id = log.universe_root_id + +WHERE (log.sync_direction = $1 + OR $1 IS NULL) + AND + (log.status = $2 OR $2 IS NULL) + AND + + -- Universe leaves WHERE clauses. + (leaf.leaf_node_namespace = $3 + OR $3 IS NULL) + AND + (leaf.minting_point = $4 + OR $4 IS NULL) + AND + (leaf.script_key_bytes = $5 + OR $5 IS NULL) +` + +type QueryFederationProofSyncLogParams struct { + SyncDirection sql.NullString + Status sql.NullString + LeafNamespace sql.NullString + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte +} + +type QueryFederationProofSyncLogRow struct { + ID int64 + Status string + Timestamp time.Time + SyncDirection string + AttemptCounter int64 + ServerID int64 + ServerHost string + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte + LeafGenesisProof []byte + LeafGenAssetID int64 + LeafAssetID []byte + UniAssetID []byte + UniGroupKey []byte + UniProofType string +} + +// Join on mssmt_nodes to get leaf related fields. +// Join on genesis_info_view to get leaf related fields. +func (q *Queries) QueryFederationProofSyncLog(ctx context.Context, arg QueryFederationProofSyncLogParams) ([]QueryFederationProofSyncLogRow, error) { + rows, err := q.db.QueryContext(ctx, queryFederationProofSyncLog, + arg.SyncDirection, + arg.Status, + arg.LeafNamespace, + arg.LeafMintingPointBytes, + arg.LeafScriptKeyBytes, + ) + if err != nil { + return nil, err + } + defer rows.Close() + var items []QueryFederationProofSyncLogRow + for rows.Next() { + var i QueryFederationProofSyncLogRow + if err := rows.Scan( + &i.ID, + &i.Status, + &i.Timestamp, + &i.SyncDirection, + &i.AttemptCounter, + &i.ServerID, + &i.ServerHost, + &i.LeafMintingPointBytes, + &i.LeafScriptKeyBytes, + &i.LeafGenesisProof, + &i.LeafGenAssetID, + &i.LeafAssetID, + &i.UniAssetID, + &i.UniGroupKey, + &i.UniProofType, + ); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryFederationUniSyncConfigs = `-- name: QueryFederationUniSyncConfigs :many SELECT namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export FROM federation_uni_sync_config @@ -688,6 +827,41 @@ func (q *Queries) QueryUniverseLeaves(ctx context.Context, arg QueryUniverseLeav return items, nil } +const queryUniverseServers = `-- name: QueryUniverseServers :many +SELECT id, server_host, last_sync_time FROM universe_servers +WHERE (id = $1 OR $1 IS NULL) AND + (server_host = $2 + OR $2 IS NULL) +` + +type QueryUniverseServersParams struct { + ID sql.NullInt64 + ServerHost sql.NullString +} + +func (q *Queries) QueryUniverseServers(ctx context.Context, arg QueryUniverseServersParams) ([]UniverseServer, error) { + rows, err := q.db.QueryContext(ctx, queryUniverseServers, arg.ID, arg.ServerHost) + if err != nil { + return nil, err + } + defer rows.Close() + var items []UniverseServer + for rows.Next() { + var i UniverseServer + if err := rows.Scan(&i.ID, &i.ServerHost, &i.LastSyncTime); err != nil { + return nil, err + } + items = append(items, i) + } + if err := rows.Close(); err != nil { + return nil, err + } + if err := rows.Err(); err != nil { + return nil, err + } + return items, nil +} + const queryUniverseStats = `-- name: QueryUniverseStats :one WITH stats AS ( SELECT total_asset_syncs, total_asset_proofs @@ -869,6 +1043,76 @@ func (q *Queries) UpsertFederationGlobalSyncConfig(ctx context.Context, arg Upse return err } +const upsertFederationProofSyncLog = `-- name: UpsertFederationProofSyncLog :one +INSERT INTO federation_proof_sync_log as log ( + status, timestamp, sync_direction, proof_leaf_id, universe_root_id, + servers_id +) VALUES ( + $1, $2, $3, + ( + -- Select the leaf id from the universe_leaves table. + SELECT id + FROM universe_leaves + WHERE leaf_node_namespace = $4 + AND minting_point = $5 + AND script_key_bytes = $6 + LIMIT 1 + ), + ( + -- Select the universe root id from the universe_roots table. + SELECT id + FROM universe_roots + WHERE namespace_root = $7 + LIMIT 1 + ), + ( + -- Select the server id from the universe_servers table. + SELECT id + FROM universe_servers + WHERE server_host = $8 + LIMIT 1 + ) +) ON CONFLICT (sync_direction, proof_leaf_id, universe_root_id, servers_id) +DO UPDATE SET + status = EXCLUDED.status, + timestamp = EXCLUDED.timestamp, + -- Increment the attempt counter. + attempt_counter = CASE + WHEN $9 = true THEN log.attempt_counter + 1 + ELSE log.attempt_counter + END +RETURNING id +` + +type UpsertFederationProofSyncLogParams struct { + Status string + Timestamp time.Time + SyncDirection string + LeafNamespace string + LeafMintingPointBytes []byte + LeafScriptKeyBytes []byte + UniverseIDNamespace string + ServerHost string + BumpSyncAttemptCounter interface{} +} + +func (q *Queries) UpsertFederationProofSyncLog(ctx context.Context, arg UpsertFederationProofSyncLogParams) (int64, error) { + row := q.db.QueryRowContext(ctx, upsertFederationProofSyncLog, + arg.Status, + arg.Timestamp, + arg.SyncDirection, + arg.LeafNamespace, + arg.LeafMintingPointBytes, + arg.LeafScriptKeyBytes, + arg.UniverseIDNamespace, + arg.ServerHost, + arg.BumpSyncAttemptCounter, + ) + var id int64 + err := row.Scan(&id) + return id, err +} + const upsertFederationUniSyncConfig = `-- name: UpsertFederationUniSyncConfig :exec INSERT INTO federation_uni_sync_config ( namespace, asset_id, group_key, proof_type, allow_sync_insert, allow_sync_export diff --git a/tapdb/sqlite.go b/tapdb/sqlite.go index 500980fcb..2a3b2d064 100644 --- a/tapdb/sqlite.go +++ b/tapdb/sqlite.go @@ -160,11 +160,11 @@ func NewSqliteStore(cfg *SqliteConfig) (*SqliteStore, error) { func NewTestSqliteDB(t *testing.T) *SqliteStore { t.Helper() - t.Logf("Creating new SQLite DB for testing") + dbFileName := filepath.Join(t.TempDir(), "tmp.db") + t.Logf("Creating new SQLite DB for testing: %s", dbFileName) // TODO(roasbeef): if we pass :memory: for the file name, then we get // an in mem version to speed up tests - dbFileName := filepath.Join(t.TempDir(), "tmp.db") sqlDB, err := NewSqliteStore(&SqliteConfig{ DatabaseFileName: dbFileName, SkipMigrations: false, diff --git a/tapdb/sqlutils_test.go b/tapdb/sqlutils_test.go new file mode 100644 index 000000000..6f0786fb3 --- /dev/null +++ b/tapdb/sqlutils_test.go @@ -0,0 +1,270 @@ +package tapdb + +import ( + "bytes" + "context" + "database/sql" + "fmt" + "math/rand" + "testing" + "time" + + "github.com/btcsuite/btcd/chaincfg/chainhash" + "github.com/btcsuite/btcd/wire" + "github.com/lightninglabs/taproot-assets/asset" + "github.com/lightninglabs/taproot-assets/commitment" + "github.com/lightninglabs/taproot-assets/internal/test" + "github.com/lightninglabs/taproot-assets/proof" + "github.com/lightninglabs/taproot-assets/tapdb/sqlc" + "github.com/lightninglabs/taproot-assets/universe" + "github.com/lightningnetwork/lnd/clock" + "github.com/stretchr/testify/require" +) + +// DbHandler is a helper struct that contains all the database stores. +type DbHandler struct { + // UniverseFederationStore is a handle to the universe federation store. + UniverseFederationStore *UniverseFederationDB + + // MultiverseStore is a handle to the multiverse store. + MultiverseStore *MultiverseStore + + // AssetMintingStore is a handle to the pending (minting) assets store. + AssetMintingStore *AssetMintingStore + + // AssetStore is a handle to the active assets store. + AssetStore *AssetStore + + // DirectQuery is a handle to the underlying database that can be used + // to query the database directly. + DirectQuery sqlc.Querier +} + +// AddRandomAssetProof generates a random asset and corresponding proof and +// inserts them into the given test database. +func (d *DbHandler) AddRandomAssetProof(t *testing.T) (*asset.Asset, + *proof.AnnotatedProof) { + + var ( + ctx = context.Background() + + assetStore = d.AssetStore + db = d.DirectQuery + ) + + // Next, we'll make a new random asset that also has a few inputs with + // dummy witness information. + testAsset := randAsset(t) + + assetRoot, err := commitment.NewAssetCommitment(testAsset) + require.NoError(t, err) + + taprootAssetRoot, err := commitment.NewTapCommitment(assetRoot) + require.NoError(t, err) + + // With our asset created, we can now create the AnnotatedProof we use + // to import assets into the database. + var blockHash chainhash.Hash + _, err = rand.Read(blockHash[:]) + require.NoError(t, err) + + anchorTx := wire.NewMsgTx(2) + anchorTx.AddTxIn(&wire.TxIn{}) + anchorTx.AddTxOut(&wire.TxOut{ + PkScript: bytes.Repeat([]byte{0x01}, 34), + Value: 10, + }) + + assetID := testAsset.ID() + anchorPoint := wire.OutPoint{ + Hash: anchorTx.TxHash(), + Index: 0, + } + + // Generate a random proof and encode it into a proof blob. + testProof := randProof(t, testAsset) + + var proofBlobBuffer bytes.Buffer + err = testProof.Encode(&proofBlobBuffer) + require.NoError(t, err) + + proofBlob := proofBlobBuffer.Bytes() + scriptKey := testAsset.ScriptKey + + annotatedProof := &proof.AnnotatedProof{ + Locator: proof.Locator{ + AssetID: &assetID, + ScriptKey: *scriptKey.PubKey, + }, + Blob: proofBlob, + AssetSnapshot: &proof.AssetSnapshot{ + Asset: testAsset, + OutPoint: anchorPoint, + AnchorBlockHash: blockHash, + AnchorBlockHeight: test.RandInt[uint32](), + AnchorTxIndex: test.RandInt[uint32](), + AnchorTx: anchorTx, + OutputIndex: 0, + InternalKey: test.RandPubKey(t), + ScriptRoot: taprootAssetRoot, + }, + } + if testAsset.GroupKey != nil { + annotatedProof.GroupKey = &testAsset.GroupKey.GroupPubKey + } + + // We'll now insert the internal key information as well as the script + // key ahead of time to reflect the address creation that happens + // elsewhere. + _, err = db.UpsertInternalKey(ctx, InternalKey{ + RawKey: annotatedProof.InternalKey.SerializeCompressed(), + KeyFamily: test.RandInt[int32](), + KeyIndex: test.RandInt[int32](), + }) + require.NoError(t, err) + rawScriptKeyID, err := db.UpsertInternalKey(ctx, InternalKey{ + RawKey: scriptKey.RawKey.PubKey.SerializeCompressed(), + KeyFamily: int32(testAsset.ScriptKey.RawKey.Family), + KeyIndex: int32(testAsset.ScriptKey.RawKey.Index), + }) + require.NoError(t, err) + _, err = db.UpsertScriptKey(ctx, NewScriptKey{ + InternalKeyID: rawScriptKeyID, + TweakedScriptKey: scriptKey.PubKey.SerializeCompressed(), + Tweak: nil, + }) + require.NoError(t, err) + + // We'll add the chain transaction of the proof now to simulate a + // batched transfer on a higher layer. + var anchorTxBuf bytes.Buffer + err = annotatedProof.AnchorTx.Serialize(&anchorTxBuf) + require.NoError(t, err) + anchorTXID := annotatedProof.AnchorTx.TxHash() + _, err = db.UpsertChainTx(ctx, ChainTxParams{ + Txid: anchorTXID[:], + RawTx: anchorTxBuf.Bytes(), + BlockHeight: sqlInt32(annotatedProof.AnchorBlockHeight), + BlockHash: annotatedProof.AnchorBlockHash[:], + TxIndex: sqlInt32(annotatedProof.AnchorTxIndex), + }) + require.NoError(t, err, "unable to insert chain tx: %w", err) + + // With all our test data constructed, we'll now attempt to import the + // asset into the database. + require.NoError(t, assetStore.ImportProofs( + ctx, proof.MockHeaderVerifier, proof.MockGroupVerifier, false, + annotatedProof, + )) + + return testAsset, annotatedProof +} + +// AddUniProofLeaf generates a universe proof leaf and inserts it into the test +// database. +func (d *DbHandler) AddUniProofLeaf(t *testing.T, testAsset *asset.Asset, + annotatedProof *proof.AnnotatedProof) *universe.Proof { + + ctx := context.Background() + + // Insert proof into the multiverse/universe store. This step will + // populate the universe root and universe leaves tables. + uniId := universe.NewUniIDFromAsset(*testAsset) + + leafKey := universe.LeafKey{ + OutPoint: annotatedProof.AssetSnapshot.OutPoint, + ScriptKey: &testAsset.ScriptKey, + } + + leaf := universe.Leaf{ + GenesisWithGroup: universe.GenesisWithGroup{ + Genesis: testAsset.Genesis, + GroupKey: testAsset.GroupKey, + }, + RawProof: annotatedProof.Blob, + Asset: testAsset, + Amt: testAsset.Amount, + } + + uniProof, err := d.MultiverseStore.UpsertProofLeaf( + ctx, uniId, leafKey, &leaf, nil, + ) + require.NoError(t, err) + + return uniProof +} + +// AddRandomServerAddrs is a helper function that will create server addresses +// and add them to the database. +func (d *DbHandler) AddRandomServerAddrs(t *testing.T, + numServers int) []universe.ServerAddr { + + var ( + ctx = context.Background() + fedDB = d.UniverseFederationStore + ) + + addrs := make([]universe.ServerAddr, 0, numServers) + for i := 0; i < numServers; i++ { + portOffset := i + 10_000 + hostStr := fmt.Sprintf("localhost:%v", portOffset) + + addr := universe.NewServerAddr(int64(i+1), hostStr) + addrs = append(addrs, addr) + } + + // With the set of addrs created, we'll now insert them all into the + // database. + err := fedDB.AddServers(ctx, addrs...) + require.NoError(t, err) + + return addrs +} + +// NewDbHandle creates a new store and query handle to the test database. +func NewDbHandle(t *testing.T) *DbHandler { + // Create a new test database. + db := NewTestDB(t) + + testClock := clock.NewTestClock(time.Now()) + + // Gain a handle to the pending (minting) universe federation store. + universeServerTxCreator := NewTransactionExecutor( + db, func(tx *sql.Tx) UniverseServerStore { + return db.WithTx(tx) + }, + ) + fedStore := NewUniverseFederationDB(universeServerTxCreator, testClock) + + // Gain a handle to the multiverse store. + multiverseTxCreator := NewTransactionExecutor(db, + func(tx *sql.Tx) BaseMultiverseStore { + return db.WithTx(tx) + }, + ) + multiverseStore := NewMultiverseStore(multiverseTxCreator) + + // Gain a handle to the pending (minting) assets store. + assetMintingDB := NewTransactionExecutor( + db, func(tx *sql.Tx) PendingAssetStore { + return db.WithTx(tx) + }, + ) + assetMintingStore := NewAssetMintingStore(assetMintingDB) + + // Gain a handle to the active assets store. + assetsDB := NewTransactionExecutor( + db, func(tx *sql.Tx) ActiveAssetsStore { + return db.WithTx(tx) + }, + ) + activeAssetsStore := NewAssetStore(assetsDB, testClock) + + return &DbHandler{ + UniverseFederationStore: fedStore, + MultiverseStore: multiverseStore, + AssetMintingStore: assetMintingStore, + AssetStore: activeAssetsStore, + DirectQuery: db, + } +} diff --git a/tapdb/universe_federation.go b/tapdb/universe_federation.go index 5052ac1a0..c6bbea8a5 100644 --- a/tapdb/universe_federation.go +++ b/tapdb/universe_federation.go @@ -1,7 +1,9 @@ package tapdb import ( + "bytes" "context" + "database/sql" "errors" "fmt" "sort" @@ -9,8 +11,11 @@ import ( "time" "github.com/btcsuite/btcd/btcec/v2" + "github.com/btcsuite/btcd/btcec/v2/schnorr" + "github.com/btcsuite/btcd/wire" "github.com/lightninglabs/taproot-assets/asset" "github.com/lightninglabs/taproot-assets/fn" + "github.com/lightninglabs/taproot-assets/proof" "github.com/lightninglabs/taproot-assets/tapdb/sqlc" "github.com/lightninglabs/taproot-assets/universe" "github.com/lightningnetwork/lnd/clock" @@ -19,6 +24,20 @@ import ( ) type ( + // UpsertFedProofSyncLogParams is used to upsert federation proof sync + // logs. + UpsertFedProofSyncLogParams = sqlc.UpsertFederationProofSyncLogParams + + // QueryFedProofSyncLogParams is used to query for federation proof sync + // logs. + QueryFedProofSyncLogParams = sqlc.QueryFederationProofSyncLogParams + + // DeleteFedProofSyncLogParams is used to delete proof sync log entries. + DeleteFedProofSyncLogParams = sqlc.DeleteFederationProofSyncLogParams + + // ProofSyncLogEntry is a single entry from the proof sync log. + ProofSyncLogEntry = sqlc.QueryFederationProofSyncLogRow + // NewUniverseServer is used to create a new universe server. NewUniverseServer = sqlc.InsertUniverseServerParams @@ -40,6 +59,9 @@ type ( // FedUniSyncConfigs is the universe specific federation sync config // returned from a query. FedUniSyncConfigs = sqlc.FederationUniSyncConfig + + // QueryUniServersParams is used to query for universe servers. + QueryUniServersParams = sqlc.QueryUniverseServersParams ) var ( @@ -59,6 +81,26 @@ var ( } ) +// FederationProofSyncLogStore is used to log the sync status of individual +// universe proofs. +type FederationProofSyncLogStore interface { + BaseUniverseStore + + // UpsertFederationProofSyncLog upserts a proof sync log entry for a + // given proof leaf and server. + UpsertFederationProofSyncLog(ctx context.Context, + arg UpsertFedProofSyncLogParams) (int64, error) + + // QueryFederationProofSyncLog returns the set of proof sync logs for a + // given proof leaf. + QueryFederationProofSyncLog(ctx context.Context, + arg QueryFedProofSyncLogParams) ([]ProofSyncLogEntry, error) + + // DeleteFederationProofSyncLog deletes proof sync log entries. + DeleteFederationProofSyncLog(ctx context.Context, + arg DeleteFedProofSyncLogParams) error +} + // FederationSyncConfigStore is used to manage the set of Universe servers as // part of a federation. type FederationSyncConfigStore interface { @@ -87,6 +129,7 @@ type FederationSyncConfigStore interface { // of a federation. type UniverseServerStore interface { FederationSyncConfigStore + FederationProofSyncLogStore // InsertUniverseServer inserts a new universe server in to the DB. InsertUniverseServer(ctx context.Context, arg NewUniverseServer) error @@ -97,8 +140,10 @@ type UniverseServerStore interface { // LogServerSync marks that a server was just synced in the DB. LogServerSync(ctx context.Context, arg sqlc.LogServerSyncParams) error - // ListUniverseServers returns the total set of all universe servers. - ListUniverseServers(ctx context.Context) ([]sqlc.UniverseServer, error) + // QueryUniverseServers returns a set of universe servers. + QueryUniverseServers(ctx context.Context, + arg sqlc.QueryUniverseServersParams) ([]sqlc.UniverseServer, + error) } // UniverseFederationOptions is the database tx object for the universe server store. @@ -174,7 +219,9 @@ func (u *UniverseFederationDB) UniverseServers( readTx := NewUniverseFederationReadTx() dbErr := u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { - servers, err := db.ListUniverseServers(ctx) + servers, err := db.QueryUniverseServers( + ctx, QueryUniServersParams{}, + ) if err != nil { return err } @@ -261,6 +308,301 @@ func (u *UniverseFederationDB) LogNewSyncs(ctx context.Context, }) } +// UpsertFederationProofSyncLog upserts a federation proof sync log entry for a +// given universe server and proof. +func (u *UniverseFederationDB) UpsertFederationProofSyncLog( + ctx context.Context, uniID universe.Identifier, + leafKey universe.LeafKey, addr universe.ServerAddr, + syncDirection universe.SyncDirection, + syncStatus universe.ProofSyncStatus, + bumpSyncAttemptCounter bool) (int64, error) { + + // Encode the leaf key outpoint as bytes. We'll use this to look up the + // leaf ID in the DB. + leafKeyOutpointBytes, err := encodeOutpoint(leafKey.OutPoint) + if err != nil { + return 0, err + } + + // Encode the leaf script key pub key as bytes. We'll use this to look + // up the leaf ID in the DB. + scriptKeyPubKeyBytes := schnorr.SerializePubKey( + leafKey.ScriptKey.PubKey, + ) + + var ( + writeTx UniverseFederationOptions + logID int64 + ) + + err = u.db.ExecTx(ctx, &writeTx, func(db UniverseServerStore) error { + params := UpsertFedProofSyncLogParams{ + Status: string(syncStatus), + Timestamp: time.Now().UTC(), + SyncDirection: string(syncDirection), + UniverseIDNamespace: uniID.String(), + LeafNamespace: uniID.String(), + LeafMintingPointBytes: leafKeyOutpointBytes, + LeafScriptKeyBytes: scriptKeyPubKeyBytes, + ServerHost: addr.HostStr(), + BumpSyncAttemptCounter: bumpSyncAttemptCounter, + } + logID, err = db.UpsertFederationProofSyncLog(ctx, params) + if err != nil { + return err + } + + return nil + }) + + return logID, err +} + +// QueryFederationProofSyncLog queries the federation proof sync log and returns +// the log entries which correspond to the given universe proof leaf. +func (u *UniverseFederationDB) QueryFederationProofSyncLog( + ctx context.Context, uniID universe.Identifier, + leafKey universe.LeafKey, + syncDirection universe.SyncDirection, + syncStatus universe.ProofSyncStatus) ([]*universe.ProofSyncLogEntry, + error) { + + // Encode the leaf key outpoint as bytes. We'll use this to look up the + // leaf ID in the DB. + leafKeyOutpointBytes, err := encodeOutpoint(leafKey.OutPoint) + if err != nil { + return nil, err + } + + // Encode the leaf script key pub key as bytes. We'll use this to look + // up the leaf ID in the DB. + scriptKeyPubKeyBytes := schnorr.SerializePubKey( + leafKey.ScriptKey.PubKey, + ) + + var ( + readTx = NewUniverseFederationReadTx() + proofSyncLogs []*universe.ProofSyncLogEntry + ) + + err = u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { + params := QueryFedProofSyncLogParams{ + SyncDirection: sqlStr(string(syncDirection)), + Status: sqlStr(string(syncStatus)), + LeafNamespace: sqlStr(uniID.String()), + LeafMintingPointBytes: leafKeyOutpointBytes, + LeafScriptKeyBytes: scriptKeyPubKeyBytes, + } + logEntries, err := db.QueryFederationProofSyncLog(ctx, params) + + // Parse database proof sync logs. Multiple log entries may + // exist for a given leaf because each log entry is unique to a + // server. + proofSyncLogs = make( + []*universe.ProofSyncLogEntry, 0, len(logEntries), + ) + for idx := range logEntries { + entry := logEntries[idx] + + parsedLogEntry, err := fetchProofSyncLogEntry( + ctx, entry, db, + ) + if err != nil { + return err + } + + proofSyncLogs = append(proofSyncLogs, parsedLogEntry) + } + + return err + }) + if err != nil { + return nil, err + } + + return proofSyncLogs, nil +} + +// FetchPendingProofsSyncLog queries the federation proof sync log and returns +// all log entries with sync status pending. +func (u *UniverseFederationDB) FetchPendingProofsSyncLog(ctx context.Context, + syncDirection *universe.SyncDirection) ([]*universe.ProofSyncLogEntry, + error) { + + var ( + readTx = NewUniverseFederationReadTx() + proofSyncLogs []*universe.ProofSyncLogEntry + ) + + err := u.db.ExecTx(ctx, &readTx, func(db UniverseServerStore) error { + // If the sync direction is not set, then we'll query for all + // pending proof sync log entries. + var sqlSyncDirection sql.NullString + if syncDirection != nil { + sqlSyncDirection = sqlStr(string(*syncDirection)) + } + + sqlProofSyncStatus := sqlStr( + string(universe.ProofSyncStatusPending), + ) + + params := QueryFedProofSyncLogParams{ + SyncDirection: sqlSyncDirection, + Status: sqlProofSyncStatus, + } + logEntries, err := db.QueryFederationProofSyncLog(ctx, params) + if err != nil { + return fmt.Errorf("unable to query proof sync log: %w", + err) + } + + // Parse log entries from database row. + proofSyncLogs = make( + []*universe.ProofSyncLogEntry, 0, len(logEntries), + ) + for idx := range logEntries { + entry := logEntries[idx] + + parsedLogEntry, err := fetchProofSyncLogEntry( + ctx, entry, db, + ) + if err != nil { + return err + } + + proofSyncLogs = append(proofSyncLogs, parsedLogEntry) + } + + return nil + }) + if err != nil { + return nil, err + } + + return proofSyncLogs, nil +} + +// fetchProofSyncLogEntry returns a proof sync log entry given a DB row. +func fetchProofSyncLogEntry(ctx context.Context, entry ProofSyncLogEntry, + dbTx UniverseServerStore) (*universe.ProofSyncLogEntry, error) { + + // Fetch asset genesis for the leaf. + leafAssetGen, err := fetchGenesis(ctx, dbTx, entry.LeafGenAssetID) + if err != nil { + return nil, err + } + + // We only need to obtain the asset at this point, so we'll do a sparse + // decode here to decode only the asset record. + var leafAsset asset.Asset + assetRecord := proof.AssetLeafRecord(&leafAsset) + err = proof.SparseDecode( + bytes.NewReader(entry.LeafGenesisProof), assetRecord, + ) + if err != nil { + return nil, fmt.Errorf("unable to decode proof: %w", err) + } + + leaf := &universe.Leaf{ + GenesisWithGroup: universe.GenesisWithGroup{ + Genesis: leafAssetGen, + GroupKey: leafAsset.GroupKey, + }, + RawProof: entry.LeafGenesisProof, + Asset: &leafAsset, + Amt: leafAsset.Amount, + } + + // Parse leaf key from leaf DB row. + scriptKeyPub, err := schnorr.ParsePubKey( + entry.LeafScriptKeyBytes, + ) + if err != nil { + return nil, err + } + scriptKey := asset.NewScriptKey(scriptKeyPub) + + var outPoint wire.OutPoint + err = readOutPoint( + bytes.NewReader(entry.LeafMintingPointBytes), 0, 0, + &outPoint, + ) + if err != nil { + return nil, err + } + + leafKey := universe.LeafKey{ + OutPoint: outPoint, + ScriptKey: &scriptKey, + } + + // Parse server address from DB row. + serverAddr := universe.NewServerAddr(entry.ServerID, entry.ServerHost) + + // Parse proof sync status directly from the DB row. + status, err := universe.ParseStrProofSyncStatus(entry.Status) + if err != nil { + return nil, err + } + + // Parse proof sync direction directly from the DB row. + direction, err := universe.ParseStrSyncDirection(entry.SyncDirection) + if err != nil { + return nil, err + } + + uniID, err := universe.NewUniIDFromRawArgs( + entry.UniAssetID, entry.UniGroupKey, + entry.UniProofType, + ) + if err != nil { + return nil, err + } + + return &universe.ProofSyncLogEntry{ + Timestamp: entry.Timestamp, + SyncStatus: status, + SyncDirection: direction, + AttemptCounter: entry.AttemptCounter, + ServerAddr: serverAddr, + + UniID: uniID, + LeafKey: leafKey, + Leaf: *leaf, + }, nil +} + +// DeleteProofsSyncLogEntries deletes a set of proof sync log entries. +func (u *UniverseFederationDB) DeleteProofsSyncLogEntries(ctx context.Context, + servers ...universe.ServerAddr) error { + + var writeTx UniverseFederationOptions + + err := u.db.ExecTx(ctx, &writeTx, func(db UniverseServerStore) error { + // Delete proof sync log entries which are associated with each + // server. + for i := range servers { + server := servers[i] + + err := db.DeleteFederationProofSyncLog( + ctx, DeleteFedProofSyncLogParams{ + ServerHost: sqlStr(server.HostStr()), + }, + ) + if err != nil { + return err + } + } + + return nil + }) + if err != nil { + return err + } + + return nil +} + // UpsertFederationSyncConfig upserts both the global and universe specific // federation sync configs. func (u *UniverseFederationDB) UpsertFederationSyncConfig( diff --git a/tapdb/universe_federation_test.go b/tapdb/universe_federation_test.go index b7fb80a21..7492a2c61 100644 --- a/tapdb/universe_federation_test.go +++ b/tapdb/universe_federation_test.go @@ -3,7 +3,6 @@ package tapdb import ( "context" "database/sql" - "fmt" "testing" "time" @@ -35,10 +34,12 @@ func newTestFederationDb(t *testing.T, func TestUniverseFederationCRUD(t *testing.T) { t.Parallel() - testClock := clock.NewTestClock(time.Now()) - fedDB, _ := newTestFederationDb(t, testClock) + var ( + ctx = context.Background() - ctx := context.Background() + db = NewDbHandle(t) + fedDB = db.UniverseFederationStore + ) // If we try to list the set of servers without any added, we should // get the error we expect. @@ -47,19 +48,7 @@ func TestUniverseFederationCRUD(t *testing.T) { require.Empty(t, dbServers) // Next, we'll try to add a new series of servers to the DB. - const numServers = 10 - addrs := make([]universe.ServerAddr, 0, numServers) - for i := int64(0); i < numServers; i++ { - portOffset := i + 10_000 - hostStr := fmt.Sprintf("localhost:%v", portOffset) - - addrs = append(addrs, universe.NewServerAddr(i+1, hostStr)) - } - - // With the set of addrs created, we'll now insert them all into the - // database. - err = fedDB.AddServers(ctx, addrs...) - require.NoError(t, err) + addrs := db.AddRandomServerAddrs(t, 10) // If we try to insert them all again, then we should get an error as // we ensure the host names are unique. @@ -98,6 +87,284 @@ func TestUniverseFederationCRUD(t *testing.T) { require.NoError(t, err) } +// TestFederationProofSyncLogCRUD tests that we can add, modify, and remove +// proof sync log entries from the Universe DB. +func TestFederationProofSyncLogCRUD(t *testing.T) { + t.Parallel() + + var ( + ctx = context.Background() + dbHandle = NewDbHandle(t) + fedStore = dbHandle.UniverseFederationStore + ) + + // Populate the database with a random asset, its associated proof, and + // a set of servers. + testAsset, testAnnotatedProof := dbHandle.AddRandomAssetProof(t) + uniProof := dbHandle.AddUniProofLeaf(t, testAsset, testAnnotatedProof) + uniId := universe.NewUniIDFromAsset(*testAsset) + + servers := dbHandle.AddRandomServerAddrs(t, 3) + + // Designate pending sync status for all servers except the first. + // Make a map set of pending sync servers. + pendingSyncServers := make(map[universe.ServerAddr]struct{}) + for i := range servers { + server := servers[i] + if i == 0 { + continue + } + pendingSyncServers[server] = struct{}{} + } + + // Add log entries for the first server. + syncServer := servers[0] + + // Add push log entry. + _, err := fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, syncServer, + universe.SyncDirectionPush, universe.ProofSyncStatusComplete, + true, + ) + require.NoError(t, err) + + // Add pull log entry. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, syncServer, + universe.SyncDirectionPull, universe.ProofSyncStatusComplete, + true, + ) + require.NoError(t, err) + + // We've already added log entries for the first server. We will now + // insert new proof sync log entries for the remaining servers. + for _, server := range servers[1:] { + _, err := fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, server, + universe.SyncDirectionPush, + universe.ProofSyncStatusPending, false, + ) + require.NoError(t, err) + } + + // Retrieve all sync status pending log entries. + syncDirectionPush := universe.SyncDirectionPush + pendingLogEntries, err := fedStore.FetchPendingProofsSyncLog( + ctx, &syncDirectionPush, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + require.Equal( + t, universe.ProofSyncStatusPending, entry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPush, entry.SyncDirection, + ) + require.Equal(t, uniId.String(), entry.UniID.String()) + require.Equal(t, int64(0), entry.AttemptCounter) + + assertProofSyncLogLeafKey(t, uniProof.LeafKey, entry.LeafKey) + assertProofSyncLogLeaf(t, *uniProof.Leaf, entry.Leaf) + + // Check for server address in pending sync server set. + _, ok := pendingSyncServers[entry.ServerAddr] + require.True(t, ok) + } + + // Retrieve all push sync status complete log entries. + completePushLogEntries, err := fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + + // There should only be one complete push log entry. + require.Len(t, completePushLogEntries, 1) + + // Check that the complete log entry is as expected. + completePushEntry := completePushLogEntries[0] + + require.Equal(t, servers[0], completePushEntry.ServerAddr) + require.Equal( + t, universe.ProofSyncStatusComplete, + completePushEntry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPush, completePushEntry.SyncDirection, + ) + require.Equal(t, uniId.String(), completePushEntry.UniID.String()) + require.Equal(t, int64(0), completePushEntry.AttemptCounter) + + assertProofSyncLogLeafKey( + t, uniProof.LeafKey, completePushEntry.LeafKey, + ) + assertProofSyncLogLeaf(t, *uniProof.Leaf, completePushEntry.Leaf) + + // Retrieve all pull sync status complete log entries. + completePullLogEntries, err := fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPull, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + + // There should only be one complete push log entry. + require.Len(t, completePullLogEntries, 1) + + // Check that the complete log entry is as expected. + completePullEntry := completePullLogEntries[0] + + require.Equal(t, servers[0], completePullEntry.ServerAddr) + require.Equal( + t, universe.ProofSyncStatusComplete, + completePullEntry.SyncStatus, + ) + require.Equal( + t, universe.SyncDirectionPull, completePullEntry.SyncDirection, + ) + require.Equal(t, uniId.String(), completePullEntry.UniID.String()) + require.Equal(t, int64(0), completePullEntry.AttemptCounter) + + assertProofSyncLogLeafKey( + t, uniProof.LeafKey, completePullEntry.LeafKey, + ) + assertProofSyncLogLeaf(t, *uniProof.Leaf, completePullEntry.Leaf) + + // Increment the attempt counter for one of the pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusPending, + true, + ) + require.NoError(t, err) + + // Check that the attempt counter was incremented as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + if entry.ServerAddr == servers[1] { + require.Equal(t, int64(1), entry.AttemptCounter) + } else { + require.Equal(t, int64(0), entry.AttemptCounter) + } + } + + // Upsert without incrementing the attempt counter for one of the + // pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusPending, + false, + ) + require.NoError(t, err) + + // Check that the attempt counter was not changed as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 2) + + for i := range pendingLogEntries { + entry := pendingLogEntries[i] + if entry.ServerAddr == servers[1] { + require.Equal(t, int64(1), entry.AttemptCounter) + } else { + require.Equal(t, int64(0), entry.AttemptCounter) + } + } + + // Set the sync status to complete for one of the pending log entries. + _, err = fedStore.UpsertFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, servers[1], + universe.SyncDirectionPush, universe.ProofSyncStatusComplete, + false, + ) + require.NoError(t, err) + + // Check that the sync status was updated as expected. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 1) + + completePushLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusComplete, + ) + require.NoError(t, err) + require.Len(t, completePushLogEntries, 2) + + // Delete log entries for one of the servers. + err = fedStore.DeleteProofsSyncLogEntries(ctx, servers[0], servers[1]) + require.NoError(t, err) + + // Only one log entry should remain and it should have sync status + // pending. + pendingLogEntries, err = fedStore.QueryFederationProofSyncLog( + ctx, uniId, uniProof.LeafKey, universe.SyncDirectionPush, + universe.ProofSyncStatusPending, + ) + require.NoError(t, err) + require.Len(t, pendingLogEntries, 1) + + // Check that the remaining log entry is as expected. + pendingEntry := pendingLogEntries[0] + require.Equal(t, servers[2], pendingEntry.ServerAddr) +} + +// assertProofSyncLogLeafKey asserts that a leaf key derived from a proof sync +// log entry is equal to a given leaf key. +func assertProofSyncLogLeafKey(t *testing.T, actualLeafKey universe.LeafKey, + logLeafKey universe.LeafKey) { + + // We can safely ignore the tweaked script key as it is the derivation + // information for the script key. It is only ever known to the owner of + // the asset and is never serialized in a proof + actualLeafKey.ScriptKey.TweakedScriptKey = nil + require.Equal(t, actualLeafKey, logLeafKey) +} + +// assertProofSyncLogLeaf asserts that a leaf derived from a proof sync log +// entry is equal to a given universe leaf. +func assertProofSyncLogLeaf(t *testing.T, actualLeaf universe.Leaf, + logLeaf universe.Leaf) { + + if actualLeaf.GenesisWithGroup.GroupKey != nil { + // We can safely ignore the group key witness as it is the + // basically just extracted from the asset and won't be relevant + // when parsing the proof. + actualLeaf.GenesisWithGroup.GroupKey.Witness = nil + + // We can safely ignore the pre-tweaked group key + // (GroupKey.RawKey) as it is the derivation information for the + // group key. It is only ever known to the owner of the asset + // and is never serialized in a proof. + actualLeaf.GenesisWithGroup.GroupKey.RawKey.PubKey = nil + } + + require.Equal(t, actualLeaf.Amt, logLeaf.Amt) + require.Equal(t, actualLeaf.RawProof, logLeaf.RawProof) + require.Equal(t, actualLeaf.GenesisWithGroup, logLeaf.GenesisWithGroup) + + // We compare the assets with our custom asset quality function as the + // SplitCommitmentRoot field MS-SMT node types will differ. A computed + // node is derived from the database data whereas the generated asset + // may have a MS-SMT branch node type. + actualLeaf.Asset.DeepEqual(logLeaf.Asset) +} + // TestFederationConfigDefault tests that we're able to fetch the default // federation config. func TestFederationConfigDefault(t *testing.T) { diff --git a/tapdb/universe_test.go b/tapdb/universe_test.go index c5803e02b..b82223e40 100644 --- a/tapdb/universe_test.go +++ b/tapdb/universe_test.go @@ -129,7 +129,12 @@ func randLeafKey(t *testing.T) universe.LeafKey { } } -func randProof(t *testing.T) *proof.Proof { +func randProof(t *testing.T, argAsset *asset.Asset) *proof.Proof { + proofAsset := *asset.RandAsset(t, asset.Normal) + if argAsset != nil { + proofAsset = *argAsset + } + return &proof.Proof{ PrevOut: wire.OutPoint{}, BlockHeader: wire.BlockHeader{ @@ -142,7 +147,7 @@ func randProof(t *testing.T) *proof.Proof { }}, }, TxMerkleProof: proof.TxMerkleProof{}, - Asset: *asset.RandAsset(t, asset.Normal), + Asset: proofAsset, InclusionProof: proof.TaprootProof{ InternalKey: test.RandPubKey(t), }, @@ -152,7 +157,7 @@ func randProof(t *testing.T) *proof.Proof { func randMintingLeaf(t *testing.T, assetGen asset.Genesis, groupKey *btcec.PublicKey) universe.Leaf { - randProof := randProof(t) + randProof := randProof(t, nil) leaf := universe.Leaf{ GenesisWithGroup: universe.GenesisWithGroup{ @@ -320,7 +325,7 @@ func TestUniverseIssuanceProofs(t *testing.T) { testLeaf := &testLeaves[idx] var proofBuf bytes.Buffer - randProof := randProof(t) + randProof := randProof(t, nil) require.NoError(t, randProof.Encode(&proofBuf)) testLeaf.Leaf.RawProof = proofBuf.Bytes() diff --git a/tapgarden/caretaker.go b/tapgarden/caretaker.go index fb1c1c65a..78b867174 100644 --- a/tapgarden/caretaker.go +++ b/tapgarden/caretaker.go @@ -1179,6 +1179,11 @@ func (b *BatchCaretaker) storeMintingProof(ctx context.Context, ID: uniID, Key: leafKey, Leaf: mintingLeaf, + + // We set this to true to indicate that we would like the syncer + // to log and reattempt (in the event of a failure) to push sync + // this proof leaf. + LogProofSync: true, }, nil } diff --git a/universe/auto_syncer.go b/universe/auto_syncer.go index 4bed26bae..688b599b3 100644 --- a/universe/auto_syncer.go +++ b/universe/auto_syncer.go @@ -71,8 +71,18 @@ type FederationPushReq struct { // Leaf is the new leaf to add. Leaf *Leaf + // resp is a channel that will be sent the asset issuance/transfer + // proof and corresponding universe/multiverse inclusion proofs if the + // federation proof push was successful. resp chan *Proof - err chan error + + // LogProofSync is a boolean that indicates, if true, that the proof + // leaf sync attempt should be logged and actively managed to ensure + // that the federation push procedure is repeated in the event of a + // failure. + LogProofSync bool + + err chan error } // FederationProofBatchPushReq is used to push out a batch of universe proof @@ -97,8 +107,12 @@ type FederationEnvoy struct { stopOnce sync.Once + // pushRequests is a channel that will be sent new requests to push out + // proof leaves to the federation. pushRequests chan *FederationPushReq + // batchPushRequests is a channel that will be sent new requests to push + // out batch proof leaves to the federation. batchPushRequests chan *FederationProofBatchPushReq } @@ -219,48 +233,105 @@ func (f *FederationEnvoy) syncServerState(ctx context.Context, return nil } -// pushProofToFederation attempts to push out a new proof to the current -// federation in parallel. -func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, - leaf *Leaf) { +// pushProofToServer attempts to push out a new proof to the target server. +func (f *FederationEnvoy) pushProofToServer(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { - // Fetch all universe servers in our federation. - fedServers, err := f.tryFetchServers() - if err != nil || len(fedServers) == 0 { - return + remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) + if err != nil { + return fmt.Errorf("cannot push proof unable to connect "+ + "to remote server(%v): %w", addr.HostStr(), err) } - log.Infof("Pushing new proof to %v federation members, proof_key=%v", - len(fedServers), spew.Sdump(key)) + _, err = remoteUniverseServer.UpsertProofLeaf( + ctx, uniID, key, leaf, + ) + if err != nil { + return fmt.Errorf("cannot push proof to remote "+ + "server(%v): %w", addr.HostStr(), err) + } - ctx, cancel := f.WithCtxQuitNoTimeout() - defer cancel() + return nil +} + +// pushProofToServerLogged attempts to push out a new proof to the target +// server, and logs the sync attempt. +func (f *FederationEnvoy) pushProofToServerLogged(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, addr ServerAddr) error { + + // Ensure that we have a pending sync log entry for this + // leaf and server pair. This will allow us to handle all + // pending syncs in the event of a restart or at a different + // point in the envoy. + _, err := f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusPending, true, + ) + if err != nil { + return fmt.Errorf("unable to log proof sync as pending: %w", + err) + } + + // Push the proof to the remote server. + err = f.pushProofToServer(ctx, uniID, key, leaf, addr) + if err != nil { + return fmt.Errorf("cannot push proof to remote server(%v): %w", + addr.HostStr(), err) + } + + // We did not encounter an error in our proof push + // attempt. Log the proof sync attempt as complete. + _, err = f.cfg.FederationDB.UpsertFederationProofSyncLog( + ctx, uniID, key, addr, SyncDirectionPush, + ProofSyncStatusComplete, false, + ) + if err != nil { + return fmt.Errorf("unable to log proof sync attempt: %w", err) + } + + return nil +} + +// pushProofToFederation attempts to push out a new proof to the current +// federation in parallel. +func (f *FederationEnvoy) pushProofToFederation(ctx context.Context, + uniID Identifier, key LeafKey, leaf *Leaf, fedServers []ServerAddr, + logProofSync bool) { + + log.Infof("Pushing proof to %v federation members, proof_key=%v", + len(fedServers), spew.Sdump(key)) // To push a new proof out, we'll attempt to dial to the remote // registrar, then will attempt to push the new proof directly to the // register. pushNewProof := func(ctx context.Context, addr ServerAddr) error { - remoteUniverseServer, err := f.cfg.NewRemoteRegistrar(addr) - if err != nil { - log.Warnf("cannot push proof unable to connect "+ - "to remote server(%v): %v", addr.HostStr(), - err) + // If we are logging proof sync attempts, we will use the + // logged version of the push function. + if logProofSync { + err := f.pushProofToServerLogged( + ctx, uniID, key, leaf, addr, + ) + if err != nil { + log.Warnf("Cannot push proof via logged "+ + "server push: %v", err) + } + return nil } - _, err = remoteUniverseServer.UpsertProofLeaf( - ctx, uniID, key, leaf, - ) + // If we are not logging proof sync attempts, we will use the + // non-logged version of the push function. + err := f.pushProofToServer(ctx, uniID, key, leaf, addr) if err != nil { - log.Warnf("cannot push proof to remote "+ - "server(%v): %v", addr.HostStr(), err) + log.Warnf("Cannot push proof: %v", err) } + return nil } // To conclude, we'll attempt to push the new proof to all the universe // servers in parallel. - err = fn.ParSlice(ctx, fedServers, pushNewProof) + err := fn.ParSlice(ctx, fedServers, pushNewProof) if err != nil { // TODO(roasbeef): retry in the background until successful? log.Errorf("unable to push proof to federation: %v", err) @@ -268,6 +339,57 @@ func (f *FederationEnvoy) pushProofToFederation(uniID Identifier, key LeafKey, } } +// filterProofSyncPending filters out servers that have already been synced +// with for the given leaf. +func (f *FederationEnvoy) filterProofSyncPending(fedServers []ServerAddr, + uniID Identifier, key LeafKey) ([]ServerAddr, error) { + + // If there are no servers to filter, then we'll return early. This + // saves from querying the database unnecessarily. + if len(fedServers) == 0 { + return nil, nil + } + + ctx, cancel := f.WithCtxQuit() + defer cancel() + + // Select all sync push complete log entries for the given universe + // leaf. If there are any servers which are sync complete within this + // log set, we will filter them out of our target server set. + logs, err := f.cfg.FederationDB.QueryFederationProofSyncLog( + ctx, uniID, key, SyncDirectionPush, + ProofSyncStatusComplete, + ) + if err != nil { + return nil, fmt.Errorf("unable to query federation sync log: %w", + err) + } + + // Construct a map of servers that have already been synced with for the + // given leaf. + syncedServers := make(map[string]struct{}) + for idx := range logs { + logEntry := logs[idx] + syncedServers[logEntry.ServerAddr.HostStr()] = struct{}{} + } + + // Filter out servers that we've already pushed to. + filteredFedServers := fn.Filter(fedServers, func(a ServerAddr) bool { + // Filter out servers that have a log entry with sync status + // complete. + if _, ok := syncedServers[a.HostStr()]; ok { + return false + } + + // By this point we haven't found logs corresponding to the + // given server, we will therefore return true and include the + // server as a sync target for the given leaf. + return true + }) + + return filteredFedServers, nil +} + // syncer is the main goroutine that's responsible for interacting with the // federation envoy. It also accepts incoming requests to push out new updates // to the federation. @@ -281,13 +403,25 @@ func (f *FederationEnvoy) syncer() { syncTicker := time.NewTicker(f.cfg.SyncInterval) defer syncTicker.Stop() + // We'll use a timeout that's slightly less than the sync interval to + // help avoid ticking into a new sync event before the previous event + // has finished. + syncContextTimeout := f.cfg.SyncInterval - 1*time.Second + if syncContextTimeout < 0 { + // If the sync interval is less than a second, then we'll use + // the sync interval as the timeout. + syncContextTimeout = f.cfg.SyncInterval + } + for { select { // A new sync event has just been triggered, so we'll attempt // to synchronize state with all the active universe servers in // the federation. case <-syncTicker.C: - // Error propogation is handled in tryFetchServers, we + log.Debug("Federation envoy handling new tick event") + + // Error propagation is handled in tryFetchServers, we // only need to exit here. fedServers, err := f.tryFetchServers() if err != nil { @@ -305,11 +439,60 @@ func (f *FederationEnvoy) syncer() { continue } + // After we've synced with the federation, we'll + // attempt to push out any pending proofs that we + // haven't yet completed. + ctxFetchLog, cancelFetchLog := f.WithCtxQuitNoTimeout() + syncDirection := SyncDirectionPush + db := f.cfg.FederationDB + logEntries, err := db.FetchPendingProofsSyncLog( + ctxFetchLog, &syncDirection, + ) + cancelFetchLog() + if err != nil { + log.Warnf("unable to query pending push "+ + "sync log: %w", err) + continue + } + + if len(logEntries) > 0 { + log.Debugf("Handling pending proof sync log "+ + "entries (entries_count=%d)", + len(logEntries)) + } + + // TODO(ffranr): Take account of any new servers that + // have been added since the last time we populated the + // log for a given proof leaf. Pending proof sync log + // entries are only relevant for the set of servers + // that existed at the time the log entry was created. + // If a new server is added, then we should create a + // new log entry for the new server. + + for idx := range logEntries { + entry := logEntries[idx] + + servers := []ServerAddr{ + entry.ServerAddr, + } + + ctxPush, cancelPush := + f.CtxBlockingCustomTimeout( + syncContextTimeout, + ) + f.pushProofToFederation( + ctxPush, entry.UniID, entry.LeafKey, + &entry.Leaf, servers, true, + ) + cancelPush() + } + // A new push request has just arrived. We'll perform a // asynchronous registration with the local Universe registrar, // then push it out in an async manner to the federation // members. case pushReq := <-f.pushRequests: + log.Debug("Federation envoy handling push request") ctx, cancel := f.WithCtxQuit() // First, we'll attempt to registrar the proof leaf with @@ -333,13 +516,53 @@ func (f *FederationEnvoy) syncer() { // proof out to the federation in the background. pushReq.resp <- newProof - // With the response sent above, we'll push this out to - // all the Universe servers in the background. - go f.pushProofToFederation( - pushReq.ID, pushReq.Key, pushReq.Leaf, + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch "+ + "federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + + if pushReq.LogProofSync { + // We are attempting to sync using the + // logged proof sync procedure. We will + // therefore narrow down the set of target + // servers based on the sync log. Only servers + // that are not yet push sync complete will be + // targeted. + fedServers, err = f.filterProofSyncPending( + fedServers, pushReq.ID, pushReq.Key, + ) + if err != nil { + log.Warnf("failed to filter " + + "federation servers") + continue + } + } + + // With the response sent above, we'll push this + // out to all the Universe servers in the + // background. + ctxPush, cancelPush := f.WithCtxQuitNoTimeout() + f.pushProofToFederation( + ctxPush, pushReq.ID, pushReq.Key, + pushReq.Leaf, fedServers, + pushReq.LogProofSync, ) + cancelPush() case pushReq := <-f.batchPushRequests: + log.Debug("Federation envoy handling batch push " + + "request") ctx, cancel := f.WithCtxQuitNoTimeout() // First, we'll attempt to registrar the proof leaf with @@ -362,16 +585,34 @@ func (f *FederationEnvoy) syncer() { // we'll return back to the caller. pushReq.resp <- struct{}{} + // Fetch all universe servers in our federation. + fedServers, err := f.tryFetchServers() + if err != nil { + err := fmt.Errorf("unable to fetch "+ + "federation servers: %w", err) + log.Warnf(err.Error()) + pushReq.err <- err + continue + } + + if len(fedServers) == 0 { + log.Warnf("could not find any federation " + + "servers") + continue + } + // With the response sent above, we'll push this out to // all the Universe servers in the background. - go func() { - for idx := range pushReq.Batch { - item := pushReq.Batch[idx] - f.pushProofToFederation( - item.ID, item.Key, item.Leaf, - ) - } - }() + ctxPush, cancelPush := f.WithCtxQuitNoTimeout() + for idx := range pushReq.Batch { + item := pushReq.Batch[idx] + + f.pushProofToFederation( + ctxPush, item.ID, item.Key, item.Leaf, + fedServers, item.LogProofSync, + ) + } + cancelPush() case <-f.Quit: return @@ -387,12 +628,18 @@ func (f *FederationEnvoy) syncer() { func (f *FederationEnvoy) UpsertProofLeaf(_ context.Context, id Identifier, key LeafKey, leaf *Leaf) (*Proof, error) { + // If we're attempting to push an issuance proof, then we'll ensure + // that we track the sync attempt to ensure that we retry in the event + // of a failure. + logProofSync := id.ProofType == ProofTypeIssuance + pushReq := &FederationPushReq{ - ID: id, - Key: key, - Leaf: leaf, - resp: make(chan *Proof, 1), - err: make(chan error, 1), + ID: id, + Key: key, + Leaf: leaf, + LogProofSync: logProofSync, + resp: make(chan *Proof, 1), + err: make(chan error, 1), } if !fn.SendOrQuit(f.pushRequests, pushReq, f.Quit) { diff --git a/universe/interface.go b/universe/interface.go index 5fdf19adb..29712cc4f 100644 --- a/universe/interface.go +++ b/universe/interface.go @@ -84,6 +84,86 @@ func (i *Identifier) StringForLog() string { i.String(), i.AssetID[:], groupKey, i.ProofType) } +// NewUniIDFromAsset creates a new universe ID from an asset. +func NewUniIDFromAsset(a asset.Asset) Identifier { + proofType := ProofTypeTransfer + if a.IsGenesisAsset() { + proofType = ProofTypeIssuance + } + + if a.GroupKey != nil { + return Identifier{ + GroupKey: &a.GroupKey.GroupPubKey, + ProofType: proofType, + } + } + + return Identifier{ + AssetID: a.ID(), + ProofType: proofType, + } +} + +// NewUniIDFromRawArgs creates a new universe ID from the raw arguments. The +// asset ID bytes and group key bytes are mutually exclusive. If the group key +// bytes are set, then the asset ID bytes will be ignored. +// This function is useful in deriving a universe ID from the data stored in the +// database. +func NewUniIDFromRawArgs(assetIDBytes []byte, groupKeyBytes []byte, + proofTypeStr string) (Identifier, error) { + + proofType, err := ParseStrProofType(proofTypeStr) + if err != nil { + return Identifier{}, err + } + + // If the group key bytes are set, then we'll preferentially populate + // the universe ID with that and not the asset ID. + if len(groupKeyBytes) != 0 { + groupKey, err := parseGroupKey(groupKeyBytes) + if err != nil { + return Identifier{}, fmt.Errorf("unable to parse "+ + "group key: %w", err) + } + return Identifier{ + GroupKey: groupKey, + ProofType: proofType, + }, nil + } + + // At this point we know that the group key bytes are nil, so we'll + // attempt to parse the asset ID bytes. + if len(assetIDBytes) == 0 { + return Identifier{}, fmt.Errorf("asset ID bytes and group " + + "key bytes are both nil") + } + + var assetID asset.ID + copy(assetID[:], assetIDBytes) + + return Identifier{ + AssetID: assetID, + ProofType: proofType, + }, nil +} + +// parseGroupKey parses a group key from bytes, which can be in either the +// Schnorr or Compressed format. +func parseGroupKey(scriptKey []byte) (*btcec.PublicKey, error) { + switch len(scriptKey) { + case schnorr.PubKeyBytesLen: + return schnorr.ParsePubKey(scriptKey) + + // Truncate the key and then parse as a Schnorr key. + case btcec.PubKeyBytesLenCompressed: + return schnorr.ParsePubKey(scriptKey[1:]) + + default: + return nil, fmt.Errorf("unknown script key length: %v", + len(scriptKey)) + } +} + // ValidateProofUniverseType validates that the proof type matches the universe // identifier proof type. func ValidateProofUniverseType(a *asset.Asset, uniID Identifier) error { @@ -337,6 +417,12 @@ type Item struct { // MetaReveal is the meta reveal associated with the given proof leaf. MetaReveal *proof.MetaReveal + + // LogProofSync is a boolean that indicates, if true, that the proof + // leaf sync attempt should be logged and actively managed to ensure + // that the federation push procedure is repeated in the event of a + // failure. + LogProofSync bool } // BatchRegistrar is an interface that allows a caller to register a batch of @@ -697,10 +783,113 @@ type FederationSyncConfigDB interface { uniSyncConfigs []*FedUniSyncConfig) error } -// FederationDB is used for CRUD operations related to federation sync config -// and tracked servers. +// SyncDirection is the direction of a proof sync. +type SyncDirection string + +const ( + // SyncDirectionPush indicates that the sync is a push sync (from the local + // server to the remote server). + SyncDirectionPush SyncDirection = "push" + + // SyncDirectionPull indicates that the sync is a pull sync (from the remote + // server to the local server). + SyncDirectionPull SyncDirection = "pull" +) + +// ParseStrSyncDirection parses a string into a SyncDirection. +func ParseStrSyncDirection(s string) (SyncDirection, error) { + switch s { + case string(SyncDirectionPush): + return SyncDirectionPush, nil + case string(SyncDirectionPull): + return SyncDirectionPull, nil + default: + return "", fmt.Errorf("unknown sync direction: %v", s) + } +} + +// ProofSyncStatus is the status of a proof sync. +type ProofSyncStatus string + +const ( + // ProofSyncStatusPending indicates that the sync is pending. + ProofSyncStatusPending ProofSyncStatus = "pending" + + // ProofSyncStatusComplete indicates that the sync is complete. + ProofSyncStatusComplete ProofSyncStatus = "complete" +) + +// ParseStrProofSyncStatus parses a string into a ProofSyncStatus. +func ParseStrProofSyncStatus(s string) (ProofSyncStatus, error) { + switch s { + case string(ProofSyncStatusPending): + return ProofSyncStatusPending, nil + case string(ProofSyncStatusComplete): + return ProofSyncStatusComplete, nil + default: + return "", fmt.Errorf("unknown proof sync status: %v", s) + } +} + +// ProofSyncLogEntry is a log entry for a proof sync. +type ProofSyncLogEntry struct { + // Timestamp is the timestamp of the log entry. + Timestamp time.Time + + // SyncStatus is the status of the sync. + SyncStatus ProofSyncStatus + + // SyncDirection is the direction of the sync. + SyncDirection SyncDirection + + // AttemptCounter is the number of times the sync has been attempted. + AttemptCounter int64 + + // ServerAddr is the address of the sync counterparty server. + ServerAddr ServerAddr + + // UniID is the identifier of the universe associated with the sync event. + UniID Identifier + + // LeafKey is the leaf key associated with the sync event. + LeafKey LeafKey + + // Leaf is the leaf associated with the sync event. + Leaf Leaf +} + +// FederationProofSyncLog is used for CRUD operations relating to the federation +// proof sync log. +type FederationProofSyncLog interface { + // UpsertFederationProofSyncLog upserts a federation proof sync log + // entry for a given universe server and proof. + UpsertFederationProofSyncLog(ctx context.Context, uniID Identifier, + leafKey LeafKey, addr ServerAddr, syncDirection SyncDirection, + syncStatus ProofSyncStatus, + bumpSyncAttemptCounter bool) (int64, error) + + // QueryFederationProofSyncLog queries the federation proof sync log and + // returns the log entries which correspond to the given universe proof + // leaf. + QueryFederationProofSyncLog(ctx context.Context, uniID Identifier, + leafKey LeafKey, syncDirection SyncDirection, + syncStatus ProofSyncStatus) ([]*ProofSyncLogEntry, error) + + // FetchPendingProofsSyncLog queries the federation proof sync log and + // returns all log entries with sync status pending. + FetchPendingProofsSyncLog(ctx context.Context, + syncDirection *SyncDirection) ([]*ProofSyncLogEntry, error) + + // DeleteProofsSyncLogEntries deletes proof sync log entries. + DeleteProofsSyncLogEntries(ctx context.Context, + servers ...ServerAddr) error +} + +// FederationDB is used for CRUD operations related to federation logs and +// configuration. type FederationDB interface { FederationLog + FederationProofSyncLog FederationSyncConfigDB }