-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathhc_mail.py
executable file
·136 lines (120 loc) · 4.35 KB
/
hc_mail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
#!/usr/bin/python3 -E
# health check mail delivery mail filter
#
# SPDX-FileCopyrightText: © 2020 Georg Sauthoff <[email protected]>
# SPDX-License-Identifier: GPL-3.0-or-later
import argparse
import email.message
import email.policy
# provided by Fedora package: python3-pynacl
import nacl.encoding
import nacl.signing
import nacl.exceptions
import os
import re
import smtplib
import sys
import time
result_header = 'x-hc-result'
signature_header = 'x-hc-signature'
loop_header = 'x-hc-loop'
hc_result_re = re.compile(result_header + ':', re.IGNORECASE)
hc_sig_re = re.compile(signature_header + ':', re.IGNORECASE)
hc_loop_re = re.compile(loop_header + ':', re.IGNORECASE)
def verify(vkey, delta, s):
try:
m = vkey.verify(s, encoder=nacl.encoding.HexEncoder)
except nacl.exceptions.BadSignatureError:
return False
now = int(time.time())
ts = int(m)
d = now - ts
if d < -1 or d > delta:
return False
return True
def process(f, vkey, delta):
state = 0
verified = False
loop = 1
for line in f:
if state == 0:
if line == '\n':
state = 1
print(f'{loop_header}: {loop}')
if verified:
print(f'{result_header}: ok')
print()
continue
elif hc_result_re.match(line):
continue
elif hc_loop_re.match(line):
loop += 1
continue
elif hc_sig_re.match(line):
_, v = line.split(maxsplit=2)
verified = verify(vkey, delta, v)
print(line, end='')
else:
pass
print(line, end='')
else:
print(line, end='')
def parse_args():
p = argparse.ArgumentParser(description='Mail filter and verify signature')
p.add_argument('--input', '-i', default='-',
help='input file (default: - for stdin)')
p.add_argument('--sign', action='store_true', help='sign current time of day')
p.add_argument('--mail', action='store_true', help='sign and mail')
p.add_argument('--to', help='mail recipient')
p.add_argument('--from', dest='frm', help='mail sender')
p.add_argument('--subject', default='1337 health check beacon', help='mail subject (default: %(default)s)')
p.add_argument('--mta', default='localhost', help='mail server to use with --mail (default: %(default)s)')
p.add_argument('key', metavar='HEXKEY', help='signature verification key (or gen-key/genkey for key generation - or env for reading key from hc_mail_key environment variable)')
p.add_argument('--delta', '-d', type=int, default=10,
help='timestamp delta window in seconds during verification (default: %(default)d')
args = p.parse_args()
if args.key == 'env':
args.key = os.getenv('hc_mail_key')
if not args.key:
raise RuntimeError('no key in hc_mail_key environment variable')
return args
def gen_key():
skey = nacl.signing.SigningKey.generate()
vkey = skey.verify_key
print(skey.encode(nacl.encoding.HexEncoder).decode())
print(vkey.encode(nacl.encoding.HexEncoder).decode())
def sign(key):
skey = nacl.signing.SigningKey(key, encoder=nacl.encoding.HexEncoder)
m = skey.sign(str(int(time.time())).encode(), encoder=nacl.encoding.HexEncoder)
return m.decode()
def mail(frm, to, subject, signature, host):
# to simplify things we don't want our signature_header to get folded,
# thus the policy change
p = email.policy.EmailPolicy().clone(max_line_length=200)
m = email.message.EmailMessage(policy=p)
m.set_content('health check beacon')
m['from'] = frm
m['to'] = to
m['subject'] = subject
m[signature_header] = signature
with smtplib.SMTP(host) as mta:
mta.send_message(m)
def main():
args = parse_args()
if args.key in ('gen-key', 'genkey'):
gen_key()
return
if args.sign:
print(sign(args.key))
return
if args.mail:
mail(args.frm, args.to, args.subject, sign(args.key), args.mta)
return
if args.input == '-':
f = sys.stdin
else:
f = open(args.input)
vkey = nacl.signing.VerifyKey(args.key, encoder=nacl.encoding.HexEncoder)
process(f, vkey, args.delta)
if __name__ == '__main__':
sys.exit(main())