Skip to content

Commit

Permalink
Merge pull request #127 from rajewsky-lab/fix-newmaster
Browse files Browse the repository at this point in the history
After long testing and many bugfixes we now have a new 0.8 candidate with this branch. In agreement with Dani and Nikos, I merge this PR into master.
  • Loading branch information
marvin-jens authored Dec 2, 2024
2 parents 9ce7d89 + 76c8de6 commit a300f5d
Show file tree
Hide file tree
Showing 42 changed files with 6,239 additions and 2,340 deletions.
4 changes: 4 additions & 0 deletions .coveragerc
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[run]
concurrency = multiprocessing
parallel = true
sigterm = true
2 changes: 1 addition & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
copyright = '2021-2024, Rajewsky Lab'
author = 'Tamas Ryszard Sztanka-Toth, Marvin Jens, Nikos Karaiskos, Nikolaus Rajewsky'

version = '0.7.9'
version = '0.8.0'
release = version

# -- General configuration
Expand Down
11 changes: 10 additions & 1 deletion environment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ channels:
- bih-cubi
- conda-forge
- bioconda
- nodefaults
dependencies:
- python>=3.6,<3.12
- snakemake>=5.32.0,<6.4.0
Expand All @@ -26,7 +27,12 @@ dependencies:
- openjdk==11.0.15
- pigz
- pip:
- pandas>=1.3.0,<=1.5.1
- setproctitle
- isal
- pytest
- pytest-cov
- mrfifo>=0.3.0
- pandas>2
- scanpy>=1.8.1
- leidenalg>=0.8.1
- numpy>=1.18.1
Expand All @@ -38,3 +44,6 @@ dependencies:
- squidpy>=1.0.0
- novosparc
- opencv-python
- jinja2>=3.1.3
- matplotlib==3.8.4
# - pytest-optional-tests
16 changes: 10 additions & 6 deletions setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = spacemake
version = 0.7.9
version = attr: spacemake.contrib.__version__
author = Tamas Ryszard Sztanka-Toth, Marvin Jens, Nikos Karaiskos, Nikolaus Rajewsky
author_email = [email protected]
description = A bioinformatic pipeline for the analysis of spatial transcriptomic data
Expand All @@ -17,7 +17,7 @@ license = GPL

