Skip to content

Commit

Permalink
feat: list keys (#164)
Browse files Browse the repository at this point in the history
* feat: list keys

* remove dead code

* adjust parameter changes + e2e tests

* fix e2e test

* fix comments
  • Loading branch information
moritzraho authored Jun 17, 2024
1 parent f41cdeb commit fe75614
Show file tree
Hide file tree
Showing 5 changed files with 384 additions and 34 deletions.
125 changes: 124 additions & 1 deletion e2e/e2e.js
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ describe('e2e tests using OpenWhisk credentials (as env vars)', () => {
}))
})

test('key-value basic test on one key with string value: put, get, delete, any, deleteAll', async () => {
test('key-value basic test on one key with string value: put, get, delete, any, stats, deleteAll', async () => {
const state = await initStateEnv()

const testValue = 'a string'
Expand Down Expand Up @@ -126,6 +126,129 @@ describe('e2e tests using OpenWhisk credentials (as env vars)', () => {
expect(await state.get(testKey)).toEqual(undefined)
})

test('listKeys test: few < 128 keys, many, and expired entries', async () => {
const state = await initStateEnv()
await state.deleteAll() // cleanup

const genKeyStrings = (n) => {
return (new Array(n).fill(0).map((_, idx) => {
const char = String.fromCharCode(97 + idx % 26)
// list-[a-z]-[0-(N-1)]
return `list-${char}-${idx}`
}))
}
const putKeys = async (keys, ttl) => {
const _putKeys = async (keys, ttl) => {
await Promise.all(keys.map(async (k, idx) => await state.put(k, `value-${idx}`, { ttl })))
}

const batchSize = 20
let i = 0
while (i < keys.length - batchSize) {
await _putKeys(keys.slice(i, i + batchSize), ttl)
i += batchSize
}
// final call
await _putKeys(keys.slice(i), ttl)
}

// 1. test with not many elements, one iteration should return all
const keys90 = genKeyStrings(90).sort()
await putKeys(keys90, 60)

let it = state.list()
let ret = await it.next()
expect(ret.value.keys.sort()).toEqual(keys90)
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ match: 'list-*' })
ret = await it.next()
expect(ret.value.keys.sort()).toEqual(keys90)
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ match: 'list-a*' })
ret = await it.next()
expect(ret.value.keys.sort()).toEqual(['list-a-0', 'list-a-26', 'list-a-52', 'list-a-78'])
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ match: 'list-*-1' })
ret = await it.next()
expect(ret.value.keys.sort()).toEqual(['list-b-1'])
expect(await it.next()).toEqual({ done: true, value: undefined })

// 2. test with many elements and large countHint
const keys900 = genKeyStrings(900)
await putKeys(keys900, 60)

it = state.list({ countHint: 1000 })
ret = await it.next()
expect(ret.value.keys.length).toEqual(900)
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ countHint: 1000, match: 'list-*' })
ret = await it.next()
expect(ret.value.keys.length).toEqual(900)
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ countHint: 1000, match: 'list-z*' })
ret = await it.next()
expect(ret.value.keys.length).toEqual(34)
expect(await it.next()).toEqual({ done: true, value: undefined })

it = state.list({ match: 'list-*-1' })
ret = await it.next()
expect(ret.value.keys.sort()).toEqual(['list-b-1'])
expect(await it.next()).toEqual({ done: true, value: undefined })

// 3. test with many elements while iterating
let iterations = 0
let retArray = []
for await (const { keys } of state.list()) {
iterations++
retArray.push(...keys)
}
expect(iterations).toBeGreaterThan(5) // should be around 9-10
expect(retArray.length).toEqual(900)

iterations = 0
retArray = []
for await (const { keys } of state.list({ match: 'list-*' })) {
iterations++
retArray.push(...keys)
}
expect(iterations).toBeGreaterThan(5) // should be around 9-10
expect(retArray.length).toEqual(900)

