Skip to content

Commit

Permalink
Add Pagination to DynamoDB Query Calls (#697)
Browse files Browse the repository at this point in the history
* Add Pagination to DynamoDB Query Calls

Also setting explicit pagination limit.

* Remove Unnecessary newlines

* Refactor pagination loops
  • Loading branch information
kmg-stripe authored Jul 30, 2024
1 parent d265bfb commit ef3ad82
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 32 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,17 @@

@Slf4j
public class DynamoDBStore implements IKeyValueStore {
// Helper class to track pagination with DynamoDB queries
// lastEventuatedKey is used as a pagination cursor
private class DynamoPaginationResult<T> {
public T result;
public Map<String, AttributeValue> lastEvaluatedKey;

DynamoPaginationResult(T result, Map<String, AttributeValue> lastEvaluatedKey) {
this.result = result;
this.lastEvaluatedKey = lastEvaluatedKey;
}
}

public static final String PK = "PK";
public static final String SK = "SK";
Expand All @@ -48,6 +59,7 @@ public class DynamoDBStore implements IKeyValueStore {
private static final String MPK_E = "#MPK";

private static final int MAX_ITEMS = 25;
public static final int QUERY_LIMIT = 100;

private final String mantisTable;
private final DynamoDbClient client;
Expand All @@ -65,6 +77,32 @@ public DynamoDBStore(DynamoDbClient client, String tableName ) {
this.client = client;
this.mantisTable = tableName;
}

private DynamoPaginationResult<List<String>> _getAllPartitionKeys(String tableName, Map<String, AttributeValue> lastEvaluatedKey) {
Map<String, String> expressionAttributesNames = new HashMap<>();
expressionAttributesNames.put(PK_E, PK);
expressionAttributesNames.put(MPK_E, PARTITION_KEY);
Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build());

QueryRequest.Builder builder = QueryRequest.builder()
.tableName(this.mantisTable)
.keyConditionExpression(String.format("%s = %s", PK_E, PK_V))
.expressionAttributeNames(expressionAttributesNames)
.expressionAttributeValues(expressionAttributeValues)
.projectionExpression(MPK_E).limit(QUERY_LIMIT);

if(lastEvaluatedKey != null) {
builder = builder.exclusiveStartKey(lastEvaluatedKey);
}
final QueryRequest request = builder.build();

final QueryResponse response = this.client.query(request);
final Map<String, String> pks = new HashMap<>();
response.items().forEach(v -> pks.put(v.get(PARTITION_KEY).s(), ""));
return new DynamoPaginationResult<>(new ArrayList<>(pks.keySet()), response.lastEvaluatedKey());
}

/**
* Gets all partition keys from the table.
* This could be beneficial to call instead of getAllRows
Expand All @@ -78,26 +116,45 @@ public DynamoDBStore(DynamoDbClient client, String tableName ) {
* @return list of all partition keys
*/
@Override
public List<String> getAllPartitionKeys(String tableName) throws IOException {
public List<String> getAllPartitionKeys(String tableName) {
final List<String> results = new ArrayList<>();
Map<String, AttributeValue> lastEvaluatedKey = null;

while(true) {
DynamoPaginationResult<List<String>> result = this._getAllPartitionKeys(tableName, lastEvaluatedKey);
results.addAll(result.result);
if (!result.lastEvaluatedKey.isEmpty()) {
log.info("partial result for all partition keys query, left off at partitionKey={} of table={}", result.lastEvaluatedKey.get("SK").s(), tableName);
lastEvaluatedKey = result.lastEvaluatedKey;
} else {
break;
}
}
log.info("found {} items when querying for all partition keys in table={}", results.size(), tableName);
return results;
}

private DynamoPaginationResult<List<Map<String, AttributeValue>>> _getAll(String tableName, String partitionKey, Map<String, AttributeValue> lastEvaluatedKey) {
Map<String, String> expressionAttributesNames = new HashMap<>();
expressionAttributesNames.put(PK_E, PK);
expressionAttributesNames.put(MPK_E, PARTITION_KEY);
expressionAttributesNames.put(SK_E, SK);
Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build());
expressionAttributeValues.put(SK_V, AttributeValue.builder().s(String.format("%s#", partitionKey)).build());

final QueryRequest request = QueryRequest.builder()
.tableName(this.mantisTable)
.keyConditionExpression(String.format("%s = %s", PK_E, PK_V))
.expressionAttributeNames(expressionAttributesNames)
.expressionAttributeValues(expressionAttributeValues)
.projectionExpression(MPK_E)
.build();
QueryRequest.Builder builder = QueryRequest.builder()
.tableName(this.mantisTable)
.keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V))
.expressionAttributeNames(expressionAttributesNames)
.expressionAttributeValues(expressionAttributeValues).limit(QUERY_LIMIT);

if(lastEvaluatedKey != null) {
builder = builder.exclusiveStartKey(lastEvaluatedKey);
}
final QueryRequest request = builder.build();

log.info("querying for all partition keys in table {}", tableName);
final QueryResponse response = this.client.query(request);
final Map<String, String> pks = new HashMap<>();
response.items().forEach(v -> pks.put(v.get(PARTITION_KEY).s(), ""));
return new ArrayList<>(pks.keySet());
return new DynamoPaginationResult<>(response.items(), response.lastEvaluatedKey());
}

