From 6ca34143b1f6a3a480003b84403af32e07dbd75e Mon Sep 17 00:00:00 2001 From: Baptiste Date: Mon, 13 Jan 2025 14:35:37 +0100 Subject: [PATCH] [IPSum ] Add connector (#3002) --- .circleci/config.yml | 12 ++ external-import/ipsum/.dockerignore | 5 + external-import/ipsum/Dockerfile | 19 ++ external-import/ipsum/README.md | 153 ++++++++++++++++ external-import/ipsum/docker-compose.yml | 37 ++++ external-import/ipsum/entrypoint.sh | 7 + .../src/external_import_connector/__init__.py | 3 + .../external_import_connector/client_api.py | 72 ++++++++ .../config_variables.py | 62 +++++++ .../external_import_connector/connector.py | 172 ++++++++++++++++++ .../converter_to_stix.py | 155 ++++++++++++++++ .../src/external_import_connector/utils.py | 60 ++++++ external-import/ipsum/src/main.py | 20 ++ external-import/ipsum/src/requirements.txt | 2 + 14 files changed, 779 insertions(+) create mode 100644 external-import/ipsum/.dockerignore create mode 100644 external-import/ipsum/Dockerfile create mode 100644 external-import/ipsum/README.md create mode 100644 external-import/ipsum/docker-compose.yml create mode 100644 external-import/ipsum/entrypoint.sh create mode 100644 external-import/ipsum/src/external_import_connector/__init__.py create mode 100644 external-import/ipsum/src/external_import_connector/client_api.py create mode 100644 external-import/ipsum/src/external_import_connector/config_variables.py create mode 100644 external-import/ipsum/src/external_import_connector/connector.py create mode 100644 external-import/ipsum/src/external_import_connector/converter_to_stix.py create mode 100644 external-import/ipsum/src/external_import_connector/utils.py create mode 100644 external-import/ipsum/src/main.py create mode 100644 external-import/ipsum/src/requirements.txt diff --git a/.circleci/config.yml b/.circleci/config.yml index 8d8e51a8cb..b52c7a393c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -550,6 +550,8 @@ jobs: docker push opencti/connector-attribution-tools:${CIRCLE_TAG} docker push opencti/connector-tenable-security-center:latest docker push opencti/connector-tenable-security-center:${CIRCLE_TAG} + docker push opencti/connector-ipsum:latest + docker push opencti/connector-ipsum:${CIRCLE_TAG} - slack/notify: event: fail template: basic_fail_1 @@ -1456,6 +1458,10 @@ jobs: working_directory: ~/opencti/external-import/tenable-security-center name: Build Docker image opencti/connector-tenable-security-center command: docker build -t opencti/connector-tenable-security-center:rolling . + - run: + working_directory: ~/opencti/external-import/ipsum + name: Build Docker image opencti/connector-ipsum + command: docker build -t opencti/connector-ipsum:rolling . - run: name: Publish Docker Image to Docker Hub command: | @@ -1535,6 +1541,7 @@ jobs: docker push opencti/connector-tenable-vuln-management:rolling docker push opencti/connector-attribution-tools:rolling docker push opencti/connector-tenable-security-center:rolling + docker push opencti/connector-ipsum:rolling - slack/notify: event: fail template: basic_fail_1 @@ -1673,6 +1680,10 @@ jobs: working_directory: ~/opencti/external-import/intel471 name: Build Docker image opencti/connector-intel471 command: docker build -t opencti/connector-intel471:rolling . + - run: + working_directory: ~/opencti/external-import/ipsum + name: Build Docker image opencti/connector-ipsum + command: docker build -t opencti/connector-ipsum:rolling . - run: working_directory: ~/opencti/external-import/orange-cyberdefense name: Build Docker image opencti/connector-orange-cyberdefense @@ -1897,6 +1908,7 @@ jobs: docker push opencti/connector-qradar:rolling docker push opencti/connector-stream-exporter:rolling docker push opencti/connector-stream-importer:rolling + docker push opencti/connector-ipsum:rolling - slack/notify: event: fail template: basic_fail_1 diff --git a/external-import/ipsum/.dockerignore b/external-import/ipsum/.dockerignore new file mode 100644 index 0000000000..f09c6aa9f6 --- /dev/null +++ b/external-import/ipsum/.dockerignore @@ -0,0 +1,5 @@ +src/config.yml +src/__pycache__ +src/logs +src/*.gql +src/.venv \ No newline at end of file diff --git a/external-import/ipsum/Dockerfile b/external-import/ipsum/Dockerfile new file mode 100644 index 0000000000..c9f8c79358 --- /dev/null +++ b/external-import/ipsum/Dockerfile @@ -0,0 +1,19 @@ +FROM python:3.12-alpine +ENV CONNECTOR_TYPE=EXTERNAL_IMPORT + +# Copy the connector +COPY src /opt/opencti-connector-ipsum + +# Install Python modules +# hadolint ignore=DL3003 +RUN apk update && apk upgrade && \ + apk --no-cache add git build-base libmagic libffi-dev libxml2-dev libxslt-dev + +RUN cd /opt/opencti-connector-ipsum && \ + pip3 install --no-cache-dir -r requirements.txt && \ + apk del git build-base + +# Expose and entrypoint +COPY entrypoint.sh / +RUN chmod +x /entrypoint.sh +ENTRYPOINT ["/entrypoint.sh"] diff --git a/external-import/ipsum/README.md b/external-import/ipsum/README.md new file mode 100644 index 0000000000..ec73252e6c --- /dev/null +++ b/external-import/ipsum/README.md @@ -0,0 +1,153 @@ +# OpenCTI External Ingestion Connector IPSUM + + + +Table of Contents + +- [OpenCTI External Ingestion Connector IPSUM](#opencti-external-ingestion-connector-ipsum) + - [Introduction](#introduction) + - [Installation](#installation) + - [Requirements](#requirements) + - [Configuration variables](#configuration-variables) + - [OpenCTI environment variables](#opencti-environment-variables) + - [Base connector environment variables](#base-connector-environment-variables) + - [Connector extra parameters environment variables](#connector-extra-parameters-environment-variables) + - [Deployment](#deployment) + - [Docker Deployment](#docker-deployment) + - [Manual Deployment](#manual-deployment) + - [Usage](#usage) + - [Behavior](#behavior) + - [Debugging](#debugging) + - [Additional information](#additional-information) + +## Introduction + +## Installation + +### Requirements + +- OpenCTI Platform >= 6... + +## Configuration variables + +There are a number of configuration options, which are set either in `docker-compose.yml` (for Docker) or +in `config.yml` (for manual deployment). + +### OpenCTI environment variables + +Below are the parameters you'll need to set for OpenCTI: + +| Parameter | config.yml | Docker environment variable | Mandatory | Description | +|---------------|------------|-----------------------------|-----------|------------------------------------------------------| +| OpenCTI URL | url | `OPENCTI_URL` | Yes | The URL of the OpenCTI platform. | +| OpenCTI Token | token | `OPENCTI_TOKEN` | Yes | The default admin token set in the OpenCTI platform. | + +### Base connector environment variables + +Below are the parameters you'll need to set for running the connector properly: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|-----------------|------------|-----------------------------|-----------------|-----------|------------------------------------------------------------------------------------------| +| Connector ID | id | `CONNECTOR_ID` | / | Yes | A unique `UUIDv4` identifier for this connector instance. | +| Connector Type | type | `CONNECTOR_TYPE` | EXTERNAL_IMPORT | Yes | Should always be set to `EXTERNAL_IMPORT` for this connector. | +| Connector Name | name | `CONNECTOR_NAME` | | Yes | Name of the connector. | +| Connector Scope | scope | `CONNECTOR_SCOPE` | ipsum | Yes | The scope or type of data the connector is importing, either a MIME type or Stix Object. | +| Log Level | log_level | `CONNECTOR_LOG_LEVEL` | error | Yes | Determines the verbosity of the logs. Options are `debug`, `info`, `warn`, or `error`. | + +### Connector extra parameters environment variables + +Below are the parameters you'll need to set for the connector: + +| Parameter | config.yml | Docker environment variable | Default | Mandatory | Description | +|--------------|--------------|-----------------------------|---------|-----------|-------------| +| API base URL | api_base_url | `CONNECTOR_IPSUM_API_BASE_URL` | `https://raw.githubusercontent.com/stamparm/ipsum/refs/heads/master/levels/5.txt` | Yes | You can choose between level 1 to level 8. 1 can have a lot of false positives, 8 has no false positive (Example: https://raw.githubusercontent.com/stamparm/ipsum/refs/heads/master/levels/8.txt - No false positive ) | +| API key | api_key | `CONNECTOR_IPSUM_API_KEY` | | No | Github API Key | +| Score | default_x_opencti_score | `CONNECTOR_IPSUM_DEFAULT_X_OPENCTI_SCORE` | 60 | No | | + +## Deployment + +### Docker Deployment + +Before building the Docker container, you need to set the version of pycti in `requirements.txt` equal to whatever +version of OpenCTI you're running. Example, `pycti==5.12.20`. If you don't, it will take the latest version, but +sometimes the OpenCTI SDK fails to initialize. + +Build a Docker Image using the provided `Dockerfile`. + +Example: + +```shell +# Replace the IMAGE NAME with the appropriate value +docker build . -t [IMAGE NAME]:latest +``` + +Make sure to replace the environment variables in `docker-compose.yml` with the appropriate configurations for your +environment. Then, start the docker container with the provided docker-compose.yml + +```shell +docker compose up -d +# -d for detached +``` + +### Manual Deployment + +Create a file `config.yml` based on the provided `config.yml.sample`. + +Replace the configuration variables (especially the "**ChangeMe**" variables) with the appropriate configurations for +you environment. + +Install the required python dependencies (preferably in a virtual environment): + +```shell +pip3 install -r requirements.txt +``` + +Then, start the connector from recorded-future/src: + +```shell +python3 main.py +``` + +## Usage + +After Installation, the connector should require minimal interaction to use, and should update automatically at a regular interval specified in your `docker-compose.yml` or `config.yml` in `duration_period`. + +However, if you would like to force an immediate download of a new batch of entities, navigate to: + +`Data management` -> `Ingestion` -> `Connectors` in the OpenCTI platform. + +Find the connector, and click on the refresh button to reset the connector's state and force a new +download of data by re-running the connector. + +## Behavior + + + +## Debugging + +The connector can be debugged by setting the appropiate log level. +Note that logging messages can be added using `self.helper.connector_logger,{LOG_LEVEL}("Sample message")`, i. +e., `self.helper.connector_logger.error("An error message")`. + + + +## Additional information + + diff --git a/external-import/ipsum/docker-compose.yml b/external-import/ipsum/docker-compose.yml new file mode 100644 index 0000000000..0073af0d0b --- /dev/null +++ b/external-import/ipsum/docker-compose.yml @@ -0,0 +1,37 @@ +version: '3' +services: + connector-ipsum: + image: opencti/connector-ipsum:6.2.4 + environment: + # Connector's generic execution parameters + - OPENCTI_URL=http://localhost + - OPENCTI_TOKEN=CHANGEME + # Connector's definition parameters REQUIRED + - CONNECTOR_ID=CHANGEME + - CONNECTOR_NAME=IPsum + - CONNECTOR_SCOPE=ipsum + - CONNECTOR_LOG_LEVEL=error + - CONNECTOR_DURATION_PERIOD=PT6H # ISO8601 format in String, start with 'P...' for Period + + # Connector's definition parameters OPTIONAL + # - CONNECTOR_QUEUE_THRESHOLD=500 # Default 500Mo, Float accepted + # - CONNECTOR_RUN_AND_TERMINATE=False # Default False, True run connector once + # - CONNECTOR_SEND_TO_QUEUE=True # Default True + # - CONNECTOR_SEND_TO_DIRECTORY=False # Default False + # - CONNECTOR_SEND_TO_DIRECTORY_PATH=CHANGEME # if CONNECTOR_SEND_TO_DIRECTORY is True, you must specify a path + # - CONNECTOR_SEND_TO_DIRECTORY_RETENTION=7 # Default 7, in days + + # Connector's custom execution parameters + - CONNECTOR_IPSUM_API_BASE_URL=https://raw.githubusercontent.com/stamparm/ipsum/refs/heads/master/levels/5.txt + - CONNECTOR_IPSUM_API_KEY="" + - CONNECTOR_IPSUM_DEFAULT_X_OPENCTI_SCORE=60 + + # Add proxy parameters below if needed + # - HTTP_PROXY=CHANGEME + # - HTTPS_PROXY=CHANGEME + # - NO_PROXY=CHANGEME + restart: always +networks: + default: + external: true + name: docker_default diff --git a/external-import/ipsum/entrypoint.sh b/external-import/ipsum/entrypoint.sh new file mode 100644 index 0000000000..f473218f44 --- /dev/null +++ b/external-import/ipsum/entrypoint.sh @@ -0,0 +1,7 @@ +#!/bin/sh + +# Go to the right directory +cd /opt/opencti-connector-ipsum + +# Launch the worker +python3 main.py \ No newline at end of file diff --git a/external-import/ipsum/src/external_import_connector/__init__.py b/external-import/ipsum/src/external_import_connector/__init__.py new file mode 100644 index 0000000000..88d3a30538 --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/__init__.py @@ -0,0 +1,3 @@ +from .connector import ConnectorIPSUM + +__all__ = ["ConnectorIPSUM"] diff --git a/external-import/ipsum/src/external_import_connector/client_api.py b/external-import/ipsum/src/external_import_connector/client_api.py new file mode 100644 index 0000000000..38374a37de --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/client_api.py @@ -0,0 +1,72 @@ +import requests + +from .utils import ( + is_cidr, + is_full_network, + is_private_cidr, + is_private_ip, + networkcidr_to_list, +) + + +class ConnectorClient: + def __init__(self, helper, config): + """ + Initialize the client with necessary configurations + """ + self.helper = helper + self.config = config + + headers = {"Bearer": self.config.api_key} + self.session = requests.Session() + if self.config.api_key and self.config.api_key != "": + self.session.headers.update(headers) + + def _request_data(self, api_url: str, params=None): + """ + Internal method to handle API requests + :return: Response in JSON format + """ + try: + response = self.session.get(api_url, params=params) + + self.helper.connector_logger.info( + "[API] HTTP Get Request to endpoint", {"url_path": api_url} + ) + + response.raise_for_status() + if response.ok: + return response + return None + + except requests.RequestException as err: + error_msg = "[API] Error while fetching data: " + self.helper.connector_logger.error( + error_msg, {"url_path": {api_url}, "error": {str(err)}} + ) + return None + + def get_entities(self, params=None) -> list: + """ + If params is None, retrieve all IPs in the Github Repository + :param params: Optional Params to filter what list to return + :return: A list of IPs + """ + ips = [] + try: + response = self._request_data(self.config.api_base_url, params=params) + if response is not None: + for line in response.text.splitlines(): + if not line.startswith("#"): + ip = line.strip() + if is_cidr(ip): + if is_full_network(ip) or is_private_cidr(ip): + continue + network_ips = networkcidr_to_list(ip) + ips.extend(network_ips) + else: + if not is_private_ip(ip): + ips.append(ip) + return ips + except Exception as err: + self.helper.connector_logger.error(err) diff --git a/external-import/ipsum/src/external_import_connector/config_variables.py b/external-import/ipsum/src/external_import_connector/config_variables.py new file mode 100644 index 0000000000..81ab7100b8 --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/config_variables.py @@ -0,0 +1,62 @@ +import os +from pathlib import Path + +import yaml +from pycti import get_config_variable + + +class ConfigConnector: + def __init__(self): + """ + Initialize the connector with necessary configurations + """ + self.load = self._load_config() + self._initialize_configurations() + + @staticmethod + def _load_config() -> dict: + """ + Load the configuration from the YAML file + :return: Configuration dictionary + """ + config_file_path = Path(__file__).parents[1].joinpath("config.yml") + config = ( + yaml.load(open(config_file_path), Loader=yaml.FullLoader) + if os.path.isfile(config_file_path) + else {} + ) + + return config + + def _initialize_configurations(self) -> None: + """ + Connector configuration variables + :return: None + """ + self.duration_period = get_config_variable( + "CONNECTOR_DURATION_PERIOD", + ["connector", "duration_period"], + self.load, + ) + + self.x_opencti_score = get_config_variable( + "CONNECTOR_IPSUM_DEFAULT_X_OPENCTI_SCORE", + ["connector_ipsum", "default_x_opencti_score"], + self.load, + default=60, + isNumber=True, + ) + + self.api_base_url = get_config_variable( + "CONNECTOR_IPSUM_API_BASE_URL", + ["connector_ipsum", "api_base_url"], + self.load, + default="https://raw.githubusercontent.com/stamparm/ipsum/refs/heads/master/levels/5.txt", + ) + + self.api_key = get_config_variable( + "CONNECTOR_IPSUM_API_KEY", + ["connector_ipsum", "api_key"], + self.load, + required=False, + ) diff --git a/external-import/ipsum/src/external_import_connector/connector.py b/external-import/ipsum/src/external_import_connector/connector.py new file mode 100644 index 0000000000..e913952bfc --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/connector.py @@ -0,0 +1,172 @@ +import sys +from datetime import datetime + +from pycti import OpenCTIConnectorHelper + +from .client_api import ConnectorClient +from .config_variables import ConfigConnector +from .converter_to_stix import ConverterToStix + + +class ConnectorIPSUM: + """ + Specifications of the external import connector + + This class encapsulates the main actions, expected to be run by any external import connector. + Note that the attributes defined below will be complemented per each connector type. + This type of connector aim to fetch external data to create STIX bundle and send it in a RabbitMQ queue. + The STIX bundle in the queue will be processed by the workers. + This type of connector uses the basic methods of the helper. + + --- + + Attributes + - `config (ConfigConnector())`: + Initialize the connector with necessary configuration environment variables + + - `helper (OpenCTIConnectorHelper(config))`: + This is the helper to use. + ALL connectors have to instantiate the connector helper with configurations. + Doing this will do a lot of operations behind the scene. + + - `converter_to_stix (ConnectorConverter(helper))`: + Provide methods for converting various types of input data into STIX 2.1 objects. + + --- + + Best practices + - `self.helper.api.work.initiate_work(...)` is used to initiate a new work + - `self.helper.schedule_iso()` is used to encapsulate the main process in a scheduler + - `self.helper.connector_logger.[info/debug/warning/error]` is used when logging a message + - `self.helper.stix2_create_bundle(stix_objects)` is used when creating a bundle + - `self.helper.send_stix2_bundle(stix_objects_bundle)` is used to send the bundle to RabbitMQ + - `self.helper.set_state()` is used to set state + + """ + + def __init__(self): + """ + Initialize the Connector with necessary configurations + """ + self.config = ConfigConnector() + self.helper = OpenCTIConnectorHelper(self.config.load) + self.client = ConnectorClient(self.helper, self.config) + self.converter_to_stix = ConverterToStix(self.helper, self.config) + + def _collect_intelligence(self) -> list: + """ + Collect intelligence from the source and convert into STIX object + :return: List of STIX objects + """ + stix_objects = [] + + entities = self.client.get_entities() + if entities is None: + return stix_objects + + stix_objects = [ + self.converter_to_stix.create_obs(entity) + for entity in entities + if entity is not None + ] + return stix_objects + + def process_message(self) -> None: + """ + Connector main process to collect intelligence + :return: None + """ + self.helper.connector_logger.info( + "[CONNECTOR] Starting connector...", + {"connector_name": self.helper.connect_name}, + ) + + try: + now = datetime.now() + current_timestamp = int(datetime.timestamp(now)) + current_state = self.helper.get_state() + + if current_state is not None and "last_run" in current_state: + last_run = current_state["last_run"] + + self.helper.connector_logger.info( + "[CONNECTOR] Connector last run", + {"last_run_datetime": last_run}, + ) + else: + self.helper.connector_logger.info( + "[CONNECTOR] Connector has never run..." + ) + + friendly_name = "Connector IPSum" + + work_id = self.helper.api.work.initiate_work( + self.helper.connect_id, friendly_name + ) + + self.helper.connector_logger.info( + "[CONNECTOR] Running connector...", + {"connector_name": self.helper.connect_name}, + ) + + stix_objects = self._collect_intelligence() + + if stix_objects is not None and len(stix_objects) != 0: + stix_objects_bundle = self.helper.stix2_create_bundle(stix_objects) + bundles_sent = self.helper.send_stix2_bundle( + stix_objects_bundle, work_id=work_id + ) + + self.helper.connector_logger.info( + "Sending STIX objects to OpenCTI...", + {"bundles_sent": {str(len(bundles_sent))}}, + ) + + self.helper.connector_logger.debug( + "Getting current state and update it with last run of the connector", + {"current_timestamp": current_timestamp}, + ) + current_state = self.helper.get_state() + current_state_datetime = now.strftime("%Y-%m-%d %H:%M:%S") + last_run_datetime = datetime.utcfromtimestamp(current_timestamp).strftime( + "%Y-%m-%d %H:%M:%S" + ) + if current_state: + current_state["last_run"] = current_state_datetime + else: + current_state = {"last_run": current_state_datetime} + self.helper.set_state(current_state) + + message = ( + f"{self.helper.connect_name} connector successfully run, storing last_run as " + + str(last_run_datetime) + ) + + self.helper.api.work.to_processed(work_id, message) + self.helper.connector_logger.info(message) + + except (KeyboardInterrupt, SystemExit): + self.helper.connector_logger.info( + "[CONNECTOR] Connector stopped...", + {"connector_name": self.helper.connect_name}, + ) + sys.exit(0) + except Exception as err: + self.helper.connector_logger.error(str(err)) + + def run(self) -> None: + """ + Run the main process encapsulated in a scheduler + It allows you to schedule the process to run at a certain intervals + This specific scheduler from the pycti connector helper will also check the queue size of a connector + If `CONNECTOR_QUEUE_THRESHOLD` is set, if the connector's queue size exceeds the queue threshold, + the connector's main process will not run until the queue is ingested and reduced sufficiently, + allowing it to restart during the next scheduler check. (default is 500MB) + It requires the `duration_period` connector variable in ISO-8601 standard format + Example: `CONNECTOR_DURATION_PERIOD=PT5M` => Will run the process every 5 minutes + :return: None + """ + self.helper.schedule_iso( + message_callback=self.process_message, + duration_period=self.config.duration_period, + ) diff --git a/external-import/ipsum/src/external_import_connector/converter_to_stix.py b/external-import/ipsum/src/external_import_connector/converter_to_stix.py new file mode 100644 index 0000000000..6ad3a30d3f --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/converter_to_stix.py @@ -0,0 +1,155 @@ +import ipaddress + +import stix2 +import validators +from pycti import Identity, StixCoreRelationship + + +class ConverterToStix: + """ + Provides methods for converting various types of input data into STIX 2.1 objects. + + REQUIREMENTS: + - generate_id() for each entity from OpenCTI pycti library except observables to create + """ + + def __init__(self, helper, config): + self.helper = helper + self.config = config + self.author = self.create_author() + self.external_reference = self.create_external_reference() + + @staticmethod + def create_external_reference() -> list: + """ + Create external reference + :return: External reference STIX2 list + """ + external_reference = stix2.ExternalReference( + source_name="External Source", + url="https://github.com/stamparm/ipsum/tree/master", + description="All lists are automatically retrieved and parsed on a daily (24h) basis and the final result is pushed to this repository", + ) + return [external_reference] + + @staticmethod + def create_author() -> dict: + """ + Create Author + :return: Author in Stix2 object + """ + author = stix2.Identity( + id=Identity.generate_id(name="IPsum", identity_class="organization"), + name="IPsum", + identity_class="organization", + description="IPsum is a threat intelligence feed based on 30+ different publicly available lists of suspicious and/or malicious IP addresses", + ) + return author + + def create_relationship( + self, source_id: str, relationship_type: str, target_id: str + ) -> dict: + """ + Creates Relationship object + :param source_id: ID of source in string + :param relationship_type: Relationship type in string + :param target_id: ID of target in string + :return: Relationship STIX2 object + """ + relationship = stix2.Relationship( + id=StixCoreRelationship.generate_id( + relationship_type, source_id, target_id + ), + relationship_type=relationship_type, + source_ref=source_id, + target_ref=target_id, + created_by_ref=self.author, + external_references=self.external_reference, + ) + return relationship + + # ===========================# + # Other Examples + # ===========================# + + @staticmethod + def _is_ipv6(value: str) -> bool: + """ + Determine whether the provided IP string is IPv6 + :param value: Value in string + :return: A boolean + """ + try: + ipaddress.IPv6Address(value) + return True + except ipaddress.AddressValueError: + return False + + @staticmethod + def _is_ipv4(value: str) -> bool: + """ + Determine whether the provided IP string is IPv4 + :param value: Value in string + :return: A boolean + """ + try: + ipaddress.IPv4Address(value) + return True + except ipaddress.AddressValueError: + return False + + @staticmethod + def _is_domain(value: str) -> bool: + """ + Valid domain name regex including internationalized domain name + :param value: Value in string + :return: A boolean + """ + is_valid_domain = validators.domain(value) + + if is_valid_domain: + return True + else: + return False + + def create_obs(self, value: str) -> dict: + """ + Create observable according to value given + :param value: Value in string + :return: Stix object for IPV4, IPV6 or Domain + """ + if self._is_ipv6(value) is True: + stix_ipv6_address = stix2.IPv6Address( + value=value, + custom_properties={ + "x_opencti_created_by_ref": self.author["id"], + "x_opencti_external_references": self.external_reference, + "x_opencti_score": self.config.x_opencti_score, + }, + ) + return stix_ipv6_address + elif self._is_ipv4(value) is True: + stix_ipv4_address = stix2.IPv4Address( + value=value, + custom_properties={ + "x_opencti_created_by_ref": self.author["id"], + "x_opencti_external_references": self.external_reference, + "x_opencti_score": self.config.x_opencti_score, + }, + ) + return stix_ipv4_address + elif self._is_domain(value) is True: + stix_domain_name = stix2.DomainName( + value=value, + custom_properties={ + "x_opencti_created_by_ref": self.author["id"], + "x_opencti_external_references": self.external_reference, + "x_opencti_score": self.config.x_opencti_score, + }, + ) + return stix_domain_name + else: + self.helper.connector_logger.error( + "This observable value is not a valid IPv4 or IPv6 address nor DomainName: ", + {"value": value}, + ) diff --git a/external-import/ipsum/src/external_import_connector/utils.py b/external-import/ipsum/src/external_import_connector/utils.py new file mode 100644 index 0000000000..10601c50fb --- /dev/null +++ b/external-import/ipsum/src/external_import_connector/utils.py @@ -0,0 +1,60 @@ +# Utilities: helper functions, classes, or modules that provide common, reusable functionality across a codebase +import ipaddress + + +@staticmethod +def is_private_ip(ip: str) -> bool: + """ + Check if the IP is a private IP + :param ip: IP address + :return: A boolean + """ + return ipaddress.ip_address(ip).is_private + + +@staticmethod +def is_private_cidr(cidr: str) -> bool: + """ + Check if the CIDR is a private network. + :param cidr: CIDR notation + :return: A boolean indicating if it is a private CIDR + """ + try: + network = ipaddress.ip_network(cidr, strict=False) + return network.is_private + except ValueError: + return False + + +@staticmethod +def is_full_network(ip): + """ + Check if the IP is a full network + :param ip: IP address + :return: A boolean + """ + return ip in ["0.0.0.0/0", "::/0"] + + +@staticmethod +def is_cidr(value): + """ + Check if the value is a CIDR + :param value: Value in string + :return: A boolean + """ + try: + ipaddress.ip_network(value) + return True + except ValueError: + return False + + +@staticmethod +def networkcidr_to_list(cidr): + """ + Convert CIDR to IP list + :param cidr: CIDR + :return: A list of IPs + """ + return [str(ip) for ip in ipaddress.ip_network(cidr).hosts()] diff --git a/external-import/ipsum/src/main.py b/external-import/ipsum/src/main.py new file mode 100644 index 0000000000..9cfa91aefa --- /dev/null +++ b/external-import/ipsum/src/main.py @@ -0,0 +1,20 @@ +import traceback + +from external_import_connector import ConnectorIPSUM + +if __name__ == "__main__": + """ + Entry point of the script + + - traceback.print_exc(): This function prints the traceback of the exception to the standard error (stderr). + The traceback includes information about the point in the program where the exception occurred, + which is very useful for debugging purposes. + - exit(1): effective way to terminate a Python program when an error is encountered. + It signals to the operating system and any calling processes that the program did not complete successfully. + """ + try: + connector = ConnectorIPSUM() + connector.run() + except Exception: + traceback.print_exc() + exit(1) diff --git a/external-import/ipsum/src/requirements.txt b/external-import/ipsum/src/requirements.txt new file mode 100644 index 0000000000..052f9c8dbc --- /dev/null +++ b/external-import/ipsum/src/requirements.txt @@ -0,0 +1,2 @@ +pycti==6.4.0 +validators==0.33.0 \ No newline at end of file