Skip to content

Commit

Permalink
port ghsa provider (#7)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Goodman <[email protected]>

Signed-off-by: Alex Goodman <[email protected]>
  • Loading branch information
wagoodman authored Dec 14, 2022
1 parent eb28a80 commit 05121cc
Show file tree
Hide file tree
Showing 18 changed files with 1,239 additions and 148 deletions.
File renamed without changes.
48 changes: 46 additions & 2 deletions src/vunnel/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def increase_indent(self, flow=False, indentless=False):


@cli.command(name="run", help="run a vulnerability provider")
@click.option("--provider", "-p", "provider_name", help="provider to run", required=True)
@click.argument("provider_name", metavar="PROVIDER")
@click.pass_obj
def run_provider(cfg: config.Application, provider_name: str):
logging.info(f"running {provider_name} provider")
Expand All @@ -91,7 +91,51 @@ def run_provider(cfg: config.Application, provider_name: str):
provider.populate()


@cli.command(name="list", help="list available vulnerability providers")
@cli.command(name="clear", help="clear provider state")
@click.argument("provider_name", metavar="PROVIDER")
@click.option("--input", "-i", "_input", default=False, help="clear only the input state")
@click.option("--result", "-r", default=False, help="clear only the result state")
@click.pass_obj
def clear_provider(cfg: config.Application, provider_name: str, _input: bool, result: bool):
logging.info(f"clearing {provider_name} provider state")

provider = providers.create(provider_name, cfg.root, config=cfg.providers.get(provider_name))
if not _input and not result:
provider.clear()
elif _input:
provider.clear_input()
elif result:
provider.clear_result()


@cli.command(name="status", help="describe current provider state")
@click.argument("provider_names", metavar="PROVIDER", nargs=-1)
@click.pass_obj
def status_provider(cfg: config.Application, provider_names: str):
if not provider_names:
selected_names = providers.names()
else:
selected_names = provider_names

for name in selected_names:
provider = providers.create(name, cfg.root, config=cfg.providers.get(name))
try:
state = provider.current_state()
tmpl = f""" • {name!r} provider
├── Inputs: {len(state.input.files)} files
{state.input.timestamp}
└── Results: {len(state.results.files)} files
{state.results.timestamp}
"""
print(tmpl)
except FileNotFoundError:
tmpl = f""" • {name!r} provider
└── (no state found)
"""
print(tmpl)


@cli.command(name="list", help="list available providers")
@click.pass_obj
def list_providers(cfg: config.Application): # pylint: disable=unused-argument
for p in providers.names():
Expand Down
14 changes: 3 additions & 11 deletions src/vunnel/cli/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@

import yaml

from vunnel import providers
from vunnel import providers, utils


@dataclass
class Providers:
alpine: providers.alpine.Config = field(default_factory=providers.alpine.Config)
centos: providers.centos.Config = field(default_factory=providers.centos.Config)
nvd: providers.nvd.Config = field(default_factory=providers.nvd.Config)
github: providers.github.Config = field(default_factory=providers.github.Config)

def get(self, name: str) -> Optional[Any]:
for f in fields(Providers):
Expand Down Expand Up @@ -50,20 +51,11 @@ def yaml_decoder(data) -> dict[Any, Any]:
return clean_dict_keys(yaml.load(data, yaml.CSafeLoader))


def dataclass_from_dict(cls, d):
try:
fieldtypes = {f.name: f.type for f in fields(cls)}
return cls(**{f: dataclass_from_dict(fieldtypes[f], d[f]) for f in d})
except TypeError:
pass
return d


def load(path: str = ".vunnel.yaml") -> Application: # pylint: disable=unused-argument
try:
with open(path, encoding="utf-8") as f:
app_object = yaml.safe_load(f.read())
cfg = dataclass_from_dict(Application, app_object)
cfg = utils.dataclass_from_dict(Application, app_object)
if cfg is None:
raise FileNotFoundError("parsed empty config")
except FileNotFoundError:
Expand Down
34 changes: 27 additions & 7 deletions src/vunnel/provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import shutil
import time
from dataclasses import dataclass, field
from typing import Optional

from . import result, workspace

Expand Down Expand Up @@ -55,7 +56,7 @@ def __post_init__(self):

@dataclass
class RuntimeConfig:
on_error: OnErrorConfig = field(default_factory=OnErrorConfig) # TODO: hook this up and enforce
on_error: OnErrorConfig = field(default_factory=OnErrorConfig)
existing_input: InputStatePolicy = InputStatePolicy.DELETE
existing_results: ResultStatePolicy = ResultStatePolicy.DELETE_BEFORE_WRITE

Expand Down Expand Up @@ -91,10 +92,10 @@ def populate(self):
self.logger.debug(f"using {self.root} as workspace root")

if self.runtime_cfg.existing_results == ResultStatePolicy.DELETE:
self._clear_results()
self.clear_results()

if self.runtime_cfg.existing_input == InputStatePolicy.DELETE:
self._clear_input()
self.clear_input()

self._create_workspace()
try:
Expand Down Expand Up @@ -137,9 +138,9 @@ def _on_error(self, e: Exception):

def _on_error_handle_state(self):
if self.runtime_cfg.on_error.input == InputStatePolicy.DELETE:
self._clear_input()
self.clear_input()
if self.runtime_cfg.on_error.results == ResultStatePolicy.DELETE:
self._clear_results()
self.clear_results()

def _create_workspace(self):
if not os.path.exists(self.input):
Expand All @@ -150,23 +151,42 @@ def _create_workspace(self):
if not os.path.exists(self.results):
os.makedirs(self.results)

def _clear_results(self):
def clear(self):
self.clear_input()
self.clear_results()

def clear_results(self):
if os.path.exists(self.results):
self.logger.debug("clearing existing results")
shutil.rmtree(self.results)

def _clear_input(self):
current_state = workspace.WorkspaceState.read(root=self.root)
current_state.results = workspace.FileListing(files=[])
current_state.write(self.root)

def clear_input(self):
if os.path.exists(self.input):
self.logger.debug("clearing existing workspace")
shutil.rmtree(self.input)

current_state = workspace.WorkspaceState.read(root=self.root)
current_state.input = workspace.FileListing(files=[])
current_state.write(self.root)

def _catalog_workspace(self, urls: list[str]):
if not urls:
current_state = workspace.WorkspaceState.read(root=self.root)
urls = current_state.urls

state = workspace.WorkspaceState.from_fs(provider=self.name, urls=urls, input=self.input, results=self.results)

metadata_path = state.write(self.root)

self.logger.debug(msg=f"wrote workspace state to {metadata_path}")

def current_state(self) -> Optional[workspace.WorkspaceState]:
return workspace.WorkspaceState.read(self.root)

@property
def root(self):
return f"{self._root}/{self.name}"
Expand Down
3 changes: 2 additions & 1 deletion src/vunnel/providers/__init__.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from vunnel.providers import alpine, centos, nvd
from vunnel.providers import alpine, centos, github, nvd

_providers = {
alpine.Provider.name: alpine.Provider,
centos.Provider.name: centos.Provider,
nvd.Provider.name: nvd.Provider,
github.Provider.name: github.Provider,
}


Expand Down
2 changes: 1 addition & 1 deletion src/vunnel/providers/centos/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ def __init__(self, workspace, logger=None, config=None, download_timeout=125):
self.download_timeout = download_timeout
self.xml_file_path = os.path.join(workspace, self._xml_file_)
self.xml_sha_file_path = os.path.join(workspace, self._xml_sha_file_)
if logger == None:
if not logger:
logger = logging.getLogger(self.__class__.__name__)
self.logger = logger

Expand Down
70 changes: 70 additions & 0 deletions src/vunnel/providers/github/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import copy
import os
from dataclasses import dataclass, field

from vunnel import provider, schema

from .parser import Parser, namespace


@dataclass
class Config:
token: str = "env:GITHUB_TOKEN"
api_url: str = "https://api.github.com/graphql"
runtime: provider.RuntimeConfig = field(
default_factory=lambda: provider.RuntimeConfig(existing_input=provider.InputStatePolicy.KEEP)
)
request_timeout: int = 125

def __post_init__(self):
if self.token.startswith("env:"):
self.token = os.environ.get(self.token[4:], "")

def __str__(self):
# sanitize secrets from any output
tok_value = self.token
str_value = super().__str__()
if not tok_value:
return str_value
return str_value.replace(tok_value, "********")


class Provider(provider.Provider):
name = "github"

def __init__(self, root: str, config: Config):
super().__init__(root, runtime_cfg=config.runtime)
self.config = config

self.logger.debug(f"config: {config}")

self.schema = schema.GithubSecurityAdvisorySchema()
self.parser = Parser(
workspace=self.input,
token=config.token,
api_url=config.api_url,
download_timeout=self.config.request_timeout,
logger=self.logger,
)

def update(self) -> list[str]:

with self.results_writer() as writer:
for advisory in self.parser.get():
all_fixes = copy.deepcopy(advisory.get("FixedIn")) if isinstance(advisory.get("FixedIn"), list) else []
for ecosystem in advisory.ecosystems:

advisory["namespace"] = f"{namespace}:{ecosystem}"

# filter the list of fixes for this ecosystem
advisory["FixedIn"] = [item for item in all_fixes if item.get("ecosystem") == ecosystem]

vuln_id = advisory["ghsaId"]

writer.write(
identifier=f"{namespace}-{vuln_id}".lower(),
schema=self.schema,
payload={"Vulnerability": {}, "Advisory": dict(advisory)},
)

return [self.parser.api_url]
Loading

0 comments on commit 05121cc

Please sign in to comment.