[options]
zip_safe = False
python_requires = >=3.6
python_requires = >=3.8
include_package_data = True
package_dir =
spacemake = spacemake
Expand All @@ -30,15 +30,19 @@ spacemake =
snakemake/scripts/*.Rmd
snakemake/scripts/*.py
data/*.csv
data/*.fa
config/*.yaml
longread/*.py

# [options.data_files]
# snakemake = spacemake/snakemake/dropseq.smk

[options.entry_points]
console_scripts =
alnstats = spacemake.alnstats:cmdline
preprocess = spacemake.preprocess:cmdline
spacemake = spacemake.smk:cmdline
spacemake = spacemake.cmdline:cmdline
pb_annotate = spacemake.longread.cmdline:cmdline

[tool:pytest]
testpaths = tests
markers =
big_download: needs to download large-ish files
addopts = --cov=spacemake --cov-report html
10 changes: 5 additions & 5 deletions spacemake/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
__version__ = 1.0
import matplotlib._path
from . import preprocess as pp
from . import spatial as sp
# __version__ = 1.0
# import matplotlib._path
# from . import preprocess as pp
# from . import spatial as sp

from .smk import Spacemake
# from .smk import Spacemake
183 changes: 183 additions & 0 deletions spacemake/bin/BamTagHistogram.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,183 @@
import mrfifo as mf
import logging


def parse_args():
from spacemake.util import make_minimal_parser

parser = make_minimal_parser("BamTagHistogram")

parser.add_argument("--parallel", type=int, default=8)
parser.add_argument("--input", default="/dev/stdin")
parser.add_argument("--output", default="/dev/stdout")
parser.add_argument(
"--prefix-size",
default=4,
type=int,
help=(
"how many letters of the tag value are used to split the stream. "
"default=4 allows for up to (alphabet_size)^4 distinct parallel workers. "
"will be spread across workers by mod <args.parallel>"
),
)
parser.add_argument("--prefix-alphabet", default="ACGTN")
parser.add_argument("--min-count", default=10, type=int)
parser.add_argument(
"--sort-mem",
default=8,
type=int,
help="how many GB are allowed to be used for sorting (default=8)",
)
parser.add_argument(
"--tag", default="CB", help="which BAM tag to count (default='CB')"
)

return parser.parse_args()


def CB_distributor(
input, outputs, tag="CB", prefix_size=3, prefix_alphabet="ACGTN", n=8, **kw
):
"ensure that the FIFOs are not managed"
assert type(input) is str
logger = logging.getLogger("mrfifo.parts.CB_distributor")
logger.debug(
f"reading from {input}, writing to {outputs} "
f"tag={tag} prefix_size={prefix_size} prefix_alphabet={prefix_alphabet} "
f"kw={kw}"
)

lkup = {}
from itertools import product

i = 0
for letters in product(*([prefix_alphabet] * prefix_size)):
prefix = "".join(letters).encode("ascii")
lkup[prefix] = i % n
i += 1

# for k, v in sorted(lkup.items()):
# print(f"{k}\t{v}")

from mrfifo.fast_loops import distribute_by_substr

tag_lead = b"\t" + tag.encode("ascii") + b":Z:"
logger.debug(
f"scanning for tag-lead {tag_lead} and using next {prefix_size} bytes as prefix"
)
res = distribute_by_substr(
fin_name=input,
fifo_names=outputs,
sub_lookup=lkup,
sub_size=prefix_size,
sub_lead=tag_lead,
# **kw,
)
logger.debug("distribution complete")
return res


def tag_counter(input, output, tag="CB", min_count=10):
from collections import defaultdict

counter = defaultdict(int)
stats = defaultdict(int)
import re

pattern = re.compile(f"{tag}:Z:(\S+)")
for sam_line in input:
stats["n_records"] += 1
flags = int(sam_line.split("\t")[1])
if flags & 256:
# 'not primary alignment' bit is set
stats["n_secondary"] += 1
continue

if m := re.search(pattern, sam_line):
stats["n_tagged"] += 1
tag_val = m.groups(0)[0]
counter[tag_val] += 1

stats["n_values"] = len(counter)
for value, count in counter.items():
if count >= min_count:
stats["n_above_cut"] += 1
output.write(f"{count}\t{value}\n")

return stats


def sort_function(input, output, n=8, sort_mem_gigs=8, header=None):
import os

if header is None:
header = rf"# INPUT={args.input} TAG={args.tag} FILTER_PCR_DUPLICATES=false READ_QUALITY=0\n"

if output.endswith(".gz"):
cmd = (
f'{{ printf "{header}"; sort -rnk 1 -S {sort_mem_gigs}G --parallel={n} {input}; }}'
f"| python -m isal.igzip -c > {output}"
)
else:
cmd = f'{{ printf "{header}"; sort -rnk 1 -S {sort_mem_gigs}G --parallel={n} {input}; }} > {output}'

import subprocess

subprocess.call(cmd, shell=True)


def main(args):
w = (
mf.Workflow("BamTagHistogram", total_pipe_buffer_MB=4)
.BAM_reader(
input=args.input,
mode="S",
threads=4,
)
.distribute(
input=mf.FIFO("input_sam", "rt"),
outputs=mf.FIFO("dist_{n}", "wt", n=args.parallel),
func=CB_distributor,
tag=args.tag,
prefix_size=args.prefix_size,
prefix_alphabet=args.prefix_alphabet,
n=args.parallel,
)
.workers(
func=tag_counter,
tag=args.tag,
input=mf.FIFO("dist_{n}", "rt"),
output=mf.FIFO("counts_{n}", "wt"),
n=args.parallel,
min_count=args.min_count,
)
.collect(
inputs=mf.FIFO("counts_{n}", "rt", n=args.parallel),
output=mf.FIFO("unsorted", "wt"),
chunk_size=1,
)
.funnel(
input=mf.FIFO("unsorted", "rt"),
output=args.output,
func=sort_function,
_manage_fifos=False,
)
.run()
)
stats = mf.util.CountDict()
for jobname, d in w.result_dict.items():
if "worker" in jobname:
stats.add_other_stats(d)

df = stats.get_stats_df()
df["input"] = args.input
print(df.set_index("input"))
return w


if __name__ == "__main__":
args = parse_args()
import spacemake.util as util

util.setup_logging(args)
main(args)
Loading

0 comments on commit a300f5d

Please sign in to comment.