forked from pynamodb/PynamoDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathpagination.py
223 lines (175 loc) · 7.34 KB
/
pagination.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
import time
from typing import Any, Callable, Dict, Iterable, Iterator, TypeVar, Optional
from pynamodb.constants import (CAMEL_COUNT, ITEMS, LAST_EVALUATED_KEY, SCANNED_COUNT,
CONSUMED_CAPACITY, TOTAL, CAPACITY_UNITS)
_T = TypeVar('_T')
class RateLimiter:
"""
RateLimiter limits operations to a pre-set rate of units/seconds
Example:
Initialize a RateLimiter with the desired rate
rate_limiter = RateLimiter(rate_limit)
Now, every time before calling an operation, call acquire()
rate_limiter.acquire()
And after an operation, update the number of units consumed
rate_limiter.consume(units)
"""
def __init__(self, rate_limit: float, time_module: Optional[Any] = None) -> None:
"""
Initializes a RateLimiter object
:param rate_limit: The desired rate
:param time_module: Optional: the module responsible for calculating time. Intended to be used for testing purposes.
"""
if rate_limit <= 0:
raise ValueError("rate_limit must be greater than zero")
self._rate_limit = rate_limit
self._consumed = 0
self._time_of_last_acquire = 0.0
self._time_module: Any = time_module or time
def consume(self, units: int) -> None:
"""
Records the amount of units consumed.
:param units: Number of units consumed
:return: None
"""
self._consumed += units
def acquire(self) -> None:
"""
Sleeps the appropriate amount of time to follow the rate limit restriction
:return: None
"""
self._time_module.sleep(max(0, self._consumed/float(self.rate_limit) - (self._time_module.time()-self._time_of_last_acquire)))
self._consumed = 0
self._time_of_last_acquire = self._time_module.time()
@property
def rate_limit(self) -> float:
"""
A limit of units per seconds
"""
return self._rate_limit
@rate_limit.setter
def rate_limit(self, rate_limit: float):
if rate_limit <= 0:
raise ValueError("rate_limit must be greater than zero")
self._rate_limit = rate_limit
class PageIterator(Iterator[_T]):
"""
PageIterator handles Query and Scan result pagination.
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Pagination
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
"""
def __init__(
self,
operation: Callable,
args: Any,
kwargs: Dict[str, Any],
rate_limit: Optional[float] = None,
) -> None:
self._operation = operation
self._args = args
self._kwargs = kwargs
self._first_iteration = True
self._last_evaluated_key = kwargs.get('exclusive_start_key')
self._total_scanned_count = 0
self._rate_limiter = None
if rate_limit:
self._rate_limiter = RateLimiter(rate_limit)
def __iter__(self) -> Iterator[_T]:
return self
def __next__(self) -> _T:
if self._last_evaluated_key is None and not self._first_iteration:
raise StopIteration()
self._first_iteration = False
self._kwargs['exclusive_start_key'] = self._last_evaluated_key
if self._rate_limiter:
self._rate_limiter.acquire()
self._kwargs['return_consumed_capacity'] = TOTAL
page = self._operation(*self._args, **self._kwargs)
self._last_evaluated_key = page.get(LAST_EVALUATED_KEY)
self._total_scanned_count += page[SCANNED_COUNT]
if self._rate_limiter:
consumed_capacity = page.get(CONSUMED_CAPACITY, {}).get(CAPACITY_UNITS, 0)
self._rate_limiter.consume(consumed_capacity)
return page
def next(self) -> _T:
return self.__next__()
@property
def key_names(self) -> Iterable[str]:
# If the current page has a last_evaluated_key, use it to determine key attributes
if self._last_evaluated_key:
return self._last_evaluated_key.keys()
# Use the table meta data to determine the key attributes
table_meta = self._operation.__self__.get_meta_table() # type: ignore
return table_meta.get_key_names(self._kwargs.get('index_name'))
@property
def page_size(self) -> Optional[int]:
return self._kwargs.get('limit')
@page_size.setter
def page_size(self, page_size: int) -> None:
self._kwargs['limit'] = page_size
@property
def last_evaluated_key(self) -> Optional[Dict[str, Dict[str, Any]]]:
return self._last_evaluated_key
@property
def total_scanned_count(self) -> int:
return self._total_scanned_count
class ResultIterator(Iterator[_T]):
"""
ResultIterator handles Query and Scan item pagination.
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Query.html#Query.Pagination
http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Scan.html#Scan.Pagination
"""
def __init__(
self,
operation: Callable,
args: Any,
kwargs: Dict[str, Any],
map_fn: Optional[Callable] = None,
limit: Optional[int] = None,
rate_limit: Optional[float] = None,
) -> None:
self.page_iter: PageIterator = PageIterator(operation, args, kwargs, rate_limit)
self._first_iteration = True
self._map_fn = map_fn
self._limit = limit
self._total_count = 0
def _get_next_page(self) -> None:
page = next(self.page_iter)
self._count = page[CAMEL_COUNT]
self._items = page.get(ITEMS) # not returned if 'Select' is set to 'COUNT'
self._index = 0 if self._items else self._count
self._total_count += self._count
def __iter__(self) -> Iterator[_T]:
return self
def __next__(self) -> _T:
if self._limit == 0:
raise StopIteration
if self._first_iteration:
self._first_iteration = False
self._get_next_page()
while self._index == self._count:
self._get_next_page()
item = self._items[self._index]
self._index += 1
if self._limit is not None:
self._limit -= 1
if self._map_fn:
item = self._map_fn(item)
return item
def next(self) -> _T:
return self.__next__()
@property
def last_evaluated_key(self) -> Optional[Dict[str, Dict[str, Any]]]:
if self._first_iteration or self._index == self._count:
# Not started iterating yet: return `exclusive_start_key` if set, otherwise expect None; or,
# Entire page has been consumed: last_evaluated_key is whatever DynamoDB returned
# It may correspond to the current item, or it may correspond to an item evaluated but not returned.
return self.page_iter.last_evaluated_key
# In the middle of a page of results: reconstruct a last_evaluated_key from the current item
# The operation should be resumed starting at the last item returned, not the last item evaluated.
# This can occur if the 'limit' is reached in the middle of a page.
item = self._items[self._index - 1]
return {key: item[key] for key in self.page_iter.key_names}
@property
def total_count(self) -> int:
return self._total_count