From 4568d45879b1c522cad08b57ef5f957cd823da79 Mon Sep 17 00:00:00 2001 From: Miles Zhang Date: Mon, 1 Jul 2024 10:53:52 +0800 Subject: [PATCH 1/4] test: update udt info worker needs to run (#2015) Signed-off-by: Miles Zhang --- test/models/ckb_sync/node_data_processor_test.rb | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/models/ckb_sync/node_data_processor_test.rb b/test/models/ckb_sync/node_data_processor_test.rb index 7b815977a..683ec346a 100644 --- a/test/models/ckb_sync/node_data_processor_test.rb +++ b/test/models/ckb_sync/node_data_processor_test.rb @@ -3758,11 +3758,11 @@ class NodeDataProcessorTest < ActiveSupport::TestCase ] node_block = CKB::Types::Block.new(uncles: [], proposals: [], transactions:, header:) - block = node_data_processor.process_block(node_block) - udt1 = Udt.find_by(args: udt_script1.args) - udt2 = Udt.find_by(args: udt_script2.args) - CkbSync::Api.any_instance.stubs(:get_tip_block_number).returns(block.number + 1) Sidekiq::Testing.inline! do + block = node_data_processor.process_block(node_block) + udt1 = Udt.find_by(args: udt_script1.args) + udt2 = Udt.find_by(args: udt_script2.args) + CkbSync::Api.any_instance.stubs(:get_tip_block_number).returns(block.number + 1) VCR.use_cassette("blocks/#{DEFAULT_NODE_BLOCK_NUMBER}", record: :new_episodes) do node_data_processor.call From 64084d14cd9e74c996a7ee6bde1127c13ed12ab3 Mon Sep 17 00:00:00 2001 From: Miles Zhang Date: Mon, 1 Jul 2024 16:49:51 +0800 Subject: [PATCH 2/4] feat: adjust tags filter logic (#2017) * feat: adjust tags filter logic Signed-off-by: Miles Zhang * test: fix test Signed-off-by: Miles Zhang --------- Signed-off-by: Miles Zhang --- app/controllers/api/v1/xudts_controller.rb | 2 +- app/controllers/api/v2/nft/collections_controller.rb | 2 +- test/controllers/api/v1/xudts_controller_test.rb | 2 +- .../api/v2/nft/collections_controller_test.rb | 9 --------- 4 files changed, 3 insertions(+), 12 deletions(-) diff --git a/app/controllers/api/v1/xudts_controller.rb b/app/controllers/api/v1/xudts_controller.rb index ef8b7d643..0600c8a04 100644 --- a/app/controllers/api/v1/xudts_controller.rb +++ b/app/controllers/api/v1/xudts_controller.rb @@ -15,7 +15,7 @@ def index if params[:tags].present? tags = parse_tags - scope = scope.joins(:xudt_tag).where("xudt_tags.tags @> array[?]::varchar[]", tags).select("udts.*") unless tags.empty? + scope = scope.joins(:xudt_tag).where("xudt_tags.tags && ARRAY[?]::varchar[]", tags).select("udts.*") unless tags.empty? end if stale?(scope) diff --git a/app/controllers/api/v2/nft/collections_controller.rb b/app/controllers/api/v2/nft/collections_controller.rb index b74b12dd0..c007a7cff 100644 --- a/app/controllers/api/v2/nft/collections_controller.rb +++ b/app/controllers/api/v2/nft/collections_controller.rb @@ -9,7 +9,7 @@ def index end if params[:tags].present? tags = parse_tags - scope = scope.where("tags @> array[?]::varchar[]", tags) unless tags.empty? + scope = scope.where("tags && ARRAY[?]::varchar[]", tags) unless tags.empty? end pagy, collections = pagy(sort_collections(scope)) diff --git a/test/controllers/api/v1/xudts_controller_test.rb b/test/controllers/api/v1/xudts_controller_test.rb index 1e25bd7eb..7c2af0c92 100644 --- a/test/controllers/api/v1/xudts_controller_test.rb +++ b/test/controllers/api/v1/xudts_controller_test.rb @@ -34,7 +34,7 @@ class XudtsControllerTest < ActionDispatch::IntegrationTest create(:xudt_tag, udt:, tags: ["duplicate", "layer-1-asset", "supply-limited"]) udt2 = create(:udt, :xudt, symbol: "RPGG") create(:xudt_tag, udt: udt2, tags: ["duplicate", "layer-2-asset", "supply-limited"]) - valid_get api_v1_xudts_url, params: { symbol: "CKBB", "tags": "layer-1-asset,supply-limited,NOT EXIST" } + valid_get api_v1_xudts_url, params: { symbol: "CKBB", "tags": "duplicate,supply-limited" } assert_response :success assert_equal "CKBB", json["data"].first["attributes"]["symbol"] assert_equal ["duplicate", "layer-1-asset", "supply-limited"], json["data"].first["attributes"]["xudt_tags"] diff --git a/test/controllers/api/v2/nft/collections_controller_test.rb b/test/controllers/api/v2/nft/collections_controller_test.rb index 4b36d49e6..15def51e2 100644 --- a/test/controllers/api/v2/nft/collections_controller_test.rb +++ b/test/controllers/api/v2/nft/collections_controller_test.rb @@ -22,15 +22,6 @@ class NFT::CollectionsControllerTest < ActionDispatch::IntegrationTest assert_equal JSON.parse(response.body)["data"].size, 2 end - test "should filter by tags but not match" do - create :token_collection, name: "token1", tags: ["layer-1-asset", "rgbpp-compatible"] - create :token_collection, name: "token2", tags: ["layer-1-asset", "rgbpp-compatible"] - - get api_v2_nft_collections_url, params: { tags: "layer-1-asset,invalid" } - assert_response :success - assert_equal JSON.parse(response.body)["data"].size, 0 - end - test "sort by block_timestamp asc" do block1 = create(:block, :with_block_hash, timestamp: 10.days.ago.to_i * 1000) block3 = create(:block, :with_block_hash, timestamp: 1.day.ago.to_i * 1000) From 79c14afc6ca29a58cf31e891af8b21d49f332b9b Mon Sep 17 00:00:00 2001 From: Miles Zhang Date: Wed, 3 Jul 2024 14:38:08 +0800 Subject: [PATCH 3/4] feat: destroy pending tx when exist samed commited tx (#2018) Signed-off-by: Miles Zhang --- app/workers/clean_up_worker.rb | 8 -------- app/workers/pool_transaction_check_worker.rb | 4 ++++ 2 files changed, 4 insertions(+), 8 deletions(-) delete mode 100644 app/workers/clean_up_worker.rb diff --git a/app/workers/clean_up_worker.rb b/app/workers/clean_up_worker.rb deleted file mode 100644 index e126aab01..000000000 --- a/app/workers/clean_up_worker.rb +++ /dev/null @@ -1,8 +0,0 @@ -class CleanUpWorker - include Sidekiq::Worker - - def perform - CkbTransaction.tx_pending.where("created_at < ?", 1.day.ago).destroy_all - CkbTransaction.tx_rejected.where("created_at < ?", 3.months.ago).destroy_all - end -end diff --git a/app/workers/pool_transaction_check_worker.rb b/app/workers/pool_transaction_check_worker.rb index 885c17163..d23eeef0d 100644 --- a/app/workers/pool_transaction_check_worker.rb +++ b/app/workers/pool_transaction_check_worker.rb @@ -10,6 +10,10 @@ def perform response_string = CkbSync::Api.instance.directly_single_call_rpc method: "get_transaction", params: [tx.tx_hash] reason = response_string["result"]["tx_status"] + if reason["status"] == "committed" && CkbTransaction.where(tx_hash: tx.tx_hash, tx_status: :committed).exists? + tx.destroy + end + if reason["status"] == "rejected" ApplicationRecord.transaction do tx.update! tx_status: "rejected" From 57bd1c1a599e919bc9fd5313a8d531e7fac11708 Mon Sep 17 00:00:00 2001 From: Rabbit Date: Wed, 3 Jul 2024 16:52:38 +0800 Subject: [PATCH 4/4] refactor: contract statistic (#2020) --- app/workers/contract_statistic_worker.rb | 32 +++++++++++++++++-- .../generate_udt_holder_allocation_worker.rb | 28 ++++++++++++---- lib/scheduler.rb | 2 +- 3 files changed, 51 insertions(+), 11 deletions(-) diff --git a/app/workers/contract_statistic_worker.rb b/app/workers/contract_statistic_worker.rb index 942c22254..e7993ee39 100644 --- a/app/workers/contract_statistic_worker.rb +++ b/app/workers/contract_statistic_worker.rb @@ -4,16 +4,42 @@ class ContractStatisticWorker def perform h24_tx_ids = CkbTransaction.h24.pluck(:id) + pool_size = 10 + pool = Concurrent::FixedThreadPool.new(pool_size) + Contract.find_each do |contract| + # ckb_address_ids = fetch_ckb_address_ids(contract, pool) contract.update( ckb_transactions_count: contract.cell_dependencies.count, h24_ckb_transactions_count: contract.cell_dependencies.where(ckb_transaction_id: h24_tx_ids).count, deployed_cells_count: contract.deployed_cell_outputs&.live&.size, - referring_cells_count: contract.referring_cell_outputs&.live&.size, + referring_cells_count: contract.referring_cell_outputs.live.size, total_deployed_cells_capacity: contract.deployed_cell_outputs&.live&.sum(:capacity), - total_referring_cells_capacity: contract.referring_cell_outputs&.live&.sum(:capacity), - addresses_count: contract.referring_cell_outputs&.live&.select(:address_id)&.distinct&.count, + total_referring_cells_capacity: contract.referring_cell_outputs.live.sum(:capacity), + # addresses_count: ckb_address_ids.count, ) end + + # 关闭线程池 + pool.shutdown + pool.wait_for_termination + end + + private + + def fetch_ckb_address_ids(contract, pool) + ckb_address_ids = Concurrent::Set.new + + futures = [] + + contract.referring_cell_outputs.live.find_in_batches(batch_size: 10_000) do |batch| + futures << Concurrent::Promises.future_on(pool) do + batch.each { |cell_output| ckb_address_ids.add(cell_output.address_id) } + end + end + + Concurrent::Promises.zip(*futures).value! + + ckb_address_ids end end diff --git a/app/workers/generate_udt_holder_allocation_worker.rb b/app/workers/generate_udt_holder_allocation_worker.rb index 133e6fcfa..6417fa624 100644 --- a/app/workers/generate_udt_holder_allocation_worker.rb +++ b/app/workers/generate_udt_holder_allocation_worker.rb @@ -14,14 +14,8 @@ def update_udt_holder_allocation(udt) type_script = TypeScript.find_by(udt.type_script) return unless type_script + btc_address_ids = fetch_btc_address_ids(type_script) holder_allocation = UdtHolderAllocation.find_or_initialize_by(udt:, contract_id: nil) - ckb_address_ids = CellOutput.live.where(type_script:).distinct.pluck(:address_id) - btc_address_ids = [] - ckb_address_ids.each_slice(1000) do |address_ids| - ids = BitcoinAddressMapping.where(ckb_address_id: address_ids).pluck(:bitcoin_address_id) - btc_address_ids.concat(ids).uniq! - end - holder_allocation.update!(btc_holder_count: btc_address_ids.count) end @@ -58,4 +52,24 @@ def update_contract_holder_allocation(udt) allocation.update!(ckb_holder_count: count) end end + + private + + def fetch_btc_address_ids(type_script) + btc_address_ids = Concurrent::Set.new + futures = [] + + CellOutput.live.where(type_script:).find_in_batches(batch_size: 1000) do |batch| + futures << Concurrent::Promises.future do + batch_ckb_address_ids = batch.pluck(:address_id) + ids = BitcoinAddressMapping.where(ckb_address_id: batch_ckb_address_ids).pluck(:bitcoin_address_id) + btc_address_ids.merge(ids) + end + end + + # 等待所有的 Future 完成 + Concurrent::Promises.zip(*futures).value! + + btc_address_ids + end end diff --git a/lib/scheduler.rb b/lib/scheduler.rb index a6f355404..6f86bcece 100644 --- a/lib/scheduler.rb +++ b/lib/scheduler.rb @@ -117,7 +117,7 @@ def call_worker(clz) call_worker CleanAddressBlockSnapshotWorker end -s.every "1h", overlap: false do +s.every "6h", overlap: false do call_worker ContractStatisticWorker end