diff --git a/app/jobs/csv_exportable/export_dao_depositors_job.rb b/app/jobs/csv_exportable/export_dao_depositors_job.rb index 94d1ae77f..ac74ef08a 100644 --- a/app/jobs/csv_exportable/export_dao_depositors_job.rb +++ b/app/jobs/csv_exportable/export_dao_depositors_job.rb @@ -3,27 +3,46 @@ module CsvExportable class ExportDaoDepositorsJob < BaseExporter def perform(args) - sql = "".dup + start_date, end_date = extract_dates(args) - if args[:start_date].present? - sql << "ckb_transactions.block_timestamp >= #{BigDecimal(args[:start_date])}" - end + rows = fetch_deposit_rows(start_date, end_date) + fetch_withdrawing_rows(start_date, end_date) - if args[:end_date].present? - sql << " AND ckb_transactions.block_timestamp <= #{BigDecimal(args[:end_date])}" - end + header = ["Address", "Capacity", "Txn hash", "Previous Txn hash", "UnixTimestamp", "date(UTC)"] + generate_csv(header, rows) + end + + private + + def extract_dates(args) + start_date = args[:start_date].present? ? BigDecimal(args[:start_date]) : nil + end_date = args[:end_date].present? ? BigDecimal(args[:end_date]) : nil + start_number = args[:start_number].presence + end_number = args[:end_number].presence - if args[:start_number].present? - sql << "ckb_transactions.block_number >= #{args[:start_number]}" + if start_number.present? + start_date = Block.find_by(number: start_number)&.timestamp end - if args[:end_number].present? - sql << " AND ckb_transactions.block_number <= #{args[:end_number]}" + if end_number.present? + end_date = Block.find_by(number: end_number)&.timestamp end + [start_date, end_date] + end + + def build_sql_query(start_date, end_date) + sql = "".dup + sql << "block_timestamp >= #{start_date}" if start_date.present? + sql << " AND " if start_date.present? && end_date.present? + sql << "block_timestamp <= #{end_date}" if end_date.present? + sql + end + + def fetch_deposit_rows(start_date, end_date) + sql = build_sql_query(start_date, end_date) rows = [] - CellOutput.left_joins(:ckb_transaction, :address).live.nervos_dao_deposit.where(sql). - select("cell_outputs.*, ckb_transactions.block_number, ckb_transactions.block_timestamp").find_in_batches(batch_size: 1000) do |cells| + + CellOutput.includes(:address).live.nervos_dao_deposit.where(sql).find_in_batches(batch_size: 500) do |cells| cells.each do |cell| amount = CkbUtils.shannon_to_byte(BigDecimal(cell.capacity)) datetime = datetime_utc(cell.block_timestamp) @@ -31,8 +50,14 @@ def perform(args) end end - CellOutput.left_joins(:ckb_transaction, :address).live.nervos_dao_withdrawing.where(sql). - select("cell_outputs.*, ckb_transactions.block_number, ckb_transactions.block_timestamp").find_in_batches(batch_size: 1000) do |cells| + rows + end + + def fetch_withdrawing_rows(start_date, end_date) + sql = build_sql_query(start_date, end_date) + rows = [] + + CellOutput.includes(:address).live.nervos_dao_withdrawing.where(sql).find_in_batches(batch_size: 500) do |cells| cells.each do |cell| cell_input = cell.ckb_transaction.cell_inputs.nervos_dao_deposit.first previous_cell_output = cell_input.previous_cell_output @@ -43,8 +68,7 @@ def perform(args) end end - header = ["Address", "Capacity", "Txn hash", "Previous Txn hash", "UnixTimestamp", "date(UTC)"] - generate_csv(header, rows) + rows end end end diff --git a/app/jobs/import_btc_time_cell_job.rb b/app/jobs/import_btc_time_cell_job.rb deleted file mode 100644 index ffbc49533..000000000 --- a/app/jobs/import_btc_time_cell_job.rb +++ /dev/null @@ -1,60 +0,0 @@ -class ImportBtcTimeCellJob < ApplicationJob - queue_as :bitcoin - - def perform(cell_id) - ApplicationRecord.transaction do - cell_output = CellOutput.find_by(id: cell_id) - return unless cell_output - - lock_script = cell_output.lock_script - return unless CkbUtils.is_btc_time_lock_cell?(lock_script) - - parsed_args = CkbUtils.parse_btc_time_lock_cell(lock_script.args) - txid = parsed_args.txid - Rails.logger.info("Importing btc time cell #{cell_id} txid #{txid}") - - # build bitcoin transaction - raw_tx = fetch_raw_transaction(txid) - return unless raw_tx - - tx = build_transaction!(raw_tx, cell_output.ckb_transaction) - # build transfer - BitcoinTransfer.create_with( - bitcoin_transaction_id: tx.id, - ckb_transaction_id: cell_output.ckb_transaction_id, - lock_type: "btc_time", - ).find_or_create_by!( - cell_output_id: cell_id, - ) - end - end - - def build_transaction!(raw_tx, ckb_tx) - tx = BitcoinTransaction.find_by(txid: raw_tx["txid"]) - return tx if tx - - created_at = Time.at((ckb_tx.block_timestamp / 1000).to_i).in_time_zone - BitcoinTransaction.create!( - txid: raw_tx["txid"], - tx_hash: raw_tx["hash"], - time: raw_tx["time"], - block_hash: raw_tx["blockhash"], - block_height: 0, - created_at:, - ) - end - - def fetch_raw_transaction(txid) - data = Rails.cache.read(txid) - data ||= rpc.getrawtransaction(txid, 2) - Rails.cache.write(txid, data, expires_in: 10.minutes) unless Rails.cache.exist?(txid) - data["result"] - rescue StandardError => e - Rails.logger.error "get bitcoin raw transaction #{txid} failed: #{e}" - nil - end - - def rpc - @rpc ||= Bitcoin::Rpc.instance - end -end diff --git a/app/jobs/import_rgbpp_cell_job.rb b/app/jobs/import_rgbpp_cell_job.rb deleted file mode 100644 index 546b05f6a..000000000 --- a/app/jobs/import_rgbpp_cell_job.rb +++ /dev/null @@ -1,159 +0,0 @@ -class ImportRgbppCellJob < ApplicationJob - class MissingVoutError < StandardError; end - class MissingAddressError < StandardError; end - - queue_as :bitcoin - - def perform(cell_id) - ApplicationRecord.transaction do - cell_output = CellOutput.find_by(id: cell_id) - return unless cell_output - - lock_script = cell_output.lock_script - return unless CkbUtils.is_rgbpp_lock_cell?(lock_script) - - txid, out_index = CkbUtils.parse_rgbpp_args(lock_script.args) - Rails.logger.info("Importing rgbpp cell #{cell_id} txid #{txid} out_index #{out_index}") - - # build bitcoin transaction - raw_tx = fetch_raw_transaction(txid) - return unless raw_tx - - tx = build_transaction!(raw_tx, cell_output.ckb_transaction) - # build op_returns - vout_attributes = [] - op_returns = build_op_returns!(raw_tx, tx, cell_output.ckb_transaction) - vout_attributes.concat(op_returns) if op_returns.present? - # build vouts - vout_attributes << build_vout!(raw_tx, tx, out_index, cell_output) - if vout_attributes.present? - BitcoinVout.upsert_all( - vout_attributes, - unique_by: %i[bitcoin_transaction_id index cell_output_id], - ) - end - # build vin - build_vin!(cell_id, tx) - # build transfer - BitcoinTransfer.create_with( - bitcoin_transaction_id: tx.id, - ckb_transaction_id: cell_output.ckb_transaction_id, - lock_type: "rgbpp", - ).find_or_create_by!( - cell_output_id: cell_id, - ) - end - rescue StandardError => e - Rails.logger.error(e.message) - end - - def build_transaction!(raw_tx, ckb_tx) - tx = BitcoinTransaction.find_by(txid: raw_tx["txid"]) - return tx if tx - - created_at = Time.at((ckb_tx.block_timestamp / 1000).to_i).in_time_zone - BitcoinTransaction.create!( - txid: raw_tx["txid"], - tx_hash: raw_tx["hash"], - time: raw_tx["time"], - block_hash: raw_tx["blockhash"], - block_height: 0, - created_at:, - ) - end - - def build_op_returns!(raw_tx, tx, ckb_tx) - op_returns = [] - - raw_tx["vout"].each do |vout| - data = vout.dig("scriptPubKey", "hex") - script_pubkey = Bitcoin::Script.parse_from_payload(data.htb) - next unless script_pubkey.op_return? - - op_return = { - bitcoin_transaction_id: tx.id, - bitcoin_address_id: nil, - data:, - index: vout.dig("n"), - asm: vout.dig("scriptPubKey", "asm"), - op_return: true, - ckb_transaction_id: ckb_tx.id, - cell_output_id: nil, - address_id: nil, - } - - next if BitcoinVout.exists?( - bitcoin_transaction_id: op_return[:bitcoin_transaction_id], - index: op_return[:index], - ) - - op_returns << op_return - end - - op_returns - end - - def build_vout!(raw_tx, tx, out_index, cell_output) - vout = raw_tx["vout"].find { _1["n"] == out_index } - raise MissingVoutError, "Missing vout txid: #{raw_tx['txid']} index: #{out_index}" unless vout - - address_hash = vout.dig("scriptPubKey", "address") - raise MissingAddressError, "Missing vout address: #{raw_tx['txid']} index: #{out_index}" unless address_hash - - address = build_address!(address_hash, cell_output) - { - bitcoin_transaction_id: tx.id, - bitcoin_address_id: address.id, - data: vout.dig("scriptPubKey", "hex"), - index: vout.dig("n"), - asm: vout.dig("scriptPubKey", "asm"), - op_return: false, - ckb_transaction_id: cell_output.ckb_transaction_id, - cell_output_id: cell_output.id, - address_id: cell_output.address_id, - } - end - - def build_vin!(cell_id, tx) - cell_input = CellInput.find_by(previous_cell_output_id: cell_id) - previous_vout = BitcoinVout.find_by(cell_output_id: cell_id) - if cell_input && previous_vout - BitcoinVin.create_with( - previous_bitcoin_vout_id: previous_vout.id, - ).find_or_create_by!( - ckb_transaction_id: cell_input.ckb_transaction_id, - cell_input_id: cell_input.id, - ) - - previous_cell_output = cell_input.output - # check whether previous_cell_output utxo consumed - if previous_cell_output.dead? && previous_vout.binding? - previous_vout.update!(status: "normal", consumed_by_id: tx.id) - end - end - end - - def build_address!(address_hash, cell_output) - created_at = Time.at((cell_output.block_timestamp / 1000).to_i).in_time_zone - bitcoin_address = BitcoinAddress.create_with(created_at:).find_or_create_by!(address_hash:) - BitcoinAddressMapping. - create_with(bitcoin_address_id: bitcoin_address.id). - find_or_create_by!(ckb_address_id: cell_output.address_id) - - bitcoin_address - end - - def fetch_raw_transaction(txid) - data = Rails.cache.read(txid) - data ||= rpc.getrawtransaction(txid, 2) - Rails.cache.write(txid, data, expires_in: 10.minutes) unless Rails.cache.exist?(txid) - data["result"] - rescue StandardError => e - Rails.logger.error "get bitcoin raw transaction #{txid} failed: #{e}" - nil - end - - def rpc - @rpc ||= Bitcoin::Rpc.instance - end -end diff --git a/app/workers/generate_udt_holder_allocation_worker.rb b/app/workers/generate_udt_holder_allocation_worker.rb index 422056776..3442308bf 100644 --- a/app/workers/generate_udt_holder_allocation_worker.rb +++ b/app/workers/generate_udt_holder_allocation_worker.rb @@ -1,6 +1,5 @@ class GenerateUdtHolderAllocationWorker - include Sidekiq::Worker - sidekiq_options retry: 3 + include Sidekiq::Job def perform(type_hashes = nil) type_hashes ||= $redis.smembers("udt_holder_allocation") diff --git a/lib/scheduler.rb b/lib/scheduler.rb index 52b44683b..1f4465e57 100644 --- a/lib/scheduler.rb +++ b/lib/scheduler.rb @@ -129,7 +129,7 @@ def call_worker(clz) call_worker TokenCollectionTagWorker end -s.every "1h", overlap: false do +s.every "10m", overlap: false do call_worker GenerateUdtHolderAllocationWorker end