Skip to content

Commit

Permalink
feat: use pybwa (#113)
Browse files Browse the repository at this point in the history
  • Loading branch information
nh13 authored Jan 22, 2025
1 parent adbae97 commit e2dc14a
Show file tree
Hide file tree
Showing 13 changed files with 223 additions and 522 deletions.
4 changes: 1 addition & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,7 @@

## Recommended Installation

The package `prymer` requires installation of [interactive `bwa`](https://github.com/fulcrumgenomics/bwa-aln-interactive).

To satisfy these requirements, it is recommended to install using [bioconda](https://bioconda.github.io/):
It is recommended to install using [bioconda](https://bioconda.github.io/):

```console
mamba install -c bioconda prymer
Expand Down
30 changes: 30 additions & 0 deletions poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 0 additions & 1 deletion prymer.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,4 @@ channels:
- bioconda
- conda-forge
dependencies:
- bioconda::bwa-aln-interactive=0.7.18
- conda-forge::python>=3.11.*
4 changes: 2 additions & 2 deletions prymer/offtarget/__init__.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
from prymer.offtarget.bwa import BwaAlnInteractive
from prymer.offtarget.bwa import Bwa
from prymer.offtarget.bwa import BwaHit
from prymer.offtarget.bwa import BwaResult
from prymer.offtarget.bwa import Query
from prymer.offtarget.offtarget_detector import OffTargetDetector
from prymer.offtarget.offtarget_detector import OffTargetResult

__all__ = [
"BwaAlnInteractive",
"Bwa",
"Query",
"BwaHit",
"BwaResult",
Expand Down
200 changes: 72 additions & 128 deletions prymer/offtarget/bwa.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
>>> from pathlib import Path
>>> ref_fasta = Path("./tests/offtarget/data/miniref.fa")
>>> query = Query(bases="TCTACTAAAAATACAAAAAATTAGCTGGGCATGATGGCATGCACCTGTAATCCCGCTACT", id="NA")
>>> bwa = BwaAlnInteractive(ref=ref_fasta, max_hits=1)
>>> bwa = Bwa(ref=ref_fasta, max_hits=1)
>>> result = bwa.map_one(query=query.bases, id=query.id)
>>> result.hit_count
1
Expand All @@ -38,34 +38,28 @@
>>> query = Query(bases="AAAAAA", id="NA")
>>> bwa.map_all(queries=[query])
[BwaResult(query=Query(id='NA', bases='AAAAAA'), hit_count=3968, hits=[])]
>>> bwa.close()
True
```
""" # noqa: E501

import logging
import os
import subprocess
from dataclasses import dataclass
from pathlib import Path
from threading import Thread
from typing import ClassVar
from typing import Optional
from typing import cast

import pysam
from fgpyo import sequence
from fgpyo.sam import Cigar
from pybwa import BwaAln
from pybwa import BwaAlnOptions
from pysam import AlignedSegment
from pysam import AlignmentHeader
from typing_extensions import override
from pysam import AlignmentFile
from pysam.libcalignmentfile import AlignmentHeader
from pysam.libcfaidx import FastxRecord

from prymer.api import coordmath
from prymer.util.executable_runner import ExecutableRunner

BWA_EXECUTABLE_NAME: str = "bwa-aln-interactive"
"""The executable name for the interactive build of bwa aln."""


@dataclass(init=True, frozen=True)
Expand Down Expand Up @@ -194,29 +188,40 @@ class BwaResult:
MAX_GAP_OPENS: int = 0
"""The default maximum number of gap opens allowed in the full query sequence"""

MAX_GAP_EXTENSIONS: int = -1
MAX_GAP_EXTENSIONS: int | None = None
"""The default maximum number of gap extensions allowed in the full query sequence"""

MIN_INDEL_TO_END_DISTANCE: int = 3
"""Do not place an indel within this many bp of the ends of the query sequence"""

SEED_LENGTH: int = 20
"""The default length of the seed region"""

BWA_AUX_EXTENSIONS: list[str] = [".amb", ".ann", ".bwt", ".pac", ".sa"]
"""The file extensions for BWA index files"""


class BwaAlnInteractive(ExecutableRunner):
"""Wrapper around a novel mode of 'bwa aln' that allows for "interactive" use of bwa to keep
the process running and be able to send it chunks of reads periodically and get alignments
back without waiting for a full batch of reads to be sent.
def _available_cores() -> int:
"""Determine the number of available cores."""
cores: int = 1
try:
from os import sched_getaffinity

cores = len(sched_getaffinity(0))
except ImportError:
import multiprocessing

See:
cores = multiprocessing.cpu_count()
except NotImplementedError:
cores = os.cpu_count() or cores

- [https://github.com/fulcrumgenomics/bwa-aln-interactive](https://github.com/fulcrumgenomics/bwa-aln-interactive)
- [https://bioconda.github.io/recipes/bwa-aln-interactive/README.html](https://bioconda.github.io/recipes/bwa-aln-interactive/README.html)
return cores


class Bwa:
"""Wrapper around `bwa aln` via `pybwa`.
Attributes:
max_hits: the maximum number of hits to report - if more than this number of seed hits
are found, report only the count and not each hit.
reverse_complement: reverse complement each query sequence before alignment.
include_alt_hits: if True include hits to references with names ending in _alt, otherwise
do not include them.
Expand All @@ -227,13 +232,12 @@ def __init__(
self,
ref: Path,
max_hits: int,
executable: str | Path = BWA_EXECUTABLE_NAME,
max_mismatches: int = 3,
max_mismatches_in_seed: int = 3,
max_gap_opens: int = 0,
max_gap_extensions: int = -1,
min_indel_to_end_distance: int = 3,
seed_length: int = 20,
max_mismatches: int = MAX_MISMATCHES,
max_mismatches_in_seed: int = MAX_MISMATCHES_IN_SEED,
max_gap_opens: int = MAX_GAP_OPENS,
max_gap_extensions: int | None = MAX_GAP_EXTENSIONS,
min_indel_to_end_distance: int = MIN_INDEL_TO_END_DISTANCE,
seed_length: int = SEED_LENGTH,
reverse_complement: bool = False,
include_alt_hits: bool = False,
threads: Optional[int] = None,
Expand All @@ -243,7 +247,6 @@ def __init__(
ref: the path to the reference FASTA, which must be indexed with bwa.
max_hits: the maximum number of hits to report - if more than this number of seed hits
are found, report only the count and not each hit.
executable: string or Path representation of the `bwa-aln-interactive` executable path
max_mismatches: the maximum number of mismatches allowed in the full query sequence
max_mismatches_in_seed: the maximum number of mismatches allowed in the seed region
max_gap_opens: the maximum number of gap opens allowed in the full query sequence
Expand All @@ -257,12 +260,6 @@ def __init__(
otherwise do not include them.
threads: the number of threads to use. If `None`, use all available CPUs.
"""
threads = os.cpu_count() if threads is None else threads
executable_path = ExecutableRunner.validate_executable_path(executable=executable)
self.reverse_complement: bool = reverse_complement
self.include_alt_hits: bool = include_alt_hits
self.max_hits: int = max_hits

missing_aux_paths = []
for aux_ext in BWA_AUX_EXTENSIONS:
aux_path = Path(f"{ref}{aux_ext}")
Expand All @@ -275,81 +272,29 @@ def __init__(
else:
message = "BWA index file does not exist:\n\t"
message += "\t\n".join(f"{p}" for p in missing_aux_paths)
raise FileNotFoundError(f"{message}\nIndex with: `{executable_path} index {ref}`")

# -N = non-iterative mode: search for all n-difference hits (slooow)
# -S = output SAM (run samse)
# -Z = interactive mode (no input buffer and force processing with empty lines between recs)
command: list[str] = [
f"{executable_path}",
"aln",
"-t",
f"{threads}",
"-n",
f"{max_mismatches}",
"-o",
f"{max_gap_opens}",
"-e",
f"{max_gap_extensions}",
"-i",
f"{min_indel_to_end_distance}",
"-l",
f"{seed_length}",
"-k",
f"{max_mismatches_in_seed}",
"-X",
f"{max_hits}",
"-N",
"-S",
"-Z",
"-D",
f"{ref}",
"/dev/stdin",
]

super().__init__(command=command, stderr=subprocess.PIPE)

header = []
for line in self._subprocess.stdout:
if line.startswith("@"):
header.append(line)
if line.startswith("@PG"):
break

self.header = AlignmentHeader.from_text("".join(header))

# NB: ExecutableRunner will default to redirecting stderr to /dev/null. However, we would
# like to preserve stderr messages from bwa for potential debugging. To do this, we create
# a single thread to continuously read from stderr and redirect text lines to a debug
# logger. The close() method of this class will additionally join the stderr logging thread.
self._logger = logging.getLogger(self.__class__.__qualname__)
self._stderr_thread = Thread(
daemon=True,
target=self._stream_to_sink,
args=(self._subprocess.stderr, self._logger.debug),
raise FileNotFoundError(f"{message}\nIndex with: `bwa index {ref}`")

seq_dict = ref.with_suffix(".dict")
with seq_dict.open("r") as fh:
with AlignmentFile(fh) as reader:
self.header: AlignmentHeader = reader.header

threads = _available_cores() if threads is None else threads
self.opt = BwaAlnOptions(
max_hits=max_hits,
max_mismatches=max_mismatches,
max_mismatches_in_seed=max_mismatches_in_seed,
max_gap_opens=max_gap_opens,
max_gap_extensions=max_gap_extensions,
min_indel_to_end_distance=min_indel_to_end_distance,
seed_length=seed_length,
find_all_hits=True,
with_md=True,
threads=threads,
)
self._stderr_thread.start()

def __signal_bwa(self) -> None:
"""Signals BWA to process the queries."""
self._subprocess.stdin.flush()
# NB: the executable compiled on different platforms requires a different number of newlines
# NB: it is not understood why, but 16 newlines seems to work for all platforms tested
self._subprocess.stdin.write("\n" * 16)
self._subprocess.stdin.flush()

@override
def close(self) -> bool:
"""
Gracefully terminates the underlying subprocess if it is still running.
Returns:
True: if the subprocess was terminated successfully
False: if the subprocess failed to terminate or was not already running
"""
safely_closed: bool = super().close()
self._stderr_thread.join()
return safely_closed
self.bwa = BwaAln(prefix=ref)
self.reverse_complement: bool = reverse_complement
self.include_alt_hits: bool = include_alt_hits

def map_one(self, query: str, id: str = "unknown") -> BwaResult:
"""Maps a single query to the genome and returns the result.
Expand All @@ -375,22 +320,21 @@ def map_all(self, queries: list[Query]) -> list[BwaResult]:
if len(queries) == 0:
return []

# Send the reads to BWA
for query in queries:
fastq_str = query.to_fastq(reverse_complement=self.reverse_complement)
self._subprocess.stdin.write(fastq_str)

# Force the input to be sent to the underlying process.
self.__signal_bwa()

# Read back the results
results: list[BwaResult] = []
for query in queries:
# get the next alignment and convert to a result
line: str = next(self._subprocess.stdout).strip()
assert not line.startswith("@"), f"SAM record must not start with '@'! {line}"
alignment = AlignedSegment.fromstring(line, self.header)
results.append(self._to_result(query=query, rec=alignment))
# Map the reads
fastxs = [
FastxRecord(
name=q.id,
sequence=sequence.reverse_complement(q.bases)
if self.reverse_complement
else q.bases,
)
for q in queries
]
records = self.bwa.align(queries=fastxs, opt=self.opt)
results: list[BwaResult] = [
self._to_result(query=query, rec=record)
for query, record in zip(queries, records, strict=True)
]

return results

Expand All @@ -404,7 +348,7 @@ def _to_result(self, query: Query, rec: pysam.AlignedSegment) -> BwaResult:
"""
if query.id != rec.query_name:
raise ValueError(
f"Query and Results are out of orderQuery=${query.id}, Result=${rec.query_name}"
f"Query and Results are out of order: Query={query.id}, Result={rec.query_name}"
)

num_hits: int = int(rec.get_tag("HN")) if rec.has_tag("HN") else 0
Expand All @@ -414,7 +358,7 @@ def _to_result(self, query: Query, rec: pysam.AlignedSegment) -> BwaResult:
# number of hits. In both of the latter cases, we have to rely on the count reported in the
# `HN` tag.
hits: list[BwaHit]
if 0 < num_hits <= self.max_hits:
if 0 < num_hits <= self.opt.max_hits:
hits = self.to_hits(rec=rec)
num_hits = len(hits)
else:
Expand Down
Loading

0 comments on commit e2dc14a

Please sign in to comment.