iterations = 0
retArray = []
for await (const { keys } of state.list({ match: 'list-z*' })) {
iterations++
retArray.push(...keys)
}
expect(iterations).toEqual(1)
expect(retArray.length).toEqual(34)

iterations = 0
retArray = []
for await (const { keys } of state.list({ match: 'list-*-1' })) {
iterations++
retArray.push(...keys)
}
expect(iterations).toEqual(1)
expect(retArray.length).toEqual(1)

// 4. make sure expired keys aren't listed
await putKeys(keys90, 1)
await waitFor(2000)

it = state.list({ countHint: 1000 })
ret = await it.next()
expect(ret.value.keys.length).toEqual(810) // 900 - 90
expect(await it.next()).toEqual({ done: true, value: undefined })

await state.deleteAll()
})

test('throw error when get/put with invalid keys', async () => {
const invalidKey = 'some/invalid:key'
const state = await initStateEnv()
Expand Down
133 changes: 111 additions & 22 deletions lib/AdobeState.js
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* eslint-disable jsdoc/no-undefined-types */
/*
Copyright 2024 Adobe. All rights reserved.
This file is licensed to you under the Apache License, Version 2.0 (the "License");
Expand All @@ -9,16 +10,27 @@ the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR REPRESENTA
OF ANY KIND, either express or implied. See the License for the specific language
governing permissions and limitations under the License.
*/
const { codes, logAndThrow } = require('./StateError')
const utils = require('./utils')
const cloneDeep = require('lodash.clonedeep')
const logger = require('@adobe/aio-lib-core-logging')('@adobe/aio-lib-state', { provider: 'debug' })
const { HttpExponentialBackoff } = require('@adobe/aio-lib-core-networking')
const url = require('node:url')
const { getCliEnv } = require('@adobe/aio-lib-env')
const { REGEX_PATTERN_STORE_KEY, HEADER_KEY_EXPIRES, CUSTOM_ENDPOINT, ENDPOINTS, ALLOWED_REGIONS } = require('./constants')
const Ajv = require('ajv')

const { codes, logAndThrow } = require('./StateError')
const utils = require('./utils')
const {
REGEX_PATTERN_STORE_KEY,
HEADER_KEY_EXPIRES,
CUSTOM_ENDPOINT,
ENDPOINTS,
ALLOWED_REGIONS,
MAX_LIST_COUNT_HINT,
REQUEST_ID_HEADER,
MIN_LIST_COUNT_HINT,
REGEX_PATTERN_LIST_KEY_MATCH
} = require('./constants')

/* *********************************** typedefs *********************************** */

/**
Expand Down Expand Up @@ -92,20 +104,21 @@ async function handleResponse (response, params) {
}

const copyParams = cloneDeep(params)
copyParams.requestId = response.headers.get(REQUEST_ID_HEADER)

switch (response.status) {
case 404:
// no exception on 404
return response
case 401:
return logAndThrow(new codes.ERROR_UNAUTHORIZED({ messageValues: ['underlying DB provider'], sdkDetails: copyParams }))
return logAndThrow(new codes.ERROR_UNAUTHORIZED({ messageValues: ['State service'], sdkDetails: copyParams }))
case 403:
return logAndThrow(new codes.ERROR_BAD_CREDENTIALS({ messageValues: ['underlying DB provider'], sdkDetails: copyParams }))
return logAndThrow(new codes.ERROR_BAD_CREDENTIALS({ messageValues: ['State service'], sdkDetails: copyParams }))
case 413:
return logAndThrow(new codes.ERROR_PAYLOAD_TOO_LARGE({ messageValues: ['underlying DB provider'], sdkDetails: copyParams }))
return logAndThrow(new codes.ERROR_PAYLOAD_TOO_LARGE({ messageValues: ['State service'], sdkDetails: copyParams }))
case 429:
return logAndThrow(new codes.ERROR_REQUEST_RATE_TOO_HIGH({ sdkDetails: copyParams }))
default: // 500 errors
return logAndThrow(new codes.ERROR_INTERNAL({ messageValues: [`unexpected response from provider with status: ${response.status} body: ${await response.text()}`], sdkDetails: copyParams }))
return logAndThrow(new codes.ERROR_INTERNAL({ messageValues: [`unexpected response from State service with status: ${response.status} body: ${await response.text()}`], sdkDetails: copyParams }))
}
}

Expand Down Expand Up @@ -159,18 +172,13 @@ class AdobeState {
* Creates a request url.
*
* @private
* @param {string} key the key of the state store
* @param {string} containerURLPath defaults to '' to hit the container
* endpoint, add /data/key to hit the key endpoint
* @param {object} queryObject the query variables to send
* @returns {string} the constructed request url
*/
createRequestUrl (key, queryObject = {}) {
let urlString

if (key) {
urlString = `${this.endpoint}/containers/${this.namespace}/data/${key}`
} else {
urlString = `${this.endpoint}/containers/${this.namespace}`
}
createRequestUrl (containerURLPath = '', queryObject = {}) {
const urlString = `${this.endpoint}/containers/${this.namespace}${containerURLPath}`

logger.debug('requestUrl string', urlString)
const requestUrl = new url.URL(urlString)
Expand Down Expand Up @@ -277,7 +285,7 @@ class AdobeState {
}
}
logger.debug('get', requestOptions)
const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(key), requestOptions)
const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(`/data/${key}`), requestOptions)
const response = await _wrap(promise, { key })
if (response.ok) {
// we only expect string values
Expand Down Expand Up @@ -334,8 +342,11 @@ class AdobeState {

logger.debug('put', requestOptions)

const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(key, queryParams), requestOptions)
await _wrap(promise, { key, value, ...options })
const promise = this.fetchRetry.exponentialBackoff(
this.createRequestUrl(`/data/${key}`, queryParams),
requestOptions
)
await _wrap(promise, { key, value, ...options }, true)
return key
}

Expand All @@ -358,7 +369,7 @@ class AdobeState {

logger.debug('delete', requestOptions)

const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(key), requestOptions)
const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(`/data/${key}`), requestOptions)
const response = await _wrap(promise, { key })
if (response.status === 404) {
return null
Expand Down Expand Up @@ -423,7 +434,7 @@ class AdobeState {
}
}

