-
Notifications
You must be signed in to change notification settings - Fork 21
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: Konstantin Slavnov <[email protected]>
- Loading branch information
Showing
9 changed files
with
547 additions
and
51 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,7 @@ | ||
""" | ||
Annotation tool. | ||
Inspired by https://uima.apache.org/d/uimafit-current/api/ | ||
""" | ||
|
||
# TODO(zurk) move annotation module and tests to lookout-sdk-ml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,265 @@ | ||
from typing import Dict, Iterable, Iterator, Optional, Sequence, Tuple, Union | ||
|
||
from lookout.sdk.service_data_pb2 import File | ||
import numpy | ||
from sortedcontainers import SortedDict | ||
|
||
from lookout.style.format.annotations.annotation import Annotation, LanguageAnnotation, \ | ||
PathAnnotation, UASTAnnotation, ValuedAnnotation | ||
|
||
|
||
class NoIntersection(Exception): | ||
"""Raises by AnnotatedData.find_intersect() if there is no intersection.""" | ||
|
||
|
||
class Annotations(dict): | ||
""" | ||
Annotations collection for a specific range. | ||
""" | ||
|
||
def __init__(self, start, stop, *args, **kwargs): | ||
"""Init.""" | ||
super().__init__(*args, **kwargs) | ||
self._range = (start, stop) | ||
self._start = start | ||
self._stop = stop | ||
|
||
def __getattr__(self, item): | ||
if item in self: | ||
annotation = self[item] | ||
if isinstance(annotation, ValuedAnnotation): | ||
return self[item].value | ||
else: | ||
return None | ||
else: | ||
raise AttributeError("Attribute \"%s\" does not exist" % item) | ||
|
||
start = property(lambda self: self._start) | ||
|
||
stop = property(lambda self: self._stop) | ||
|
||
range = property(lambda self: self._range) | ||
|
||
|
||
class RawData: | ||
"""The storage for ordered document collection indexed and accessible by global offsets.""" | ||
|
||
def __init__(self, document_collection: Iterable[str]): | ||
"""Init.""" | ||
document_collection = list(document_collection) | ||
assert len(document_collection) > 0 | ||
self._document_collection = document_collection | ||
|
||
doc_lens = [0] + [len(d) for d in self._document_collection] | ||
self._doc_start_offset = numpy.array(doc_lens).cumsum() | ||
|
||
def _offset_to_doc_index(self, offset_start: int, offset_stop: int) -> Tuple[int, int, int]: | ||
if offset_start >= len(self): | ||
raise IndexError("Start offset %d is greater then the collection length %d." % ( | ||
offset_start, len(self))) | ||
if offset_stop > len(self): | ||
raise IndexError("Stop offset %d is greater then the collection length %d." % ( | ||
offset_stop, len(self))) | ||
|
||
doc_index = numpy.argmax(self._doc_start_offset > offset_start) - 1 | ||
doc_offset_start = offset_start - self._doc_start_offset[doc_index] | ||
doc_offset_stop = offset_stop - self._doc_start_offset[doc_index] | ||
if doc_offset_stop > len(self._document_collection[doc_index]): | ||
raise IndexError("You can get data only from one document from collection") | ||
|
||
return doc_index, doc_offset_start, doc_offset_stop | ||
|
||
def __len__(self): | ||
"""Accumulative document collection length.""" | ||
return self._doc_start_offset[-1] | ||
|
||
def __getitem__(self, index: Union[slice, int, tuple]) -> str: | ||
""" | ||
Access data slices by offset. | ||
""" | ||
if isinstance(index, int): | ||
doc_index, doc_offset = self._offset_to_doc_index(index) | ||
return self._document_collection[doc_index][doc_offset] | ||
elif isinstance(index, slice): | ||
if isinstance(index, slice) and index.step is not None: | ||
raise IndexError("Step in unsupported for slices.") | ||
return self._get_range(index.start, index.stop) | ||
elif isinstance(index, tuple): | ||
assert len(index) == 2 | ||
return self._get_range(*index) | ||
else: | ||
raise IndexError("Unknown index type %s" % type(index)) | ||
|
||
def _get_range(self, start: int, stop: int) -> str: | ||
if start == stop: | ||
return "" | ||
doc_index_start, doc_offset_start, doc_offset_end = self._offset_to_doc_index(start, stop) | ||
return self._document_collection[doc_index_start][doc_offset_start: doc_offset_end] | ||
|
||
def get_docs_range(self): | ||
"""Get documents index range.""" | ||
return tuple(zip(self._doc_start_offset, self._doc_start_offset[1:])) | ||
|
||
|
||
class AnnotatedData: | ||
""" | ||
Class that couples annotations and data together. | ||
All special utilities to work with annotations should be implemented in this class | ||
List of methods that should be implemented can be found here: | ||
https://uima.apache.org/d/uimafit-current/api/org/apache/uima/fit/util/JCasUtil.html | ||
""" | ||
|
||
def __init__(self, raw_data: RawData): | ||
"""Init.""" | ||
self._raw_data = raw_data | ||
# Interval trees should be used for _range_to_annotations later. | ||
self._range_to_annotations = SortedDict() # type: SortedDict[(int, int), Dict[str, Annotation]] # noqa E501 | ||
self._type_to_annotations = {} # type: Dict[str, SortedDict[(int, int), Annotation]] | ||
|
||
def __getitem__(self, item): | ||
return self._raw_data[item] | ||
|
||
def add(self, annotation: Annotation) -> None: | ||
""" | ||
Add annotation. | ||
""" | ||
# TODO(zurk): Add a check that there is no overlapping annotations of one type. | ||
try: | ||
self._raw_data._offset_to_doc_index(*annotation.range) | ||
except IndexError as e: | ||
if e.args[0] == "You can get data only from one document from collection": | ||
raise ValueError( | ||
"It is not possible to add one annotation %s for several documents." % | ||
annotation) from e | ||
else: | ||
raise e | ||
if annotation.range not in self._range_to_annotations: | ||
self._range_to_annotations[annotation.range] = {} | ||
if annotation.name not in self._type_to_annotations: | ||
self._type_to_annotations[annotation.name] = SortedDict() | ||
self._range_to_annotations[annotation.range][annotation.name] = annotation | ||
self._type_to_annotations[annotation.name][annotation.range] = annotation | ||
|
||
def update(self, annotations: Iterable[Annotation]) -> None: | ||
""" | ||
Update with annotations. | ||
""" | ||
for annotation in annotations: | ||
self.add(annotation) | ||
|
||
def get(self, position: Tuple[int, int]) -> Tuple[str, Dict[str, Annotation]]: | ||
""" | ||
Get annotated value and all annotations for the range. | ||
""" | ||
raise NotImplementedError() | ||
|
||
def get_value(self, position: Tuple[int, int]) -> Tuple[str, Dict[str, Annotation]]: | ||
""" | ||
Get annotated value and all annotations for the range. | ||
""" | ||
raise NotImplementedError() | ||
|
||
def iter_annotation(self, name: str, start_offset: Optional[int] = None, | ||
stop_offset: Optional[int] = None) -> Iterator[Tuple[str, Annotation]]: | ||
""" | ||
Iterate through specific annotation atomic_tokens, ys, files, etc. | ||
Returns slice of RawData and its annotation. | ||
""" | ||
if start_offset is not None or stop_offset is not None: | ||
raise NotImplementedError() | ||
|
||
for range, value in self._type_to_annotations[name].items(): | ||
yield self[range], value | ||
|
||
def iter_annotations(self, names: Sequence[str], start_offset: Optional[int] = None, | ||
stop_offset: Optional[int] = None, | ||
) -> Iterator[Tuple[str, Annotations]]: | ||
""" | ||
Iterate through annotations with specified type. | ||
Returns slice of RawData and its annotation. | ||
""" | ||
if start_offset is not None or stop_offset is not None: | ||
raise NotImplementedError() | ||
|
||
names_set = frozenset(names) | ||
for value, annotation0 in self.iter_annotation(names[0]): | ||
# Annotations with the same range | ||
same_range_annotations = self._range_to_annotations[annotation0.range] | ||
same_range_names = set(same_range_annotations.keys()) | ||
common = names_set & same_range_names | ||
missing = names_set - same_range_names | ||
annotations = dict() | ||
for name in missing: | ||
try: | ||
annotations[name] = self.find_intersect(name, *annotation0.range) | ||
except NoIntersection: | ||
pass | ||
annotations.update({name: same_range_annotations[name] for name in common}) | ||
yield value, Annotations(*annotation0.range, annotations) | ||
|
||
def find_intersect(self, name: str, start: int, stop: int) -> Annotation: | ||
""" | ||
Find an annotation of given type that intersects the interval [start, stop). | ||
raises NoIntersection exception if there is no such annotation. | ||
:param name: Annotation type. | ||
:param start: start of interval. | ||
:param stop: end of interval. | ||
:return: requested Annotation. | ||
""" | ||
try: | ||
annotation_layer = self._type_to_annotations[name] | ||
except KeyError: | ||
raise NoIntersection("There is no annotation layer %s" % name) | ||
search_start = max(0, annotation_layer.bisect_left((start, start)) - 1) | ||
search_stop = annotation_layer.bisect_right((stop, stop)) | ||
for range in annotation_layer.islice(search_start, search_stop): | ||
if self._check_interval_crossing(start, stop, *range): | ||
# assuming that there is only one such annotation | ||
return annotation_layer[range] | ||
raise NoIntersection("There is no annotation %s from %d to %d" % (name, start, stop)) | ||
|
||
@classmethod | ||
def _check_interval_crossing(cls, start1: int, stop1: int, start2: int, stop2: int) -> bool: | ||
# ODO(zurk): explain logic with [x, x) intervals. | ||
if start1 == stop1: | ||
if start2 == stop2: | ||
return start1 == start2 | ||
else: | ||
return start2 < start1 < stop2 | ||
else: | ||
if start2 == stop2: | ||
return start1 < start2 < stop1 | ||
else: | ||
return (start1 <= start2 < stop1 or | ||
start1 < stop2 < stop1 or | ||
start2 <= start1 < stop2) | ||
|
||
def subiter_annotation(self, name: str, covering_annotation: Annotation): | ||
"""TODO.""" | ||
raise NotImplementedError() | ||
|
||
def sub_iter_annotations(self, names: Sequence[str], covering_annotation: Annotation): | ||
"""TODO.""" | ||
raise NotImplementedError() | ||
|
||
@classmethod | ||
def from_files(cls, files: Iterable[File]) -> "AnnotatedData": | ||
""" | ||
Create AnnotatedData instance from files. | ||
:param files: | ||
:return: new AnnotatedData instance | ||
""" | ||
raw_data = RawData(file.content.decode("utf-8", "replace") for file in files) | ||
file_ranges = raw_data.get_docs_range() | ||
annotated_data = AnnotatedData(raw_data) | ||
for file_range, file in zip(file_ranges, files): | ||
annotated_data.add(PathAnnotation(*file_range, file.path)) | ||
annotated_data.add(UASTAnnotation(*file_range, file.uast)) | ||
annotated_data.add(LanguageAnnotation(*file_range, file.language)) | ||
return annotated_data |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
from typing import Any | ||
|
||
|
||
class Annotation: | ||
"""Base class for annotation.""" | ||
|
||
name = None # Should be defined in inheritors | ||
|
||
def __init__(self, start: int, stop: int): | ||
"""Init.""" | ||
if self.name is None: | ||
raise NotImplementedError("name should be defined for Annotation.") | ||
self._range = (start, stop) | ||
self._start = start | ||
self._stop = stop | ||
|
||
start = property(lambda self: self._start) | ||
|
||
stop = property(lambda self: self._stop) | ||
|
||
range = property(lambda self: self._range) | ||
|
||
def __repr__(self): | ||
return self.__str__() | ||
|
||
def __str__(self): | ||
return "<%s [%d, %d)>" % (self.name, *self.range) | ||
|
||
|
||
class ValuedAnnotation(Annotation): | ||
"""Annotation with value.""" | ||
|
||
def __init__(self, start: int, stop: int, value: Any): | ||
"""Init.""" | ||
super().__init__(start, stop) | ||
self.value = value | ||
|
||
|
||
# Specific annotations for style-analyzer: | ||
class AnnotationNames: | ||
"""Names of all available annotations.""" | ||
|
||
atomic_token = "atomic_token" | ||
token = "token" | ||
line = "line" | ||
uast_node = "uast_node" | ||
uast = "uast" | ||
language = "language" | ||
path = "path" | ||
|
||
|
||
class TokenAnnotation(Annotation): | ||
"""Virtual сode token annotation.""" | ||
|
||
name = AnnotationNames.token | ||
|
||
|
||
class AtomicTokenAnnotation(Annotation): | ||
"""Infrangible сode token annotation.""" | ||
|
||
name = AnnotationNames.atomic_token | ||
|
||
|
||
class LineAnnotation(ValuedAnnotation): | ||
"""Line number annotation.""" | ||
|
||
name = AnnotationNames.line | ||
|
||
|
||
class UASTNodeAnnotation(ValuedAnnotation): | ||
"""UAST Node annotation.""" | ||
|
||
name = AnnotationNames.uast_node | ||
|
||
@staticmethod | ||
def from_node(node: "bblfsh.Node") -> "UASTNodeAnnotation": | ||
"""Create the annotation from bblfsh node.""" | ||
return UASTNodeAnnotation(node.start_position.offset, node.end_position.offset, node) | ||
|
||
|
||
class UASTAnnotation(UASTNodeAnnotation): | ||
"""Full UAST of the file annotation.""" | ||
|
||
name = AnnotationNames.uast | ||
|
||
|
||
class LanguageAnnotation(ValuedAnnotation): | ||
"""File language annotation.""" | ||
|
||
name = AnnotationNames.language | ||
|
||
|
||
class PathAnnotation(ValuedAnnotation): | ||
"""File language annotation.""" | ||
|
||
name = AnnotationNames.path |
Oops, something went wrong.