diff --git a/packages/graphql/database/1.sql b/packages/graphql/database/1.sql new file mode 100644 index 000000000..0729e1cfc --- /dev/null +++ b/packages/graphql/database/1.sql @@ -0,0 +1,14 @@ +CREATE TABLE indexer.accounts ( + _id SERIAL PRIMARY KEY, + account_id character varying(66) NOT NULL UNIQUE, + balance BIGINT NOT NULL DEFAULT 0, + transaction_count INTEGER NOT NULL DEFAULT 0, + data jsonb NOT NULL DEFAULT '{}', + first_transaction_timestamp timestamp without time zone NOT NULL, + recent_transaction_timestamp timestamp without time zone NOT NULL +); +CREATE UNIQUE INDEX ON indexer.accounts(_id); +CREATE UNIQUE INDEX ON indexer.accounts(account_id); +CREATE INDEX ON indexer.accounts(transaction_count); +CREATE INDEX ON indexer.accounts(recent_transaction_timestamp); +CREATE INDEX ON indexer.accounts(first_transaction_timestamp); \ No newline at end of file diff --git a/packages/graphql/src/application/uc/NewAddBlockRange.ts b/packages/graphql/src/application/uc/NewAddBlockRange.ts index 93f7bd5cb..98f2e602a 100644 --- a/packages/graphql/src/application/uc/NewAddBlockRange.ts +++ b/packages/graphql/src/application/uc/NewAddBlockRange.ts @@ -12,8 +12,11 @@ import { import Block from '~/infra/dao/Block'; import Transaction from '~/infra/dao/Transaction'; import { DatabaseConnection } from '~/infra/database/DatabaseConnection'; +import { AccountEntity } from '../../domain/Account/AccountEntity'; +import { AccountDAO } from '../../infra/dao/AccountDAO'; export default class NewAddBlockRange { + private accountDAO = new AccountDAO(); async execute(input: Input) { const { from, to } = input; logger.info(`🔗 Syncing blocks: #${from} - #${to}`); @@ -27,6 +30,7 @@ export default class NewAddBlockRange { for (const blockData of blocksData) { const queries: { statement: string; params: any }[] = []; const block = new Block({ data: blockData }); + const blockTransactionTime = block.timestamp; queries.push({ statement: 'insert into indexer.blocks (_id, id, timestamp, data, gas_used, producer) values ($1, $2, $3, $4, $5, $6) on conflict do nothing', @@ -104,6 +108,52 @@ export default class NewAddBlockRange { } } } + // New code starts here: Fetch and save account data + const owners = this.extractUniqueOwners(blockData.transactions); + for (const owner of owners) { + // Fetch existing account if present + const existingAccount = await this.accountDAO.getAccountById(owner); + const transactionCountIncrement = blockData.transactions.filter((tx) => + tx.inputs?.some( + (input) => + input.__typename === 'InputCoin' && input.owner === owner, + ), + ).length; + + let newData: any; + let newBalance: bigint; + + if (existingAccount) { + // Increment transaction count by the number of transactions found in the current range + await this.accountDAO.incrementTransactionCount( + owner, + blockTransactionTime, + transactionCountIncrement, + ); + + newData = await this.fetchAccountDataFromGraphQL(owner); + newBalance = await this.fetchBalance(owner); + + await this.accountDAO.updateAccountBalance(owner, newBalance); + await this.accountDAO.updateAccountData( + owner, + newData, + blockTransactionTime, + ); + } else { + newBalance = await this.fetchBalance(owner); + newData = await this.fetchAccountDataFromGraphQL(owner); + + const newAccount = AccountEntity.create({ + account_id: owner, + balance: newBalance, + transactionCount: transactionCountIncrement, + data: newData, + first_transaction_timestamp: blockTransactionTime, + }); + await this.accountDAO.save(newAccount, blockTransactionTime); + } + } await connection.executeTransaction(queries); } const end = performance.now(); @@ -178,6 +228,60 @@ export default class NewAddBlockRange { } return accounts; } + + private async fetchBalance(owner: string): Promise { + const response = await client.sdk.balance({ + owner, + assetId: + '0xf8f8b6283d7fa5b672b530cbb84fcccb4ff8dc40f8176ef4544ddb1f1952ad07', + }); + return BigInt(response.data.balance.amount); + } + + // New method to extract unique owners + extractUniqueOwners(transactions: GQLTransaction[]): string[] { + const owners = new Set(); + for (const tx of transactions) { + if (tx.inputs) { + for (const input of tx.inputs) { + if (input.__typename === 'InputCoin' && input.owner) { + owners.add(input.owner); + } + } + } + } + return Array.from(owners); + } + + // New method to fetch account data from GraphQL + async fetchAccountDataFromGraphQL(owner: string): Promise { + const allBalances: any[] = []; + let hasNextPage = true; + let after: string | null = null; + + while (hasNextPage) { + const response = await client.sdk.balances({ + filter: { owner }, + first: 1000, // Fetch 1000 records at a time + after, // Use the 'after' cursor for pagination + }); + + if (response.data?.balances?.nodes) { + // Map the nodes to the desired structure and append to allBalances + const nodes = response.data.balances.nodes.map((node: any) => ({ + amount: BigInt(node.amount), + assetId: node.assetId, + })); + allBalances.push(...nodes); + } + + // Check if there is a next page and update the 'after' cursor + hasNextPage = response.data?.balances?.pageInfo?.hasNextPage || false; + after = response.data?.balances?.pageInfo?.endCursor || null; + } + + return allBalances; + } } type Input = { diff --git a/packages/graphql/src/domain/Account/AccountEntity.ts b/packages/graphql/src/domain/Account/AccountEntity.ts new file mode 100644 index 000000000..4c58d7136 --- /dev/null +++ b/packages/graphql/src/domain/Account/AccountEntity.ts @@ -0,0 +1,73 @@ +import { Hash256 } from '../../application/vo/Hash256'; +import { Entity } from '../../core/Entity'; +import { AccountBalance } from './vo/AccountBalance'; +import { AccountData } from './vo/AccountData'; +import { AccountModelID } from './vo/AccountModelID'; + +type AccountInputProps = { + account_id: Hash256; + balance: AccountBalance; + data: AccountData; + transactionCount: number; +}; + +export class AccountEntity extends Entity { + // Adjust the constructor to not require an ID initially + static create(account: any) { + const account_id = Hash256.create(account.account_id); + const balance = AccountBalance.create(account.balance); + const data = AccountData.create(account.data); + const transactionCount = account.transactionCount || 0; + + const props: AccountInputProps = { + account_id, + balance, + data, + transactionCount, + }; + + // If _id is not provided, set it as undefined + const id = account._id ? AccountModelID.create(account._id) : undefined; + + return new AccountEntity(props, id); + } + + static toDBItem(account: AccountEntity): any { + return { + account_id: account.props.account_id.value(), + balance: account.props.balance.value().toString(), + data: AccountEntity.serializeData(account.props.data.value()), + transaction_count: account.props.transactionCount, + }; + } + + static serializeData(data: any): string { + return JSON.stringify(data, (_, value) => + typeof value === 'bigint' ? value.toString() : value, + ); + } + + get cursor() { + return this.id ? this.id.value() : null; + } + + get id() { + return this._id; + } + + get account_id() { + return this.props.account_id.value(); + } + + get balance() { + return this.props.balance.value(); + } + + get transactionCount() { + return this.props.transactionCount; + } + + get data() { + return this.props.data.value(); + } +} diff --git a/packages/graphql/src/domain/Account/vo/AccountBalance.ts b/packages/graphql/src/domain/Account/vo/AccountBalance.ts new file mode 100644 index 000000000..be538e838 --- /dev/null +++ b/packages/graphql/src/domain/Account/vo/AccountBalance.ts @@ -0,0 +1,27 @@ +import { bigint as DrizzleBigint } from 'drizzle-orm/pg-core'; +import { ValueObject } from '../../../core/ValueObject'; +interface Props { + value: bigint; +} + +export class AccountBalance extends ValueObject { + private constructor(props: Props) { + super(props); + } + + static type() { + return DrizzleBigint('balance', { mode: 'bigint' }).notNull(); + } + + static create(value: bigint) { + return new AccountBalance({ value }); + } + + value() { + return this.props.value; + } + + add(amount: bigint): AccountBalance { + return new AccountBalance({ value: this.value() + amount }); + } +} diff --git a/packages/graphql/src/domain/Account/vo/AccountData.ts b/packages/graphql/src/domain/Account/vo/AccountData.ts new file mode 100644 index 000000000..080d74b73 --- /dev/null +++ b/packages/graphql/src/domain/Account/vo/AccountData.ts @@ -0,0 +1,24 @@ +import { jsonb } from 'drizzle-orm/pg-core'; +import { ValueObject } from '../../../core/ValueObject'; + +interface Props { + value: any; +} + +export class AccountData extends ValueObject { + private constructor(props: Props) { + super(props); + } + + static type() { + return jsonb('data').notNull(); + } + + static create(value: any) { + return new AccountData({ value }); + } + + value() { + return this.props.value; + } +} diff --git a/packages/graphql/src/domain/Account/vo/AccountModelID.ts b/packages/graphql/src/domain/Account/vo/AccountModelID.ts new file mode 100644 index 000000000..e5d6513f4 --- /dev/null +++ b/packages/graphql/src/domain/Account/vo/AccountModelID.ts @@ -0,0 +1,20 @@ +import { integer } from 'drizzle-orm/pg-core'; +import { Identifier } from '../../../core/Identifier'; + +export class AccountModelID extends Identifier { + private constructor(id: number) { + super(id); + } + + static type() { + return integer('_id').primaryKey(); + } + + static create(id: number): AccountModelID { + if (typeof id !== 'number' || Number.isNaN(id)) { + throw new Error('Invalid ID: ID must be a valid number.'); + } + + return new AccountModelID(id); + } +} diff --git a/packages/graphql/src/domain/Account/vo/AccountRef.ts b/packages/graphql/src/domain/Account/vo/AccountRef.ts new file mode 100644 index 000000000..ca4f4474e --- /dev/null +++ b/packages/graphql/src/domain/Account/vo/AccountRef.ts @@ -0,0 +1,18 @@ +import { ValueObject } from '../../../core/ValueObject'; +interface Props { + value: number; +} + +export class AccountRef extends ValueObject { + private constructor(props: Props) { + super(props); + } + + static create(id: number) { + return new AccountRef({ value: id }); + } + + value() { + return this.props.value; + } +} diff --git a/packages/graphql/src/infra/dao/AccountDAO.ts b/packages/graphql/src/infra/dao/AccountDAO.ts new file mode 100644 index 000000000..e04509955 --- /dev/null +++ b/packages/graphql/src/infra/dao/AccountDAO.ts @@ -0,0 +1,132 @@ +import { AccountEntity } from '../../domain/Account/AccountEntity'; +import { DatabaseConnection } from '../database/DatabaseConnection'; + +export class AccountDAO { + private databaseConnection: DatabaseConnection; + + constructor() { + this.databaseConnection = DatabaseConnection.getInstance(); + } + + // Custom function to stringify BigInt values + private stringifyBigInt(data: any): string { + return JSON.stringify(data, (_key, value) => + typeof value === 'bigint' ? value.toString() : value, + ); + } + + async save(account: AccountEntity, blockTransactionTime: Date) { + const accountData = AccountEntity.toDBItem(account); + + const balance = accountData.balance.toString(); + const data = this.stringifyBigInt(accountData.data); + + // Use raw SQL query to insert or update the account record + await this.databaseConnection.query( + ` + INSERT INTO indexer.accounts (account_id, balance, transaction_count, data, first_transaction_timestamp, recent_transaction_timestamp) + VALUES ($1, $2, $3, $4, $5, $6) + ON CONFLICT (account_id) + DO UPDATE SET + balance = EXCLUDED.balance, + transaction_count = EXCLUDED.transaction_count, + data = EXCLUDED.data, + recent_transaction_timestamp = CASE + WHEN accounts.transaction_count <> EXCLUDED.transaction_count THEN EXCLUDED.recent_transaction_timestamp + ELSE accounts.recent_transaction_timestamp + END + `, + [ + accountData.account_id, + balance, + accountData.transaction_count, + data, + blockTransactionTime.toISOString(), + blockTransactionTime.toISOString(), + ], + ); + } + + async getAccountById(id: string): Promise { + const result = await this.databaseConnection.query( + ` + SELECT * FROM indexer.accounts WHERE account_id = $1 + `, + [id], + ); + + return result.length ? AccountEntity.create(result[0]) : null; + } + + async incrementTransactionCount( + account_id: string, + blockTransactionTime: Date, + incrementBy = 1, + ) { + await this.databaseConnection.query( + ` + UPDATE indexer.accounts + SET transaction_count = transaction_count + $1, + recent_transaction_timestamp = $2 + WHERE account_id = $3 + `, + [incrementBy, blockTransactionTime.toISOString(), account_id], + ); + } + + // Updated method to update account data with BigInt handling + async updateAccountData( + account_id: string, + newData: any, + blockTransactionTime: Date, + ) { + const data = this.stringifyBigInt(newData); // Use custom function for BigInt serialization + + await this.databaseConnection.query( + ` + UPDATE indexer.accounts + SET data = $1, + recent_transaction_timestamp = $2 + WHERE account_id = $3 + `, + [data, blockTransactionTime.toISOString(), account_id], + ); + } + + async updateAccountTransactionCount( + account_id: string, + newTransactionCount: number, + blockTransactionTime: Date, + ) { + await this.databaseConnection.query( + ` + UPDATE indexer.accounts + SET transaction_count = $1, + recent_transaction_timestamp = $2 + WHERE account_id = $3 + `, + [newTransactionCount, blockTransactionTime.toISOString(), account_id], + ); + } + + async updateAccountBalance(account_id: string, newBalance: bigint) { + await this.databaseConnection.query( + ` + UPDATE indexer.accounts SET balance = $1 WHERE account_id = $2 + `, + [newBalance.toString(), account_id], // Convert BigInt to string + ); + } + + // New method to get account data content + async getAccountDataContent(account_id: string): Promise { + const result = await this.databaseConnection.query( + ` + SELECT data FROM indexer.accounts WHERE account_id = $1 + `, + [account_id], + ); + + return result.length ? result[0].data : null; + } +}