From 64795e464797248ab463ec18deb49108b9286fa1 Mon Sep 17 00:00:00 2001 From: Klaus Trainer Date: Fri, 7 Oct 2016 15:02:34 +0200 Subject: [PATCH] fix: synchronous table creation and deletion --- index.js | 75 +++++++++++-------- iterator.js | 7 +- .../apis/dynamodb-2012-08-10.waiters2.json | 35 +++++++++ 3 files changed, 84 insertions(+), 33 deletions(-) create mode 100644 test/node_modules/aws-sdk/apis/dynamodb-2012-08-10.waiters2.json diff --git a/index.js b/index.js index 8b9504b..9cae30d 100644 --- a/index.js +++ b/index.js @@ -7,6 +7,8 @@ const serialize = require('./serialize') const deserialize = require('./deserialize') const DynamoDBIterator = require('./iterator') +const MAX_BATCH_SIZE = 25 + const globalStore = {} AWS.config.apiVersions = {dynamodb: '2012-08-10'} @@ -40,7 +42,8 @@ function hexEncodeTableName (str) { DynamoDBDOWN.prototype._open = function (options, cb) { if (!options.dynamodb) { - return cb(new Error('`open` requires `options` argument with "dynamodb" key')) + cb(new Error('`open` requires `options` argument with "dynamodb" key')) + return } if (typeof options.prefix === 'string') { @@ -57,17 +60,17 @@ DynamoDBDOWN.prototype._open = function (options, cb) { this.dynamoDb = new AWS.DynamoDB(dynamodbOptions) - if (options.createIfMissing) { + if (options.createIfMissing !== false) { this.createTable({ ProvisionedThroughput: dynamodbOptions.ProvisionedThroughput }, (err, data) => { const exists = err && (err.code === 'ResourceInUseException') if (options.errorIfExists && exists || err && !exists) { - return cb(err) + cb(err) + } else { + cb(null, this) } - - return cb(null, this) }) } else { cb(null, this) @@ -102,14 +105,12 @@ DynamoDBDOWN.prototype._get = function (key, options, cb) { this.dynamoDb.getItem(params, function (err, data) { if (err) { - return cb(err) - } - - if (!(data && data.Item && data.Item.value)) { - return cb(new Error('NotFound')) + cb(err) + } else if (!(data && data.Item && data.Item.value)) { + cb(new Error('NotFound')) + } else { + cb(null, deserialize(data.Item.value, options.asBuffer)) } - - return cb(null, deserialize(data.Item.value, options.asBuffer)) }) } @@ -122,13 +123,7 @@ DynamoDBDOWN.prototype._del = function (key, options, cb) { } } - this.dynamoDb.deleteItem(params, function (err, data) { - if (err) { - return cb(err) - } - - cb(null, data) - }) + this.dynamoDb.deleteItem(params, cb) } DynamoDBDOWN.prototype._batch = function (array, options, cb) { @@ -183,7 +178,8 @@ DynamoDBDOWN.prototype._batch = function (array, options, cb) { const loop = (err, data) => { if (err) { - return cb(err) + cb(err) + return } const reqs = [] @@ -192,14 +188,14 @@ DynamoDBDOWN.prototype._batch = function (array, options, cb) { reqs.push.apply(reqs, data.UnprocessedItems[this.encodedTableName]) } - reqs.push.apply(reqs, ops.splice(0, 25 - reqs.length)) + reqs.push.apply(reqs, ops.splice(0, MAX_BATCH_SIZE - reqs.length)) if (reqs.length === 0) { - return cb() + cb() + } else { + params.RequestItems[this.encodedTableName] = reqs + this.dynamoDb.batchWriteItem(params, loop) } - - params.RequestItems[this.encodedTableName] = reqs - this.dynamoDb.batchWriteItem(params, loop) } loop() @@ -227,23 +223,38 @@ DynamoDBDOWN.prototype.createTable = function (opts, cb) { WriteCapacityUnits: 1 } - this.dynamoDb.createTable(params, cb) + this.dynamoDb.createTable(params, (err, data) => { + if (err) { + cb(err) + } else { + this.dynamoDb.waitFor('tableExists', {TableName: this.encodedTableName}, cb) + } + }) } -DynamoDBDOWN.destroy = function (name, callback) { +DynamoDBDOWN.destroy = function (name, cb) { const store = globalStore[name] if (store) { store.dynamoDb.deleteTable({TableName: store.encodedTableName}, (err, data) => { - if (err) { - callback() - } else { + if (err && err.code === 'ResourceNotFoundException') { delete globalStore[name] - callback() + cb() + } else if (err) { + cb(err) + } else { + store.dynamoDb.waitFor('tableNotExists', {TableName: store.encodedTableName}, (err, data) => { + if (err) { + cb(err) + } else { + delete globalStore[name] + cb() + } + }) } }) } else { - callback(new Error('NotFound')) + cb(new Error('NotFound')) } } diff --git a/iterator.js b/iterator.js index af5bddf..6eba902 100644 --- a/iterator.js +++ b/iterator.js @@ -80,7 +80,12 @@ DynamoDBIterator.prototype.createReadStream = function (opts) { const onData = (err, data) => { if (err) { - return stream.emit('error', err) + if (err.code === 'ResourceNotFoundException') { + stream.end() + } else { + stream.emit('error', err) + } + return stream } data.Items.forEach((item) => { diff --git a/test/node_modules/aws-sdk/apis/dynamodb-2012-08-10.waiters2.json b/test/node_modules/aws-sdk/apis/dynamodb-2012-08-10.waiters2.json new file mode 100644 index 0000000..af8a5cd --- /dev/null +++ b/test/node_modules/aws-sdk/apis/dynamodb-2012-08-10.waiters2.json @@ -0,0 +1,35 @@ +{ + "version": 2, + "waiters": { + "TableExists": { + "delay": 0.002, + "operation": "DescribeTable", + "maxAttempts": 25, + "acceptors": [ + { + "expected": "ACTIVE", + "matcher": "path", + "state": "success", + "argument": "Table.TableStatus" + }, + { + "expected": "ResourceNotFoundException", + "matcher": "error", + "state": "retry" + } + ] + }, + "TableNotExists": { + "delay": 0.002, + "operation": "DescribeTable", + "maxAttempts": 25, + "acceptors": [ + { + "expected": "ResourceNotFoundException", + "matcher": "error", + "state": "success" + } + ] + } + } +}