-
Notifications
You must be signed in to change notification settings - Fork 2
/
annotate.py
executable file
·2554 lines (2139 loc) · 111 KB
/
annotate.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""Usage: python -m diffannotator.annotate [<common-options>] <subcommand> <options>
Annotate existing dataset (that is, patch files in subdirectories corresponding to bugs),
or selected subset of commits (of changes in selectes subset of commits) in a given repository.
The result of annotation is saved in JSON files, one per patch / commit.
This script provides the following subcommands:
- `diff-annotate patch [OPTIONS] PATCH_FILE RESULT_JSON`:
annotate a single PATCH_FILE, writing results to RESULT_JSON,
- `diff-annotate dataset [OPTIONS] DATASETS...`:
annotate all bugs in provided DATASETS,
- `diff-annotate from-repo [OPTIONS] REPO_PATH [REVISION_RANGE...]`:
create annotation data for commits from local Git repository
(with `REVISION_RANGE...` passed as arguments to the `git log` command);
Example (after installing the 'patchscope' package):
diff-annotate --help
diff-annotate --use-pylinguist patch \
tests/test_dataset/tqdm-1/c0dcf39b046d1b4ff6de14ac99ad9a1b10487512.diff \
c0dcf39b046d1b4ff6de14ac99ad9a1b10487512.json
diff-annotate dataset \
--output-prefix ~/example_annotations/ \
/mnt/data/HaPy-Bug/raw_data/bugsinpy-dataset/
diff-annotate from-repo \
--output-dir=~/example_annotations/tensorflow/yong.tang/ \
~/example_repositories/tensorflow/ \
"""
from __future__ import annotations
import collections.abc
from collections import defaultdict, deque, namedtuple, Counter
import inspect
import json
import logging
import os
from pathlib import Path
import re
import sys
import time
#import traceback # replaced by exc_info (and possibly stack_info) when loging
from textwrap import dedent
from typing import TypeVar, Optional, Union, Literal, TYPE_CHECKING
from collections.abc import Iterable, Iterator, Generator, Callable
if TYPE_CHECKING:
from _typeshed import SupportsWrite
from joblib import Parallel, delayed
from pygments.token import Token
import unidiff
from unidiff.patch import Line as PatchLine
import tqdm
from tqdm.contrib.logging import logging_redirect_tqdm
import typer
from typing_extensions import Annotated # in typing since Python 3.9
import yaml
from . import languages
from .config import get_version, JSONFormat, JSONFormatExt, guess_format_version
from .languages import Languages
from .lexer import Lexer
from .utils.git import GitRepo, ChangeSet
# optional dependencies
try:
# noinspection PyPackageRequirements,PyUnresolvedReferences
import linguist
# noinspection PyPackageRequirements
from linguist.libs.language import Language as LinguistLanguage
has_pylinguist = True
except ImportError:
class LinguistLanguage:
"""Dummy of the linguist.libs.language.Language enough to satisfy linter"""
FakeLanguage = namedtuple('FakeLanguage', ['name', 'type'])
@classmethod
def find_by_filename(cls, _):
return [cls.FakeLanguage('unknown', 'unknown')]
has_pylinguist = False
class LanguagesFromLinguist:
def __init__(self):
super(LanguagesFromLinguist, self).__init__()
@staticmethod
def annotate(path: str) -> dict:
"""Annotate file with its primary / first language metadata
:param path: file path in the repository
:return: metadata about language, file type, and purpose of file
"""
langs = LinguistLanguage.find_by_filename(path)
if len(langs) > 1:
logger.warning(f"LanguagesFromLinguist: Filename collision in filenames_lang for '{path}': {langs}")
language = langs[0]
language_name = language.name
file_type = language.type
file_purpose = Languages._path2purpose(path, file_type)
return {
"language": language_name,
"type": file_type,
"purpose": file_purpose,
}
# configure logging
logger = logging.getLogger(__name__)
T = TypeVar('T')
PathLike = TypeVar("PathLike", str, bytes, Path, os.PathLike)
LineCallback = Callable[[dict[str, str], Iterable[tuple]], str]
OptionalLineCallback = Optional[LineCallback]
PURPOSE_TO_ANNOTATION = {"documentation": "documentation"}
"""Defines when purpose of the file is propagated to line annotation, without parsing"""
TRANSLATION_TABLE = str.maketrans("", "", "*/\\\t\n")
LANGUAGES = Languages()
LEXER = Lexer()
compute_patch_sizes_and_spreads: bool = True
def line_ends_idx(text: str) -> list[int]:
"""Return position+1 for each newline in text
This way each line can be extracted with text[pos[i-1]:pos[i]].
>>> example_text = "123\\n56\\n"
>>> line_ends_idx(example_text)
[4, 7]
>>> example_text[0:4]
'123\\n'
>>> example_text[4:7]
'56\\n'
:param text: str to process
:return: list of positions after end of line characters
"""
return [i for i, ch in enumerate(text, start=1)
if ch == '\n']
def split_multiline_lex_tokens(tokens_unprocessed: Iterable[T]) -> Generator[T, None, None]:
"""Split multiline tokens into individual lines
:param tokens_unprocessed: Result of calling `get_tokens_unprocessed(text)`
method on a `pygments.lexer.Lexer` instance. This is an iterable
of (index, token_type, value) tuples, where index is the starting
position of the token within the input text.
:return: An iterable of (index, token_type, value) tuples, where `index`
is the starting position of `value` in the input text, and each
`value` contains at most one newline.
"""
for index, token_type, text_fragment in tokens_unprocessed:
lines = text_fragment.splitlines(keepends=True)
if len(lines) <= 1:
# no need for splitting, return original
yield index, token_type, text_fragment
else:
# split into lines, updating the index
running_count = 0
for line in lines:
yield index+running_count, token_type, line
running_count += len(line)
def group_tokens_by_line(code: str, tokens: Iterable[T]) -> dict[int, list[T]]:
"""Group tokens by line in code
For each line in the source `code`, find all `tokens` that belong
to that line, and group tokens by line. **Note** that `tokens` must
be result of parsing `code`.
:param code: Source code text that was parsed into tokens
:param tokens: An iterable of (index, token_type, value) tuples,
preferably with `value` split into individual lines with the
help of `split_multiline_lex_tokens` function.
:return: mapping from line number in `code` to list of tokens
in that line
"""
tokens_deque = deque(tokens)
idx_code = line_ends_idx(code)
# handle special case where `code` does not end in '\n' (newline)
# otherwise the last (and incomplete) line would be dropped
len_code = len(code)
if len_code not in idx_code:
idx_code.append(len_code)
line_tokens = defaultdict(list)
for no, idx in enumerate(idx_code):
while tokens_deque:
token = tokens_deque.popleft()
if token[0] < idx:
line_tokens[no].append(token)
else:
tokens_deque.appendleft(token)
break
return line_tokens
def front_fill_gaps(data: dict[int, T]) -> dict[int, T]:
"""Fill any gaps in `data` keys with previous value
>>> front_fill_gaps({1: '1', 3: '3'})
{1: '1', 2: '1', 3: '3'}
:param data: Input data - dictionary with int keys
:return: Front filled input data
"""
if not data:
return {}
# Find the minimum and maximum keys
min_key = min(data.keys())
max_key = max(data.keys())
# Create a new dictionary to store the result
filled_dict = {}
# Initialize the previous value
previous_value = None
# Iterate through the range of keys
for key in range(min_key, max_key + 1):
if key in data:
previous_value = data[key]
filled_dict[key] = previous_value
return filled_dict
def deep_update(d: dict, u: collections.abc.Mapping) -> dict:
"""Update nested dictionary of varying depth
Update dict `d` with the contents of dict `u`, without overwriting
deeply nested levels in input dictionary `d`. **Note** that this
would also extend `d` with new keys from `u`.
:param d: dict to update
:param u: data to update with
:return: updated input dict
"""
# modified from https://stackoverflow.com/a/3233356/46058
# see also https://github.com/pydantic/pydantic/blob/v2.7.4/pydantic/_internal/_utils.py#L103
for k, v in u.items():
if isinstance(v, collections.abc.Mapping):
d[k] = deep_update(d.get(k, {}), v)
elif isinstance(v, collections.abc.MutableSequence):
list_value = d.get(k, [])
list_value.extend(v)
d[k] = list_value
else:
d[k] = v
return d
def clean_text(text: str) -> str:
ret = text.translate(TRANSLATION_TABLE)
ret = re.sub(pattern=r'\s+', repl=' ', string=ret)
return ret
def line_is_empty(tokens_list: Iterable[tuple]) -> bool:
"""Given results of parsing a line, find if it is empty
:param tokens_list: An iterable of (index, token_type, text_fragment) tuples,
supposedly created by parsing some line of source code text
:return: Whether set of tokens in `tokens_list` can be all
considered to come from empty line
"""
tokens_list = list(tokens_list)
return len(tokens_list) == 1 and (tokens_list[0][2] == '\n' or tokens_list[0][2] == '\r\n')
def line_is_whitespace(tokens_list: Iterable[tuple]) -> bool:
"""Given results of parsing a line, find if it consists only of whitespace tokens
:param tokens_list: An iterable of (index, token_type, text_fragment) tuples,
supposedly created by parsing some line of source code text
:return: Whether set of tokens in `tokens_list` are all
whitespace tokens
"""
return all([token_type in Token.Text.Whitespace or
token_type in Token.Text and text_fragment.isspace()
for _, token_type, text_fragment in tokens_list])
def line_is_comment(tokens_list: Iterable[tuple]) -> bool:
"""Given results of parsing line, find if it is comment
:param tokens_list: An iterable of (index, token_type, text_fragment) tuples,
supposedly from parsing some line of source code text
:return: Whether set of tokens in `tokens_list` can be all
considered to be a comment
"""
can_be_comment = False
cannot_be_comment = False
for _, token_type, text_fragment in tokens_list:
if token_type in Token.Comment:
can_be_comment = True
elif token_type in Token.Literal.String.Doc:
# docstrings are considered documentation / comments
can_be_comment = True
elif token_type in Token.Text.Whitespace:
# white space in line is also ok, but only whitespace is not a comment
pass # does not change the status f the line
elif token_type in Token.Text and text_fragment.isspace(): # just in case
# white space in line is also ok, but only whitespace is not a comment
pass # does not change the status of the line
else:
# other tokens
cannot_be_comment = True
break
return can_be_comment and not cannot_be_comment
def purpose_to_default_annotation(file_purpose: str) -> str:
"""Mapping from file purpose to default line annotation"""
return "code" if file_purpose == "programming" else file_purpose
class AnnotatedPatchSet:
"""Annotations for whole patch / diff
:ivar patch_set: original unidiff.PatchSet or diffannotator.git.ChangeSet
:ivar repo: optionally, the repository diffannotator.git.ChangeSet came from"""
def __init__(self,
patch_set: Union[ChangeSet, unidiff.PatchSet],
repo: Optional[GitRepo] = None):
"""Initialize AnnotatedPatchSet with unidiff.PatchSet (or derived class)
:param patch_set: parsed unified diff (if unidiff.PatchSet),
or parsed commit changes and parsed commit metadata (if ChangeSet)
:param repo: the Git repository the `patch_set` (ChangeSet)
came from
"""
self.patch_set = patch_set
self.repo = repo
# builder pattern
def add_repo(self, repo: GitRepo) -> 'AnnotatedPatchSet':
"""Add the Git repository the patch (supposedly) came from
**NOTE:** Modifies self, and returns modified object.
:param repo: the Git repository connected to self / the patchset
:return: changed object, to enable flow/builder pattern
"""
self.repo = repo
return self
@property
def commit_id(self) -> Optional[str]:
if isinstance(self.patch_set, ChangeSet):
return self.patch_set.commit_id
else:
return getattr(self.patch_set, 'commit_id', None)
@classmethod
def from_filename(cls, filename: Union[str, Path], encoding: str = unidiff.DEFAULT_ENCODING,
errors: Optional[str] = None, newline: Optional[str] = None,
missing_ok: bool = False,
ignore_diff_parse_errors: bool = True,) -> Optional['AnnotatedPatchSet']:
"""Return a AnnotatedPatchSet instance given a diff filename
:param filename: path to the patch file (diff file) to try to parse
(absolute or relative to the current working directory)
:param encoding: name of the encoding used to decode the file,
defaults to "UTF-8"
:param errors: optional string that specifies how decoding errors
are to be handled; see documentation of `open` function for list
of possible values, see: https://docs.python.org/3/library/functions.html#open
:param newline: determines how to parse newline characters from the stream;
see documentation of `open` function for possible values
:param missing_ok: if false (the default), `FileNotFoundError` is raised
if the path does not exist, and `PermissionError` is raised if file
exists but cannot be read because of path permissions; if `missing_ok` is true,
return None on missing file, or file with wrong permissions
:param ignore_diff_parse_errors: if false (the default), `unidiff.UnidiffParseError`
is raised if there was error parsing the unified diff; if true, return None
on parse errors
:return: wrapped result of parsing patch file `filename`
"""
# NOTE: unconditionally using `file_path = Path(filename)` would simplify some code
try:
patch_set = ChangeSet.from_filename(filename, encoding=encoding,
errors=errors, newline=newline)
except FileNotFoundError as ex:
logger.error(f"No such patch file: '{filename}'")
if not missing_ok:
raise ex
return None
except PermissionError as ex:
if Path(filename).exists() and Path(filename).is_dir():
logger.error(f"Path points to directory, not patch file: '{filename}'")
else:
logger.error(f"Permission denied to read patch file '{filename}'")
if not missing_ok:
raise ex
return None
except unidiff.UnidiffParseError as ex:
logger.error(msg=f"Error parsing patch file '{filename}'", exc_info=True)
if not ignore_diff_parse_errors:
raise ex
return None
return cls(patch_set)
def compute_sizes_and_spreads(self) -> Counter:
"""Compute patch set sizes and spread
See the detailed description of returned metrics in docstring
for `AnnotatedPatchedFile.compute_sizes_and_spreads`.
:return: Counter with different sizes and different spreads
of the given patch set (unified diff object, or diff file)
"""
result = Counter()
#print(f"patched file: {self.patched_file!r}")
patched_file: unidiff.PatchedFile
for patched_file in self.patch_set:
annotated_file = AnnotatedPatchedFile(patched_file)
file_result = annotated_file.compute_sizes_and_spreads()
result += file_result
return result
def process(self,
sizes_and_spreads: bool = False,
ignore_annotation_errors: bool = True):
"""Process wrapped patch set, annotating changes for patched files
Returns mapping from filename to pre- and post-image
line annotations. The pre-image line annotations use "-" as key,
while post-image use "+".
The format of returned values is described in more detail
in `AnnotatedHunk.process()` documentation.
TODO: Update and returns the `self.patch_set_data` field (caching results).
:param sizes_and_spreads: if true, compute also various metrics
for patch size and for patch spread with `compute_sizes_and_spreads`
:param ignore_annotation_errors: if true (the default), ignore errors during
patch annotation process
:return: annotated patch data, mapping from changed file names
to '+'/'-', to annotated line info (from post-image or pre-image)
:rtype: dict[str, dict[str, dict | list | str]]
"""
i: Optional[int] = None
patch_annotations: dict[str, Union[dict[str, Union[str, dict]], Counter]] = {}
# once per changeset: extracting the commit id and commit metadata
patch_id: Optional[str] = None
# TODO: make '' into a constant, like UNKNOWN_ID, reducing duplication
if isinstance(self.patch_set, ChangeSet) and self.patch_set.commit_id != '':
patch_id = self.patch_set.commit_id
commit_metadata = {'id': patch_id}
if self.patch_set.commit_metadata is not None:
commit_metadata.update(self.patch_set.commit_metadata)
patch_annotations['commit_metadata'] = commit_metadata
# helpers to get contents of pre-image and post-image files
src_commit: Optional[str] = None
dst_commit: Optional[str] = None
if self.repo is not None and patch_id is not None:
if self.repo.is_valid_commit(patch_id):
dst_commit = patch_id
if self.repo.is_valid_commit(f"{patch_id}^"):
src_commit = f"{patch_id}^"
# TODO?: Consider moving the try ... catch ... inside the loop
try:
# for each changed file
patched_file: unidiff.PatchedFile
for i, patched_file in enumerate(self.patch_set, start=1):
# create AnnotatedPatchedFile object from i-th changed file in patchset
annotated_patch_file = AnnotatedPatchedFile(patched_file)
# add sources, if repo is available, and they are available from repo
src: Optional[str] = None
dst: Optional[str] = None
if self.repo is not None:
# we need real name, not prefixed with "a/" or "b/" name unidiff.PatchedFile provides
# TODO?: use .is_added_file and .is_removed_file unidiff.PatchedFile properties, or
# TODO?: or use unidiff.DEV_NULL / unidiff.constants.DEV_NULL
if src_commit is not None and annotated_patch_file.source_file != "/dev/null":
src = self.repo.file_contents(src_commit, annotated_patch_file.source_file)
if dst_commit is not None and annotated_patch_file.target_file != "/dev/null":
dst = self.repo.file_contents(dst_commit, annotated_patch_file.target_file)
annotated_patch_file.add_sources(src=src, dst=dst)
# add annotations from i-th changed file
if 'changes' not in patch_annotations:
patch_annotations['changes'] = {}
patch_annotations['changes'].update(annotated_patch_file.process())
if sizes_and_spreads:
patch_annotations['diff_metadata'] = self.compute_sizes_and_spreads()
except Exception as ex:
#print(f"Error processing patch {self.patch_set!r}, at file no {i}: {ex!r}")
#traceback.print_tb(ex.__traceback__)
logger.error(msg=f"Error processing patch {self.patch_set!r}, at file no {i}",
exc_info=True)
if not ignore_annotation_errors:
raise ex
# returns what it was able to process so far
return patch_annotations
class AnnotatedPatchedFile:
"""Annotations for diff for a single file in a patch
It includes metadata about the programming language associated with
the changed/patched file.
Note that major part of the annotation process is performed on demand,
during the `process()` method call.
Fixes some problems with `unidiff.PatchedFile`
:ivar patched_file: original `unidiff.PatchedFile` to be annotated
:ivar source_file: name of source file (pre-image name),
without the "a/" prefix from diff / patch
:ivar target_file: name of target file (post-image name),
without the "b/" prefix from diff / patch
:ivar patch_data: gathers patch files and changed patch lines
annotations; mapping from file name to gathered data
"""
# NOTE: similar signature to line_is_comment, but returning str
# TODO: store this type as TypeVar to avoid code duplication
line_callback: OptionalLineCallback = None
@staticmethod
def make_line_callback(code_str: str) -> OptionalLineCallback:
"""Create line callback function from text of its body
Example of creating a no-op callback:
>>> AnnotatedPatchedFile.line_callback = AnnotatedPatchedFile.make_line_callback("return None")
:param code_str: text of the function body code
:return: callback function or None
"""
#print(f"RUNNING make_line_callback(code_str='{code_str[:6]}[...]')")
if not code_str:
return None
match = re.match(pattern=r"def\s+(?P<func_name>\w+)"
r"\("
r"(?P<param1>\w+)(?P<type_info1>\s*:\s*[^)]*?)?"
r",\s*"
r"(?P<param2>\w+)(?P<type_info2>\s*:\s*[^)]*?)?"
r"\)"
r"\s*(?P<rtype_info>->\s*[^:]*?\s*)?:\s*$",
string=code_str, flags=re.MULTILINE)
if match:
# or .info(), if it were not provided extra debugging data
logger.debug("Found function definition in callback code string:", match.groupdict())
#print(f" Found function definition in callback code string:")
#print(f" {match.groupdict()}")
callback_name = match.group('func_name')
callback_code_str = code_str
else:
# or .info(), if it were not provided full text of the callback body
logger.debug("Using provided code string as body of callback function", code_str)
#print(f" Using provided code string as body (first 50 characters):")
#print(f" {code_str[:50]}")
#print(f" {match=}")
callback_name = "_line_callback"
callback_code_str = (f"def {callback_name}(file_data, tokens):\n" +
" " + "\n ".join(code_str.splitlines()) + "\n")
# TODO?: wrap with try: ... except SyntaxError: ...
exec(callback_code_str, globals())
return locals().get(callback_name,
globals().get(callback_name,
None))
def __init__(self, patched_file: unidiff.PatchedFile):
"""Initialize AnnotatedPatchedFile with PatchedFile
Retrieve pre-image and post-image names of the changed file
(cleaning them up by removing the "a/" or "B/" prefixes, if
needed; unidiff does that for .path getter, if it is modern
enough).
TODO: handle c-quoted filenames, e.g. '"przyk\305\202ad"'
for 'przykład'.
Retrieves information about programming language and purpose
of the file based solely on the pathname of a source and of
a target file, using the :mod:`languages` module.
:param patched_file: patched file data parsed from unified diff
"""
self.patch_data: dict[str, dict] = defaultdict(lambda: defaultdict(list))
# save original diffutils.PatchedFile
self.patched_file: unidiff.PatchedFile = patched_file
# get the names and drop "a/" and "b/"
self.source_file: str = patched_file.source_file
self.target_file: str = patched_file.target_file
if self.source_file[:2] == "a/":
self.source_file = patched_file.source_file[2:]
if self.target_file[:2] == "b/":
self.target_file = patched_file.target_file[2:]
# add language metadata (based on filename only!)
source_meta_dict = LANGUAGES.annotate(self.source_file)
self.patch_data[self.source_file].update(source_meta_dict)
if self.source_file != self.target_file:
target_meta_dict = LANGUAGES.annotate(self.target_file)
self.patch_data[self.target_file].update(target_meta_dict)
# place to hold pre-image and post-image, if available
self.source: Optional[str] = None
self.target: Optional[str] = None
# cache to hold the result of lexing pre-image/post-image
self.source_tokens: Optional[dict[int, list[tuple]]] = None
self.target_tokens: Optional[dict[int, list[tuple]]] = None
# builder pattern
def add_sources(self, src: str, dst: str) -> 'AnnotatedPatchedFile':
"""Add pre-image and post-image of a file at given diff
**NOTE:** Modifies self, and returns modified object.
Example:
>>> from diffannotator.annotate import AnnotatedPatchedFile
>>> import unidiff
>>> patch_path = 'tests/test_dataset_structured/keras-10/patches/c1c4afe60b1355a6c0e83577791a0423f37a3324.diff'
>>> patch_set = unidiff.PatchSet.from_filename(patch_path, encoding="utf-8")
>>> patched_file = AnnotatedPatchedFile(patch_set[0]).add_sources("a", "b")
>>> patched_file.source
'a'
>>> patched_file.target
'b'
:param src: pre-image contents of patched file
:param dst: post-image contents of patched file
:return: changed object, to enable flow/builder pattern
"""
self.source = src
self.target = dst
return self
def add_sources_from_files(self,
src_file: Path,
dst_file: Path) -> 'AnnotatedPatchedFile':
"""Read pre-image and post-image for patched file at given diff
**NOTE:** Modifies self, adding contents of files, and returns modified
object.
Example:
>>> from diffannotator.annotate import AnnotatedPatchedFile
>>> import unidiff
>>> from pathlib import Path
>>> patch_path = 'tests/test_dataset_structured/keras-10/patches/c1c4afe60b1355a6c0e83577791a0423f37a3324.diff'
>>> patch_set = unidiff.PatchSet.from_filename(patch_path, encoding="utf-8")
>>> patched_file = AnnotatedPatchedFile(patch_set[0])
>>> files_path = Path('tests/test_dataset_structured/keras-10/files')
>>> src_path = files_path / 'a' / Path(patched_file.source_file).name
>>> dst_path = files_path / 'b' / Path(patched_file.target_file).name
>>> patched_file_with_sources = patched_file.add_sources_from_files(src_file=src_path, dst_file=dst_path)
>>> patched_file_with_sources.source.splitlines()[2]
'from __future__ import absolute_import'
:param src_file: path to pre-image contents of patched file
:param dst_file: path to post-image contents of patched file
:return: changed object
"""
return self.add_sources(
src_file.read_text(encoding="utf-8"),
dst_file.read_text(encoding="utf-8")
)
def image_for_type(self, line_type: Literal['-','+']) -> Optional[str]:
"""Return pre-image for '-', post-image for '+', if available
:param line_type: denotes line type, e.g. line.line_type from unidiff
:return: pre-image or post-image, or None if pre/post-images are not set
"""
if line_type == unidiff.LINE_TYPE_REMOVED: # '-'
return self.source
elif line_type == unidiff.LINE_TYPE_ADDED: # '+'
return self.target
else:
raise ValueError(f"value must be '-' or '+', got {line_type!r}")
def tokens_for_type(self, line_type: Literal['-','+']) -> Optional[dict[int, list[tuple]]]:
"""Run lexer on a pre-image or post-image contents, if available
Returns (cached) result of lexing pre-image for `line_type` '-',
and of post-image for line type '+'.
The pre-image and post-image contents of patched file should / can
be provided with the help of `add_sources()` or `add_sources_from_files()`
methods.
:param line_type: denotes line type, e.g. line.line_type from unidiff;
must be one of '+' or '-'.
:return: post-processed result of lexing, split into lines,
if there is pre-/post-image file contents available.
"""
# return cached value, if available
if line_type == unidiff.LINE_TYPE_REMOVED: # '-'
if self.source_tokens is not None:
return self.source_tokens
contents = self.source
file_path = self.source_file
elif line_type == unidiff.LINE_TYPE_ADDED: # '+'
if self.target_tokens is not None:
return self.target_tokens
contents = self.target
file_path = self.target_file
else:
raise ValueError(f"value must be '-' or '+', got {line_type!r}")
# return None if source code is not available for lexing
if contents is None:
return None
# lex selected contents (same as in main process() method)
tokens_list = LEXER.lex(file_path, contents)
tokens_split = split_multiline_lex_tokens(tokens_list)
tokens_group = group_tokens_by_line(contents, tokens_split)
# just in case, it should not be needed
tokens_group = front_fill_gaps(tokens_group)
# save/cache computed data
if line_type == unidiff.LINE_TYPE_REMOVED: # '-'
self.source_tokens = tokens_group
elif line_type == unidiff.LINE_TYPE_ADDED: # '+'
self.target_tokens = tokens_group
# return computed result
return tokens_group
def tokens_range_for_type(self, line_type: Literal['-','+'],
start_line: int, length: int) -> Optional[dict[int, list[tuple]]]:
"""Lexing results for given range of lines, or None if no pre-/post-image
The pre-image and post-image contents of patched file should / can
be provided with the help of `add_sources()` or `add_sources_from_files()`
methods.
The result is mapping from line number of the pre- or post-image
contents, counting from 1 (the same as diff and unidiff), to the list
of tokens corresponding to the line in question.
:param line_type: denotes line type, e.g. line.line_type from unidiff;
must be one of '-' (unidiff.LINE_TYPE_REMOVED) or '+' (unidiff.LINE_TYPE_ADDED).
:param start_line: starting line number in file, counting from 1
:param length: number of lines to return results for,
starting from `start_line`
:return: post-processed result of lexing, split into lines,
if there is pre-/post-image file contents available;
None if there is no pre-/post-image contents attached.
"""
tokens_list = self.tokens_for_type(line_type=line_type)
if tokens_list is None:
return None
# Iterable might be not subscriptable, that's why there is list() here
# TODO: check if it is correct (0-based vs 1-based subscripting)
return {
line_no+1: line_tokens
for line_no, line_tokens in tokens_list.items()
if line_no+1 in range(start_line, (start_line + length))
}
def hunk_tokens_for_type(self, line_type: Literal['-','+'],
hunk: Union[unidiff.Hunk, 'AnnotatedHunk']) -> Optional[dict[int, list[tuple]]]:
"""Lexing results for removed ('-')/added ('+') lines in hunk, if possible
The pre-image and post-image contents of patched file should / can
be provided with the help of `add_sources()` or `add_sources_from_files()`
methods. If this contents is not provided, this method returns None.
The result is mapping from line number of the pre- or post-image
contents, counting from 1 (the same as diff and unidiff), to the list
of tokens corresponding to the line in question.
:param line_type: denotes line type, e.g. line.line_type from unidiff;
must be one of '-' (unidiff.LINE_TYPE_REMOVED) or '+' (unidiff.LINE_TYPE_ADDED).
:param hunk: block of changes in fragment of diff corresponding
to changed file, either unidiff.Hunk or annotate.AnnotatedHunk
:return: post-processed result of lexing, split into lines,
if there is pre-/post-image file contents available;
None if there is no pre-/post-image contents attached.
"""
tokens_list = self.tokens_for_type(line_type=line_type)
if tokens_list is None:
return None
if isinstance(hunk, AnnotatedHunk):
hunk = hunk.hunk
result = {}
for hunk_line_no, line in enumerate(hunk):
if line.line_type != line_type:
continue
# NOTE: first line of file is line number 1, not 0, according to (uni)diff
# but self.tokens_for_type(line_type) returns 0-based indexing
line_no = line.source_line_no if line_type == unidiff.LINE_TYPE_REMOVED else line.target_line_no
# first line is 1; first element has index 0
result[hunk_line_no] = tokens_list[line_no - 1]
return result
def compute_sizes_and_spreads(self) -> Counter:
"""Compute sizes and spread for patched file in diff/patch
Computes the following metrics:
- patched file sizes:
- total number of hunks (in the unified diff meaning),
as 'n_hunks'
- total number of modified, added and removed lines for patched file, counting
a pair of adjacent removed and added line as single modified line,
as 'n_mod', 'n_rem', and 'n_add'
- total number of changed lines: sum of number of modified, added, and removed,
as 'patch_size'
- total number of '+' and '-' lines in hunks of patched file (without extracting modified lines),
as 'n_lines_added', 'n_lines_removed'
- number of all lines in all hunks of patched file, including context lines,
but excluding hunk headers and patched file headers, as 'n_lines_all'
- patched file spread
- total number of groups, i.e. spans of removed and added lines,
not interrupted by context line (also called "chunks"),
as 'n_groups'
- number of modified files, as 'n_files' (always 1)
- number of modified binary files, as 'n_binary_files' (either 0 or 1);
for those files there cannot beno information about "lines",
like the number of hunks, groups (chunks), etc.
- sum of distances in context lines between groups (chunks)
inside hunk, for all hunks in patched file, as 'spread_inner'
- sum of distances in lines between groups (chunks) for
a single changed patched file, measuring how wide across file
contents the patch spreads, as 'groups_spread'
:return: Counter with different sizes and different spreads
of the given changed file
"""
# Handle the case where there are no hunks of changed lines,
# for the case of change to the binary file:
# Binary files /dev/null and b/foo.gz differ
if len(self.patched_file) == 0:
return Counter({
'n_files': 1,
'n_binary_files': 1,
# TODO?: Do not add if value is 0
'n_added_files': int(self.patched_file.is_added_file),
'n_removed_files': int(self.patched_file.is_removed_file),
'n_file_renames': int(self.patched_file.is_rename),
})
result = Counter({
'n_files': 1,
'hunk_span_src':
# line number of last hunk - line number of first hunk in source (pre-image)
(self.patched_file[-1].source_start + self.patched_file[-1].source_length - 1
- self.patched_file[0].source_start),
'hunk_span_dst':
# line number of last hunk - line number of first hunk in target (post-image)
(self.patched_file[-1].target_start + self.patched_file[-1].target_length - 1
- self.patched_file[0].target_start),
})
if self.patched_file.is_added_file:
result['n_added_files'] = 1
elif self.patched_file.is_removed_file:
result['n_removed_files'] = 1
elif self.patched_file.is_rename:
result['n_file_renames'] = 1
#print(f"patched file: {self.patched_file!r}")
prev_hunk_info: Optional[dict] = None
inter_hunk_span = 0
hunk: unidiff.Hunk
for idx, hunk in enumerate(self.patched_file):
annotated_hunk = AnnotatedHunk(self, hunk, hunk_idx=idx)
hunk_result, hunk_info = annotated_hunk.compute_sizes_and_spreads()
#print(f"[{idx}] hunk: inner spread={hunk_result['spread_inner']:3d} "
# f"among {hunk_result['n_groups']} groups for {hunk!r}")
result += hunk_result
if prev_hunk_info is not None:
# there was previous hunk,
# computing hunk-to-hunk distance
# between pre- and post-image line numbers of end of previous hunk
# and pre- and post-image line numbers of beginning of current hunk
result['hunk_spread_src'] += hunk_info['hunk_start'][0] - prev_hunk_info['hunk_end'][0]
result['hunk_spread_dst'] += hunk_info['hunk_start'][1] - prev_hunk_info['hunk_end'][1]
# computing inter-hunk distance
# between last group in previous hunk
# and first group in the current hunk
prev_end_type = prev_hunk_info['type_last']
curr_beg_type = hunk_info['type_first']
# 1:-removed, 1st hunk, groups_end=1
# 2: context
# 3: context
# 4:-removed, 2nd hunk, groups_start=4
# 4-1 = 3, but there are 2 = 3-1 = 3-2+1 context lines
if prev_end_type == curr_beg_type:
#print(f"from group ending to starting in {prev_end_type}={curr_beg_type}")
if prev_end_type == unidiff.LINE_TYPE_REMOVED:
# removed line to removed line, can use pre-image line numbers
inter_hunk_span = hunk_info['groups_start'][0] - prev_hunk_info['groups_end'][0] - 1
elif prev_end_type == unidiff.LINE_TYPE_ADDED:
# added line to added line, can use post-image line numbers
inter_hunk_span = hunk_info['groups_start'][1] - prev_hunk_info['groups_end'][1] - 1
else:
#print(f"from group ending in {prev_end_type} to starting in {curr_beg_type}")
if prev_end_type == unidiff.LINE_TYPE_REMOVED:
# from removed line to next hunk start using pre-image line numbers
inter_hunk_span = hunk_info['hunk_start'][0] - prev_hunk_info['groups_end'][0] - 1
elif prev_end_type == unidiff.LINE_TYPE_ADDED:
# from added line to next hunk start using post-image line numbers
inter_hunk_span = hunk_info['hunk_start'][1] - prev_hunk_info['groups_end'][1] - 1
if curr_beg_type == unidiff.LINE_TYPE_REMOVED:
# from start of current hunk using pre-image line numbers to removed line
inter_hunk_span += hunk_info['groups_start'][0] - hunk_info['hunk_start'][0] # -1?
elif curr_beg_type == unidiff.LINE_TYPE_ADDED:
# from start of current hunk using post-image line numbers to added line
inter_hunk_span += hunk_info['groups_start'][1] - hunk_info['hunk_start'][1] # -1?
#print(f"inner={hunk_result['spread_inner']:2d}, inter={inter_hunk_span:2d} for "
# f"{hunk_info['type_first']}->{hunk_info['type_last']}:{hunk!r}")
result['groups_spread'] += hunk_result['spread_inner']
result['groups_spread'] += inter_hunk_span # will be 0 for the first hunk
# at the end of the loop
prev_hunk_info = hunk_info
return result
def process(self):
"""Process hunks in patched file, annotating changes
Returns single-element mapping from filename to pre- and post-image
line annotations. The pre-image line annotations use "-" as key,
while post-image use "+".
The format of returned values is described in more detail
in `AnnotatedHunk.process()` documentation.
Updates and returns the `self.patch_data` field.
:return: annotated patch data, mapping from changed file name
to '+'/'-', to annotated line info (from post-image or pre-image)
:rtype: dict[str, dict[str, dict]]
"""