Skip to content

Commit

Permalink
after review
Browse files Browse the repository at this point in the history
  • Loading branch information
michalkrzem committed Dec 13, 2024
1 parent fc5a2f6 commit 7fdfe9e
Showing 1 changed file with 39 additions and 68 deletions.
107 changes: 39 additions & 68 deletions src/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,44 +104,6 @@ def update_metadata(
)
self.db.add_match(job, match)

@staticmethod
def read_file(file_path: str) -> bytes:
"""Reads the entire file content.
Returns:
bytes: The content of the file.
"""
with open(file_path, "rb") as file:
return file.read()

@staticmethod
def read_bytes_from_offset(
data: bytes, matched_length: int, offset: int, byte_range: int = 32
) -> tuple[bytes, bytes, bytes]:
"""Reads a specific range of bytes from the already loaded file content around a given offset.
Args:
data (bytes): Data to read.
matched_length (int): Number of bytes to read.
offset (int): The offset in bytes from which to start reading.
byte_range (int): The range in bytes to read around the offset (default is 32).
Returns:
bytes: A chunk of bytes from the file, starting from the given offset minus bit_range
and ending at offset plus matched_length and byte_range.
"""

before = data[max(0, offset - byte_range) : offset]
matching = data[offset : offset + matched_length]
after = data[
offset
+ matched_length : min(
len(data), offset + matched_length + byte_range
)
]

return before, matching, after

def execute_yara(self, job: Job, files: List[str]) -> None:
rule = yara.compile(source=job.raw_yara)
num_matches = 0
Expand All @@ -157,15 +119,16 @@ def execute_yara(self, job: Job, files: List[str]) -> None:

matches = rule.match(path)
if matches:
data = self.read_file(path)
with open(path, "rb") as file:
data = file.read()
context = self.get_match_context(data, matches)

self.update_metadata(
job=job.id,
orig_name=orig_name,
path=path,
matches=[r.rule for r in matches],
context=context,
job.id,
orig_name,
path,
[r.rule for r in matches],
context,
)
num_matches += 1
except yara.Error:
Expand Down Expand Up @@ -194,34 +157,27 @@ def execute_yara(self, job: Job, files: List[str]) -> None:
f"in {scanned_datasets}/{job.total_datasets} ({dataset_percent:.0%}) of datasets.",
)

@staticmethod
def get_match_context(
self, data: bytes, matches: List[yara.Match]
) -> dict:
data: bytes, matches: List[yara.Match]
) -> Dict[str, Dict[str, Dict[str, base64.b64decode]]]:
context = {}
for yara_match in matches:
match_context = []
match_context = {}
for string_match in yara_match.strings:
expression_keys = []
for expression_key in string_match.instances:
if expression_key in expression_keys:
continue

(before, matching, after,) = self.read_bytes_from_offset(
data=data,
offset=expression_key.offset,
matched_length=expression_key.matched_length,
)
match_context.append(
{
"before": base64.b64encode(before).decode("utf-8"),
"matching": base64.b64encode(matching).decode(
"utf-8"
),
"after": base64.b64encode(after).decode("utf-8"),
}
)
context.update({str(yara_match): match_context})
expression_keys.append(expression_key)
expression_key = string_match.instances[0]

(before, matching, after,) = read_bytes_with_context(
data, expression_key.matched_length, expression_key.offset
)
match_context[expression_key] = {
"before": base64.b64encode(before).decode("utf-8"),
"matching": base64.b64encode(matching).decode("utf-8"),
"after": base64.b64encode(after).decode("utf-8"),
}

context[yara_match.rule] = match_context
logging.error(f"Match context: {context}")
return context

def init_search(self, job: Job, tasks: int) -> None:
Expand Down Expand Up @@ -374,3 +330,18 @@ def run_yara_batch(job_id: JobId, iterator: str, batch_size: int) -> None:

agent.execute_yara(job, pop_result.files)
agent.add_tasks_in_progress(job, -1)


def read_bytes_with_context(
data: bytes, matched_length: int, offset: int, byte_range: int = 32
) -> tuple[bytes, bytes, bytes]:
"""Return `matched_length` bytes from `offset`, along with `byte_range` bytes before and after the match."""

before = data[max(0, offset - byte_range) : offset]
matching = data[offset : offset + matched_length]
after = data[
offset
+ matched_length : min(len(data), offset + matched_length + byte_range)
]

return before, matching, after

0 comments on commit 7fdfe9e

Please sign in to comment.