diff --git a/ansible/config.yaml b/ansible/config.yaml index 9fb12b9..0d06ba8 100644 --- a/ansible/config.yaml +++ b/ansible/config.yaml @@ -4,6 +4,7 @@ application_name: ton-index-go # ADJUST. For additional args call build/ton-index-go -help bind_addr: ":4100" pg_dsn: postgresql://localhost:5432/mainnet +pg_master_dsn: postgresql://localhost:5432/mainnet additional_args: # deploy config @@ -11,8 +12,13 @@ remote_hosts: - ubuntu-01 - ubuntu-02 - ubuntu-03 +master_nodes: + - master-01 service_user: root service_group: root go_binary: ton-index-go go_binary_path: /usr/local/bin systemd_service_path: /etc/systemd/system + +metadata_fetcher_binary: metadata-fetcher +metadata_fetcher_additional_args: diff --git a/ansible/service.yaml.j2 b/ansible/index_api.yaml.j2 similarity index 89% rename from ansible/service.yaml.j2 rename to ansible/index_api.yaml.j2 index 629872e..1f1d343 100644 --- a/ansible/service.yaml.j2 +++ b/ansible/index_api.yaml.j2 @@ -6,7 +6,7 @@ After = network.target Type = simple Restart = always RestartSec = 1 -ExecStart={{ go_binary_path }}/{{ go_binary }} -bind "{{ bind_addr }}" -pg {{ pg_dsn }}?application_name={{ application_name }} {{ additional_args }} +ExecStart={{ go_binary_path }}/{{ go_binary }} -bind "{{ bind_addr }}" -pg {{ pg_dsn }}?application_name={{ application_name }} -pg-master "{{ pg_master_dsn }}" {{ additional_args }} ExecStopPost = /bin/echo "{{ application_name }} service down" User = {{ service_user }} Group = {{ service_group }} diff --git a/ansible/inventory.yaml b/ansible/inventory.yaml index 113cc2f..7be1059 100644 --- a/ansible/inventory.yaml +++ b/ansible/inventory.yaml @@ -9,9 +9,5 @@ all: ansible_host: 192.168.66.2 ubuntu-03: ansible_host: 192.168.66.3 - -api-nodes: - hosts: - ubuntu-01: - ubuntu-02: - ubuntu-03: + master-01: + ansible_host: 192.168.69.1 diff --git a/ansible/metadata_fetcher.yaml.j2 b/ansible/metadata_fetcher.yaml.j2 new file mode 100644 index 0000000..99699e6 --- /dev/null +++ b/ansible/metadata_fetcher.yaml.j2 @@ -0,0 +1,18 @@ +[Unit] +Description = "{{ application_name }}-metadata-fetcher service" +After = network.target + +[Service] +Type = simple +Restart = always +RestartSec = 1 +ExecStart={{ go_binary_path }}/{{ metadata_fetcher_binary }} -pg {{ pg_master_dsn or pg_dsn }}?application_name={{ application_name }}-metadata-fetcher {{ metadata_fetcher_additional_args }} +ExecStopPost = /bin/echo "{{ application_name }}-metadata-fetcher service down" +User = {{ service_user }} +Group = {{ service_group }} +LimitNOFILE = infinity +LimitNPROC = infinity +LimitMEMLOCK = infinity + +[Install] +WantedBy = multi-user.target diff --git a/docs/docs.go b/docs/docs.go index 2c5690e..cf6eaa0 100644 --- a/docs/docs.go +++ b/docs/docs.go @@ -1342,6 +1342,57 @@ const docTemplate = `{ } } }, + "/api/v3/metadata": { + "get": { + "security": [ + { + "APIKeyHeader": [] + }, + { + "APIKeyQuery": [] + } + ], + "description": "Query address metadata", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "accounts" + ], + "summary": "Metadata", + "operationId": "api_v3_get_metadata", + "parameters": [ + { + "type": "array", + "items": { + "type": "string" + }, + "collectionFormat": "multi", + "description": "List of addresses in any form to get address metadata. Max: 1024.", + "name": "address", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/Metadata" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/RequestError" + } + } + } + } + }, "/api/v3/nft/collections": { "get": { "security": [ @@ -2262,6 +2313,9 @@ const docTemplate = `{ }, "address_book": { "$ref": "#/definitions/AddressBook" + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2366,6 +2420,9 @@ const docTemplate = `{ }, "address_book": { "$ref": "#/definitions/AddressBook" + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2383,6 +2440,20 @@ const docTemplate = `{ } } }, + "AddressMetadata": { + "type": "object", + "properties": { + "is_indexed": { + "type": "boolean" + }, + "token_info": { + "type": "array", + "items": { + "$ref": "#/definitions/TokenInfo" + } + } + } + }, "Block": { "type": "object", "properties": { @@ -2701,6 +2772,9 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/Event" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2757,6 +2831,9 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/JettonBurn" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2805,6 +2882,9 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/JettonMaster" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2870,6 +2950,9 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/JettonTransfer" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2911,6 +2994,9 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/JettonWallet" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -3006,9 +3092,18 @@ const docTemplate = `{ "items": { "$ref": "#/definitions/Message" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, + "Metadata": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/AddressMetadata" + } + }, "MsgSize": { "type": "object", "properties": { @@ -3056,6 +3151,9 @@ const docTemplate = `{ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_collections": { "type": "array", "items": { @@ -3107,6 +3205,9 @@ const docTemplate = `{ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_items": { "type": "array", "items": { @@ -3169,6 +3270,9 @@ const docTemplate = `{ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_transfers": { "type": "array", "items": { @@ -3403,6 +3507,30 @@ const docTemplate = `{ } } }, + "TokenInfo": { + "type": "object", + "properties": { + "description": { + "type": "string" + }, + "extra": { + "type": "object", + "additionalProperties": true + }, + "image": { + "type": "string" + }, + "name": { + "type": "string" + }, + "symbol": { + "type": "string" + }, + "type": { + "type": "string" + } + } + }, "TraceNode": { "type": "object", "properties": { @@ -3733,6 +3861,9 @@ const docTemplate = `{ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "wallets": { "type": "array", "items": { diff --git a/docs/swagger.json b/docs/swagger.json index 604c675..5777287 100644 --- a/docs/swagger.json +++ b/docs/swagger.json @@ -1334,6 +1334,57 @@ } } }, + "/api/v3/metadata": { + "get": { + "security": [ + { + "APIKeyHeader": [] + }, + { + "APIKeyQuery": [] + } + ], + "description": "Query address metadata", + "consumes": [ + "application/json" + ], + "produces": [ + "application/json" + ], + "tags": [ + "accounts" + ], + "summary": "Metadata", + "operationId": "api_v3_get_metadata", + "parameters": [ + { + "type": "array", + "items": { + "type": "string" + }, + "collectionFormat": "multi", + "description": "List of addresses in any form to get address metadata. Max: 1024.", + "name": "address", + "in": "query", + "required": true + } + ], + "responses": { + "200": { + "description": "OK", + "schema": { + "$ref": "#/definitions/Metadata" + } + }, + "400": { + "description": "Bad Request", + "schema": { + "$ref": "#/definitions/RequestError" + } + } + } + } + }, "/api/v3/nft/collections": { "get": { "security": [ @@ -2254,6 +2305,9 @@ }, "address_book": { "$ref": "#/definitions/AddressBook" + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2358,6 +2412,9 @@ }, "address_book": { "$ref": "#/definitions/AddressBook" + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2375,6 +2432,20 @@ } } }, + "AddressMetadata": { + "type": "object", + "properties": { + "is_indexed": { + "type": "boolean" + }, + "token_info": { + "type": "array", + "items": { + "$ref": "#/definitions/TokenInfo" + } + } + } + }, "Block": { "type": "object", "properties": { @@ -2693,6 +2764,9 @@ "items": { "$ref": "#/definitions/Event" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2749,6 +2823,9 @@ "items": { "$ref": "#/definitions/JettonBurn" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2797,6 +2874,9 @@ "items": { "$ref": "#/definitions/JettonMaster" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2862,6 +2942,9 @@ "items": { "$ref": "#/definitions/JettonTransfer" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2903,6 +2986,9 @@ "items": { "$ref": "#/definitions/JettonWallet" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, @@ -2998,9 +3084,18 @@ "items": { "$ref": "#/definitions/Message" } + }, + "metadata": { + "$ref": "#/definitions/Metadata" } } }, + "Metadata": { + "type": "object", + "additionalProperties": { + "$ref": "#/definitions/AddressMetadata" + } + }, "MsgSize": { "type": "object", "properties": { @@ -3048,6 +3143,9 @@ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_collections": { "type": "array", "items": { @@ -3099,6 +3197,9 @@ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_items": { "type": "array", "items": { @@ -3161,6 +3262,9 @@ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "nft_transfers": { "type": "array", "items": { @@ -3395,6 +3499,30 @@ } } }, + "TokenInfo": { + "type": "object", + "properties": { + "description": { + "type": "string" + }, + "extra": { + "type": "object", + "additionalProperties": true + }, + "image": { + "type": "string" + }, + "name": { + "type": "string" + }, + "symbol": { + "type": "string" + }, + "type": { + "type": "string" + } + } + }, "TraceNode": { "type": "object", "properties": { @@ -3725,6 +3853,9 @@ "address_book": { "$ref": "#/definitions/AddressBook" }, + "metadata": { + "$ref": "#/definitions/Metadata" + }, "wallets": { "type": "array", "items": { diff --git a/docs/swagger.yaml b/docs/swagger.yaml index 17e088f..445cadc 100644 --- a/docs/swagger.yaml +++ b/docs/swagger.yaml @@ -59,6 +59,8 @@ definitions: type: array address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' type: object Action: properties: @@ -129,6 +131,8 @@ definitions: type: array address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' type: object AddressBook: additionalProperties: @@ -139,6 +143,15 @@ definitions: user_friendly: type: string type: object + AddressMetadata: + properties: + is_indexed: + type: boolean + token_info: + items: + $ref: '#/definitions/TokenInfo' + type: array + type: object Block: properties: after_merge: @@ -354,6 +367,8 @@ definitions: items: $ref: '#/definitions/Event' type: array + metadata: + $ref: '#/definitions/Metadata' type: object JettonBurn: properties: @@ -391,6 +406,8 @@ definitions: items: $ref: '#/definitions/JettonBurn' type: array + metadata: + $ref: '#/definitions/Metadata' type: object JettonMaster: properties: @@ -423,6 +440,8 @@ definitions: items: $ref: '#/definitions/JettonMaster' type: array + metadata: + $ref: '#/definitions/Metadata' type: object JettonTransfer: properties: @@ -466,6 +485,8 @@ definitions: items: $ref: '#/definitions/JettonTransfer' type: array + metadata: + $ref: '#/definitions/Metadata' type: object JettonWallet: properties: @@ -493,6 +514,8 @@ definitions: items: $ref: '#/definitions/JettonWallet' type: array + metadata: + $ref: '#/definitions/Metadata' type: object MasterchainInfo: properties: @@ -557,6 +580,12 @@ definitions: items: $ref: '#/definitions/Message' type: array + metadata: + $ref: '#/definitions/Metadata' + type: object + Metadata: + additionalProperties: + $ref: '#/definitions/AddressMetadata' type: object MsgSize: properties: @@ -590,6 +619,8 @@ definitions: properties: address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' nft_collections: items: $ref: '#/definitions/NFTCollection' @@ -624,6 +655,8 @@ definitions: properties: address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' nft_items: items: $ref: '#/definitions/NFTItem' @@ -665,6 +698,8 @@ definitions: properties: address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' nft_transfers: items: $ref: '#/definitions/NFTTransfer' @@ -820,6 +855,22 @@ definitions: example: "0" type: string type: object + TokenInfo: + properties: + description: + type: string + extra: + additionalProperties: true + type: object + image: + type: string + name: + type: string + symbol: + type: string + type: + type: string + type: object TraceNode: properties: children: @@ -1037,6 +1088,8 @@ definitions: properties: address_book: $ref: '#/definitions/AddressBook' + metadata: + $ref: '#/definitions/Metadata' wallets: items: $ref: '#/definitions/WalletState' @@ -1929,6 +1982,39 @@ paths: summary: Get messages tags: - blockchain + /api/v3/metadata: + get: + consumes: + - application/json + description: Query address metadata + operationId: api_v3_get_metadata + parameters: + - collectionFormat: multi + description: 'List of addresses in any form to get address metadata. Max: + 1024.' + in: query + items: + type: string + name: address + required: true + type: array + produces: + - application/json + responses: + "200": + description: OK + schema: + $ref: '#/definitions/Metadata' + "400": + description: Bad Request + schema: + $ref: '#/definitions/RequestError' + security: + - APIKeyHeader: [] + - APIKeyQuery: [] + summary: Metadata + tags: + - accounts /api/v3/nft/collections: get: consumes: diff --git a/go.mod b/go.mod index 3f28cb2..4d8fcac 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( github.com/gofiber/fiber/v2 v2.52.5 github.com/gofiber/swagger v1.1.0 github.com/jackc/pgx/v5 v5.6.0 + github.com/lib/pq v1.10.9 github.com/swaggo/swag v1.16.3 github.com/xssnick/tonutils-go v1.9.9 ) diff --git a/go.sum b/go.sum index 048a1b5..d9d28e7 100644 --- a/go.sum +++ b/go.sum @@ -70,6 +70,8 @@ github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORN github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= +github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= +github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA= diff --git a/index/crud.go b/index/crud.go index 368de7e..3291504 100644 --- a/index/crud.go +++ b/index/crud.go @@ -3,6 +3,7 @@ package index import ( "context" "fmt" + "github.com/lib/pq" "log" "reflect" "sort" @@ -1223,6 +1224,71 @@ func queryAdjacentTransactionsImpl(req AdjacentTransactionRequest, conn *pgxpool return txs, nil } +func queryMetadataImpl(addr_list []string, conn *pgxpool.Conn, settings RequestSettings) (Metadata, error) { + query := "select n.address, m.valid, 'nft_items' as type, m.name, m.symbol, m.description, m.image, m.extra from nft_items n left join address_metadata m on n.address = m.address and m.type = 'nft_items' where n.address = ANY($1)" + + " union all " + + "select c.address, m.valid, 'nft_collections' as type, m.name, m.symbol, m.description, m.image, m.extra from nft_collections c left join address_metadata m on c.address = m.address and m.type = 'nft_collections' where c.address = ANY($1)" + + " union all " + + "select j.address, m.valid, 'jetton_masters' as type, m.name, m.symbol, m.description, m.image, m.extra from jetton_masters j left join address_metadata m on j.address = m.address and m.type = 'jetton_masters' where j.address = ANY($1)" + + ctx, cancel_ctx := context.WithTimeout(context.Background(), settings.Timeout) + defer cancel_ctx() + rows, err := conn.Query(ctx, query, pq.Array(addr_list)) + if err != nil { + return nil, IndexError{Code: 500, Message: err.Error()} + } + + defer rows.Close() + + tasks := []BackgroundTask{} + token_info_map := map[string][]TokenInfo{} + + for rows.Next() { + var row TokenInfo + err := rows.Scan(&row.Address, &row.Valid, &row.Type, &row.Name, &row.Symbol, &row.Description, &row.Image, &row.Extra) + if err != nil { + return nil, IndexError{Code: 500, Message: err.Error()} + } + if row.Valid == nil { + data := map[string]interface{}{ + "address": row.Address, + "type": row.Type, + } + tasks = append(tasks, BackgroundTask{Type: "fetch_metadata", Data: data}) + token_info_map[row.Address] = append(token_info_map[row.Address], TokenInfo{ + Address: row.Address, + Type: row.Type, + Indexed: false, + }) + } else if *row.Valid { + row.Indexed = true + + if _, ok := token_info_map[*row.Type]; !ok { + token_info_map[row.Address] = []TokenInfo{} + } + token_info_map[row.Address] = append(token_info_map[row.Address], row) + } else { + token_info_map[row.Address] = []TokenInfo{} + } + } + metadata := Metadata{} + for addr, infos := range token_info_map { + indexed := true + for _, info := range infos { + indexed = indexed && info.Indexed + } + metadata[addr] = AddressMetadata{ + TokenInfo: infos, + IsIndexed: indexed, + } + } + + if len(tasks) > 0 && BackgroundTaskManager != nil { + BackgroundTaskManager.EnqueueTasksIfPossible(tasks) + } + return metadata, nil +} + func queryAddressBookImpl(addr_list []string, conn *pgxpool.Conn, settings RequestSettings) (AddressBook, error) { book := AddressBook{} quote_addr_list := []string{} @@ -1892,6 +1958,17 @@ func (db *DbClient) QueryShards( return queryBlocksImpl(query, conn, settings) } +func (db *DbClient) QueryMetadata( + addr_list []string, + settings RequestSettings, +) (Metadata, error) { + conn, err := db.Pool.Acquire(context.Background()) + if err != nil { + return nil, IndexError{Code: 500, Message: err.Error()} + } + return queryMetadataImpl(addr_list, conn, settings) +} + func (db *DbClient) QueryAddressBook( addr_list []string, settings RequestSettings, @@ -2054,28 +2131,29 @@ func (db *DbClient) QueryMessages( lt_req LtRequest, lim_req LimitRequest, settings RequestSettings, -) ([]Message, AddressBook, error) { +) ([]Message, AddressBook, Metadata, error) { query, err := buildMessagesQuery(msg_req, utime_req, lt_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() msgs, err := queryMessagesImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, m := range msgs { if m.Source != nil { @@ -2088,38 +2166,44 @@ func (db *DbClient) QueryMessages( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return msgs, book, nil + return msgs, book, metadata, nil } func (db *DbClient) QueryNFTCollections( nft_req NFTCollectionRequest, lim_req LimitRequest, settings RequestSettings, -) ([]NFTCollection, AddressBook, error) { +) ([]NFTCollection, AddressBook, Metadata, error) { query, err := buildNFTCollectionsQuery(nft_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryNFTCollectionsImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Address)) @@ -2130,38 +2214,43 @@ func (db *DbClient) QueryNFTCollections( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryNFTItems( nft_req NFTItemRequest, lim_req LimitRequest, settings RequestSettings, -) ([]NFTItem, AddressBook, error) { +) ([]NFTItem, AddressBook, Metadata, error) { query, err := buildNFTItemsQuery(nft_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryNFTItemsWithCollectionsImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Address)) @@ -2170,10 +2259,14 @@ func (db *DbClient) QueryNFTItems( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryNFTTransfers( @@ -2182,28 +2275,29 @@ func (db *DbClient) QueryNFTTransfers( lt_req LtRequest, lim_req LimitRequest, settings RequestSettings, -) ([]NFTTransfer, AddressBook, error) { +) ([]NFTTransfer, AddressBook, Metadata, error) { query, err := buildNFTTransfersQuery(transfer_req, utime_req, lt_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryNFTTransfersImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.NftItemAddress)) @@ -2217,38 +2311,43 @@ func (db *DbClient) QueryNFTTransfers( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryJettonMasters( jetton_req JettonMasterRequest, lim_req LimitRequest, settings RequestSettings, -) ([]JettonMaster, AddressBook, error) { +) ([]JettonMaster, AddressBook, Metadata, error) { query, err := buildJettonMastersQuery(jetton_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryJettonMastersImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Address)) @@ -2257,39 +2356,44 @@ func (db *DbClient) QueryJettonMasters( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryJettonWallets( jetton_req JettonWalletRequest, lim_req LimitRequest, settings RequestSettings, -) ([]JettonWallet, AddressBook, error) { +) ([]JettonWallet, AddressBook, Metadata, error) { query, err := buildJettonWalletsQuery(jetton_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryJettonWalletsImpl(query, conn, settings) if err != nil { log.Println(query) - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Address)) @@ -2299,10 +2403,14 @@ func (db *DbClient) QueryJettonWallets( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryJettonTransfers( @@ -2311,28 +2419,29 @@ func (db *DbClient) QueryJettonTransfers( lt_req LtRequest, lim_req LimitRequest, settings RequestSettings, -) ([]JettonTransfer, AddressBook, error) { +) ([]JettonTransfer, AddressBook, Metadata, error) { query, err := buildJettonTransfersQuery(transfer_req, utime_req, lt_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryJettonTransfersImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Source)) @@ -2346,10 +2455,14 @@ func (db *DbClient) QueryJettonTransfers( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryJettonBurns( @@ -2358,28 +2471,29 @@ func (db *DbClient) QueryJettonBurns( lt_req LtRequest, lim_req LimitRequest, settings RequestSettings, -) ([]JettonBurn, AddressBook, error) { +) ([]JettonBurn, AddressBook, Metadata, error) { query, err := buildJettonBurnsQuery(transfer_req, utime_req, lt_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryJettonBurnsImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { addr_list = append(addr_list, string(t.Owner)) @@ -2392,38 +2506,43 @@ func (db *DbClient) QueryJettonBurns( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryAccountStates( account_req AccountRequest, lim_req LimitRequest, settings RequestSettings, -) ([]AccountStateFull, AddressBook, error) { +) ([]AccountStateFull, AddressBook, Metadata, error) { query, err := buildAccountStatesQuery(account_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, err := queryAccountStateFullImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} addr_list := []string{} for _, t := range res { if t.AccountAddress != nil { @@ -2433,30 +2552,35 @@ func (db *DbClient) QueryAccountStates( if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + + return res, book, metadata, nil } func (db *DbClient) QueryWalletStates( account_req AccountRequest, lim_req LimitRequest, settings RequestSettings, -) ([]WalletState, AddressBook, error) { - states, book, err := db.QueryAccountStates(account_req, lim_req, settings) +) ([]WalletState, AddressBook, Metadata, error) { + states, book, metadata, err := db.QueryAccountStates(account_req, lim_req, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } res := []WalletState{} for _, state := range states { loc, err := ParseWalletState(state) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } res = append(res, *loc) } - return res, book, nil + return res, book, metadata, nil } func (db *DbClient) QueryTopAccountBalances(lim_req LimitRequest, settings RequestSettings) ([]AccountBalance, error) { @@ -2485,34 +2609,35 @@ func (db *DbClient) QueryActions( act_req ActionRequest, lim_req LimitRequest, settings RequestSettings, -) ([]Action, AddressBook, error) { +) ([]Action, AddressBook, Metadata, error) { query, err := buildActionsQuery(act_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() raw_actions, err := queryRawActionsImpl(query, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } actions := []Action{} book := AddressBook{} + metadata := Metadata{} addr_map := map[string]bool{} for idx := range raw_actions { collectAddressesFromAction(&addr_map, &raw_actions[idx]) action, err := ParseRawAction(&raw_actions[idx]) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } actions = append(actions, *action) } @@ -2523,10 +2648,15 @@ func (db *DbClient) QueryActions( } book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return actions, book, nil + return actions, book, metadata, nil } func (db *DbClient) QueryEvents( @@ -2535,36 +2665,41 @@ func (db *DbClient) QueryEvents( lt_req LtRequest, lim_req LimitRequest, settings RequestSettings, -) ([]Event, AddressBook, error) { +) ([]Event, AddressBook, Metadata, error) { query, err := buildEventsQuery(event_req, utime_req, lt_req, lim_req, settings) if settings.DebugRequest { log.Println("Debug query:", query) } // log.Println(query) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } // read data conn, err := db.Pool.Acquire(context.Background()) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } defer conn.Release() res, addr_list, err := queryEventsImpl(query, conn, settings) if err != nil { log.Println(query) - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } book := AddressBook{} + metadata := Metadata{} if len(addr_list) > 0 { book, err = queryAddressBookImpl(addr_list, conn, settings) if err != nil { - return nil, nil, IndexError{Code: 500, Message: err.Error()} + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} + } + metadata, err = queryMetadataImpl(addr_list, conn, settings) + if err != nil { + return nil, nil, nil, IndexError{Code: 500, Message: err.Error()} } } - return res, book, nil + return res, book, metadata, nil } diff --git a/index/models.go b/index/models.go index b2da343..7a6b587 100644 --- a/index/models.go +++ b/index/models.go @@ -39,6 +39,30 @@ type AddressBookRow struct { } // @name AddressBookRow type AddressBook map[string]AddressBookRow // @name AddressBook +type Metadata map[string]AddressMetadata // @name Metadata + +type BackgroundTask struct { + Type string + Retry int + Data map[string]interface{} +} + +type AddressMetadata struct { + IsIndexed bool `json:"is_indexed"` + TokenInfo []TokenInfo `json:"token_info"` +} // @name AddressMetadata + +type TokenInfo struct { + Address string `json:"-"` + Valid *bool `json:"-"` + Indexed bool `json:"-"` + Type *string `json:"type,omitempty"` + Name *string `json:"name,omitempty"` + Symbol *string `json:"symbol,omitempty"` + Description *string `json:"description,omitempty"` + Image *string `json:"image,omitempty"` + Extra map[string]interface{} `json:"extra,omitempty"` +} // @name TokenInfo type BlockId struct { Workchain int32 `json:"workchain"` diff --git a/index/response.go b/index/response.go index cb8c04a..0aa76ad 100644 --- a/index/response.go +++ b/index/response.go @@ -20,63 +20,74 @@ type TransactionsResponse struct { type MessagesResponse struct { Messages []Message `json:"messages"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name MessagesResponse type AccountStatesResponse struct { Accounts []AccountStateFull `json:"accounts"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name AccountStatesResponse type WalletStatesResponse struct { Wallets []WalletState `json:"wallets"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name WalletStatesResponse type NFTCollectionsResponse struct { Collections []NFTCollection `json:"nft_collections"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name NFTCollectionsResponse type NFTItemsResponse struct { Items []NFTItem `json:"nft_items"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name NFTItemsResponse type NFTTransfersResponse struct { Transfers []NFTTransfer `json:"nft_transfers"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name NFTTransfersResponse type JettonMastersResponse struct { Masters []JettonMaster `json:"jetton_masters"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name JettonMastersResponse type JettonWalletsResponse struct { Wallets []JettonWallet `json:"jetton_wallets"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name JettonWalletsResponse type JettonTransfersResponse struct { Transfers []JettonTransfer `json:"jetton_transfers"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name JettonTransfersResponse type JettonBurnsResponse struct { Burns []JettonBurn `json:"jetton_burns"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name JettonBurnsResponse type EventsResponse struct { Events []Event `json:"events"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name EventsResponse type ActionsResponse struct { Actions []Action `json:"actions"` AddressBook AddressBook `json:"address_book"` + Metadata Metadata `json:"metadata"` } // @name ActionsResponse - // errors type RequestError struct { Message string `json:"error"` diff --git a/index/tasks.go b/index/tasks.go new file mode 100644 index 0000000..dc858a3 --- /dev/null +++ b/index/tasks.go @@ -0,0 +1,88 @@ +package index + +import ( + "context" + "errors" + "log" +) + +var BackgroundTaskManager *TaskManager + +type TaskManager struct { + dbClient *DbClient + taskChannel chan []BackgroundTask +} + +func NewBackgroundTaskManager(pg_dsn string, channel_size int, min_conns int, max_conns int) (*TaskManager, error) { + client, err := NewDbClient(pg_dsn, min_conns, max_conns) + if err != nil { + return nil, err + } + + conn, err := client.Pool.Acquire(context.Background()) + if err != nil { + return nil, err + } + user := client.Pool.Config().ConnConfig.User + + row := conn.QueryRow(context.Background(), "SELECT has_table_privilege($1, 'fetch_metadata_tasks', 'INSERT, UPDATE, DELETE')", user) + var has_privilege bool + err = row.Scan(&has_privilege) + if err != nil { + return nil, err + } + if !has_privilege { + return nil, errors.New("user does not have required privileges on fetch_metadata_tasks table") + } + return &TaskManager{ + dbClient: client, + taskChannel: make(chan []BackgroundTask, channel_size), + }, nil +} +func (manager *TaskManager) Start(ctx context.Context) { + go manager.run(ctx) +} + +func (manager *TaskManager) run(ctx context.Context) { + for { + tasks := <-manager.taskChannel + conn, err := manager.dbClient.Pool.Acquire(ctx) + if err != nil { + log.Printf("Error acquiring connection to create tasks: %v", err) + manager.taskChannel <- tasks + } + tx, err := conn.Begin(ctx) + if err != nil { + log.Printf("Error beginning transaction to create tasks: %v", err) + manager.taskChannel <- tasks + } + tx_failed := false + for _, task := range tasks { + _, err := tx.Exec(ctx, "INSERT INTO background_tasks (type, data, status) "+ + "VALUES ($1, $2, 'ready') ON CONFLICT DO NOTHING", task.Type, task.Data) + if err != nil { + log.Printf("Error inserting task: %v", err) + tx.Rollback(ctx) + manager.taskChannel <- tasks + tx_failed = true + break + } + } + if !tx_failed { + err = tx.Commit(ctx) + if err != nil { + log.Printf("Error committing transaction to create tasks: %v", err) + manager.taskChannel <- tasks + } + } + } +} + +func (manager *TaskManager) EnqueueTasksIfPossible(tasks []BackgroundTask) bool { + select { + case manager.taskChannel <- tasks: + return true + default: + return false + } +} diff --git a/main.go b/main.go index 39caeed..7ae5c9f 100644 --- a/main.go +++ b/main.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "log" @@ -18,15 +19,18 @@ import ( ) type Settings struct { - PgDsn string - MaxConns int - MinConns int - Bind string - InstanceName string - Prefork bool - MaxThreads int - Debug bool - Request index.RequestSettings + PgDsn string + PgMasterDsn string + TaskChannelSize int + MaxConns int + MinConns int + MasterMaxConns int + Bind string + InstanceName string + Prefork bool + MaxThreads int + Debug bool + Request index.RequestSettings } func onlyOneOf(flags ...bool) bool { @@ -40,6 +44,7 @@ func onlyOneOf(flags ...bool) bool { } var pool *index.DbClient +var masterPool *index.DbClient var settings Settings // @title TON Index (Go) @@ -430,7 +435,7 @@ func GetMessages(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - msgs, book, err := pool.QueryMessages(msg_req, utime_req, lt_req, lim_req, request_settings) + msgs, book, metadata, err := pool.QueryMessages(msg_req, utime_req, lt_req, lim_req, request_settings) if err != nil { return err } @@ -438,7 +443,7 @@ func GetMessages(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "messages not found"} // } - msgs_resp := index.MessagesResponse{Messages: msgs, AddressBook: book} + msgs_resp := index.MessagesResponse{Messages: msgs, AddressBook: book, Metadata: metadata} return c.JSON(msgs_resp) } @@ -472,6 +477,36 @@ func GetAddressBook(c *fiber.Ctx) error { return c.JSON(book) } +// @summary Metadata +// +// @description Query address metadata +// +// @id api_v3_get_metadata +// @tags accounts +// @Accept json +// @Produce json +// @success 200 {object} index.Metadata +// @failure 400 {object} index.RequestError +// @param address query []string true "List of addresses in any form to get address metadata. Max: 1024." collectionFormat(multi) +// @router /api/v3/metadata [get] +// @security APIKeyHeader +// @security APIKeyQuery +func GetMetadata(c *fiber.Ctx) error { + request_settings := GetRequestSettings(c, &settings) + var addr_book_req index.AddressBookRequest + if err := c.QueryParser(&addr_book_req); err != nil { + return index.IndexError{Code: 422, Message: err.Error()} + } + if len(addr_book_req.Address) == 0 { + return index.IndexError{Code: 422, Message: "at least 1 address required"} + } + metadata, err := pool.QueryMetadata(addr_book_req.Address, request_settings) + if err != nil { + return err + } + return c.JSON(metadata) +} + // @summary Get Account States // // @description Query account states @@ -509,7 +544,7 @@ func GetAccountStates(c *fiber.Ctx) error { *account_req.IncludeBOC = true } - res, book, err := pool.QueryAccountStates(account_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryAccountStates(account_req, lim_req, request_settings) if err != nil { return index.IndexError{Code: 422, Message: err.Error()} } @@ -517,7 +552,7 @@ func GetAccountStates(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "account states not found"} // } - resp := index.AccountStatesResponse{Accounts: res, AddressBook: book} + resp := index.AccountStatesResponse{Accounts: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -550,12 +585,12 @@ func GetWalletStates(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: "address of account is required"} } - res, book, err := pool.QueryWalletStates(account_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryWalletStates(account_req, lim_req, request_settings) if err != nil { return index.IndexError{Code: 422, Message: err.Error()} } - resp := index.WalletStatesResponse{Wallets: res, AddressBook: book} + resp := index.WalletStatesResponse{Wallets: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -588,14 +623,14 @@ func GetNFTCollections(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryNFTCollections(nft_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryNFTCollections(nft_req, lim_req, request_settings) if err != nil { return err } // if len(res) == 0 { // return index.IndexError{Code: 404, Message: "nft collections not found"} // } - resp := index.NFTCollectionsResponse{Collections: res, AddressBook: book} + resp := index.NFTCollectionsResponse{Collections: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -630,14 +665,14 @@ func GetNFTItems(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryNFTItems(nft_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryNFTItems(nft_req, lim_req, request_settings) if err != nil { return err } // if len(res) == 0 { // return index.IndexError{Code: 404, Message: "nft items not found"} // } - resp := index.NFTItemsResponse{Items: res, AddressBook: book} + resp := index.NFTItemsResponse{Items: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -696,7 +731,7 @@ func GetNFTTransfers(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryNFTTransfers(transfer_req, utime_req, lt_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryNFTTransfers(transfer_req, utime_req, lt_req, lim_req, request_settings) if err != nil { return err } @@ -704,7 +739,7 @@ func GetNFTTransfers(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "nft transfers not found"} // } - resp := index.NFTTransfersResponse{Transfers: res, AddressBook: book} + resp := index.NFTTransfersResponse{Transfers: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -765,14 +800,14 @@ func GetJettonMasters(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryJettonMasters(jetton_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryJettonMasters(jetton_req, lim_req, request_settings) if err != nil { return err } // if len(res) == 0 { // return index.IndexError{Code: 404, Message: "jetton masters not found"} // } - resp := index.JettonMastersResponse{Masters: res, AddressBook: book} + resp := index.JettonMastersResponse{Masters: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -808,14 +843,14 @@ func GetJettonWallets(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryJettonWallets(jetton_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryJettonWallets(jetton_req, lim_req, request_settings) if err != nil { return err } // if len(res) == 0 { // return index.IndexError{Code: 404, Message: "jetton wallets not found"} // } - resp := index.JettonWalletsResponse{Wallets: res, AddressBook: book} + resp := index.JettonWalletsResponse{Wallets: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -874,7 +909,7 @@ func GetJettonTransfers(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryJettonTransfers(transfer_req, utime_req, lt_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryJettonTransfers(transfer_req, utime_req, lt_req, lim_req, request_settings) if err != nil { return err } @@ -882,7 +917,7 @@ func GetJettonTransfers(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "jetton transfers not found"} // } - resp := index.JettonTransfersResponse{Transfers: res, AddressBook: book} + resp := index.JettonTransfersResponse{Transfers: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -937,7 +972,7 @@ func GetJettonBurns(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryJettonBurns(burn_req, utime_req, lt_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryJettonBurns(burn_req, utime_req, lt_req, lim_req, request_settings) if err != nil { return err } @@ -945,7 +980,7 @@ func GetJettonBurns(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "jetton burns not found"} // } - resp := index.JettonBurnsResponse{Burns: res, AddressBook: book} + resp := index.JettonBurnsResponse{Burns: res, AddressBook: book, Metadata: metadata} return c.JSON(resp) } @@ -995,7 +1030,7 @@ func GetEvents(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: "only one of account, trace_id, tx_hash, msg_hash should be specified"} } - res, book, err := pool.QueryEvents(event_req, utime_req, lt_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryEvents(event_req, utime_req, lt_req, lim_req, request_settings) if err != nil { return err } @@ -1003,7 +1038,7 @@ func GetEvents(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "transactions not found"} // } - txs_resp := index.EventsResponse{Events: res, AddressBook: book} + txs_resp := index.EventsResponse{Events: res, AddressBook: book, Metadata: metadata} return c.JSON(txs_resp) } @@ -1033,7 +1068,7 @@ func GetActions(c *fiber.Ctx) error { return index.IndexError{Code: 422, Message: err.Error()} } - res, book, err := pool.QueryActions(act_req, lim_req, request_settings) + res, book, metadata, err := pool.QueryActions(act_req, lim_req, request_settings) if err != nil { return err } @@ -1041,7 +1076,7 @@ func GetActions(c *fiber.Ctx) error { // return index.IndexError{Code: 404, Message: "actions not found"} // } - resp := index.ActionsResponse{Actions: res, AddressBook: book} + resp := index.ActionsResponse{Actions: res, AddressBook: book, Metadata: metadata} return c.Status(200).JSON(resp) } @@ -1083,7 +1118,7 @@ func GetV2WalletInformation(c *fiber.Ctx) error { var res *index.V2WalletInformation if !use_v2 { account_req := index.AccountRequest{AccountAddress: []index.AccountAddress{acc_req.AccountAddress}} - loc, _, err := pool.QueryWalletStates(account_req, index.LimitRequest{}, request_settings) + loc, _, _, err := pool.QueryWalletStates(account_req, index.LimitRequest{}, request_settings) if err != nil { return err } @@ -1156,7 +1191,7 @@ func GetV2AddressInformation(c *fiber.Ctx) error { res = loc } else { account_req := index.AccountRequest{AccountAddress: []index.AccountAddress{acc_req.AccountAddress}} - loc, _, err := pool.QueryAccountStates(account_req, index.LimitRequest{}, request_settings) + loc, _, _, err := pool.QueryAccountStates(account_req, index.LimitRequest{}, request_settings) if err != nil { return err } @@ -1380,10 +1415,13 @@ func main() { var timeout_ms int flag.StringVar(&settings.PgDsn, "pg", "postgresql://localhost:5432", "PostgreSQL connection string") + flag.StringVar(&settings.PgMasterDsn, "pg-master", "", "PostgreSQL connection string with write access") flag.StringVar(&settings.Request.V2Endpoint, "v2", "", "TON HTTP API endpoint for proxied methods") flag.StringVar(&settings.Request.V2ApiKey, "v2-apikey", "", "API key for TON HTTP API endpoint") flag.IntVar(&settings.MaxConns, "maxconns", 100, "PostgreSQL max connections") flag.IntVar(&settings.MinConns, "minconns", 0, "PostgreSQL min connections") + flag.IntVar(&settings.MasterMaxConns, "master-maxconns", 16, "PostgreSQL master max connections") + flag.IntVar(&settings.TaskChannelSize, "task-channel-size", 5000, "Task channel size") flag.StringVar(&settings.Bind, "bind", ":8000", "Bind address") flag.StringVar(&settings.InstanceName, "name", "Go", "Instance name to show in Swagger UI") flag.BoolVar(&settings.Prefork, "prefork", false, "Prefork workers") @@ -1469,6 +1507,7 @@ func main() { // account methods app.Get("/api/v3/addressBook", GetAddressBook) + app.Get("/api/v3/metadata", GetMetadata) app.Get("/api/v3/accountStates", GetAccountStates) app.Get("/api/v3/walletStates", GetWalletStates) @@ -1508,6 +1547,22 @@ func main() { } app.Get("/api/v3/*", swagger.New(swagger_config)) app.Static("/", "./static") + index.BackgroundTaskManager, err = index.NewBackgroundTaskManager(settings.PgDsn, settings.TaskChannelSize, + 0, settings.MasterMaxConns) + if err != nil { + if len(settings.PgMasterDsn) == 0 { + log.Printf("Error creating background task manager: %s", err.Error()) + } else { + index.BackgroundTaskManager, err = index.NewBackgroundTaskManager(settings.PgMasterDsn, + settings.TaskChannelSize, 0, settings.MasterMaxConns) + if err != nil { + log.Printf("Error creating background task manager: %s", err.Error()) + } + } + } + if index.BackgroundTaskManager != nil { + index.BackgroundTaskManager.Start(context.Background()) + } err = app.Listen(settings.Bind) log.Fatal(err) } diff --git a/systemd-deploy.yaml b/systemd-deploy.yaml index 1f0d3ab..c638094 100644 --- a/systemd-deploy.yaml +++ b/systemd-deploy.yaml @@ -3,6 +3,7 @@ hosts: localhost vars: systemd_service_name: "{{application_name}}.service" + metadata_fetcher_systemd_service_name: "{{application_name}}-metadata-fetcher.service" tasks: - name: Build the Go application ansible.builtin.shell: | @@ -10,6 +11,7 @@ mkdir -p build swag init go build -o build/{{ go_binary }} + go build -C utils/metadata-fetcher -o ../../build/{{ metadata_fetcher_binary }} changed_when: true failed_when: false register: go_build_result @@ -29,15 +31,35 @@ delegate_to: "{{ item }}" with_items: "{{ remote_hosts }}" + - name: Copy the Go binary to remote hosts + ansible.builtin.copy: + src: "build/{{ metadata_fetcher_binary }}" + dest: "{{ go_binary_path }}/{{ metadata_fetcher_binary }}" + mode: "0777" + force: true + become: true + delegate_to: "{{ item }}" + with_items: "{{ master_nodes }}" + - name: Create systemd service from template if it doesn't exist ansible.builtin.template: - src: "ansible/service.yaml.j2" + src: "ansible/index_api.yaml.j2" dest: "{{ systemd_service_path }}/{{ systemd_service_name }}" mode: "0644" force: true become: true delegate_to: "{{ item }}" with_items: "{{ remote_hosts }}" + + - name: Create metadata fetcher systemd service from template if it doesn't exist + ansible.builtin.template: + src: "ansible/metadata_fetcher.yaml.j2" + dest: "{{ systemd_service_path }}/{{ metadata_fetcher_systemd_service_name }}" + mode: "0644" + force: true + become: true + delegate_to: "{{ item }}" + with_items: "{{ master_nodes }}" - name: Restart and enable the Go application service ansible.builtin.systemd: @@ -49,3 +71,14 @@ throttle: 1 delegate_to: "{{ item }}" with_items: "{{ remote_hosts }}" + + - name: Restart and enable the Go application service + ansible.builtin.systemd: + name: "{{ metadata_fetcher_systemd_service_name }}" + state: restarted + enabled: true + daemon_reload: true + become: true + throttle: 1 + delegate_to: "{{ item }}" + with_items: "{{ master_nodes }}" diff --git a/utils/metadata-fetcher/go.mod b/utils/metadata-fetcher/go.mod new file mode 100644 index 0000000..364b8c6 --- /dev/null +++ b/utils/metadata-fetcher/go.mod @@ -0,0 +1,16 @@ +module metadata-fetcher + +go 1.22.1 + +require ( + github.com/jackc/pgx/v5 v5.7.1 + golang.org/x/sync v0.8.0 +) + +require ( + github.com/jackc/pgpassfile v1.0.0 // indirect + github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 // indirect + github.com/jackc/puddle/v2 v2.2.2 // indirect + golang.org/x/crypto v0.27.0 // indirect + golang.org/x/text v0.18.0 // indirect +) diff --git a/utils/metadata-fetcher/go.sum b/utils/metadata-fetcher/go.sum new file mode 100644 index 0000000..345521e --- /dev/null +++ b/utils/metadata-fetcher/go.sum @@ -0,0 +1,28 @@ +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/jackc/pgpassfile v1.0.0 h1:/6Hmqy13Ss2zCq62VdNG8tM1wchn8zjSGOBJ6icpsIM= +github.com/jackc/pgpassfile v1.0.0/go.mod h1:CEx0iS5ambNFdcRtxPj5JhEz+xB6uRky5eyVu/W2HEg= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761 h1:iCEnooe7UlwOQYpKFhBabPMi4aNAfoODPEFNiAnClxo= +github.com/jackc/pgservicefile v0.0.0-20240606120523-5a60cdf6a761/go.mod h1:5TJZWKEWniPve33vlWYSoGYefn3gLQRzjfDlhSJ9ZKM= +github.com/jackc/pgx/v5 v5.7.1 h1:x7SYsPBYDkHDksogeSmZZ5xzThcTgRz++I5E+ePFUcs= +github.com/jackc/pgx/v5 v5.7.1/go.mod h1:e7O26IywZZ+naJtWWos6i6fvWK+29etgITqrqHLfoZA= +github.com/jackc/puddle/v2 v2.2.2 h1:PR8nw+E/1w0GLuRFSmiioY6UooMp6KJv0/61nB7icHo= +github.com/jackc/puddle/v2 v2.2.2/go.mod h1:vriiEXHvEE654aYKXXjOvZM39qJ0q+azkZFrfEOc3H4= +github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= +github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +golang.org/x/crypto v0.27.0 h1:GXm2NjJrPaiv/h1tb2UH8QfgC/hOf/+z0p6PT8o1w7A= +golang.org/x/crypto v0.27.0/go.mod h1:1Xngt8kV6Dvbssa53Ziq6Eqn0HqbZi5Z6R0ZpwQzt70= +golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= +golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/text v0.18.0 h1:XvMDiNzPAl0jr17s6W9lcaIhGUfUORdGCNsuLmPG224= +golang.org/x/text v0.18.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/utils/metadata-fetcher/main.go b/utils/metadata-fetcher/main.go new file mode 100644 index 0000000..3a1ff9c --- /dev/null +++ b/utils/metadata-fetcher/main.go @@ -0,0 +1,402 @@ +package main + +import ( + "context" + "encoding/json" + "flag" + "fmt" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "golang.org/x/sync/semaphore" + "io" + "log" + "math" + "net/http" + "time" +) + +var gate *semaphore.Weighted +var client *http.Client + +var max_retries int +var initial_backoff time.Duration +var backoff_multiplier float64 +var max_backoff time.Duration +var stalled_task_interval time.Duration + +type BackgroundTask struct { + Id int64 + Type string + Retry int + Data map[string]interface{} +} + +type FetchTask struct { + Address string + Type string + Retry int + TaskId int64 +} + +type AddressMetadata struct { + Address *string + Type *string + Name *string + Symbol *string + Description *string + Image *string + Extra map[string]interface{} +} + +func (receiver AddressMetadata) hasAnyData() bool { + return receiver.Name != nil || receiver.Description != nil || receiver.Image != nil +} + +func fetchTasks(ctx context.Context, pool *pgxpool.Pool) ([]FetchTask, error) { + // Acquire a connection from the pool + conn, err := pool.Acquire(ctx) + if err != nil { + return nil, fmt.Errorf("failed to acquire connection: %v", err) + } + defer conn.Release() + + rows, err := conn.Query(ctx, ` + SELECT id, type, data, retries FROM background_tasks + WHERE status = 'ready' + AND type = 'fetch_metadata' AND retries <= $1 + AND (retry_at <= EXTRACT(EPOCH FROM NOW())::bigint OR retry_at is NULL) + LIMIT 100 + `, max_retries) + if err != nil { + return nil, fmt.Errorf("failed to fetch tasks: %v", err) + } + defer rows.Close() + + var tasks []FetchTask + for rows.Next() { + var task BackgroundTask + if err := rows.Scan(&task.Id, &task.Type, &task.Data, &task.Retry); err != nil { + return nil, fmt.Errorf("failed to scan task: %v", err) + } + tasks = append(tasks, FetchTask{ + TaskId: task.Id, + Type: task.Data["type"].(string), + Address: task.Data["address"].(string), + Retry: task.Retry, + }) + } + return tasks, nil +} + +func getMetadata(ctx context.Context, tx pgx.Tx, task FetchTask) (map[string]interface{}, error) { + var metadata_bytes []byte + var field_name string + switch task.Type { + case "nft_collections": + field_name = "collection_content" + case "nft_items": + field_name = "content" + case "jetton_masters": + field_name = "jetton_content" + } + query := fmt.Sprintf("SELECT %s as metadata FROM %s WHERE address = $1", field_name, task.Type) + err := tx.QueryRow(ctx, query, task.Address).Scan(&metadata_bytes) + if err != nil { + return nil, fmt.Errorf("failed to fetch metadata: %v", err) + } + + var metadata map[string]interface{} + if err := json.Unmarshal(metadata_bytes, &metadata); err != nil { + return nil, fmt.Errorf("failed to unmarshal metadata: %v", err) + } + return metadata, nil +} + +// extractURL extracts the 'url' or 'uri' from the metadata. +func extractURL(metadata map[string]interface{}) (string, error) { + if url, ok := metadata["url"].(string); ok { + return url, nil + } + if uri, ok := metadata["uri"].(string); ok { + return uri, nil + } + return "", fmt.Errorf("no 'url' or 'uri' found in metadata") +} + +// completeTask removes the task from the tasks table. +func completeTask(ctx context.Context, tx pgx.Tx, task FetchTask) error { + query := "DELETE FROM background_tasks WHERE id = $1" + _, err := tx.Exec(ctx, query, task.TaskId) + if err != nil { + return fmt.Errorf("failed to delete task: %v", err) + } + return nil +} + +func getMetadataFromJson(metadata map[string]interface{}) AddressMetadata { + var result AddressMetadata + for key := range metadata { + if value, ok := metadata[key].(string); ok { + switch key { + case "name": + if result.Name == nil { + result.Name = new(string) + } + *result.Name = value + case "description": + if result.Description == nil { + result.Description = new(string) + } + *result.Description = value + case "image": + if result.Image == nil { + result.Image = new(string) + } + *result.Image = value + case "symbol": + if result.Symbol == nil { + result.Symbol = new(string) + } + *result.Symbol = value + default: + if result.Extra == nil { + result.Extra = make(map[string]interface{}) + } + result.Extra[key] = value + } + } + } + + return result +} + +func fetchContent(metadata map[string]interface{}) (AddressMetadata, error) { + url, err := extractURL(metadata) + if err != nil { + metadata_from_db := getMetadataFromJson(metadata) + if metadata_from_db.hasAnyData() { + return metadata_from_db, nil + } else { + return AddressMetadata{}, fmt.Errorf("failed to extract URL or required data: %v", err) + } + } + + resp, err := client.Get(url) + if err != nil { + return AddressMetadata{}, fmt.Errorf("failed to fetch content from URL: %v", err) + } + defer resp.Body.Close() + + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + return AddressMetadata{}, fmt.Errorf("non-OK HTTP status: %s", resp.Status) + } + + body_bytes, err := io.ReadAll(resp.Body) + if err != nil { + return AddressMetadata{}, fmt.Errorf("failed to read response body: %v", err) + } + + var content map[string]interface{} + if err := json.Unmarshal(body_bytes, &content); err != nil { + return AddressMetadata{}, fmt.Errorf("failed to unmarshal response body: %v", err) + } + return getMetadataFromJson(content), nil +} + +func processTask(ctx context.Context, pool *pgxpool.Pool, task FetchTask) (taskError error) { + defer gate.Release(1) + conn, err := pool.Acquire(ctx) + if err != nil { + return fmt.Errorf("failed to acquire connection: %v", err) + } + defer conn.Release() + + tx, err := conn.Begin(ctx) + if err != nil { + return fmt.Errorf("failed to start transaction: %v", err) + } + defer func() { + if taskError != nil { + _ = tx.Rollback(ctx) + } else { + _ = tx.Commit(ctx) + } + }() + + // Process the task within the transaction + metadata, err := getMetadata(ctx, tx, task) + if err != nil { + log.Printf("Error getting metadata for task %v: %v", task, err) + return handleTaskFailure(ctx, tx, task, err) + } + + content, err := fetchContent(metadata) + if err != nil { + log.Printf("Error fetching content for task %v: %v", task, err) + return handleTaskFailure(ctx, tx, task, err) + } + + _, err = tx.Exec(ctx, `INSERT INTO address_metadata (address, type, valid, name, description, image, symbol, extra, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (address, type) DO UPDATE SET + valid = $3, name = $4, description = $5, image = $6, symbol = $7, extra = $8, updated_at = $9`, + task.Address, task.Type, true, content.Name, content.Description, content.Image, content.Symbol, content.Extra, time.Now().Unix()) + if err != nil { + log.Printf("Error inserting metadata for task %v: %v", task, err) + return handleTaskFailure(ctx, tx, task, err) + } + if err := completeTask(ctx, tx, task); err != nil { + log.Printf("Error completing task %v: %v", task, err) + return handleTaskFailure(ctx, tx, task, err) + } + return nil +} + +func handleTaskFailure(ctx context.Context, tx pgx.Tx, task FetchTask, taskErr error) error { + delay := calculateBackoffDelay(task.Retry) + + if task.Retry < max_retries { + _, err := tx.Exec(ctx, ` + UPDATE background_tasks + SET status = 'ready', + retry_at = EXTRACT(EPOCH FROM NOW())::bigint + $1, + retries = retries + 1, + error = $3 + WHERE id = $2 + `, int64(delay.Seconds()), task.TaskId, taskErr.Error()) + if err != nil { + log.Printf("Error updating retry_at for task %v: %v", task, err) + return err + } + } else { + _, err := tx.Exec(ctx, ` + UPDATE background_tasks + SET status = 'failed', error = $2, retries = retries + 1 + WHERE id = $1`, task.TaskId, taskErr.Error()) + if err != nil { + log.Printf("Error updating status to failed for task %v: %v", task, err) + return err + } + } + extra := map[string]interface{}{ + "error": taskErr.Error(), + } + extra_json, _ := json.Marshal(extra) + + _, err := tx.Exec(ctx, `INSERT INTO address_metadata (address, type, valid, name, description, image, symbol, extra, updated_at) + VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ON CONFLICT (address, type) DO UPDATE SET valid = $3`, + task.Address, task.Type, false, nil, nil, nil, nil, extra_json, time.Now().Unix()) + if err != nil { + log.Printf("Error inserting metadata for failed task %v: %v", task, err) + return err + } + return nil +} + +func calculateBackoffDelay(retry int) time.Duration { + delay := initial_backoff * time.Duration(math.Pow(backoff_multiplier, float64(retry-1))) + if delay > max_backoff { + delay = max_backoff + } + return delay +} + +func initializeDb(ctx context.Context, pgDsn string, processes int) (*pgxpool.Pool, error) { + config, err := pgxpool.ParseConfig(pgDsn) + if err != nil { + return nil, fmt.Errorf("failed to parse connection string: %v", err) + } + // Set maximum connections in the pool + config.MaxConns = max(int32(processes)*2, 4) + + pool, err := pgxpool.NewWithConfig(ctx, config) + if err != nil { + return nil, fmt.Errorf("failed to create connection pool: %v", err) + } + return pool, nil +} + +func updateStalledTasks(ctx context.Context, pool *pgxpool.Pool) { + for { + conn, err := pool.Acquire(ctx) + if err != nil { + log.Printf("failed to acquire connection: %v", err) + } + + _, err = conn.Exec(ctx, ` + UPDATE background_tasks + SET status = 'ready', + retry_at = EXTRACT(EPOCH FROM NOW())::bigint + LEAST( + $1 * POWER($2, retries - 1), + $3 + ) + WHERE type = 'fetch_metadata' AND status = 'in_progress' AND started_at < EXTRACT(EPOCH FROM NOW())::bigint - $4`, + int64(initial_backoff.Seconds()), backoff_multiplier, int64(max_backoff.Seconds()), stalled_task_interval) + if err != nil { + log.Print("failed to update stalled tasks: ", err) + } + conn.Release() + time.Sleep(time.Minute) + } +} + +func main() { + var pg_dsn string + var processes int + flag.StringVar(&pg_dsn, "pg", "postgresql://localhost:5432", "PostgreSQL connection string") + flag.IntVar(&processes, "processes", 32, "Set number of parallel queries") + flag.DurationVar(&initial_backoff, "initial-backoff", 5*time.Second, "Initial backoff duration") + flag.Float64Var(&backoff_multiplier, "backoff-multiplier", 2, "Backoff multiplier") + flag.DurationVar(&max_backoff, "max-backoff", 5*time.Minute, "Maximum backoff duration") + flag.IntVar(&max_retries, "max-retries", 5, "Maximum number of retries") + flag.DurationVar(&stalled_task_interval, "stalled-task-interval", 5*time.Minute, + "Interval to update stalled tasks") + flag.Parse() + + gate = semaphore.NewWeighted(int64(processes)) + client = &http.Client{ + Timeout: 30 * time.Second, + } + ctx := context.Background() + pool, err := initializeDb(ctx, pg_dsn, processes) + + if err != nil { + log.Fatal("Error initializing database connection: ", err) + } + defer pool.Close() + go updateStalledTasks(ctx, pool) + conn, err := pool.Acquire(ctx) + if err != nil { + log.Fatal("failed to acquire connection: ", err) + } + defer conn.Release() + for { + tasks, err := fetchTasks(ctx, pool) + if err != nil { + log.Println("Error fetching tasks: ", err) + time.Sleep(time.Second) + continue + } + + for _, task := range tasks { + err = gate.Acquire(ctx, 1) + if err != nil { + log.Printf("failed to acquire worker: %s\n", err.Error()) + continue + } + _, err := conn.Exec(ctx, `UPDATE background_tasks + SET status = 'in_progress', + started_at = EXTRACT(EPOCH FROM NOW())::bigint + WHERE id = $1`, + task.TaskId) + if err != nil { + log.Println("Error updating task status: ", err) + gate.Release(1) + continue + } + + go processTask(ctx, pool, task) + } + time.Sleep(time.Second) + } + +}