From ca258ade6ca16d72a40741fdf22bc8b9027ffbde Mon Sep 17 00:00:00 2001 From: Nicholas Hairs Date: Fri, 18 Oct 2024 13:15:51 +1100 Subject: [PATCH] [sink.splunk] Add Splunk HEC Sink (#43) Fixes: #27 ### Test Plan Not fully tested, linted and unit tests only. --- docs/migrating.md | 7 ++- src/parsedmarc/sink/splunk.py | 77 ++++++++++++++++++++++++++++++++ src/parsedmarc/splunk.py | 1 + tests/test_source_sink_config.py | 9 ++++ 4 files changed, 93 insertions(+), 1 deletion(-) create mode 100644 src/parsedmarc/sink/splunk.py diff --git a/docs/migrating.md b/docs/migrating.md index 1753ba3..1a7a751 100644 --- a/docs/migrating.md +++ b/docs/migrating.md @@ -199,7 +199,12 @@ Not supported - [GitHub issue #6](https://github.com/nhairs/parsedmarc-fork/issu ### `splunk_hec` -Not supported - [GitHub Issue #27](https://github.com/nhairs/parsedmarc-fork/issues/27). +Use a `.splunk:Splunk` Sink. + +- `url`: moved to `client.url`. +- `token`: moved to `client.token`. +- `index`: no changes. +- `skip_certification_verification`: moved to `client.verify_ssl`. ### `kafka` diff --git a/src/parsedmarc/sink/splunk.py b/src/parsedmarc/sink/splunk.py new file mode 100644 index 0000000..c16e906 --- /dev/null +++ b/src/parsedmarc/sink/splunk.py @@ -0,0 +1,77 @@ +### IMPORTS +### ============================================================================ +# Future +from __future__ import annotations + +# Installed +from pydantic import BaseModel + +# Local +from ..const import AppState +from ..report import AggregateReport, ForensicReport +from ..splunk import HECClient +from .base import BaseConfig, Sink + + +### CLASSES +### ============================================================================ +class Splunk(Sink): + """Sink that stores reports using the SPlunk HTTP Events Collector (HEC) + + References: + + - http://docs.splunk.com/Documentation/Splunk/latest/Data/AboutHEC + - http://docs.splunk.com/Documentation/Splunk/latest/RESTREF/RESTinput#services.2Fcollector + + + *New in 9.0*. + """ + + config: SplunkConfig + + def setup(self) -> None: + if self._state != AppState.SHUTDOWN: + raise RuntimeError("Sink is already running") + self._state = AppState.SETTING_UP + + try: + self.client = HECClient( + url=self.config.client.url, + access_token=self.config.client.access_token, + verify=self.config.client.verify_ssl, + timeout=self.config.client.timeout, + index=self.config.index, + source=self.config.source, + ) + + except: + self._state = AppState.SETUP_ERROR + raise + + self._state = AppState.RUNNING + return + + def process_aggregate_report(self, report: AggregateReport) -> None: + self.client.save_aggregate_reports_to_splunk(report) + return + + def process_forensic_report(self, report: ForensicReport) -> None: + self.client.save_forensic_reports_to_splunk(report) + return + + +class SplunkConfig(BaseConfig): + """Splunk Config""" + + client: SplunkClientConfig + index: str + source: str = "parsedmarc" + + +class SplunkClientConfig(BaseModel): + """Splunk Client Config""" + + url: str + access_token: str + verify_ssl: bool = True + timeout: int = 60 diff --git a/src/parsedmarc/splunk.py b/src/parsedmarc/splunk.py index 9ed3cb4..1ae70c5 100644 --- a/src/parsedmarc/splunk.py +++ b/src/parsedmarc/splunk.py @@ -55,6 +55,7 @@ def __init__( """ parsed = urlparse(url) self.url = f"{parsed.scheme}://{parsed.netloc}/services/collector/event/1.0" + # TODO: this should probably be "remove_prefix", strip may eat more than intended. self.access_token = access_token.lstrip("Splunk ") self.index = index self.host = socket.getfqdn() diff --git a/tests/test_source_sink_config.py b/tests/test_source_sink_config.py index 66c9888..f960aba 100644 --- a/tests/test_source_sink_config.py +++ b/tests/test_source_sink_config.py @@ -13,6 +13,7 @@ from parsedmarc.parser import ReportParser from parsedmarc.sink.base import Sink import parsedmarc.sink.elasticsearch +import parsedmarc.sink.splunk import parsedmarc.sink.util import parsedmarc.sink.webhook import parsedmarc.source.aws @@ -98,6 +99,14 @@ def test_source_init(class_: Type[Source], config: Dict[str, Any]): [ # ElasticSearch (parsedmarc.sink.elasticsearch.Elasticsearch, {"client": {"hosts": "foo"}}), + # Splunk + ( + parsedmarc.sink.splunk.Splunk, + { + "client": {"url": "https://example.org", "access_token": "Splunk asdfsadf"}, + "index": "some_index", + }, + ), # Util (parsedmarc.sink.util.Noop, {}), (parsedmarc.sink.util.Console, {}),