forked from astronomer/2-9-example-dags
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathcustom_operators.py
82 lines (75 loc) · 2.9 KB
/
custom_operators.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
from airflow.models.baseoperator import BaseOperator
class MyBasicMathOperator(BaseOperator):
"""
Example Operator that does basic arithmetic.
:param first_number: first number to put into an equation
:param second_number: second number to put into an equation
:param operation: mathematical operation to perform
"""
# provide a list of valid operations
valid_operations = ("+", "-", "*", "/")
# define which fields can use Jinja templating
template_fields = ("first_number", "second_number")
def __init__(
self,
first_number: float,
second_number: float,
operation: str,
*args,
**kwargs,
):
super().__init__(*args, **kwargs)
self.first_number = first_number
self.second_number = second_number
self.operation = operation
# raise an import error if the operation provided is not valid
if self.operation not in self.valid_operations:
raise ValueError(
f"{self.operation} is not a valid operation. Choose one of {self.valid_operations}"
)
def execute(self, context):
self.log.info(
f"Equation: {self.first_number} {self.operation} {self.second_number}"
)
if self.operation == "+":
res = self.first_number + self.second_number
self.log.info(f"Result: {res}")
return {
"res": res,
"first_number": self.first_number,
"second_number": self.second_number,
"operation": self.operation,
}
if self.operation == "-":
res = self.first_number - self.second_number
self.log.info(f"Result: {res}")
return {
"res": res,
"first_number": self.first_number,
"second_number": self.second_number,
"operation": self.operation,
}
if self.operation == "*":
res = self.first_number * self.second_number
self.log.info(f"Result: {res}")
return {
"res": res,
"first_number": self.first_number,
"second_number": self.second_number,
"operation": self.operation,
}
if self.operation == "/":
try:
res = self.first_number / self.second_number
except ZeroDivisionError as err:
self.log.critical(
"If you have set up an equation where you are trying to divide by zero, you have done something WRONG. - Randall Munroe, 2006"
)
raise ZeroDivisionError
self.log.info(f"Result: {res}")
return {
"res": res,
"first_number": self.first_number,
"second_number": self.second_number,
"operation": self.operation,
}