diff --git a/docs/overview.md b/docs/overview.md index 42759d2..1977e58 100644 --- a/docs/overview.md +++ b/docs/overview.md @@ -47,39 +47,7 @@ using the [`OffTargetDetector`][prymer.offtarget.offtarget_detector.OffTargetDet The remaining primers may then be used with the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] method to build primer pairs -from all combinations of the left and right primers. -The produced primer pairs are scored in a manner similar to Primer3 using the [`score()`][prymer.api.picking.score] method. -The [`FilterParams`][prymer.api.picking.FilteringParams] class is used to provide parameters for scoring. - -Next, the [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] method is used to select up to -a maximum number of primer pairs. The primer pairs are selected in the order of lowest penalty (highest score). As -part of this process, each primer pair must: - -1. Have an amplicon in the desired size range (see [`is_acceptable_primer_pair`][prymer.api.picking.is_acceptable_primer_pair]). -2. Have an amplicon melting temperature in the desired range (see [`is_acceptable_primer_pair`][prymer.api.picking.is_acceptable_primer_pair]). -3. Not have too many off-targets (see [`OffTargetDetector.check_one()`][prymer.offtarget.offtarget_detector.OffTargetDetector.check_one]). -4. Not have primer pairs that overlap too much (see [`check_primer_overlap()`][prymer.api.picking.check_primer_overlap]). -5. Not form a dimer with a melting temperature above a specified threshold (see the [`max_dimer_tm` attribute in `FilterParams`][prymer.api.picking.FilteringParams]). - -Checking for dimers may be performed using the [`NtThermoAlign`][prymer.ntthal.NtThermoAlign] command line executable, -and can be passed to [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] as follows: - -```python -from prymer.ntthal import NtThermoAlign -from prymer.api.picking import FilteringParams, pick_top_primer_pairs -params = FilteringParams(...) -dimer_checker = NtThermoAlign() -pick_top_primer_pairs( - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ... -) - -``` - -For convenience, the [`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] method combines -both the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] and -[`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] methods in single invocation. - -The resulting primer pairs may be further examined. \ No newline at end of file +from all combinations of the left and right primers that are within acceptable amplicon size and tm ranges. +Optionally, if a `max_heterodimer_tm` is provided, primer pairs will be screened to ensure that the left and right +primers do not dimerize with a Tm greater than the one provided. +The produced primer pairs are scored in a manner similar to Primer3 using the [`score()`][prymer.api.picking.score] method. \ No newline at end of file diff --git a/prymer/api/__init__.py b/prymer/api/__init__.py index 9cc352d..4d937a2 100644 --- a/prymer/api/__init__.py +++ b/prymer/api/__init__.py @@ -3,10 +3,7 @@ from prymer.api.minoptmax import MinOptMax from prymer.api.oligo import Oligo from prymer.api.oligo_like import OligoLike -from prymer.api.picking import FilteringParams -from prymer.api.picking import build_and_pick_primer_pairs from prymer.api.picking import build_primer_pairs -from prymer.api.picking import pick_top_primer_pairs from prymer.api.primer_pair import PrimerPair from prymer.api.span import BedLikeCoords from prymer.api.span import Span @@ -23,10 +20,7 @@ "ClusteredIntervals", "cluster_intervals", "MinOptMax", - "FilteringParams", "build_primer_pairs", - "pick_top_primer_pairs", - "build_and_pick_primer_pairs", "OligoLike", "Oligo", "PrimerPair", diff --git a/prymer/api/picking.py b/prymer/api/picking.py index aa20ce9..e407f8b 100644 --- a/prymer/api/picking.py +++ b/prymer/api/picking.py @@ -1,51 +1,26 @@ """ # Methods and class for building and scoring primer pairs from a list of left and right primers. -Typically, the first step is to build primer pairs from a list of left and right primers for a +Typically, primer pairs are built from a list of left and right primers for a given target with the [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] method. The returned primer pairs are automatically scored (using the -[`score()`][prymer.api.picking.score] method), and returned sorted by the penalty -(increasing). - -Next, the list of primer pairs for a given target are filtered based on various criteria using the -[`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] method. Criteria -included but not limited to filtering for desired size and melting temperature ranges (see -[`is_acceptable_primer_pair()`][prymer.api.picking.is_acceptable_primer_pair]), not too -many off-targets, no self-dimers, and so on. A given maximum number of passing primer pairs is -returned. - -These two steps can be performed jointly using the -[`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] method. +[`score()`][prymer.api.picking.score] method). ## Module contents Contains the following public classes and methods: -- [`FilteringParams`][prymer.api.picking.FilteringParams] -- Stores the parameters used - for filtering primers and primer pairs. - [`score()`][prymer.api.picking.score] -- Scores the amplicon amplified by a primer pair in a manner similar to Primer3. -- [`check_primer_overlap()`][prymer.api.picking.check_primer_overlap] -- Checks that both - (next) primers have at least `min_difference` bases that do not overlap the other (previous) - primers. -- [`is_acceptable_primer_pair()`][prymer.api.picking.is_acceptable_primer_pair] -- Checks - if a primer pair has the desired size range and melting temperature. - [`build_primer_pairs()`][prymer.api.picking.build_primer_pairs] -- Builds primer pairs from individual left and primers. -- [`pick_top_primer_pairs()`][prymer.api.picking.pick_top_primer_pairs] -- Selects up to - the given number of primer pairs from the given list of primer pairs. -- [`build_and_pick_primer_pairs()`][prymer.api.picking.build_and_pick_primer_pairs] -- - Builds primer pairs from individual left and right primers and selects up to the given number of - primer pairs from the given list of primer pairs. """ -from dataclasses import dataclass -from typing import Callable -from typing import Iterable +from pathlib import Path +from typing import Iterator from typing import Optional -from fgpyo.collections import PeekableIterator from pysam import FastaFile from prymer.api.melting import calculate_long_seq_tm @@ -54,104 +29,17 @@ from prymer.api.primer_pair import PrimerPair from prymer.api.span import Span from prymer.ntthal import NtThermoAlign -from prymer.offtarget import OffTargetDetector - - -@dataclass(frozen=True, init=True, slots=True) -class FilteringParams: - """Stores the parameters used for filtering primers and primer pairs. - - Attributes: - amplicon_sizes: the min, optimal, and max amplicon size - amplicon_tms: the min, optimal, and max amplicon melting temperatures - product_size_lt: penalty weight for products shorter than the optimal amplicon size - product_size_gt: penalty weight for products longer than the optimal amplicon size - product_tm_lt: penalty weight for products with a Tm smaller the optimal amplicon Tm - product_tm_gt: penalty weight for products with a Tm larger the optimal amplicon Tm - max_dimer_tm: the max primer dimer melting temperature allowed when filtering primer pairs - for dimer formation. Any primer pair with a dimer melting temperature above this - threshold will be discarded. - max_primer_hits: the maximum number of hits an individual primer can have in the genome - before it is considered an invalid primer, and all primer pairs containing the primer - failed. - max_primer_pair_hits: the maximum number of amplicons a primer pair can make and be - considered passing. - three_prime_region_length: the number of bases at the 3' end of the primer in which the - parameter `max_mismatches_in_three_prime_region` is evaluated. - max_mismatches_in_three_prime_region: the maximum number of mismatches that are tolerated in - the three prime region of each primer defined by `three_prime_region_length`. - max_mismatches: the maximum number of mismatches that are tolerated in the full primer. - min_primer_to_target_distance: minimum distance allowed between the end of a primer and - start of the target -- aimed at centering the target in the amplicon. - read_length: sequencing depth and maximum distance allowed between the start of a primer and - the end of the target -- ensuring coverage during sequencing - dist_from_primer_end_weight: weight determining importance of the distance between the - primer and amplicon - target_not_covered_by_read_length_weight: weight determining importance of coverage of the - target by the sequencing depth - """ - - amplicon_sizes: MinOptMax[int] - amplicon_tms: MinOptMax[float] - product_size_lt: int - product_size_gt: int - product_tm_lt: float - product_tm_gt: float - max_dimer_tm: float = 55.0 - max_primer_hits: int = 500 - max_primer_pair_hits: int = 1 - three_prime_region_length: int = 1 - max_mismatches_in_three_prime_region: int = 2 - max_mismatches: int = 2 - min_primer_to_target_distance: int = 30 - read_length: int = 150 - dist_from_primer_end_weight: float = 100.0 - target_not_covered_by_read_length_weight: float = 100.0 - - -def _dist_penalty(start: int, end: int, params: FilteringParams) -> float: - """Returns a penalty that penalizes primers whose innermost base is closer than some - minimum distance from the target. The "distance" is zero if the innermost base overlaps the - target, one if the innermost base abuts the target, etc. - - Args: - start: the 1-based start position (inclusive) - end: the 1-based end position (inclusive) - params: the filtering parameters - """ - diff = end - start - if diff >= params.min_primer_to_target_distance: - return 0.0 - else: - return (params.min_primer_to_target_distance - diff) * params.dist_from_primer_end_weight - - -def _seq_penalty(start: int, end: int, params: FilteringParams) -> float: - """Returns a penalty that penalizes amplicons where the target cannot be fully sequenced - at the given read length starting from both ends of the amplicon. - - Args: - start: the start coordinate of the span to consider - end: the end coordinate of the span to consider - params: the filtering parameters - """ - - amplicon_length = end - start + 1 - if amplicon_length <= params.read_length: - return 0.0 - else: - return ( - amplicon_length - params.read_length - ) * params.target_not_covered_by_read_length_weight +from prymer.primer3 import PrimerAndAmpliconWeights def score( left_primer: Oligo, right_primer: Oligo, - target: Span, amplicon: Span, - amplicon_seq_or_tm: str | float, - params: FilteringParams, + amplicon_tm: float, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], + weights: PrimerAndAmpliconWeights, ) -> float: """Score the amplicon in a manner similar to Primer3 @@ -162,20 +50,15 @@ def score( scaled by the product size weight. Is zero if the optimal amplicon size is zero. 3. Amplicon Tm: The difference in melting temperature between the calculated and optimal, weighted by the product melting temperature. - 4. Inner primer distance: For primers whose innermost base is closer than some minimum distance - from the target, the difference between the two distances scale by the corresponding weight. - 5. Amplicon length: The difference between the amplicon length and provided read length scaled - by the corresponding weight. Is zero when the amplicon is at most the read length. Args: left_primer: the left primer right_primer: the right primer - target: the target mapping amplicon: the amplicon mapping - amplicon_seq_or_tm: either the melting temperature of the amplicon, or the amplicon sequence - from which the melting temperature of the amplicon will be calculated with - `calculate_long_seq_tm` - params: the filtering parameters + amplicon_tm: the melting temperature of the amplicon + amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths) + amplicon_tms: minimum, optimal, and maximum amplicon Tms + weights: the set of penalty weights Returns: the penalty for the whole amplicon. @@ -186,285 +69,109 @@ def score( # product size weight. The product size weight is different depending on if the amplicon # size is greater or less than the optimal amplicons. size_penalty: float - if params.amplicon_sizes.opt == 0: + if amplicon_sizes.opt == 0: size_penalty = 0.0 - elif amplicon.length > params.amplicon_sizes.opt: - size_penalty = (amplicon.length - params.amplicon_sizes.opt) * params.product_size_gt + elif amplicon.length > amplicon_sizes.opt: + size_penalty = (amplicon.length - amplicon_sizes.opt) * weights.product_size_gt else: - size_penalty = (params.amplicon_sizes.opt - amplicon.length) * params.product_size_lt + size_penalty = (amplicon_sizes.opt - amplicon.length) * weights.product_size_lt # The penalty for the amplicon melting temperature. # The difference in melting temperature between the calculated and optimal is weighted by the # product melting temperature. - tm: float - if isinstance(amplicon_seq_or_tm, str): - tm = calculate_long_seq_tm(amplicon_seq_or_tm) - else: - tm = float(amplicon_seq_or_tm) + tm = amplicon_tm tm_penalty: float - if tm > params.amplicon_tms.opt: - tm_penalty = (tm - params.amplicon_tms.opt) * params.product_tm_gt + if tm > amplicon_tms.opt: + tm_penalty = (tm - amplicon_tms.opt) * weights.product_tm_gt else: - tm_penalty = (params.amplicon_tms.opt - tm) * params.product_tm_lt - - # Penalize primers whose innermost base is closer than some minimum distance from the target - left_dist_penalty: float = _dist_penalty( - start=left_primer.span.end, end=target.start, params=params - ) - right_dist_penalty: float = _dist_penalty( - start=target.end, end=right_primer.span.start, params=params - ) - - # Penalize amplicons where the target cannot be fully sequenced at the given read length - # starting from both ends of the amplicon. - left_seq_penalty: float = _seq_penalty( - start=left_primer.span.start, end=target.end, params=params - ) - right_seq_penalty: float = _seq_penalty( - start=target.start, end=right_primer.span.end, params=params - ) + tm_penalty = (amplicon_tms.opt - tm) * weights.product_tm_lt # Put it all together - return ( - left_primer.penalty - + right_primer.penalty - + size_penalty - + tm_penalty - + left_dist_penalty - + right_dist_penalty - + left_seq_penalty - + right_seq_penalty - ) - - -def check_primer_overlap( - prev_left: Span, - prev_right: Span, - next_left: Span, - next_right: Span, - min_difference: int, -) -> bool: - """Check that both (next) primers have at least `min_difference` bases that do not overlap the - other (previous) primers. - - Args: - prev_left: left primer mapping from the previous primer pair. - prev_right: right primer mapping from the previous primer pair - next_left: left primer mapping from the current primer pair being compared - next_right: right primer mapping from the current primer pair being compared - min_difference: the minimum number of bases that must differ between two primers - - Returns: - True if the primers are sufficiently different - """ - left = prev_left.length_of_overlap_with(next_left) + min_difference - right = prev_right.length_of_overlap_with(next_right) + min_difference - return ( - prev_left.length >= left - and next_left.length >= left - and prev_right.length >= right - and next_right.length >= right - ) - - -def is_acceptable_primer_pair(primer_pair: PrimerPair, params: FilteringParams) -> bool: - """Determine if a primer pair can be kept considering: - - 1. the primer pair is in the desired size range - 2. The primer pair has a melting temperature in the desired range - - Args: - primer_pair: the primer pair. - params: the parameters used for filtering - - Returns: - True if the primer pair passes initial checks and can be considered later on. - """ - return ( - params.amplicon_sizes.min <= primer_pair.amplicon.length <= params.amplicon_sizes.max - and params.amplicon_tms.min <= primer_pair.amplicon_tm <= params.amplicon_tms.max - ) + return left_primer.penalty + right_primer.penalty + size_penalty + tm_penalty def build_primer_pairs( - left_primers: Iterable[Oligo], - right_primers: Iterable[Oligo], + left_primers: list[Oligo], + right_primers: list[Oligo], target: Span, - params: FilteringParams, - fasta: FastaFile, -) -> list[PrimerPair]: + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], + max_heterodimer_tm: Optional[float], + weights: PrimerAndAmpliconWeights, + fasta_path: Path, +) -> Iterator[PrimerPair]: """Builds primer pairs from individual left and primers. - Args: - left_primers: the left primers - right_primers: the right primers - target: the genome mapping for the target - params: the parameters used for filtering - fasta: the FASTA file from which the amplicon sequence will be retrieved. - - Returns: - the list of primer pairs, sorted by penalty (increasing) - """ - # generate all the primer pairs - primer_pairs: list[PrimerPair] = [] - for left_primer in left_primers: - for right_primer in right_primers: - if left_primer.span.refname != right_primer.span.refname: - raise ValueError( - "Cannot create a primer pair from left and right primers on different" - f" references: left: '{left_primer.span.refname}'" - f" right: '{right_primer.span.refname}'" - ) - amplicon_mapping = Span( - refname=target.refname, start=left_primer.span.start, end=right_primer.span.end - ) - amplicon_bed = amplicon_mapping.get_bedlike_coords() # since fasta.fetch is 0-based - amplicon_sequence = fasta.fetch( - reference=target.refname, start=amplicon_bed.start, end=amplicon_bed.end - ) - amplicon_penalty = score( - left_primer=left_primer, - right_primer=right_primer, - target=target, - amplicon=amplicon_mapping, - amplicon_seq_or_tm=amplicon_sequence, - params=params, - ) - pp = PrimerPair( - left_primer=left_primer, - right_primer=right_primer, - amplicon_sequence=amplicon_sequence, - amplicon_tm=calculate_long_seq_tm(amplicon_sequence), - penalty=amplicon_penalty, - ) - primer_pairs.append(pp) - - # sort by smallest penalty - return sorted(primer_pairs, key=lambda pp: pp.penalty) - - -def pick_top_primer_pairs( - primer_pairs: list[PrimerPair], - num_primers: int, - min_difference: int, - params: FilteringParams, - offtarget_detector: OffTargetDetector, - is_dimer_tm_ok: Callable[[str, str], bool], -) -> list[PrimerPair]: - """Selects up to the given number of primer pairs from the given list of primer pairs. - - The primer pairs are selected in the order of lowest penalty. - - The primer pairs must: - 1. Have an amplicon in desired size range (see `is_acceptable_primer_pair`). - 2. Have an amplicon melting temperature in the desired range (see `is_acceptable_primer_pair`). - 3. Not have too many off-targets (see `OffTargetDetector#check_one()`). - 4. Not have primer pairs that overlap too much (see `check_primer_overlap`). - 5. Not form a dimer with a melting temperature above a specified threshold (see `max_dimer_tm`). - - The _order_ of these checks are important as some of them are expensive to compute. - - Args: - primer_pairs: the list of all primer pairs. - num_primers: the number of primer pairs to return for the target - min_difference: the minimum base difference between two primers that we will tolerate. - params: the parameters used for filtering - offtarget_detector: the off-target detector. - is_dimer_tm_ok: a function to use for checking for dimers. Should return true if the pair - passes (e.g. the dimer check) - - Returns: - Up to `num_primers` primer pairs - """ - selected: list[PrimerPair] = [] - pp_iter = PeekableIterator(primer_pairs) - last_pp: Optional[PrimerPair] = None - while len(selected) < num_primers and pp_iter.can_peek(): - pp = next(pp_iter) - - # Enforce that primers were ordered by penalty (larger value first) - if last_pp is not None and pp.penalty < last_pp.penalty: - raise ValueError("Primers must be given in order by penalty (increasing value).") - last_pp = pp - - # Check some basic properties - if not is_acceptable_primer_pair(primer_pair=pp, params=params): - continue - - # Check for off-targets - if not offtarget_detector.check_one(primer_pair=pp).passes: - continue - - # Check that both primers have at least `min_difference` bases that do not overlap the - # other primer. - okay = all( - check_primer_overlap( - prev.left_primer.span, - prev.right_primer.span, - pp.left_primer.span, - pp.right_primer.span, - min_difference=min_difference, - ) - for prev in selected - ) - if not okay: - continue - - # Check dimer Tm - if not is_dimer_tm_ok(pp.left_primer.bases, pp.right_primer.bases): - continue - - selected.append(pp) - - return selected - - -def build_and_pick_primer_pairs( - left_primers: Iterable[Oligo], - right_primers: Iterable[Oligo], - target: Span, - num_primers: int, - min_difference: int, - params: FilteringParams, - offtarget_detector: OffTargetDetector, - dimer_checker: NtThermoAlign, - fasta: FastaFile, -) -> list[PrimerPair]: - """Picks up to `num_primers` primer pairs. + Only primer pairs that meet the requirements for amplicon sizes and tms will be returned. + In addition, if `max_heterodimer_tm` is provided then primer pairs with a heterodimer tm + exceeding the maximum will also be discarded. Args: left_primers: the left primers right_primers: the right primers target: the genome mapping for the target - num_primers: the number of primer pairs to return for the target. - min_difference: the minimum base difference between two primers that we will tolerate. - params: the parameters used for filtering. - offtarget_detector: the off-target detector. - dimer_checker: the primer-dimer melting temperature checker. - fasta: the FASTA file from which the amplicon sequence will be retrieved. + amplicon_sizes: minimum, optimal, and maximum amplicon sizes (lengths) + amplicon_tms: minimum, optimal, and maximum amplicon Tms + max_heterodimer_tm: if supplied, heterodimer Tms will be calculated for primer pairs, + and those exceeding the maximum Tm will be discarded + weights: the set of penalty weights + fasta_path: the path to the FASTA file from which the amplicon sequence will be retrieved. Returns: - the list of primer pairs, sorted by penalty (increasing) + an iterator over all the valid primer pairs, unsorted """ - # build the list of primer pairs - primer_pairs = build_primer_pairs( - left_primers=left_primers, - right_primers=right_primers, - target=target, - params=params, - fasta=fasta, - ) + # Short circuit if we have no left primers or no right primers + if not any(left_primers) or not any(right_primers): + return + + if any(p.span.refname != target.refname for p in left_primers): + raise ValueError("Left primers exist on different reference than target.") + + if any(p.span.refname != target.refname for p in right_primers): + raise ValueError("Right primers exist on different reference than target.") + + # Grab the sequence we'll use to fill in the amplicon sequence + with FastaFile(f"{fasta_path}") as fasta: + region_start = min(p.span.start for p in left_primers) + region_end = max(p.span.end for p in right_primers) + bases = fasta.fetch(target.refname, region_start - 1, region_end) + + with NtThermoAlign() as ntthal: + # generate all the primer pairs that don't violate hard size and Tm constraints + for lp in left_primers: + for rp in right_primers: + if rp.span.end - lp.span.start + 1 > amplicon_sizes.max: + continue + + amp_mapping = Span(refname=target.refname, start=lp.span.start, end=rp.span.end) + amp_bases = bases[ + amp_mapping.start - region_start : amp_mapping.end - region_start + 1 + ] + amp_tm = calculate_long_seq_tm(amp_bases) + + if amp_tm < amplicon_tms.min or amp_tm > amplicon_tms.max: + continue + + if max_heterodimer_tm is not None: + if ntthal.duplex_tm(lp.bases, rp.bases) > max_heterodimer_tm: + continue + + penalty = score( + left_primer=lp, + right_primer=rp, + amplicon=amp_mapping, + amplicon_tm=amp_tm, + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + weights=weights, + ) - # select the primer pairs - selected: list[PrimerPair] = pick_top_primer_pairs( - primer_pairs=primer_pairs, - num_primers=num_primers, - min_difference=min_difference, - params=params, - offtarget_detector=offtarget_detector, - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ) + pp = PrimerPair( + left_primer=lp, + right_primer=rp, + amplicon_sequence=amp_bases, + amplicon_tm=amp_tm, + penalty=penalty, + ) - return selected + yield pp diff --git a/tests/api/test_picking.py b/tests/api/test_picking.py index 98f2096..164a5ed 100644 --- a/tests/api/test_picking.py +++ b/tests/api/test_picking.py @@ -1,919 +1,407 @@ -from collections import Counter -from dataclasses import dataclass -from dataclasses import replace +import dataclasses +from math import floor from pathlib import Path -from typing import Any from typing import Optional -from typing import Tuple -import pysam import pytest +from fgpyo.fasta.builder import FastaBuilder from fgpyo.sequence import reverse_complement -from prymer.api import FilteringParams from prymer.api import MinOptMax from prymer.api import Oligo from prymer.api import PrimerPair from prymer.api import Span -from prymer.api import Strand -from prymer.api import build_and_pick_primer_pairs -from prymer.api import build_primer_pairs -from prymer.api import pick_top_primer_pairs +from prymer.api import picking from prymer.api.melting import calculate_long_seq_tm -from prymer.api.picking import _dist_penalty -from prymer.api.picking import _seq_penalty -from prymer.api.picking import check_primer_overlap -from prymer.api.picking import is_acceptable_primer_pair -from prymer.api.picking import score as picking_score -from prymer.ntthal import NtThermoAlign -from prymer.offtarget import OffTargetDetector -from prymer.offtarget.bwa import BWA_EXECUTABLE_NAME +from prymer.primer3 import PrimerAndAmpliconWeights @pytest.fixture -def filter_params() -> FilteringParams: - return FilteringParams( - amplicon_sizes=MinOptMax(100, 150, 200), - amplicon_tms=MinOptMax(50.0, 55.0, 60.0), - product_size_lt=1, - product_size_gt=1, - product_tm_lt=0.0, - product_tm_gt=0.0, - min_primer_to_target_distance=0, - read_length=100000, - ) +def amplicon_sizes() -> MinOptMax[int]: + return MinOptMax(100, 150, 200) -@pytest.mark.parametrize( - "start, end, min_primer_to_target_distance, dist_from_primer_end_weight, penalty", - [ - (10, 20, 10, 2.0, 0.0), # 10bp away, at min distance - (10, 19, 10, 2.0, 2.0), # 9bp away, below min distance - (10, 21, 10, 2.0, 0.0), # 11bp away, beyond min distance - (10, 22, 32, 3.0, 60.0), # 2bp away, one over min distance, larger weight - (10, 22, 32, 5.0, 100.0), # 2bp away, one over min distance, large weight - ], -) -def test_dist_penalty( - start: int, - end: int, - min_primer_to_target_distance: int, - dist_from_primer_end_weight: float, - penalty: float, - filter_params: FilteringParams, -) -> None: - params = replace( - filter_params, - min_primer_to_target_distance=min_primer_to_target_distance, - dist_from_primer_end_weight=dist_from_primer_end_weight, - ) - assert _dist_penalty(start=start, end=end, params=params) == penalty - - -@pytest.mark.parametrize( - "start, end, read_length, target_not_covered_by_read_length_weight, penalty", - [ - (10, 10, 1, 2.0, 0.0), # 1bp amplicon_length length, at read length - (10, 10, 2, 2.0, 0.0), # 1bp amplicon_length length, one shorter than read length - (10, 10, 0, 2.0, 2.0), # 1bp amplicon_length length, one longer than read length - (10, 100, 91, 2.0, 0.0), # 91bp amplicon length, at read length - (10, 100, 90, 2.0, 2.0), # 1bp amplicon_length length, one longer than read length - (10, 100, 91, 4.0, 0.0), # 1bp amplicon_length length, one below minimum, larger weight - (10, 100, 50, 5.0, 205.0), # 1bp amplicon_length length, far below minimum, large weight - ], -) -def test_seq_penalty( - start: int, - end: int, - read_length: int, - target_not_covered_by_read_length_weight: float, - penalty: float, - filter_params: FilteringParams, -) -> None: - params = replace( - filter_params, - read_length=read_length, - target_not_covered_by_read_length_weight=target_not_covered_by_read_length_weight, - ) - assert _seq_penalty(start=start, end=end, params=params) == penalty +@pytest.fixture +def amplicon_tms() -> MinOptMax[float]: + return MinOptMax(75.0, 80.0, 90.0) -def build_primer_pair(amplicon_length: int, tm: float) -> PrimerPair: - left_primer = Oligo( - tm=0, - penalty=0, - span=Span(refname="1", start=1, end=max(1, amplicon_length // 4)), - ) - right_primer = Oligo( - tm=0, - penalty=0, - span=Span( - refname="1", start=max(1, amplicon_length - (amplicon_length // 4)), end=amplicon_length - ), - ) - return PrimerPair( - left_primer=left_primer, - right_primer=right_primer, - amplicon_tm=tm, - penalty=0, +@pytest.fixture +def weights() -> PrimerAndAmpliconWeights: + return PrimerAndAmpliconWeights( + product_size_lt=0.5, + product_size_gt=1.5, + product_tm_lt=10, + product_tm_gt=15, ) -@pytest.mark.parametrize( - "prev_left, prev_right, next_left, next_right, min_difference, expected", - [ - # no overlap -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 220, 240), - 10, - True, - ), - # left at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 110, 130), - Span("chr1", 220, 240), - 10, - True, - ), - # left one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 109, 129), - Span("chr1", 220, 240), - 10, - False, - ), - # right at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 210, 230), - 10, - True, - ), - # right one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 120, 140), - Span("chr1", 209, 229), - 10, - False, - ), - # both at min_difference -> True - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 110, 130), - Span("chr1", 210, 230), - 10, - True, - ), - # both one less than min_difference -> False - ( - Span("chr1", 100, 120), - Span("chr1", 200, 220), - Span("chr1", 109, 129), - Span("chr1", 209, 229), - 10, - False, - ), - ], -) -def test_check_primer_overlap( - prev_left: Span, - prev_right: Span, - next_left: Span, - next_right: Span, - min_difference: int, - expected: bool, -) -> None: - assert ( - check_primer_overlap( - prev_left=prev_left, - prev_right=prev_right, - next_left=next_left, - next_right=next_right, - min_difference=min_difference, - ) - == expected - ) - assert ( - check_primer_overlap( - prev_left=next_left, - prev_right=next_right, - next_left=prev_left, - next_right=prev_right, - min_difference=min_difference, - ) - == expected - ) +@pytest.fixture +def all_zero_weights() -> PrimerAndAmpliconWeights: + d = dataclasses.asdict(PrimerAndAmpliconWeights()) + for key in d: + d[key] = 0.0 + return PrimerAndAmpliconWeights(**d) + + +# 1000 bases with 100 bases per line +REF_BASES = """ +CCTGCTGGTTTTATTTCTGCTACTTCTGAAGTGTGGGGCACACAACCTGAGCAGTGCTTTTATTTGAGTCCCAATGCTTTTATTTGAGTTTTGCAAGGTT +ATTCCAAGTTTTACAAATAGAAGGTAGCGTATGACTCAGTCCTTGATATGCCAACCACTGCACAGAGACTTGCCACCTTCCTGTCACTGGAGAAACACTC +ATGTGGGTTTTCTTAAATTTGCCTCCCTCTGAGCTTCCCTTTAACTTCAACTATAATATGCAAGAAAGACTATCTGACCATAAATACACATTTGGGCCAA +TCAAGATGGTTTTGCCAAGGAAAGATGCCCACAATGGTTAAGCAGAATGCAATAATGTAGAGAATATCATTTCTTTCATGCTGGTGTATATCATATGCAT +TCAAAAACAGGGAGAACTTCTAAGCAACTAACAGTGACCATATCAAGCAGGTGCAATCACAGAATAACTGGTTTTCTCCTTTAAGAATTTTTCTATCATT +TGGCTTTCCCCACTCACACACACTAAATATTTTAAGTAAAAAGTTACTTCCATTTTGAAAGAGAAAAGAAAGAGACATGCATGAACATTTTTCTCCACCT +TGGTGCAGGGACCAGACAACTGTATCCAGTGTGCCCACTACATTGACGGCCCCCACTGCGTCAAGACCTGCCCGGCAGGAGTCATGGGAGAAAACAACAC +CCTGGTCTGGAAGTACGCAGACGCCGGCCATGTGTGCCACCTGTGCCATCCAAACTGCACCTACGGGTGAGTGGAAAGTGAAGGAGAACAGAACATTTCC +TCTCTTGCAAATTCAGAGATCAAAAATGTCTCCCAAGTTTTCCGGCAACAAATTGCCGAGGTTTGTATTTGAGTCAGTTACTTAAGGTGTTTTGGTCCCC +ACAGCCATGCCAGTAGCAACTTGCTTGTGAGCAGGCCTCAGTGCAGTGGGAATGACTCTGCCATGCACCGTGTCCCCGGCCGGGCCTGTGTTGTGCAATG +""".strip().replace("\n", "") -@pytest.mark.parametrize( - "pair, expected", - [ - (build_primer_pair(amplicon_length=150, tm=55), True), # OK at optimal - (build_primer_pair(amplicon_length=100, tm=55), True), # OK, amplicon at lower boundary - (build_primer_pair(amplicon_length=99, tm=55), False), # NOK, amplicon < lower boundary - (build_primer_pair(amplicon_length=200, tm=55), True), # OK, amplicon at upper boundary - (build_primer_pair(amplicon_length=201, tm=55), False), # NOK, amplicon > upper boundary - (build_primer_pair(amplicon_length=150, tm=50), True), # OK, tm at lower boundary - (build_primer_pair(amplicon_length=150, tm=49), False), # NOK, tm < lower boundary - (build_primer_pair(amplicon_length=150, tm=60), True), # OK, tm at upper boundary - (build_primer_pair(amplicon_length=150, tm=61), False), # NOK, tm > upper boundary - ], -) -def test_is_acceptable_primer_pair(pair: PrimerPair, expected: bool) -> None: - params = FilteringParams( - amplicon_sizes=MinOptMax(100, 150, 200), - amplicon_tms=MinOptMax(50.0, 55.0, 60.0), - product_size_lt=1, - product_size_gt=1, - product_tm_lt=0.0, - product_tm_gt=0.0, - min_primer_to_target_distance=0, - read_length=100000, +@pytest.fixture +def fasta(tmp_path: Path) -> Path: + """Fixture that returns a Fasta""" + builder = FastaBuilder(line_length=100) + path = tmp_path / "ref.fa" + builder.add("chr1").add(REF_BASES) + builder.to_file(path) + return path + + +def p(bases: str, tm: float, pos: int, pen: float = 0, chrom: str = "chr1") -> Oligo: + """Generates a primer for testing.""" + oligo = Oligo( + name="left", + tm=tm, + penalty=pen, + bases=bases, + span=Span(chrom, pos, pos + len(bases) - 1), + tail=None, ) - assert is_acceptable_primer_pair(primer_pair=pair, params=params) == expected - - -@dataclass(init=True, frozen=True) -class ScoreInput: - left: Oligo - right: Oligo - target: Span - amplicon: Span - amplicon_sequence: str - - def __post_init__(self) -> None: - primer_pair = PrimerPair( - left_primer=self.left, - right_primer=self.right, - amplicon_tm=0, - penalty=0, - ) - object.__setattr__(self, "primer_pair", primer_pair) + assert oligo.span.length == len(oligo.bases) + return oligo -def _score_input() -> ScoreInput: - l_mapping = Span(refname="1", start=95, end=125) - r_mapping = Span(refname="1", start=195, end=225) - amplicon = Span(refname="1", start=l_mapping.end, end=r_mapping.start) - target = Span(refname="1", start=l_mapping.end + 10, end=r_mapping.start - 20) - return ScoreInput( - left=Oligo(penalty=0, tm=0, span=l_mapping), - right=Oligo(penalty=0, tm=0, span=r_mapping), - target=target, - amplicon=amplicon, - amplicon_sequence="A" * amplicon.length, +def pp(lp: Oligo, rp: Oligo, bases: Optional[str] = None, tm: Optional[float] = None) -> PrimerPair: + """Generates a primer pair for testing.""" + if lp.span.end > rp.span.start: + raise ValueError("Overlapping primers not supported.") + + if bases is None: + length = rp.span.end - lp.span.start + 1 + needed = length - lp.span.length - rp.span.length + bases = lp.bases + ("A" * needed) + reverse_complement(rp.bases) + + return PrimerPair( + name="pair", + left_primer=lp, + right_primer=rp, + amplicon_tm=tm if tm is not None else calculate_long_seq_tm(bases), + amplicon_sequence=bases, + penalty=0, ) -@pytest.fixture() -def score_input() -> ScoreInput: - return _score_input() - - -# Developer Note: for `score()`, we must have: -# 1. params.amplicon_sizes.opt == 0 -# 2. params.amplicon_tms.opt is exactly the _calculate_long_seq_tm of the amplicon sequence -# 3. filtering_params.min_primer_to_target_distance is zero, -# (target.start >= left.span.end) -# 4. filtering_params.min_primer_to_target_distance is zero, -# (right.span.start >= target.end) -# 5. filtering_params.read_length is large, (target.end >= left.span.start) -# 6. filtering_params.read_length is large, (right.span.end >= target.start) -def _zero_score_filtering_params(score_input: ScoreInput) -> FilteringParams: - amplicon_tm = calculate_long_seq_tm(score_input.amplicon_sequence) - return FilteringParams( - # for size_penalty to be zero - amplicon_sizes=MinOptMax(0, 0, 200), - product_size_gt=0, - product_size_lt=0, - # for tm_penalty to be zero - amplicon_tms=MinOptMax(amplicon_tm, amplicon_tm, amplicon_tm), - product_tm_gt=0, - product_tm_lt=0, - # for left_dist_penalty and left_dist_penalty to be zero - min_primer_to_target_distance=0, - dist_from_primer_end_weight=0, - # for left_seq_penalty and left_seq_penalty to be zero - read_length=100000, - target_not_covered_by_read_length_weight=0, +def _score( + pair: PrimerPair, + weights: PrimerAndAmpliconWeights, + sizes: MinOptMax[int], + tms: MinOptMax[float], +) -> float: + """Wrapper around picking.score that decomposes the PrimerPair for the arguments.""" + return picking.score( + left_primer=pair.left_primer, + right_primer=pair.right_primer, + amplicon=pair.span, + amplicon_tm=pair.amplicon_tm, + amplicon_sizes=sizes, + amplicon_tms=tms, + weights=weights, ) -@pytest.fixture() -def zero_score_filtering_params(score_input: ScoreInput) -> FilteringParams: - return _zero_score_filtering_params(score_input=score_input) +################################################################################ +# Tests for picking.py::score() +################################################################################ -def test_zero_score( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, +def test_score_returns_sum_of_primer_penalties_when_all_weights_zero( + all_zero_weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=zero_score_filtering_params, - ) - == 0.0 - ) + pair = pp(p("ACGACTCATG", 60.1, 100, 1.7), p("GTGCATACTAG", 59.8, 200, 3.1)) + score = _score(pair=pair, weights=all_zero_weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert score == 1.7 + 3.1 -def test_zero_score_with_amplicon_tm( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, +def test_score_returns_sum_of_primer_penalties_when_amplicon_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - amplicon_tm: float = calculate_long_seq_tm(score_input.amplicon_sequence) - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=amplicon_tm, - params=zero_score_filtering_params, - ) - == 0.0 - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.7), p("GTGCATACTAG", 59.8, 240, 3.1), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt + assert score == 1.7 + 3.1 -# Notes on score_input. -# - amplicon length is 71bp (for size_penalty) -# - amplicon Tm is 66.30319122042972 (for tm_penalty) -# - primer to target distance is 10 for left_dist_penalty and 20 right_dist_penalty. This also -# means we need a read length of at least 81bp (=71 + 10) for the left_seq_penalty to be zero -# and at least 91bp (=71 + 20) for the right_seq_penalty to be zero -@pytest.mark.parametrize( - "kwargs, expected_score", - [ - # size_penalty: amplicon size is too large (71bp > 61bp, diff = 10, so score = (71-61)*5) - ({"product_size_gt": 5.0, "amplicon_sizes": MinOptMax(61, 61, 61)}, (71 - 61) * 5), - # size_penalty: amplicon size is too small (71bp < 81bp, diff = 10, so score = (71-61)*4) - ({"product_size_lt": 4.0, "amplicon_sizes": MinOptMax(81, 81, 81)}, (71 - 61) * 4), - # tm_penalty: amplicon Tm is too large - ( - {"product_tm_gt": 5.0, "amplicon_tms": MinOptMax(60, 60, 60)}, - 5.0 * (66.30319122042972 - 60), - ), - # tm_penalty: amplicon Tm is too small - ( - {"product_tm_lt": 4.0, "amplicon_tms": MinOptMax(70, 70, 70)}, - 4.0 * (70 - 66.30319122042972), - ), - # both [left/right]_dist_penalty are zero: diff > min_primer_to_target_distance - ({"min_primer_to_target_distance": 10, "dist_from_primer_end_weight": 5.0}, 0), - # left_dist_penalty: diff < min_primer_to_target_distance - ({"min_primer_to_target_distance": 11, "dist_from_primer_end_weight": 5.0}, (1 * 5.0)), - ({"min_primer_to_target_distance": 20, "dist_from_primer_end_weight": 5.0}, (10 * 5.0)), - # both [left/right]_dist_penalty: diff < min_primer_to_target_distance, 11bp for left, and - # 1bp for right, so 12bp overall - ({"min_primer_to_target_distance": 21, "dist_from_primer_end_weight": 5.0}, (12 * 5.0)), - # both[left/right]_seq_penalty: zero since read length >= 81bp (left) and >= 91bp (right) - ({"read_length": 91, "target_not_covered_by_read_length_weight": 10.0}, 0), - # right_seq_penalty: less than 91bp for the right but at 81bp for the left - ({"read_length": 81, "target_not_covered_by_read_length_weight": 10.0}, (10 * 10)), - # both[left/right]_seq_penalty: zero since read length <>=> 81bp (left) and < 91bp (right) - ({"read_length": 71, "target_not_covered_by_read_length_weight": 10.0}, 10.0 * (10 + 20)), - ], -) -def test_score( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, - kwargs: dict[str, Any], - expected_score: float, +def test_score_when_amplicon_longer_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - params = replace(zero_score_filtering_params, **kwargs) - assert ( - picking_score( - left_primer=score_input.left, - right_primer=score_input.right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=params, - ) - == expected_score - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 250, 1.0), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + 10 + assert pair.amplicon_tm == amplicon_tms.opt + assert score == pytest.approx(1 + 1 + (10 * weights.product_size_gt)) -@pytest.mark.parametrize( - "left_penalty, right_penalty", [(0, 0), (10.0, 0.0), (0, 12.0), (13.0, 14.0)] -) -def test_score_primer_primer_penalties( - score_input: ScoreInput, - zero_score_filtering_params: FilteringParams, - left_penalty: float, - right_penalty: float, +def test_score_when_amplicon_shorter_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - left = replace(score_input.left, penalty=left_penalty) - right = replace(score_input.right, penalty=right_penalty) - assert picking_score( - left_primer=left, - right_primer=right, - target=score_input.target, - amplicon=score_input.amplicon, - amplicon_seq_or_tm=score_input.amplicon_sequence, - params=zero_score_filtering_params, - ) == (left_penalty + right_penalty) - - -def test_primer_pairs( - score_input: ScoreInput, zero_score_filtering_params: FilteringParams, genome_ref: Path + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 220, 1.0), tm=80) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt - 20 + assert pair.amplicon_tm == amplicon_tms.opt + assert score == pytest.approx(1 + 1 + (20 * weights.product_size_lt)) + + +def test_score_when_amplicon_tm_higher_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - primer_length: int = 30 - target = Span(refname="chr1", start=100, end=250) - - # tile some left primers - left_primers = [] - right_primers = [] - for offset in range(0, 50, 5): - # left - left_end = target.start - offset - left_start = left_end - primer_length - left = Oligo( - penalty=-offset, tm=0, span=Span(refname=target.refname, start=left_start, end=left_end) - ) - left_primers.append(left) - # right - right_start = target.end + offset - right_end = right_start + primer_length - right = Oligo( - penalty=offset, - tm=0, - span=Span(refname=target.refname, start=right_start, end=right_end), - ) - right_primers.append(right) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 240, 1.0), tm=82.15) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt + 2.15 + assert score == pytest.approx(1 + 1 + (2.15 * weights.product_tm_gt)) - with pysam.FastaFile(f"{genome_ref}") as fasta: - primer_pairs = build_primer_pairs( - left_primers=left_primers, - right_primers=right_primers, - target=target, - params=zero_score_filtering_params, - fasta=fasta, - ) - assert len(primer_pairs) == len(left_primers) * len(right_primers) - last_penalty = primer_pairs[0].penalty - primer_counter: Counter[Oligo] = Counter() - for pp in primer_pairs: - assert pp.left_primer in left_primers - assert pp.right_primer in right_primers - primer_counter[pp.left_primer] += 1 - primer_counter[pp.right_primer] += 1 - # by design, only the left/right penalties contribute to the primer pair penalty - assert pp.penalty == pp.left_primer.penalty + pp.right_primer.penalty - # at least check that the amplicon Tm is non-zero - assert pp.amplicon_tm > 0 - # at least check that the amplicon sequence retrieved is the correct length - assert len(pp.amplicon_sequence) == pp.amplicon.length - # check that the primer pairs are sorted by increasing penalty - assert last_penalty <= pp.penalty - last_penalty = pp.penalty - # make sure we see all the primers the same # of times! - items = primer_counter.items() - assert len(set(i[0] for i in items)) == len(left_primers) + len( - right_primers - ) # same primers - assert len(set(i[1] for i in items)) == 1 # same counts for each primer - - -def test_primer_pairs_except_different_references( - score_input: ScoreInput, zero_score_filtering_params: FilteringParams, genome_ref: Path + +def test_score_when_amplicon_tm_lower_than_optimal( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - # all primers (both left and right) _should_ be on the same reference (refname). Add a test - # that raises an exception (in PrimerPair) that doesn't. - with pysam.FastaFile(f"{genome_ref}") as fasta: - with pytest.raises(ValueError, match="Cannot create a primer pair"): - # change the reference for the right primer - right = replace(score_input.right, span=Span(refname="Y", start=195, end=225)) - build_primer_pairs( - left_primers=[score_input.left], - right_primers=[right], - target=score_input.target, - params=zero_score_filtering_params, - fasta=fasta, - ) + pair = pp(p("ACGACTCATG", 60.1, 101, 1.0), p("GTGCATACTAG", 59.8, 240, 1.0), tm=75.67) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + assert pair.amplicon_tm == amplicon_tms.opt - 4.33 + assert score == pytest.approx(1 + 1 + (4.33 * weights.product_tm_lt)) -def _picking_ref() -> Path: - return Path(__file__).parent / "data" / "picking.fa" - - -@pytest.fixture(scope="session") -def picking_ref() -> Path: - return _picking_ref() - - -def _target() -> Span: - target: Span = Span(refname="chr1", start=150, end=250) - return target - - -def _primer_pair( - target_offset: int = 0, - primer_length: int = 30, - amplicon_tm: Optional[float] = None, - penalty: int = 0, - target: Optional[Span] = None, - params: Optional[FilteringParams] = None, -) -> PrimerPair: - if target is None: - target = _target() - if params is None: - params = _zero_score_filtering_params(score_input=_score_input()) - fasta = pysam.FastaFile(f"{_picking_ref()}") - if amplicon_tm is None: - amplicon_tm = params.amplicon_tms.opt - left_span = Span( - refname=target.refname, - start=target.start - primer_length - target_offset + 1, # +1 for 1-based inclusive - end=target.start - target_offset, - strand=Strand.POSITIVE, - ) - assert left_span.length == primer_length - left_bases = fasta.fetch( - reference=left_span.refname, start=left_span.start - 1, end=left_span.end - ) - right_span = Span( - refname=target.refname, - start=target.end + target_offset, - end=target.end + primer_length + target_offset - 1, # -1 for 1-based inclusive - strand=Strand.NEGATIVE, - ) - assert right_span.length == primer_length - right_bases = fasta.fetch( - reference=left_span.refname, start=right_span.start - 1, end=right_span.end - ) - right_bases = reverse_complement(right_bases) - fasta.close() - return PrimerPair( - left_primer=Oligo(bases=left_bases, penalty=0, tm=0, span=left_span), - right_primer=Oligo(bases=right_bases, penalty=0, tm=0, span=right_span), - amplicon_tm=amplicon_tm, - penalty=penalty, +def test_score_realistic( + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], +) -> None: + pair = pp(p("ACGACTCATG", 60.1, 101, 3.1), p("GTGCATACTAG", 59.8, 256, 4.08), tm=75.67) + score = _score(pair=pair, weights=weights, sizes=amplicon_sizes, tms=amplicon_tms) + assert pair.amplicon.length == amplicon_sizes.opt + 16 + assert pair.amplicon_tm == amplicon_tms.opt - 4.33 + assert score == pytest.approx( + 3.1 + 4.08 + (16 * weights.product_size_gt) + (4.33 * weights.product_tm_lt) ) -def _pick_top_primer_pairs( - params: FilteringParams, - picking_ref: Path, - primer_pairs: list[PrimerPair], - max_primer_hits: int, - max_primer_pair_hits: int, - min_difference: int = 1, -) -> list[PrimerPair]: - with ( - NtThermoAlign() as dimer_checker, - OffTargetDetector( - ref=picking_ref, - max_primer_hits=max_primer_hits, - max_primer_pair_hits=max_primer_pair_hits, - three_prime_region_length=5, - max_mismatches_in_three_prime_region=0, - max_mismatches=0, - max_amplicon_size=params.amplicon_sizes.max, - executable=BWA_EXECUTABLE_NAME, - ) as offtarget_detector, - ): - picked: list[PrimerPair] = pick_top_primer_pairs( - primer_pairs=primer_pairs, - num_primers=len(primer_pairs), - min_difference=min_difference, - params=params, - offtarget_detector=offtarget_detector, - is_dimer_tm_ok=lambda s1, s2: ( - dimer_checker.duplex_tm(s1=s1, s2=s2) <= params.max_dimer_tm - ), - ) - return picked - - -_PARAMS: FilteringParams = _zero_score_filtering_params(_score_input()) -"""Filter parameters for use in creating the test cases for `test_pick_top_primer_pairs`""" - - -@pytest.mark.parametrize( - "picked, primer_pair", - [ - # primer pair passes all primer/primer-pair-specific filters - (True, _primer_pair()), - # too small amplicon size - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_sizes.min - 1)), - # too big amplicon size - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_sizes.max + 1)), - # too low amplicon Tm - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_tms.min - 1)), - # too large amplicon Tm - (False, _primer_pair(amplicon_tm=_PARAMS.amplicon_tms.max + 1)), - ], -) -def test_pick_top_primer_pairs_individual_primers( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, - picked: bool, - primer_pair: PrimerPair, +################################################################################ +# Tests for picking.py::build_primer_pairs() +################################################################################ + + +def test_build_primer_pairs_no_primers( + fasta: Path, + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - expected = [primer_pair] if picked else [] - assert expected == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[primer_pair], - max_primer_hits=2, - max_primer_pair_hits=2, + pairs = list( + picking.build_primer_pairs( + left_primers=[], + right_primers=[], + target=Span("chr1", 240, 260), + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) ) + assert len(pairs) == 0 -def test_pick_top_primer_pairs_dimer_tm_too_large( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primer_pairs_single_pair( + fasta: Path, + weights: PrimerAndAmpliconWeights, + amplicon_sizes: MinOptMax[int], + amplicon_tms: MinOptMax[float], ) -> None: - pp = _primer_pair() - - # get the Tm for the primer pair - duplex_tm = NtThermoAlign().duplex_tm(s1=pp.left_primer.bases, s2=pp.right_primer.bases) - - # at the maximum dimer Tm - params = replace(zero_score_filtering_params, max_dimer_tm=duplex_tm) - assert [pp] == _pick_top_primer_pairs( - params=params, - picking_ref=picking_ref, - primer_pairs=[pp], - max_primer_hits=2, - max_primer_pair_hits=2, + pairs = list( + picking.build_primer_pairs( + left_primers=[p(REF_BASES[200:220], tm=60.1, pos=201, pen=1.0)], + right_primers=[p(reverse_complement(REF_BASES[280:300]), tm=61.8, pos=281, pen=1.1)], + target=Span("chr1", 240, 260), + amplicon_sizes=amplicon_sizes, + amplicon_tms=amplicon_tms, + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) ) - # **just** over the maximum dimer Tm - params = replace(zero_score_filtering_params, max_dimer_tm=duplex_tm - 0.0001) - assert [] == _pick_top_primer_pairs( - params=params, - picking_ref=picking_ref, - primer_pairs=[pp], - max_primer_hits=2, - max_primer_pair_hits=2, - ) + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert len(pairs) == 1 + assert pairs[0].span == Span("chr1", 201, 300) + assert pairs[0].left_primer.bases == pairs[0].bases[0:20] + assert pairs[0].right_primer.bases == reverse_complement(pairs[0].bases[-20:]) -_PRIMER_PAIRS_PICKING: list[PrimerPair] = [ - _primer_pair(target_offset=10, penalty=2), # left.start=111 - _primer_pair(target_offset=6, penalty=2), # 4bp from the previous, left.start=115 - _primer_pair(target_offset=3, penalty=3), # 3bp from the previous, left.start=118 - _primer_pair(target_offset=1, penalty=4), # 2bp from the previous, left.start=120 - _primer_pair(target_offset=0, penalty=5), # 1bp from the previous, left.start=121 - _primer_pair(target_offset=0, penalty=6), # same as the previous, left.start=121 -] - - -@dataclass(frozen=True) -class _PickingTestCase: - min_difference: int - primer_pairs: list[Tuple[bool, PrimerPair]] - - -@pytest.mark.parametrize( - "test_case", - [ - # primer pairs that overlap each other fully, so second one is discarded - _PickingTestCase( - min_difference=1, - primer_pairs=[ - (True, _primer_pair(target_offset=1, penalty=2)), # first primer pair, kept - ( - False, - _primer_pair(target_offset=1, penalty=2), - ), # same as the previous, discarded - ], - ), - # primer pairs that are offset by 1bp (equal to min-difference) so both are kept - _PickingTestCase( - min_difference=1, - primer_pairs=[ - (True, _primer_pair(target_offset=0, penalty=2)), - (True, _primer_pair(target_offset=1, penalty=2)), - ], - ), - # primer pairs that are offset by 1bp (less than min-difference) so the first is kept - _PickingTestCase( - min_difference=2, - primer_pairs=[ - (True, _primer_pair(target_offset=0, penalty=2)), - (False, _primer_pair(target_offset=1, penalty=2)), - ], - ), - _PickingTestCase( - min_difference=3, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, True, True, False, True, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=4, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, True, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=5, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=6, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=7, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, True, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=8, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=9, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, True, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=10, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, False, True, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - _PickingTestCase( - min_difference=11, - primer_pairs=list( - zip( - # 111, 115, 118, 120, 121, 121 - [True, False, False, False, False, False], - _PRIMER_PAIRS_PICKING, - strict=True, - ) - ), - ), - ], -) -def test_pick_top_primer_pairs_multiple_primers( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, - test_case: _PickingTestCase, + +def test_build_primers_amplicon_size_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - in_primer_pairs = [pp[1] for pp in test_case.primer_pairs] - - # First check that picked _alone_ they will be kept - for primer_pair in in_primer_pairs: - assert [primer_pair] == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[primer_pair], - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=test_case.min_difference, + pairs = list( + picking.build_primer_pairs( + left_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(1, 151, 10)], + right_primers=[p("A" * 20, tm=60, pos=s, pen=1.0) for s in range(151, 301, 10)], + target=Span("chr1", 150, 160), + amplicon_sizes=MinOptMax(100, 150, 200), + amplicon_tms=MinOptMax(0.0, 0.0, 100.0), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, ) - - # Next, check that picked _together_ only those primer pairs with "True" associated with them - # are kept - expected = [pp[1] for pp in test_case.primer_pairs if pp[0]] - assert expected == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=in_primer_pairs, - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=test_case.min_difference, ) + assert len(pairs) > 0 + + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert pair.span.length <= 200 + -def test_pick_top_primer_pairs_input_order( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primers_heterodimer_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - with pytest.raises(ValueError, match="Primers must be given in order by penalty"): - _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[_primer_pair(penalty=6), _primer_pair(penalty=5)], - max_primer_hits=2, - max_primer_pair_hits=2, - min_difference=0, + pairs = list( + picking.build_primer_pairs( + left_primers=[ + p("CGCGCGCGCGCGCGCGCGCG", tm=60, pos=100, pen=1.0), + p("CTGCTGCTGCTGCTGCTGCT", tm=60, pos=100, pen=1.0), + ], + right_primers=[ + p("GCGCGCGCGCGCGCGCGCGC", tm=60, pos=180, pen=1.0), + p("AGCAGCAGCAGCAGCAGCAG", tm=60, pos=180, pen=1.0), + ], + target=Span("chr1", 150, 160), + amplicon_sizes=MinOptMax(10, 150, 200), + amplicon_tms=MinOptMax(0.0, 0.0, 100.0), + max_heterodimer_tm=50, + weights=weights, + fasta_path=fasta, ) + ) + + for pair in pairs: + assert pair.span.length == len(pair.bases) + assert pair.amplicon.start == pair.left_primer.span.start + assert pair.amplicon.end == pair.right_primer.span.end + assert pair.bases == REF_BASES[pair.amplicon.start - 1 : pair.amplicon.end] + assert pair.span.length <= 200 + assert len(pairs) == 2 + primer_seqs = sorted([f"{pp.left_primer.bases}-{pp.right_primer.bases}" for pp in pairs]) + assert primer_seqs == [ + "CGCGCGCGCGCGCGCGCGCG-AGCAGCAGCAGCAGCAGCAG", + "CTGCTGCTGCTGCTGCTGCT-GCGCGCGCGCGCGCGCGCGC", + ] -def test_pick_top_primer_pairs_too_many_off_targets( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, + +def test_build_primer_pairs_amplicon_tm_filtering( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - """The contig is duplicated wholesale in `picking.fa`, so we should get two hits per primer. - Thus, setting `max_primer_hits=1` will return "too many hits""" - assert [] == _pick_top_primer_pairs( - params=zero_score_filtering_params, - picking_ref=picking_ref, - primer_pairs=[_primer_pair()], - max_primer_hits=1, - max_primer_pair_hits=1, - ) + amp_bases = REF_BASES[200:300] + amp_tm = calculate_long_seq_tm(amp_bases) + assert floor(amp_tm) == 84 + + for max_tm in [83, 84, 85]: + pairs = list( + picking.build_primer_pairs( + left_primers=[p(REF_BASES[200:220], tm=60.1, pos=201, pen=1.0)], + right_primers=[ + p(reverse_complement(REF_BASES[280:300]), tm=61.8, pos=281, pen=1.1) + ], + target=Span("chr1", 240, 260), + amplicon_sizes=MinOptMax(0, 0, 500), + amplicon_tms=MinOptMax(0, 80, max_tm), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ) + ) + + assert len(pairs) == (1 if max_tm > amp_tm else 0) -def test_and_pick_primer_pairs( - picking_ref: Path, - zero_score_filtering_params: FilteringParams, +def test_build_primer_pairs_fails_when_primers_on_wrong_reference( + fasta: Path, + weights: PrimerAndAmpliconWeights, ) -> None: - target = _target() - pp = _primer_pair(target=target, amplicon_tm=92.96746617835336) - params = replace( - zero_score_filtering_params, - amplicon_tms=MinOptMax(pp.amplicon_tm, pp.amplicon_tm, pp.amplicon_tm), + target = Span("chr1", 240, 260) + valid_lefts = [p(REF_BASES[200:220], tm=60, pos=201, pen=1)] + invalid_lefts = [p(REF_BASES[200:220], tm=60, chrom="X", pos=201, pen=1)] + valid_rights = [p(reverse_complement(REF_BASES[280:300]), tm=61, pos=281, pen=1)] + invalid_rights = [p(reverse_complement(REF_BASES[280:300]), tm=61, chrom="X", pos=281, pen=1)] + + picks = picking.build_primer_pairs( + left_primers=valid_lefts, + right_primers=valid_rights, + target=target, + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, ) - offtarget_detector = OffTargetDetector( - ref=picking_ref, - max_primer_hits=2, - max_primer_pair_hits=2, - three_prime_region_length=5, - max_mismatches_in_three_prime_region=0, - max_mismatches=0, - max_amplicon_size=params.amplicon_sizes.max, - executable=BWA_EXECUTABLE_NAME, - ) + assert next(picks) is not None - with pysam.FastaFile(f"{picking_ref}") as fasta: - designed_primer_pairs = build_and_pick_primer_pairs( - left_primers=[pp.left_primer], - right_primers=[pp.right_primer], + with pytest.raises(ValueError, match="Left primers exist on different reference"): + _picks = list(picking.build_primer_pairs( + left_primers=invalid_lefts, + right_primers=valid_rights, target=target, - num_primers=1, - min_difference=1, - params=params, - offtarget_detector=offtarget_detector, - dimer_checker=NtThermoAlign(), - fasta=fasta, - ) - assert len(designed_primer_pairs) == 1 - designed_primer_pair = designed_primer_pairs[0] - assert designed_primer_pair.left_primer == pp.left_primer - assert designed_primer_pair.right_primer == pp.right_primer - assert designed_primer_pair.amplicon == pp.amplicon - assert designed_primer_pair.penalty == pp.penalty - assert designed_primer_pair.amplicon_tm == pp.amplicon_tm - assert len(designed_primer_pair.amplicon_sequence) == pp.amplicon.length + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + )) + + with pytest.raises(ValueError, match="Right primers exist on different reference"): + _picks = list(picking.build_primer_pairs( + left_primers=valid_lefts, + right_primers=invalid_rights, + target=target, + amplicon_sizes=MinOptMax(0, 100, 500), + amplicon_tms=MinOptMax(0, 80, 150), + max_heterodimer_tm=None, + weights=weights, + fasta_path=fasta, + ))