Skip to content

Commit

Permalink
light: add testcase for transport(auto)
Browse files Browse the repository at this point in the history
Signed-off-by: Balazs Scheidler <[email protected]>
  • Loading branch information
bazsi committed Nov 11, 2024
1 parent 57e8313 commit dff7494
Showing 1 changed file with 66 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,49 +25,107 @@

from src.common.blocking import wait_until_true
from src.common.file import File
from src.common.file import copy_shared_file
from src.common.random_id import get_unique_id


def _test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, transport, input_messages, number_of_messages, expected_messages):
def _write_auto_config(config, syslog_ng, port_allocator, transport, testcase_parameters):
server_key_path = copy_shared_file(testcase_parameters, "server.key")
server_cert_path = copy_shared_file(testcase_parameters, "server.crt")

output_file = "output.log"

syslog_source = config.create_syslog_source(
ip="localhost",
port=port_allocator(),
keep_hostname="yes",
transport=transport,
tls={
"key-file": server_key_path,
"cert-file": server_cert_path,
"peer-verify": '"optional-untrusted"',
},
)
file_destination = config.create_file_destination(file_name=output_file)
config.create_logpath(statements=[syslog_source, file_destination])

syslog_ng.start(config)
return (syslog_source, file_destination)

def _test_auto_detect(syslog_source, file_destination, loggen, input_messages, number_of_messages, expected_messages, use_ssl=False, proxied=False, proxied_tls_passthrough=False):

loggen_input_file_path = Path("loggen_input_{}.txt".format(get_unique_id()))
loggen_input_file = File(loggen_input_file_path)
loggen_input_file.write_content_and_close(input_messages)
loggen.start(
syslog_source.options["ip"], syslog_source.options["port"],
number=number_of_messages,
number=number_of_messages+(1 if proxied else 0),
dont_parse=True,
read_file=str(loggen_input_file_path),
syslog_proto=True,
inet=True,
inet=None if use_ssl else True,
use_ssl=use_ssl,
proxied=1 if proxied else None,
proxied_tls_passthrough=proxied_tls_passthrough,
proxy_src_ip="1.1.1.1", proxy_dst_ip="2.2.2.2", proxy_src_port="3333", proxy_dst_port="4444",
)

wait_until_true(lambda: loggen.get_sent_message_count() == number_of_messages)

assert file_destination.read_log() == expected_messages

loggen.stop()


def test_auto_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
INPUT_MESSAGES = "53 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" * 10
NUMBER_OF_MESSAGES = 10
INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES
EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n"
(syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters)

_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES)

def test_auto_framing_tls(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
NUMBER_OF_MESSAGES = 10
_test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"auto"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES)
INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES
EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n"
(syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters)

_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES,
use_ssl=True)

def test_auto_no_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
INPUT_MESSAGES = "<2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\r\n" * 10
def test_auto_framing_tls_proxied(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
NUMBER_OF_MESSAGES = 10
INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES
EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n"
(syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters)

_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES,
use_ssl=True, proxied=True)

def test_auto_framing_tls_proxied_passthrough(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
NUMBER_OF_MESSAGES = 10
INPUT_MESSAGES = "52 <2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES
EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n"
(syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters)

_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES,
use_ssl=True, proxied=True, proxied_tls_passthrough=True)


def test_auto_no_framing(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters):
NUMBER_OF_MESSAGES = 10
_test_auto_detect(config, syslog_ng, syslog_ng_ctl, port_allocator, loggen, testcase_parameters, '"auto"', INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES)
INPUT_MESSAGES = "<2>Oct 11 22:14:15 myhostname sshd[1234]: message 0\n" * NUMBER_OF_MESSAGES
EXPECTED_MESSAGES = "Oct 11 22:14:15 myhostname sshd[1234]: message 0\n"

(syslog_source, file_destination) = _write_auto_config(config, syslog_ng, port_allocator, '"auto"', testcase_parameters)
_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES)

_test_auto_detect(syslog_source, file_destination, loggen,
INPUT_MESSAGES, NUMBER_OF_MESSAGES, EXPECTED_MESSAGES,
use_ssl=True)

0 comments on commit dff7494

Please sign in to comment.