forked from pynamodb/PynamoDB
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathexceptions.py
133 lines (97 loc) · 3.29 KB
/
exceptions.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
"""
PynamoDB exceptions
"""
from typing import Any, Optional
import botocore.exceptions
class PynamoDBException(Exception):
"""
A common exception class
"""
def __init__(self, msg: Optional[str] = None, cause: Optional[Exception] = None) -> None:
self.msg = msg
self.cause = cause
super(PynamoDBException, self).__init__(self.msg)
@property
def cause_response_code(self) -> Optional[str]:
return getattr(self.cause, 'response', {}).get('Error', {}).get('Code')
@property
def cause_response_message(self) -> Optional[str]:
return getattr(self.cause, 'response', {}).get('Error', {}).get('Message')
class PynamoDBConnectionError(PynamoDBException):
"""
A base class for connection errors
"""
msg = "Connection Error"
class DeleteError(PynamoDBConnectionError):
"""
Raised when an error occurs deleting an item
"""
msg = "Error deleting item"
class QueryError(PynamoDBConnectionError):
"""
Raised when queries fail
"""
msg = "Error performing query"
class ScanError(PynamoDBConnectionError):
"""
Raised when a scan operation fails
"""
msg = "Error performing scan"
class PutError(PynamoDBConnectionError):
"""
Raised when an item fails to be created
"""
msg = "Error putting item"
class UpdateError(PynamoDBConnectionError):
"""
Raised when an item fails to be updated
"""
msg = "Error updating item"
class GetError(PynamoDBConnectionError):
"""
Raised when an item fails to be retrieved
"""
msg = "Error getting item"
class TableError(PynamoDBConnectionError):
"""
An error involving a dynamodb table operation
"""
msg = "Error performing a table operation"
class DoesNotExist(PynamoDBException):
"""
Raised when an item queried does not exist
"""
msg = "Item does not exist"
class TableDoesNotExist(PynamoDBException):
"""
Raised when an operation is attempted on a table that doesn't exist
"""
def __init__(self, table_name: str) -> None:
msg = "Table does not exist: `{}`".format(table_name)
super(TableDoesNotExist, self).__init__(msg)
class TransactWriteError(PynamoDBException):
"""
Raised when a TransactWrite operation fails
"""
pass
class TransactGetError(PynamoDBException):
"""
Raised when a TransactGet operation fails
"""
pass
class InvalidStateError(PynamoDBException):
"""
Raises when the internal state of an operation context is invalid
"""
msg = "Operation in invalid state"
class VerboseClientError(botocore.exceptions.ClientError):
def __init__(self, error_response: Any, operation_name: str, verbose_properties: Optional[Any] = None):
""" Modify the message template to include the desired verbose properties """
if not verbose_properties:
verbose_properties = {}
self.MSG_TEMPLATE = (
'An error occurred ({{error_code}}) on request ({request_id}) '
'on table ({table_name}) when calling the {{operation_name}} '
'operation: {{error_message}}'
).format(request_id=verbose_properties.get('request_id'), table_name=verbose_properties.get('table_name'))
super(VerboseClientError, self).__init__(error_response, operation_name)