Skip to content

Commit

Permalink
Add the support of more entity types than just observables returned b…
Browse files Browse the repository at this point in the history
…y Threatr
  • Loading branch information
U039b committed Sep 21, 2024
1 parent 946125d commit 8e9d12b
Show file tree
Hide file tree
Showing 12 changed files with 564 additions and 202 deletions.
42 changes: 37 additions & 5 deletions colander/core/forms.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from django.core.exceptions import ValidationError
from django.utils.translation import gettext_lazy as _

from colander.core.models import Case, Comment, DetectionRule, ObservableType
from colander.core.models import Case, Comment, DetectionRule
from colander.core.threatr import ThreatrClient


class CaseForm(forms.ModelForm):
Expand Down Expand Up @@ -70,14 +71,45 @@ def set_user(self, connected_user):
self.instance.owner = connected_user


def _get_types_for_investigation_form():
return [(t.short_name, t.name) for t in ObservableType.objects.all()]

class InvestigateSearchForm(forms.Form):
type = forms.ChoiceField(choices=_get_types_for_investigation_form)
# Corresponds to the entity super type such as observable
super_type = forms.ChoiceField()
# Corresponds to the entity type such as ipv4
type = forms.ChoiceField()
value = forms.CharField(max_length=128)
force_update = forms.BooleanField(required=False, label='Update results from vendors.')

def __init__(self, *args, **kwargs):
super(InvestigateSearchForm, self).__init__(*args, **kwargs)
self.threatr_client = ThreatrClient()
self.threatr_supported_types = self.threatr_client.get_supported_types()
self.fields['super_type'].choices = [
(t.get('name').upper(), t.get('name'))
for t in self.threatr_supported_types.get('super_types')
]
available_types = []
for _, m in self.threatr_supported_types.get('types').items():
for t in m:
available_types.append((t.get('id').upper(), t.get('label')))
self.fields['type'].choices = available_types

def clean(self):
cleaned_data = super().clean()
selected_model = cleaned_data.get('super_type')
selected_type = cleaned_data.get('type')
if selected_model not in self.threatr_supported_types.get('types'):
raise ValidationError(
_("Invalid model selected"), code="invalid"
)
model_types = self.threatr_supported_types.get('types').get(selected_model, None)
if selected_type not in [
t.get('id')
for t in model_types
]:
raise ValidationError(
_("Invalid types combination"), code="invalid"
)



class DocumentationForm(forms.Form):
Expand Down
8 changes: 6 additions & 2 deletions colander/core/rest/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
Observable,
ObservableType,
Threat,
ThreatType,
ThreatType, Device, DeviceType,
)
from colander.core.rest.serializers import DetailedEntitySerializer