logger.debug('any', requestOptions)
logger.debug('stats', requestOptions)

const promise = this.fetchRetry.exponentialBackoff(this.createRequestUrl(), requestOptions)
const response = await _wrap(promise, {})
Expand All @@ -433,6 +444,84 @@ class AdobeState {
return response.json()
}
}

/**
* List keys, returns an iterator. Every iteration returns a batch of
* approximately `countHint` keys.
* @example
* for await (const { keys } of state.list({ match: 'abc*' })) {
* console.log(keys)
* }
* @param {object} options list options
* @param {string} options.match a glob pattern that supports '*' to filter
* keys.
* @param {number} options.countHint an approximate number on how many items
* to return per iteration. Default: 100, min: 10, max: 1000.
* @returns {AsyncGenerator<{ keys: [] }>} an async generator which yields a
* { keys } object at every iteration.
* @memberof AdobeState
*/
list (options = {}) {
logger.debug('list', options)
const requestOptions = {
method: 'GET',
headers: {
...this.getAuthorizationHeaders()
}
}
logger.debug('list', requestOptions)

const queryParams = {}
if (options.match) {
queryParams.match = options.match
}
if (options.countHint) {
queryParams.countHint = options.countHint
}

if (queryParams.countHint < MIN_LIST_COUNT_HINT || queryParams.countHint > MAX_LIST_COUNT_HINT) {
logAndThrow(new codes.ERROR_BAD_ARGUMENT({
messageValues: `'countHint' must be in the [${MIN_LIST_COUNT_HINT}, ${MAX_LIST_COUNT_HINT}] range`,
sdkDetails: { queryParams }
}))
}
const schema = {
type: 'object',
properties: {
match: { type: 'string', pattern: REGEX_PATTERN_LIST_KEY_MATCH }, // this is an important check
countHint: { type: 'integer' }
}
}

const { valid, errors } = validate(schema, queryParams)
if (!valid) {
logAndThrow(new codes.ERROR_BAD_ARGUMENT({
messageValues: utils.formatAjvErrors(errors),
sdkDetails: { queryParams, errors }
}))
}

const stateInstance = this
return (async function * iter () {
let cursor = 0

do {
const promise = stateInstance.fetchRetry.exponentialBackoff(
stateInstance.createRequestUrl('/data', { ...queryParams, cursor }),
requestOptions
)
const response = await _wrap(promise, { ...queryParams, cursor })
if (response.status === 404) {
yield { keys: [] }
return
}
const res = await response.json()
cursor = res.cursor

yield { keys: res.keys }
} while (cursor !== 0)
}())
}
}

