Skip to content

Commit

Permalink
Reindex chain handler. (#336)
Browse files Browse the repository at this point in the history
* Reindex chain handler.

* created tests for adminOperations file.

* sign on ganache

* debug

* added checks for addresses mismatch.

* fix + debug logs

* saved file

* update logs

* added more logs

* remove file

* added check for supported chain id.

* change signature approach.

* remove bytes format

* added more logs

* fix.

* fix command

* add log in tests.

* more logs + waiting for indexing.

* fix key

* enhancements.

* remove promise syntax.

* revert type for handle function

* added debug.

* add debug log.

* fix test.

* add logs in file infor handler.

* fix log

* logs to file info handler

* json stringify.

* added another debug log

* changed encrypt method

* added ecies method back + logs.

* added workaround for decrypting bytes.

* convert to string

* modified ddo

* commented tests.

* renamed test file.

* renamed file with a valid name.
  • Loading branch information
mariacarmina authored Mar 21, 2024
1 parent 2d38c67 commit 8a172ae
Show file tree
Hide file tree
Showing 9 changed files with 314 additions and 28 deletions.
4 changes: 4 additions & 0 deletions src/@types/commands.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ export interface AdminReindexTxCommand extends AdminCommand {
txId: string
}

export interface AdminReindexChainCommand extends AdminCommand {
chainId: number
}

export interface ICommandHandler {
handle(command: Command): Promise<P2PCommandResponse>
validate(command: Command): ValidateParams
Expand Down
108 changes: 90 additions & 18 deletions src/components/core/adminOperations.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
import { Handler } from './handler.js'
import { P2PCommandResponse } from '../../@types/OceanNode.js'
import { AdminReindexTxCommand, AdminStopNodeCommand } from '../../@types/commands.js'
import {
AdminReindexTxCommand,
AdminStopNodeCommand,
AdminReindexChainCommand
} from '../../@types/commands.js'
import { CORE_LOGGER } from '../../utils/logging/common.js'
import { LOG_LEVELS_STR } from '../../utils/logging/Logger.js'
import { ReadableString } from '../P2P/handleProtocolCommands.js'
Expand All @@ -11,8 +15,12 @@ import {
buildInvalidParametersResponse
} from '../httpRoutes/validateCommands.js'
import { validateSignature } from '../../utils/auth.js'
import { processChunkLogs } from '../Indexer/utils.js'
import { Blockchain, getConfiguration } from '../../utils/index.js'
import {
processBlocks,
processChunkLogs,
getDeployedContractBlock
} from '../Indexer/utils.js'
import { Blockchain, checkSupportedChainId } from '../../utils/index.js'

export class StopNodeHandler extends Handler {
validate(command: AdminStopNodeCommand): ValidateParams {
Expand Down Expand Up @@ -73,20 +81,15 @@ export class ReindexTxHandler extends Handler {
async handle(task: AdminReindexTxCommand): Promise<P2PCommandResponse> {
const validation = this.validate(task)
if (!validation.valid) {
return new Promise<P2PCommandResponse>((resolve, reject) => {
resolve(buildInvalidParametersResponse(validation))
})
return buildInvalidParametersResponse(validation)
}
CORE_LOGGER.logMessage(`Reindexing tx...`)
const config = await getConfiguration()
if (!(`${task.chainId}` in config.supportedNetworks)) {
const checkChainId = await checkSupportedChainId(task.chainId)
if (!checkChainId[0]) {
CORE_LOGGER.error(`Chain ID ${task.chainId} is not supported in config.`)
return
}
const blockchain = new Blockchain(
config.supportedNetworks[task.chainId.toString()].rpc,
task.chainId
)
const blockchain = new Blockchain(checkChainId[1], task.chainId)
const provider = blockchain.getProvider()
const signer = blockchain.getSigner()
try {
Expand All @@ -104,14 +107,83 @@ export class ReindexTxHandler extends Handler {
return
}

return new Promise<P2PCommandResponse>((resolve, reject) => {
resolve({
status: { httpStatus: 200 },
stream: new ReadableString('REINDEX TX OK')
})
})
return {
status: { httpStatus: 200 },
stream: new ReadableString('REINDEX TX OK')
}
} catch (error) {
CORE_LOGGER.log(LOG_LEVELS_STR.LEVEL_ERROR, `REINDEX tx: ${error.message} `, true)
}
}
}

export class ReindexChainHandler extends Handler {
validate(command: AdminReindexChainCommand): ValidateParams {
const commandValidation = validateCommandParameters(command, [
'expiryTimestamp',
'signature',
'chainId'
])
if (!commandValidation.valid) {
const errorMsg = `Command validation failed: ${JSON.stringify(commandValidation)}`
CORE_LOGGER.logMessage(errorMsg)
return buildInvalidRequestMessage(errorMsg)
}
if (!validateSignature(command.expiryTimestamp, command.signature)) {
const errorMsg = 'Expired authentication or invalid signature'
CORE_LOGGER.logMessage(errorMsg)
return buildInvalidRequestMessage(errorMsg)
}
return commandValidation
}

async handle(task: AdminReindexChainCommand): Promise<P2PCommandResponse> {
const validation = this.validate(task)
if (!validation.valid) {
return buildInvalidParametersResponse(validation)
}
CORE_LOGGER.logMessage(`Reindexing chain command called`)
const checkChainId = await checkSupportedChainId(task.chainId)
if (!checkChainId[0]) {
CORE_LOGGER.error(`Chain ID ${task.chainId} is not supported in config.`)
return
}
const blockchain = new Blockchain(checkChainId[1], task.chainId)
const provider = blockchain.getProvider()
const signer = blockchain.getSigner()
const deployedBlock = getDeployedContractBlock(task.chainId)
try {
await this.getOceanNode().getDatabase().ddo.deleteAllAssetsFromChain(task.chainId)
CORE_LOGGER.logMessage(
`Assets from chain ${task.chainId} were deleted from db, now starting to reindex...`
)
const latestBlock = await provider.getBlockNumber()
const ret = await processBlocks(
signer,
provider,
task.chainId,
deployedBlock,
latestBlock - deployedBlock + 1
)
if (!ret) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`Reindex chain failed on chain ${task.chainId}.`,
true
)
return
}

return {
status: { httpStatus: 200 },
stream: new ReadableString('REINDEX CHAIN OK')
}
} catch (error) {
CORE_LOGGER.log(
LOG_LEVELS_STR.LEVEL_ERROR,
`REINDEX chain: ${error.message} `,
true
)
}
}
}
10 changes: 9 additions & 1 deletion src/components/core/coreHandlersRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,11 @@ import {
ComputeGetResultHandler,
ComputeInitializeHandler
} from './compute/index.js'
import { ReindexTxHandler, StopNodeHandler } from './adminOperations.js'
import {
ReindexChainHandler,
ReindexTxHandler,
StopNodeHandler
} from './adminOperations.js'

export type HandlerRegistry = {
handlerName: string // name of the handler
Expand Down Expand Up @@ -101,6 +105,10 @@ export class CoreHandlersRegistry {
)
this.registerCoreHandler(PROTOCOL_COMMANDS.STOP_NODE, new StopNodeHandler(node))
this.registerCoreHandler(PROTOCOL_COMMANDS.REINDEX_TX, new ReindexTxHandler(node))
this.registerCoreHandler(
PROTOCOL_COMMANDS.REINDEX_CHAIN,
new ReindexChainHandler(node)
)
}

public static getInstance(node: OceanNode): CoreHandlersRegistry {
Expand Down
10 changes: 10 additions & 0 deletions src/components/core/fileInfoHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,24 @@ async function getFile(
throw new Error(msg)
}
// 3. Decrypt the url
CORE_LOGGER.logMessage(
`Uint8Array.from(Buffer.from(service.files, 'hex')): ${Uint8Array.from(
Buffer.from(service.files, 'hex')
)}`
)
CORE_LOGGER.logMessage(`service.files: ${service.files}`)
const decryptedUrlBytes = await decrypt(
Uint8Array.from(Buffer.from(service.files, 'hex')),
EncryptMethod.ECIES
)
CORE_LOGGER.logMessage(`URL decrypted for Service ID: ${serviceId}`)
CORE_LOGGER.logMessage(`decryptedUrlBytes: ${decryptedUrlBytes}`)

// Convert the decrypted bytes back to a string
const decryptedFilesString = Buffer.from(decryptedUrlBytes).toString()
CORE_LOGGER.logMessage(`decryptedFilesString: ${decryptedFilesString}`)
const decryptedFileArray = JSON.parse(decryptedFilesString)
CORE_LOGGER.logMessage(`decryptedFileArray: ${JSON.stringify(decryptedFileArray)}`)
return decryptedFileArray.files
} catch (error) {
const msg = 'Error occured while requesting the files: ' + error.message
Expand Down Expand Up @@ -128,6 +137,7 @@ export class FileInfoHandler extends Handler {
})
} else if (task.did && task.serviceId) {
const fileArray = await getFile(task.did, task.serviceId, oceanNode)
CORE_LOGGER.logMessage(`fileArray: ${fileArray}`)

if (task.fileIndex) {
const fileMetadata = await formatMetadata(fileArray[task.fileIndex])
Expand Down
15 changes: 15 additions & 0 deletions src/components/database/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -378,6 +378,21 @@ export class DdoDatabase {
)
}
}

async deleteAllAssetsFromChain(chainId: number) {
const searchParameters = {
q: '*',
filter_by: `chainId:=${chainId}`
}
const results = await this.search(searchParameters)
for (const res of results) {
if (res && res.hits) {
for (const h of res.hits) {
await this.delete(h.document.id)
}
}
}
}
}

export class NonceDatabase {
Expand Down
5 changes: 3 additions & 2 deletions src/test/integration/download.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -163,11 +163,12 @@ describe('Should run a complete node flow.', () => {
it('should get file info with did', async () => {
const fileInfoTask = {
command: PROTOCOL_COMMANDS.FILE_INFO,
did: publishedDataset.ddo.id,
serviceId: publishedDataset.ddo.services[0].id
did: actualDDO.id,
serviceId: actualDDO.services[0].id
}

const response = await new FileInfoHandler(oceanNode).handle(fileInfoTask)
console.log(`response file info: ${JSON.stringify(response)}`)

assert(response)
assert(response.stream, 'stream not present')
Expand Down
Loading

0 comments on commit 8a172ae

Please sign in to comment.