Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Add Type Annotations #32

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
add more type annotations
dosisod committed Sep 13, 2020
commit f70b837acbb41ceae45983106cce6c9be47ce172
10 changes: 8 additions & 2 deletions pcapng/blocks.py
Original file line number Diff line number Diff line change
@@ -16,6 +16,7 @@

from pcapng import strictness as strictness
from pcapng.constants import link_types
from pcapng.flags import FlagField
from pcapng.structs import (
IntField,
ListField,
@@ -92,6 +93,7 @@ def _write(self, outstream):
write_int(block_length, outstream, 32)

def _encode(self, outstream):
# type: (io.BytesIO) -> None
"""Encodes the fields of this block into raw data"""
for name, field, default in self.schema:
field.encode(
@@ -147,14 +149,16 @@ class SectionMemberBlock(Block):
__slots__ = ["section"] # type: List[str]

def __init__(self, section, **kwargs):
# type: (SectionMemberBlock, str) -> None
super(SectionMemberBlock, self).__init__(**kwargs)
self.section = section


def register_block(block):
# type: (Any) -> Block
"""Handy decorator to register a new known block type"""
KNOWN_BLOCKS[block.magic_number] = block
return block
return block # type: ignore


@register_block
@@ -324,6 +328,7 @@ def timestamp_resolution(self):

@property
def statistics(self):
# type: () -> object
# todo: ensure we always have an interface id -> how??
return self.section.interface_stats.get(self.interface_id)

@@ -370,6 +375,7 @@ class BlockWithInterfaceMixin(object):

@property
def interface(self):
# type: () -> FlagField
# We need to get the correct interface from the section
# by looking up the interface_id
return self.section.interfaces[self.interface_id]
@@ -635,7 +641,7 @@ class NameResolution(SectionMemberBlock):
),
None,
),
]
] # type: List[Tuple[str, FlagField, object]]


@register_block
7 changes: 4 additions & 3 deletions pcapng/flags.py
Original file line number Diff line number Diff line change
@@ -97,7 +97,7 @@ def __init__(self, owner, offset, size, extra=None):
if len(extra) > 2 ** size:
raise TypeError(
"{cls} iterable has too many values (got {got}, "
+ "{size} bits only address {max})".format(
"{size} bits only address {max})".format(
cls=self.__class__.__name__,
got=len(extra),
size=size,
@@ -134,7 +134,7 @@ def set(self, val):

FlagField = namedtuple(
"FlagField", ("name", "ftype", "nbits", "extra"), defaults=(1, None)
)
) # type: Type[namedtuple]


class FlagWord(object):
@@ -149,6 +149,7 @@ class FlagWord(object):
] # type: List[str]

def __init__(self, schema, nbits=32, initial=0):
# type: (FlagField, int, int) -> None
"""
:param schema:
A list of FlagField objects representing the values to be packed
@@ -169,7 +170,7 @@ def __init__(self, schema, nbits=32, initial=0):
if tot_bits > nbits:
raise TypeError(
"Too many fields for {nbits}-bit field "
+ "(schema defines {tot} bits)".format(nbits=nbits, tot=tot_bits)
"(schema defines {tot} bits)".format(nbits=nbits, tot=tot_bits)
)

bitn = 0
9 changes: 8 additions & 1 deletion pcapng/structs.py
Original file line number Diff line number Diff line change
@@ -8,10 +8,11 @@
import warnings
from collections import defaultdict
from collections.abc import Iterable, Mapping
from typing import List
from typing import Dict, List, Tuple, Union

from pcapng import strictness as strictness
from pcapng._compat import namedtuple
from pcapng.blocks import Block
from pcapng.exceptions import (
BadMagic,
CorruptedFile,
@@ -131,6 +132,7 @@ def write_int(number, stream, size, signed=False, endianness="="):


def read_section_header(stream):
# type: (BytesIO) -> Dict[str, Union[str, bytes]]
"""
Read a section header block from a stream.

@@ -326,6 +328,7 @@ class RawBytes(StructField):
__slots__ = ["size"] # type: List[str]

def __init__(self, size):
# type: (int) -> None
self.size = size # in bytes!

def load(self, stream, endianness=None, seen=None):
@@ -388,6 +391,7 @@ class OptionsField(StructField):
__slots__ = ["options_schema"] # type: List[str]

def __init__(self, options_schema):
# type: (Mapping) -> None
self.options_schema = options_schema

def load(self, stream, endianness, seen=None):
@@ -564,6 +568,7 @@ def encode_finish(self, stream, endianness):


def read_options(stream, endianness):
# type: (BytesIO, str) -> List[Tuple[int, bytes]]
"""
Read "options" from an "options block" in a stream, until a
``StreamEmpty`` exception is caught, or an end marker is reached.
@@ -1034,6 +1039,7 @@ def _encode_value(self, value, ftype):


def struct_decode(schema, stream, endianness="="):
# type: (List[Tuple[str, StructField, object]], BytesIO, str) -> bytes
"""
Decode structured data from a stream, following a schema.

@@ -1063,6 +1069,7 @@ def struct_decode(schema, stream, endianness="="):


def block_decode(block, stream):
# type: (Block, BytesIO) -> bytes
return struct_decode(block.schema, stream, block.section.endianness)


4 changes: 3 additions & 1 deletion pcapng/utils.py
Original file line number Diff line number Diff line change
@@ -2,6 +2,7 @@
import struct
from typing import (
Iterable,
List,
Tuple
)

@@ -17,6 +18,7 @@ def unpack_ipv4(data):


def _get_pairs(data):
# type: (List[object]) -> List[Tuple[object, object]]
"""Return data in pairs

This uses a clever hack, based on the fact that zip will consume
@@ -29,7 +31,7 @@ def _get_pairs(data):
[(1, 2), (3, 4)]

"""
return list(zip(*((iter(data),) * 2)))
return list(zip(*((iter(data),) * 2))) # type: ignore


def pack_ipv6(data):