module.exports = { AdobeState }
2 changes: 1 addition & 1 deletion lib/StateError.js
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,12 @@ const E = ErrorWrapper(
E('ERROR_INTERNAL', '%s')
E('ERROR_BAD_REQUEST', '%s')
E('ERROR_BAD_ARGUMENT', '%s')
E('ERROR_UNEXPECTED_NOT_FOUND', '%s')
E('ERROR_UNKNOWN_PROVIDER', '%s')
E('ERROR_UNAUTHORIZED', 'you are not authorized to access %s')
E('ERROR_BAD_CREDENTIALS', 'cannot access %s, make sure your credentials are valid')
E('ERROR_PAYLOAD_TOO_LARGE', 'key, value or request payload is too large')
E('ERROR_REQUEST_RATE_TOO_HIGH', 'Request rate too high. Please retry after sometime.')
E('ERROR_FIREWALL', 'cannot access %s because your IP is blocked by a firewall, please make sure to run in an Adobe I/O Runtime action')

// eslint-disable-next-line jsdoc/require-jsdoc
function logAndThrow (e) {
Expand Down
10 changes: 10 additions & 0 deletions lib/constants.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ const HEADER_KEY_EXPIRES = 'x-key-expires-ms'
const REGEX_PATTERN_STORE_NAMESPACE = '^(development-)?([0-9]{3,10})-([a-z0-9]{1,20})(-([a-z0-9]{1,20}))?$'
// The regex for keys, allowed chars are alphanumerical with _ - .
const REGEX_PATTERN_STORE_KEY = `^[a-zA-Z0-9-_.]{1,${MAX_KEY_SIZE}}$`
// The regex for list key pattern, allowed chars are alphanumerical with _ - . and * for glob matching
const REGEX_PATTERN_LIST_KEY_MATCH = `^[a-zA-Z0-9-_.*]{1,${MAX_KEY_SIZE}}$`
const MAX_LIST_COUNT_HINT = 1000
const MIN_LIST_COUNT_HINT = 100

const REQUEST_ID_HEADER = 'x-request-id'

module.exports = {
ALLOWED_REGIONS,
Expand All @@ -56,6 +62,10 @@ module.exports = {
REGEX_PATTERN_STORE_NAMESPACE,
REGEX_PATTERN_STORE_KEY,
HEADER_KEY_EXPIRES,
REGEX_PATTERN_LIST_KEY_MATCH,
MAX_LIST_COUNT_HINT,
MIN_LIST_COUNT_HINT,
REQUEST_ID_HEADER,
// for testing only
ENDPOINT_PROD,
ENDPOINT_PROD_INTERNAL,
Expand Down
Loading

0 comments on commit fe75614

Please sign in to comment.