/**
Expand All @@ -109,26 +166,20 @@ public List<String> getAllPartitionKeys(String tableName) throws IOException {
*/
@Override
public Map<String, String> getAll(String tableName, String partitionKey) throws IOException {

Map<String, String> expressionAttributesNames = new HashMap<>();
expressionAttributesNames.put(PK_E, PK);
expressionAttributesNames.put(SK_E, SK);
Map<String, AttributeValue> expressionAttributeValues = new HashMap<>();
expressionAttributeValues.put(PK_V, AttributeValue.builder().s(tableName).build());
expressionAttributeValues.put(SK_V, AttributeValue.builder().s(String.format("%s#", partitionKey)).build());

final QueryRequest request = QueryRequest.builder()
.tableName(this.mantisTable)
.keyConditionExpression(String.format("%s = %s and begins_with(%s, %s)", PK_E, PK_V, SK_E, SK_V))
.expressionAttributeNames(expressionAttributesNames)
.expressionAttributeValues(expressionAttributeValues)
.build();

log.info("querying for all items in partition {} in table {}", partitionKey, tableName);
final QueryResponse response = this.client.query(request);
final Map<String, String> items = new HashMap<>();
response.items()
.forEach(v -> items.put(v.get(SECONDARY_KEY).s(), v.get(DATA_KEY).s()));
Map<String, AttributeValue> lastEvaluatedKey = null;
while(true) {
DynamoPaginationResult<List<Map<String, AttributeValue>>> result = this._getAll(tableName, partitionKey, lastEvaluatedKey);
result.result.forEach(v -> items.put(v.get(SECONDARY_KEY).s(), v.get(DATA_KEY).s()));
if (!result.lastEvaluatedKey.isEmpty()) {
log.info("partial result for get all query, left off at SK={} of table={}", result.lastEvaluatedKey.get("SK").s(), tableName);
lastEvaluatedKey = result.lastEvaluatedKey;
} else {
break;
}
}

log.info("found {} items when querying for all items in partition {} in table {}", items.size(), partitionKey, tableName);
return items;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,40 @@ public void testInsertAndGetAllMoreThan25() throws Exception {
assertEquals(skData1.size(), itemsPK1.size());
final Map<String, String> itemsPK3 = store.getAll(table, pks.get(2));
assertEquals(skData3.size(), itemsPK3.size());
}

@Test
public void testInsertAndGetAllMoreThanLimit() throws Exception {
IKeyValueStore store = new DynamoDBStore(client, table);
int numRows = DynamoDBStore.QUERY_LIMIT * 2 + 1;
final List<String> pks = makePKs(1);
final Map<String, String> skData1 = new HashMap<>();
for (int i = 0; i < numRows; i++) {
skData1.put(String.valueOf(i), V1);
}
assertTrue(store.upsertAll(table, pks.get(0), skData1));
final Map<String, String> itemsPK1 = store.getAll(table, pks.get(0));
assertEquals(skData1.size(), itemsPK1.size());
}

@Test
public void testUpsertAndGetAllPkMoreThanLimit() throws Exception {
IKeyValueStore store = new DynamoDBStore(client, table);
int numRows = DynamoDBStore.QUERY_LIMIT * 2 + 1;
final List<String> pks = new ArrayList<>();
for (int i = 0; i < numRows; i++) {
pks.add(UUID.randomUUID().toString());
}
Collections.sort(pks);
final Map<String, String> skData1 = new HashMap<>();
for(int i=0; i< numRows; i++) {
store.upsert(table, pks.get(i), String.valueOf(i), V1);
}
final List<String> allPKs = store.getAllPartitionKeys(table);
Collections.sort(allPKs);
assertEquals(pks,allPKs);
}

private List<String> makePKs(int num) {
final List<String> pks = new ArrayList<>();
for(int i = 0; i<3; i++) {
Expand Down

0 comments on commit ef3ad82

Please sign in to comment.