Skip to content
This repository has been archived by the owner on Mar 8, 2020. It is now read-only.

Commit

Permalink
[Master] Experimental Pluggable Query Handler (#4537)
Browse files Browse the repository at this point in the history
This is an experimental feature for composer and as such is not documented.

Signed-off-by: Dave Kelsey <[email protected]>
  • Loading branch information
Dave Kelsey authored Dec 6, 2018
1 parent c20b2b7 commit 3ed1794
Show file tree
Hide file tree
Showing 4 changed files with 352 additions and 314 deletions.
111 changes: 105 additions & 6 deletions packages/composer-connector-hlfv1/lib/hlfconnection.js
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ const fs = require('fs-extra');
const HLFSecurityContext = require('./hlfsecuritycontext');
const HLFUtil = require('./hlfutil');
const HLFTxEventHandler = require('./hlftxeventhandler');
const HLFQueryHandler = require('./hlfqueryhandler');
const Logger = require('composer-common').Logger;
const path = require('path');
const temp = require('temp').track();
Expand All @@ -44,6 +43,8 @@ const installDependencies = {

const chaincodePathSection = 'businessnetwork';

let HLFQueryHandler;

/**
* Class representing a connection to a business network running on Hyperledger
* Fabric, using the hfc module.
Expand All @@ -67,10 +68,18 @@ class HLFConnection extends Connection {
/**
* create a new Query Handler.
* @param {HLFConnection} connection The connection to be used by the query handler.
* @return {HLFQueryManager} A new query manager.
* @param {string} queryHandlerImpl The query handler to require
* @return {HLFQueryHandler} A new query handler.
*/
static createQueryHandler(connection) {
return new HLFQueryHandler(connection);
static createQueryHandler(connection, queryHandlerImpl) {
const method = 'createQueryHandler';
if (typeof queryHandlerImpl === 'string') {
LOG.info(method, `attemping to load query handler module ${queryHandlerImpl}`);
HLFQueryHandler = require(queryHandlerImpl);
return new HLFQueryHandler(connection);
} else {
return new queryHandlerImpl(connection);
}
}

/**
Expand Down Expand Up @@ -128,10 +137,18 @@ class HLFConnection extends Connection {
this.caClient = caClient;
this.initialized = false;
this.commitTimeout = connectOptions['x-commitTimeout'] ? connectOptions['x-commitTimeout'] * 1000 : 300 * 1000;
LOG.debug(method, `commit timeout set to ${this.commitTimeout}`);

this.requiredEventHubs = isNaN(connectOptions['x-requiredEventHubs'] * 1) ? 1 : connectOptions['x-requiredEventHubs'] * 1;
LOG.debug(method, `required event hubs set to ${this.requiredEventHubs}`);
this.queryHandler = HLFConnection.createQueryHandler(this);
LOG.debug(method, `commit timeout set to ${this.commitTimeout}`);

let queryHandlerImpl = './hlfqueryhandler';
if (process.env.COMPOSER_QUERY_HANDLER && process.env.COMPOSER_QUERY_HANDLER.length !== 0) {
queryHandlerImpl = process.env.COMPOSER_QUERY_HANDLER;
} else if (connectOptions.queryHandler && connectOptions.queryHandler.length !== 0) {
queryHandlerImpl = connectOptions.queryHandler;
}
this.queryHandler = HLFConnection.createQueryHandler(this, queryHandlerImpl);

// We create promisified versions of these APIs.
this.fs = thenifyAll(fs);
Expand Down Expand Up @@ -1337,6 +1354,88 @@ class HLFConnection extends Connection {
return txId;
}

/**
* Send a query
* @param {Peer} peer The peer to query
* @param {TransactionID} txId the transaction id to use
* @param {string} functionName the function name of the query
* @param {array} args the arguments to ass
* @returns {Buffer} asynchronous response to query
*/
async querySinglePeer(peer, txId, functionName, args) {
const method = 'querySinglePeer';
LOG.entry(method, peer.getName(), txId, functionName, args);
const request = {
targets: [peer],
chaincodeId: this.businessNetworkIdentifier,
txId: txId,
fcn: functionName,
args: args
};

const t0 = Date.now();
let payloads = await this.queryByChaincode(request);
LOG.perf(method, `Total duration for queryByChaincode to ${functionName}: `, txId, t0);
LOG.debug(method, `Received ${payloads.length} payloads(s) from querying the composer runtime chaincode`);
if (!payloads.length) {
LOG.error(method, 'No payloads were returned from the query request:' + functionName);
throw new Error('No payloads were returned from the query request:' + functionName);
}
const payload = payloads[0];

//
// need to also handle the grpc error codes as before, but now need to handle the change in the
// node-sdk with a horrible match a string error, but would need a fix to node-sdk to resolve.
// A Fix is in 1.3
if (payload instanceof Error && (
(payload.code && (payload.code === 14 || payload.code === 1 || payload.code === 4)) ||
(payload.message.match(/Failed to connect before the deadline/))
)) {
throw payload;
}

LOG.exit(method, payload);
return payload;

}

/**
* Perform a chaincode query and parse the responses.
* @param {object} request the proposal for a query
* @return {array} the responses
*/
async queryByChaincode(request) {
const method = 'queryByChaincode';
LOG.entry(method, request);
try {
const results = await this.channel.sendTransactionProposal(request);
const responses = results[0];
if (responses && Array.isArray(responses)) {
let results = [];
for (let i = 0; i < responses.length; i++) {
let response = responses[i];
if (response instanceof Error) {
results.push(response);
}
else if (response.response && response.response.payload) {
results.push(response.response.payload);
}
else {
results.push(new Error(response));
}
}
LOG.exit(method);
return results;
}
const err = new Error('Payload results are missing from the chaincode query');
LOG.error(method, err);
throw err;
} catch(err) {
LOG.error(method, err);
throw err;
}
}

}

module.exports = HLFConnection;
86 changes: 2 additions & 84 deletions packages/composer-connector-hlfv1/lib/hlfqueryhandler.js
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class HLFQueryHandler {
if (this.queryPeerIndex !== -1) {
let peer = this.allQueryPeers[this.queryPeerIndex];
try {
payload = await this.querySinglePeer(peer, txId, functionName, args);
payload = await this.connection.querySinglePeer(peer, txId, functionName, args);
success = true;
} catch (error) {
allErrors.push(error);
Expand All @@ -93,7 +93,7 @@ class HLFQueryHandler {
}
let peer = this.allQueryPeers[i];
try {
payload = await this.querySinglePeer(peer, txId, functionName, args);
payload = await this.connection.querySinglePeer(peer, txId, functionName, args);
this.queryPeerIndex = i;
success = true;
break;
Expand All @@ -119,88 +119,6 @@ class HLFQueryHandler {
return payload;

}

/**
* Send a query
* @param {Peer} peer The peer to query
* @param {TransactionID} txId the transaction id to use
* @param {string} functionName the function name of the query
* @param {array} args the arguments to ass
* @returns {Buffer} asynchronous response to query
*/
async querySinglePeer(peer, txId, functionName, args) {
const method = 'querySinglePeer';
LOG.entry(method, peer.getName(), txId, functionName, args);
const request = {
targets: [peer],
chaincodeId: this.connection.businessNetworkIdentifier,
txId: txId,
fcn: functionName,
args: args
};

const t0 = Date.now();
let payloads = await this.queryByChaincode(request);
LOG.perf(method, `Total duration for queryByChaincode to ${functionName}: `, txId, t0);
LOG.debug(method, `Received ${payloads.length} payloads(s) from querying the composer runtime chaincode`);
if (!payloads.length) {
LOG.error(method, 'No payloads were returned from the query request:' + functionName);
throw new Error('No payloads were returned from the query request:' + functionName);
}
const payload = payloads[0];

//
// need to also handle the grpc error codes as before, but now need to handle the change in the
// node-sdk with a horrible match a string error, but would need a fix to node-sdk to resolve.
// A Fix is in 1.3
if (payload instanceof Error && (
(payload.code && (payload.code === 14 || payload.code === 1 || payload.code === 4)) ||
(payload.message.match(/Failed to connect before the deadline/))
)) {
throw payload;
}

LOG.exit(method, payload);
return payload;

}

/**
* Perform a chaincode query and parse the responses.
* @param {object} request the proposal for a query
* @return {array} the responses
*/
async queryByChaincode(request) {
const method = 'queryByChaincode';
LOG.entry(method, request);
try {
const results = await this.connection.channel.sendTransactionProposal(request);
const responses = results[0];
if (responses && Array.isArray(responses)) {
let results = [];
for (let i = 0; i < responses.length; i++) {
let response = responses[i];
if (response instanceof Error) {
results.push(response);
}
else if (response.response && response.response.payload) {
results.push(response.response.payload);
}
else {
results.push(new Error(response));
}
}
LOG.exit(method);
return results;
}
const err = new Error('Payload results are missing from the chaincode query');
LOG.error(method, err);
throw err;
} catch(err) {
LOG.error(method, err);
throw err;
}
}
}

module.exports = HLFQueryHandler;
Expand Down
Loading

0 comments on commit 3ed1794

Please sign in to comment.