Expand Down Expand Up @@ -104,6 +104,11 @@ def get_threatr_entity_type(entity):
return Observable, ObservableType.objects.get(short_name=entity['type']['short_name'])
except Exception:
return None, None
if entity['super_type']['short_name'] == 'DEVICE':
try:
return Device, DeviceType.objects.get(short_name=entity['type']['short_name'])
except Exception:
return None, None
if entity['super_type']['short_name'] == 'THREAT':
try:
return Threat, ThreatType.objects.get(short_name=entity['type']['short_name'])
Expand Down Expand Up @@ -224,7 +229,6 @@ def update_or_create_entity_relation(obj_from, obj_to, relation, case: Case, own
def import_entity_from_threatr(request):
if request.method == 'POST':
data = json.loads(request.body)
print(data)
case_id = data.get('case_id', None)
root = data.get('root', None)
entity = data.get('entity', None)
Expand Down
119 changes: 119 additions & 0 deletions colander/core/threatr.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
import logging

import requests
from django.conf import settings

from colander.core.models import BackendCredentials

logger = logging.getLogger(__name__)


class ThreatrClient:
backend_identifier = 'threatr'
url = settings.THREATR_BASE_URL

def __init__(self):
self.api_key = ''
self.types = []
self.supported_types = []
self.__load_credentials()

def is_correctly_configured(self):
try:
response = requests.head(f'{self.url}/api/schema/', headers=self.__get_headers(), timeout=10)
return response.status_code == 200
except requests.exceptions.RequestException as e:
return False

def is_online(self):
try:
requests.head(f'{self.url}/api/schema/', headers=self.__get_headers(), timeout=10)
return True
except requests.exceptions.RequestException:
return False

def __get_headers(self):
"""
Construct the authentication headers.
:return: the authentication headers
"""
return {'Authorization': f'Token {self.api_key}'}

def __load_credentials(self):
"""
Load the authentication credentials from the database.
"""
credentials = BackendCredentials.objects.filter(backend=ThreatrClient.backend_identifier)
if credentials:
credentials = credentials.first()
self.api_key = credentials.credentials.get('api_key')

def get_types(self):
"""
Get all the types defined in Threatr models.
:return: all the types defined in Threatr models
"""
if self.types:
return self.types
response = requests.get(f'{self.url}/api/types/', headers=self.__get_headers())
if response.status_code < 300:
self.types = response.json()
return self.types

def get_supported_types(self):
"""
Get all the entity types supported by the modules available on Threatr.
The data returned has the following structure:
{
"models": [
{"name": "Observable"}
],
"types": {
"Observable": [
{"id": "CVE", "label": "CVE", "name": "CVE"},
{"id": "DNS_RECORD", "label": "DNS record", "name": "DNS record"},
{"id": "DOMAIN", "label": "Domain", "name": "Domain"}
]
}
}
This structure is directly compatible with the dynamic type selector used by both
the investigate workspace and the quick entity creation form.
:return: the entity types supported by the modules available on Threatr
"""
if self.supported_types:
return self.supported_types
response = requests.get(f'{self.url}/api/types/supported/', headers=self.__get_headers())
if response.status_code < 300:
self.supported_types = response.json()
return self.supported_types

def send_request(self, data) -> (list, bool):
"""
Send the request to Threatr. If Threatr returns a status code equals to 201,
this means the client has to come back later.
If the status code is equal to 200, we are ready to return the result to the client.
The data sent to Threatr must follow this structure:
{
"super_type": # the entity super type such as observable or device,
"type": # the entity type such as IPv4 or server,
"value": # the actual subject of the search such as 1.1.1.1,
"force": # indicated to Threatr that it must propagate the query to the
different vendors and update its cache
}
:param data: the data to be sent
:return: the query results and a boolean telling if the client has to wait and come back later
"""
if not data:
return [], False
response = requests.post(f'{self.url}/api/request/', headers=self.__get_headers(), json=data)
if response.status_code == 201:
return [], True
elif response.status_code == 200:
return response.json(), False
else:
return [], False
132 changes: 91 additions & 41 deletions colander/core/views/investigate_views.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from django.views.decorators.csrf import csrf_exempt

from colander.core.forms import InvestigateSearchForm
from colander.core.models import BackendCredentials, ObservableType, colander_models, color_scheme
from colander.core.models import ObservableType, colander_models, color_scheme
from colander.core.threatr import ThreatrClient

THREAT_BACKEND_IDENTIFIER = 'threatr'
logger = logging.getLogger(__name__)
Expand All @@ -21,70 +22,119 @@ def get_threatr_types(api_key):
return response.json()
return []


@login_required
@csrf_exempt
def investigate_search_view(request):
form = InvestigateSearchForm()
results = {}
threatr_client = ThreatrClient()
threatr_results = {}
wait = False
types_to_display = {}
mermaid = ''
types = {t.short_name.lower(): t for t in ObservableType.objects.all()}
types = {}
ordering = {}
request_data = {}

if request.GET.keys():
credentials = BackendCredentials.objects.filter(backend=THREAT_BACKEND_IDENTIFIER)
if not credentials:
messages.info(request, 'This relation already exists.', extra_tags='danger')
logger.error(f'No credentials found for module {THREAT_BACKEND_IDENTIFIER}')
if not threatr_client.is_correctly_configured():
messages.error(request, 'Threatr is not correctly configured.', extra_tags='danger')
logger.error(f'Threatr is not correctly configured. {THREAT_BACKEND_IDENTIFIER}')
return render(
request,
'pages/investigate/base.html',
{
'form': form,
'request_data': request_data,
'results': threatr_results,
'mermaid': mermaid,
'types': types,
'wait': wait,
}
)

credentials = credentials.first()
api_key = credentials.credentials.get('api_key')
threatr_types = get_threatr_types(api_key)
if not threatr_types:
logger.error('Unable to retrieve threatr types')
types = threatr_client.get_supported_types()

if request.GET.keys():
entities = {}
form = InvestigateSearchForm(request.GET)
if form.is_valid():
data = {
"super_type": "observable",
request_data = {
"super_type": form.cleaned_data.get('super_type'),
"type": form.cleaned_data.get('type'),
"value": form.cleaned_data.get('value'),
"force": form.cleaned_data.get('force_update')
"force": form.cleaned_data.get('force_update', False)
}
headers = {'Authorization': f'Token {api_key}'}
response = requests.post(f'{settings.THREATR_BASE_URL}/api/request/', headers=headers, json=data)
wait = response.status_code == 201
if response.status_code == 200:
results = response.json()
root_entity = results.get('root_entity')
entity_super_type = root_entity.get('super_type').get('name')
if entity_super_type in colander_models:
types_to_display[root_entity.get('super_type').get('short_name')] = root_entity.get('super_type')
model = colander_models[entity_super_type]
threatr_results, wait = threatr_client.send_request(request_data)
if not wait and type(threatr_results) is dict:
ordering = {'global':{
'total': len(threatr_results['entities']),
'events': len(threatr_results['events']),
'entities': 0,
'external_doc': 0,
},
# ToDo: get rid of the hardcoded list of types
'importable_types': ['OBSERVABLE', 'DEVICE', 'THREAT'],
'types': {
}
}
external_doc_entities = []
root_entity = threatr_results.get('root_entity')
root_entity_super_type_name = root_entity.get('super_type').get('name')
root_entity_super_type_short_name = root_entity.get('super_type').get('short_name')
# List the root entity as a result
ordering['types'][root_entity_super_type_short_name] = {
'super_type': root_entity.get('super_type'),
'entities': [root_entity],
'count': 1
}
# Map the root entity types from Threatr with Colander models and types
if root_entity_super_type_name in colander_models:
model = colander_models[root_entity_super_type_name]
# Apply Colander colors
if model in color_scheme:
results['root_entity']['color'] = color_scheme[model]
for entity in results.get('entities'):
types_to_display[entity.get('super_type').get('short_name')] = entity.get('super_type')
entity_super_type = entity.get('super_type').get('name')
if entity_super_type in colander_models:
model = colander_models[entity_super_type]
threatr_results['root_entity']['color'] = color_scheme[model]
for entity in threatr_results.get('entities'):
entity_super_type = entity.get('super_type')
entity_super_type_short_name = entity_super_type.get('short_name')
entity_super_type_name = entity_super_type.get('name')
# Put the external documentations such as reports into a separate list
if entity_super_type_short_name == 'EXT_DOC':
ordering['global']['external_doc'] += 1
external_doc_entities.append(entity)
continue
else:
ordering['global']['entities'] += 1
# Count the number of results per super type
if entity_super_type_short_name not in ordering['types']:
ordering['types'][entity_super_type_short_name] = {
'super_type': entity_super_type,
'entities': [],
'count': 0 }
ordering['types'][entity_super_type_short_name]['count'] += 1
ordering['types'][entity_super_type_short_name]['entities'].append(entity)
# Apply Colander colors
if entity_super_type_name in colander_models:
model = colander_models[entity_super_type_name]
if model in color_scheme:
entity['color'] = color_scheme[model]
entities[entity.get('id')] = entity
results['entities'] = entities
# types_to_display = list(types_to_display.values())
# types_to_display.sort(key=lambda v: v.get('name'), reverse=False)

# Sort the entities
for super_type, obj in ordering['types'].items():
obj['entities'] = sorted(obj['entities'], key=lambda x: x.get('type').get('name'))
threatr_results['entities'] = entities
threatr_results['reports'] = external_doc_entities
request_data.pop('force', False)
return render(
request,
'pages/investigate/base.html',
{
'form': form,
'results': results,
'mermaid': mermaid,
'types_to_display': types_to_display,
'types': types,
'wait': wait,
'threatr_url': settings.THREATR_BASE_URL,
'request_data': request_data, # parameters of the request
'results': threatr_results, # the results returned by Threatr
'ordering': ordering, # in which order the results should be shown
'mermaid': mermaid, # the mermaid graph
'models': types, # the different types of entities to list
'wait': wait, # True if we are waiting for the completion of the request sent to Threatr
}
)
Loading

0 comments on commit 8e9d12b

Please sign in to comment.