Skip to content

Commit

Permalink
style: reformat unit test modules (#125)
Browse files Browse the repository at this point in the history
* style: refactor unit test modules

* refactor: use relative path for module import

* build: add __init__.py

* ci: modify pytest and flake8 calls

* ci: add black call

* style: correct style with black
  • Loading branch information
deliaBlue authored Dec 3, 2023
1 parent ddf82ce commit 618e082
Show file tree
Hide file tree
Showing 17 changed files with 1,350 additions and 977 deletions.
10 changes: 8 additions & 2 deletions .github/workflows/tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ jobs:
- name: flake8
working-directory: ./scripts
run: flake8 ./*.py
run: flake8

- name: mypy
working-directory: ./scripts
Expand All @@ -48,6 +48,12 @@ jobs:
working-directory: ./scripts
run: pylint --rcfile=../pylint.cfg ./*.py

- name: black
uses: psf/black@stable
with:
options: "--check --verbose --line-length=79"
src: "./scripts"


snakemake-format-graph-test:
runs-on: ubuntu-latest
Expand Down Expand Up @@ -149,4 +155,4 @@ jobs:
- name: run unit tests
working-directory: ./scripts/tests
run: pytest --cov=scripts --cov-branch --cov-report=term-missing
run: pytest --import-mode prepend --cov=scripts --cov-branch --cov-report=term-missing
1 change: 1 addition & 0 deletions scripts/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""MIRFLOWZ scripts."""
57 changes: 28 additions & 29 deletions scripts/filter_multimappers.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,29 +50,30 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description="Script to filter multimappers by indel counts."
)
)

parser.add_argument(
'-v', '--version',
action='version',
version='%(prog)s 1.0',
help="Show program's version number and exit"
)
"-v",
"--version",
action="version",
version="%(prog)s 1.0",
help="Show program's version number and exit",
)

parser.add_argument(
'infile',
"infile",
help="Path to the SAM input file, sorted by query name.",
type=Path
)
type=Path,
)

parser.add_argument(
'--nh',
"--nh",
help=(
"If set, the NH tag will be include in the alignment name after"
"and underscore. Default: %(default)s."
),
action='store_true',
default=False
action="store_true",
default=False,
)

return parser
Expand Down Expand Up @@ -101,7 +102,7 @@ def count_indels(aln: pysam.libcalignedsegment.AlignedSegment) -> int:


def find_best_alignments(
alns: List[pysam.AlignedSegment], nh: bool = False
alns: List[pysam.AlignedSegment], nh: bool = False
) -> List[pysam.AlignedSegment]:
"""Find alignments with less indels.
Expand Down Expand Up @@ -129,20 +130,18 @@ def find_best_alignments(
aln_indels = [(aln, count_indels(aln=aln)) for aln in alns]
min_indels = min(aln_indels, key=lambda x: x[1])[1]
best_alignments = [
aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels]
aln
for i, (aln, indels) in enumerate(aln_indels)
if indels == min_indels
]

for i, best_aln in enumerate(best_alignments):

if nh:
name = (
f'{best_aln.query_name}_{len(best_alignments)}'
)
name = f"{best_aln.query_name}_{len(best_alignments)}"
best_aln.query_name = name

best_aln.set_tag('NH', len(best_alignments))
best_aln.set_tag('HI', i + 1)
best_aln.set_tag("NH", len(best_alignments))
best_aln.set_tag("HI", i + 1)

return best_alignments

Expand All @@ -154,20 +153,18 @@ def write_output(alns: List[pysam.AlignedSegment]) -> None:
alignments: alignments with the same query name
"""
for alignment in alns:
sys.stdout.write(alignment.to_string() + '\n')
sys.stdout.write(alignment.to_string() + "\n")


def main(arguments) -> None:
"""Filter multimappers by indels count."""
with pysam.AlignmentFile(arguments.infile, "r") as samfile:

sys.stdout.write(str(samfile.header))

current_alignments: list[pysam.AlignedSegment] = []
current_query = None

for alignment in samfile:

if alignment.is_supplementary:
continue

Expand All @@ -178,16 +175,18 @@ def main(arguments) -> None:
current_alignments.append(alignment)

else:
current_alignments = find_best_alignments(current_alignments,
arguments.nh)
current_alignments = find_best_alignments(
current_alignments, arguments.nh
)
write_output(alns=current_alignments)

current_query = alignment.query_name
current_alignments = [alignment]

if len(current_alignments) > 0:
current_alignments = find_best_alignments(current_alignments,
arguments.nh)
current_alignments = find_best_alignments(
current_alignments, arguments.nh
)
write_output(alns=current_alignments)


Expand Down
117 changes: 66 additions & 51 deletions scripts/iso_name_tagging.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,70 +92,73 @@ def parse_arguments():
"""Command-line arguments parser."""
parser = argparse.ArgumentParser(
description=__doc__,
formatter_class=argparse.RawDescriptionHelpFormatter
formatter_class=argparse.RawDescriptionHelpFormatter,
)
parser.add_argument(
'-v', '--version',
action='version',
version='%(prog)s 1.0.0',
help="Show program's version number and exit"
"-v",
"--version",
action="version",
version="%(prog)s 1.0.0",
help="Show program's version number and exit",
)
parser.add_argument(
'-b', '--bed',
"-b",
"--bed",
help=(
"Path to the BED file. This file must be the output of "
" a bedtools intersect call with -a being a GFF3 file and"
" -b a BAM file."
),
type=Path,
required=True
required=True,
)
parser.add_argument(
'-s', '--sam',
"-s",
"--sam",
help="Path to the SAM input file.",
type=Path,
required=True
required=True,
)
parser.add_argument(
'-e', '--extension',
"-e",
"--extension",
help=(
"Number of nucleotides the start and end coordinates of the"
" annotated features had been extended. Default: %(default)d."
),
default=0,
type=int
type=int,
)
parser.add_argument(
'--id',
"--id",
help=(
"ID used to identify the feature in the name that is added as tag."
" The ID must be in lowercase. Default: %(default)s."
),
default="name",
type=str
type=str,
)

return parser


def attributes_dictionary(attr: str) -> Dict[str, str]:
"""Create attributes dicctionary."""
pairs = attr.split(';')
pairs = attr.split(";")

if len(pairs[0].split('=')) == 2:
attr_dict = {p.split('=')[0].lower(): p.split('=')[1] for p in pairs}
if len(pairs[0].split("=")) == 2:
attr_dict = {p.split("=")[0].lower(): p.split("=")[1] for p in pairs}
else:
attr_dict = {
p.split('"')[0].strip().lower(): p.split('"')[1]
for p in pairs}
p.split('"')[0].strip().lower(): p.split('"')[1] for p in pairs
}

return attr_dict


def parse_intersect_output(
intersect_file: Path,
ID: str = "name",
extension: int = 0) -> Optional[Dict[Optional[str], list]]:
intersect_file: Path, ID: str = "name", extension: int = 0
) -> Optional[Dict[Optional[str], list]]:
"""Parse intersect BED file.
Given a BED file generated by intersecting a GFF file (-a) with a BAM file
Expand All @@ -176,25 +179,39 @@ def parse_intersect_output(
adjusted. Defaults to 0.
"""
intersect_data = defaultdict(list)
Fields = namedtuple('Fields', ("feat_chr", "source", "feat_type",
"feat_start", "feat_end", "feat_score",
"strand", "phase", "feat_attributes",
"read_chr", "read_start", "read_end",
"read_name", "read_score", "read_strand",
"overlap_len"))

with open(intersect_file, 'r', encoding="utf-8") as bedfile:
for line in bedfile:
Fields = namedtuple(
"Fields",
(
"feat_chr",
"source",
"feat_type",
"feat_start",
"feat_end",
"feat_score",
"strand",
"phase",
"feat_attributes",
"read_chr",
"read_start",
"read_end",
"read_name",
"read_score",
"read_strand",
"overlap_len",
),
)

fields = Fields(*line.strip().split('\t'))
with open(intersect_file, "r", encoding="utf-8") as bedfile:
for line in bedfile:
fields = Fields(*line.strip().split("\t"))

miRNA_name = attributes_dictionary(fields.feat_attributes)[ID]
miRNA_start = int(fields.feat_start) + extension
miRNA_end = int(fields.feat_end) - extension

intersect_data[fields.read_name].append((miRNA_name,
miRNA_start,
miRNA_end))
intersect_data[fields.read_name].append(
(miRNA_name, miRNA_start, miRNA_end)
)

if not intersect_data:
return None
Expand All @@ -203,10 +220,8 @@ def parse_intersect_output(


def get_tags(
intersecting_mirna: list,
alignment: pysam.AlignedSegment,
extension: int
) -> set:
intersecting_mirna: list, alignment: pysam.AlignedSegment, extension: int
) -> set:
"""Get tag for alignment.
Given an alignment and a list containing the feature name, start position,
Expand All @@ -233,7 +248,7 @@ def get_tags(
set of strings containing the new tag
"""
cigar = alignment.cigarstring
md = alignment.get_tag('MD')
md = alignment.get_tag("MD")
limit = extension + 1

tags = []
Expand All @@ -243,19 +258,18 @@ def get_tags(
shift_3p = alignment.reference_end - miRNA_end

if -limit < shift_5p < limit and -limit < shift_3p < limit:
tags.append(f'{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}')
tags.append(f"{miRNA_name}|{shift_5p}|{shift_3p}|{cigar}|{md}")

return set(tags)


def main(arguments) -> None:
"""Add intersecting feature(s) into a SAM file as a tag."""
intersect_data = parse_intersect_output(arguments.bed,
arguments.id,
arguments.extension)

with pysam.AlignmentFile(arguments.sam, 'r') as samfile:
intersect_data = parse_intersect_output(
arguments.bed, arguments.id, arguments.extension
)

with pysam.AlignmentFile(arguments.sam, "r") as samfile:
sys.stdout.write(str(samfile.header))

if intersect_data is None:
Expand All @@ -265,15 +279,16 @@ def main(arguments) -> None:
alignment_id = alignment.query_name
intersecting_miRNAs = intersect_data[alignment_id]

tags = get_tags(intersecting_mirna=intersecting_miRNAs,
alignment=alignment,
extension=arguments.extension)
tags = get_tags(
intersecting_mirna=intersecting_miRNAs,
alignment=alignment,
extension=arguments.extension,
)

alignment.set_tag('YW', ';'.join(tags))
sys.stdout.write(alignment.to_string() + '\n')
alignment.set_tag("YW", ";".join(tags))
sys.stdout.write(alignment.to_string() + "\n")


if __name__ == "__main__":

args = parse_arguments().parse_args() # pragma: no cover
main(args) # pragma: no cover
Loading

0 comments on commit 618e082

Please sign in to comment.