Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use Native Python Types #329

Open
kkirsche opened this issue Jun 16, 2021 · 7 comments
Open

Use Native Python Types #329

kkirsche opened this issue Jun 16, 2021 · 7 comments
Assignees
Labels
backlog Issue Backlog
Milestone

Comments

@kkirsche
Copy link
Contributor

kkirsche commented Jun 16, 2021

Is your feature request related to a problem? Please describe.
In the Tenable.SC python module, the library returns strings for various data types complicating interactions. As a result, end-users need to be aware of all the various data types in the API to properly interact with and manipulate the data. For example, calculating differences in times between various scans, processing things like ID's as integers, etc.

Describe the solution you'd like
Use native python types

Describe alternatives you've considered
I have written my own custom integration layer to handle type conversions to native python types and key styles.

Additional context
Example from my integration layer:

from __future__ import annotations

from datetime import datetime
from typing import Any, List, Optional, TypedDict, Callable
from pydantic.dataclasses import dataclass

def convert_str_to_boolean(v: str) -> bool:
    # tenable uses a mix of values for booleans, we need to clean these up for consistency
    normalized_v = v.lower()
    if normalized_v == "true" or normalized_v == "yes":
        return True
    if normalized_v == "false" or normalized_v == "no":
        return False
    raise ValueError(
        "Value was not 'true', 'yes', 'false', or 'no'. Cannot convert to boolean"
    )

# batch conversion of attributes
def convert_attr_list_to_types(
    __obj: object, attr_list: List[str], factory: Callable
) -> object:
    for attr in attr_list:
        if getattr(__obj, attr) is not None:
            setattr(__obj, attr, factory(getattr(__obj, attr)))
    return __obj

# what Tenable sends, with comments to clarify what the values really are
class PolicyDetailDict(TypedDict):
    auditFiles: List[Any]
    canManage: str  # 'true' | 'false'
    canUse: str  # 'true' | 'false'
    context: str
    createdTime: str
    creator: UserDict
    description: str
    groups: Optional[List[Any]]
    ownerGroup: Optional[StrIDNameAndDescriptionDict]
    targetGroup: Optional[IntIDNameAndDescriptionDict]
    owner: Optional[UserDict]
    generateXCCDFResults: str  # 'true' | 'false'
    id: str  # int
    modifiedTime: str  # int - epoch time
    name: str
    families: Optional[List[PolicyFamilyDict]]
    preferences: PolicyPreferencesDict
    policyTemplate: PolicyTemplateDict
    status: str
    tags: str
 
 # what we return to a client
class SerializedPolicyDetailDict(TypedDict):
    auditFiles: List[Any]
    canManage: bool
    canUse: bool
    context: str
    createdTime: int
    createdTimeDate: str
    creator: SerializedUserDict
    description: str
    groups: List[Any]
    ownerGroup: Optional[SerializedIDNameAndDescriptionDict]
    targetGroup: Optional[SerializedIDNameAndDescriptionDict]
    owner: Optional[SerializedUserDict]
    generateXCCDFResults: bool
    id: int
    modifiedTime: int
    modifiedTimeDate: str
    name: str
    families: Optional[List[SerializedPolicyFamilyDict]]
    preferences: SerializedPolicyPreferencesDict
    policyTemplate: SerializedPolicyTemplateDict
    status: int
    tags: str

# A native layer for use in applications, this is what I'd expect to see pyTenable return
@dataclass
class PolicyDetail:
    audit_files: List[Any]
    can_manage: bool
    can_use: bool
    context: str
    created_time: datetime
    creator: User
    description: str
    generate_xccdf_results: bool
    groups: List[Any]
    id: int
    modified_time: datetime
    name: str
    preferences: PolicyPreferences
    families: List[PolicyFamily]
    policy_template: PolicyTemplate
    owner_group: Optional[IDNameAndDescription]
    target_group: Optional[IDNameAndDescription]
    owner: Optional[User]
    status: int
    tags: str

    @classmethod
    def from_dict(cls, d: PolicyDetailDict) -> PolicyDetail:
        to_create = tenablesc_keys_to_snake_case(d)
        if "preferences" in to_create:
            to_create["preferences"] = PolicyPreferences.from_dict(
                to_create["preferences"]
            )

        for k in ["groups", "families"]:
            if k not in to_create:
                to_create[k] = []
        for k in ["owner", "owner_group", "target_group"]:
            if k not in to_create:
                to_create[k] = None

        return cls(**to_create)

    def __post_init__(self) -> None:
        convert_attr_list_to_types(
            __obj=self,
            attr_list=["can_manage", "can_use", "generate_xccdf_results"],
            factory=convert_str_to_boolean,
        )
        convert_attr_list_to_types(__obj=self, attr_list=["id", "status"], factory=int)
        convert_attr_list_to_types(
            __obj=self,
            attr_list=["modified_time", "created_time"],
            factory=lambda x: datetime.fromtimestamp(int(x)),
        )

    def serialize(self) -> SerializedPolicyDetailDict:
        return SerializedPolicyDetailDict(
            auditFiles=self.audit_files,
            canManage=self.can_manage,
            canUse=self.can_use,
            context=self.context,
            createdTime=convert_datetime_to_milliseconds(self.created_time),
            createdTimeDate=convert_datetime_to_string(self.created_time),
            creator=self.creator.serialize(),
            description=self.description,
            groups=self.groups,
            generateXCCDFResults=self.generate_xccdf_results,
            id=self.id,
            modifiedTime=convert_datetime_to_milliseconds(self.modified_time),
            modifiedTimeDate=convert_datetime_to_string(self.modified_time),
            name=self.name,
            preferences=self.preferences.serialize(),
            policyTemplate=self.policy_template.serialize(),
            status=self.status,
            tags=self.tags,
            owner=self.owner.serialize() if self.owner else self.owner,
            ownerGroup=self.owner_group.serialize()
            if self.owner_group
            else self.owner_group,
            targetGroup=self.target_group.serialize()
            if self.target_group
            else self.target_group,
            families=[family.serialize() for family in self.families],
        )
@SteveMcGrath
Copy link
Contributor

While I completely agree that work like this needs to be done, it would likely need to be part of a much larger modernization refactoring effort. Currently the pyTenable Tenable.sc and Tenable.io packages still rely on a lot of Python 2.x compatible code that would need to be modernized along with this.

@kkirsche
Copy link
Contributor Author

kkirsche commented Jun 16, 2021

That's perfectly reasonable, and I appreciate the consideration. Please let me know if there is any support needed to accelerate that type of effort.

EDIT: This comment assumes that the goal would be to do so soon, due to Python 2 being end-of-life. If realistically that is not a goal of the project due to enterprise or other customers, I fully understand.

@levwais levwais added the backlog Issue Backlog label Jun 23, 2021
@levwais
Copy link
Contributor

levwais commented Jun 23, 2021

this will be part of a bigger refactoring to Python 3.x code standards that is on our roadmap.

@SteveMcGrath SteveMcGrath added this to the 2.0 milestone Sep 28, 2021
@SteveMcGrath
Copy link
Contributor

So in diving into this further, and when looking into what would be needed to support all of Tenable.sc 5.x, this may be a much bigger addition than expected. This would also mean the library would have to be revved for every version of Tenable.sc, or any other products when they get revved to add new fields. I'm not sure is native datatypes are the right approach.

I do understand that SC's responses are almost entirely string values, and it's a pain having to recast them. my concern is that by forcing folks to upgrade the library potentially every rev of any product may create other issues.

There may be some alternative validation and transformation libraries that can handle data in a less lossy way.

@SteveMcGrath
Copy link
Contributor

so Marshmallow might be a more appropriate path forward here, as it can handle unknown fields in a more appropriate manner.

https://marshmallow.readthedocs.io/en/stable/

In any case, implementing response attribute recasting across the whole library will dramatically increase the time to refactor the code on the pathway to the larger v2 refactor.

@SteveMcGrath
Copy link
Contributor

So please excuse the stream of posts here, but I'll be treating this issue as a bit of a scratch pad while I research this. In either case, marshmallow shows the most promise so far. I was already looking into using it to replace a lot of madness in the IO package around filtering.

Custom Fields: https://marshmallow.readthedocs.io/en/stable/custom_fields.html
Examples: https://marshmallow.readthedocs.io/en/stable/examples.html#inflection-camel-casing-keys

Potential issues/thoughts:

  • Validation happens at serialization, not deserialization.
  • Can a single model in marshmallow handle differences in the request and response objects? if so, is it worth it?
  • Should responses always be returned in a naturalized state? should we support returning the raw doc?

@SteveMcGrath
Copy link
Contributor

@kkirsche so the approach here will be to refactor parts of the library using marshmallow to handle the data typing and transformation. This will effectively replace a lot of the old constructor code. It'll be happening organically, and the first part of that was to replace the old, inflexible connection class with something that's better tested. #457 Details this initial work. We didn't want to have to touch everything multiple times, but this enables a pattern like so (below). It's a work in progress, just keeping you posted.

Example Tenable.io Access Groups v2 API Schema

import re
from marshmallow import (
    Schema,
    fields,
    pre_load,
    validate as v,
    validates_schema,
)
from tenable.base.schema.fields import LowerCase, UpperCase
from tenable.io.base.schemas.filters.base import BaseFilterSchema
from tenable.io.base.session import TenableIO


class RuleSchema(BaseFilterSchema):
    '''
    Marshmallow schema for processing the Access List rule definitions.
    '''
    type = fields.Str(required=True)
    operator = fields.Str(required=True)
    terms = fields.List(fields.Str(), required=True)
    _filters = None

    @classmethod
    def populate_filters(cls, tio: TenableIO):
        '''
        Pre-populates the asset filter definitions into the RuleSchema class.
        '''
        super().populate_filters(tio, 'filters/workbenches/assets')

    @validates_schema
    def filter_validation(self, data, **kwargs):  # noqa: PLW0613
        '''
        Handles validation of the filter provided against the asset filter
        definitions.
        '''
        self.validate_filter(data.get('type'),
                             data.get('operator'),
                             data.get('terms'))

    @pre_load(pass_many=False)
    def tuple_expansion(self, data, **kwargs):  # noqa: PLW0613
        '''
        Handles expanding a tuple definition into the dictionary equivalent.
        '''
        if isinstance(data, tuple):
            return {
                'type': data[0],
                'operator': data[1],
                'terms': data[2]
            }
        return data


class PrincipalSchema(Schema):
    principal_name = fields.Str()
    principal_id = fields.UUID()
    type = LowerCase(
        fields.Str(validate=v.OneOf(['all_users', 'user', 'group'])))
    permissions = fields.List(UpperCase(
        fields.Str(validate=v.OneOf(['CAN_VIEW', 'CAN_SCAN']))))

    @pre_load(pass_many=False)
    def pre_process(self, data, **kwargs):
        if isinstance(data, tuple):
            # unpack the tuple into the appropriate fields.  Supporting format
            #
            # ('TYPE', 'NAME', ['PERMS'])
            #
            # or
            #
            # ('TYPE', 'UUID', ['PERMS'])
            newdata = {
                'type': data[0],
                'permissions': data[2]
            }
            if re.match(r'[0-9a-f]{8}(?:-[0-9a-f]{4}){3}-[0-9a-f]{12}',
                        data[1],
                        re.I):
                newdata['principal_id'] = data[1]
            else:
                newdata['principal_name'] = data[1]
            return newdata
        return data


class AccessGroupsSchema(Schema):  # noqa: PLR0903
    '''
    '''
    name = fields.Str(required=True)
    access_group_type = UpperCase(fields.Str(validate=v.OneOf(['MANAGE_ASSETS',
                                                               'SCAN_TARGETS',
                                                               'ALL'
                                                               ])))
    all_users = fields.Bool()
    principals = fields.List(fields.Nested(PrincipalSchema))
    rules = fields.List(fields.Nested(RuleSchema))
    container_uuid = fields.UUID(load_only=True)
    created_by_uuid = fields.UUID(load_only=True)
    updated_by_uuid = fields.UUID(load_only=True)
    created_by_name = fields.Str(load_only=True)
    updated_by_name = fields.Str(load_only=True)
    created_at = fields.DateTime(load_only=True)
    updated_at = fields.DateTime(load_only=True)
    id = fields.UUID(load_only=True)
    status = fields.Str(load_only=True)
    all_assets = fields.Bool(load_only=True)
    processing_percent_complete = fields.Int(load_only=True)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
backlog Issue Backlog
Projects
None yet
Development

No branches or pull requests

4 participants