Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add lineage assignment #22

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 37 additions & 23 deletions .github/workflows/pytest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,18 @@ jobs:
with:
path: main

- name: get lineage databases
uses: actions/checkout@v3
with:
repository: bacpop/beebop_data
path: ./main/storage

- name: Download and extract GPS database
working-directory: ./main
run: |
./scripts/download_db --small storage



- name: Set up Python 3.9
uses: actions/setup-python@v2
Expand All @@ -44,31 +52,37 @@ jobs:
source $CONDA/etc/profile.d/conda.sh
conda activate beebop_py

# - name: Install poppunk
# run: conda install poppunk
- name: Install poppunk
run: conda install poppunk
# currently the latest poppunk release is missing some functions required for beebop. For now installing from source:

- name: Get poppunk source code
uses: actions/checkout@v3
with:
repository: bacpop/PopPUNK
ref: fix-json-serialisation
path: poppunk

- name: install poppunk & dependencies
working-directory: ./poppunk
# - name: Get poppunk source code
# uses: actions/checkout@v3
# with:
# repository: bacpop/PopPUNK
# path: poppunk

# - name: install poppunk & dependencies
# working-directory: ./poppunk
# run: |
# source $CONDA/etc/profile.d/conda.sh
# conda activate beebop_py
# conda install graph-tool
# conda install mandrake
# conda install rapidnj
# sudo apt-get install libeigen3-dev
# sudo apt-get install libopenblas-dev
# sudo apt-get install -y '^libxcb.*-dev' libx11-xcb-dev libglu1-mesa-dev libxrender-dev libxi-dev libxkbcommon-dev libxkbcommon-x11-dev
# pip install joblib==1.1.0
# conda install -c bioconda pp-sketchlib=2.0.0
# pip3 install git+https://github.com/bacpop/PopPUNK

- name: install dependencies
run: |
source $CONDA/etc/profile.d/conda.sh
conda activate beebop_py
conda install graph-tool
conda install mandrake
conda install rapidnj
sudo apt-get install libeigen3-dev
sudo apt-get install libopenblas-dev
sudo apt-get install libeigen3-dev libopenblas-dev
sudo apt-get install -y '^libxcb.*-dev' libx11-xcb-dev libglu1-mesa-dev libxrender-dev libxi-dev libxkbcommon-dev libxkbcommon-x11-dev
pip install joblib==1.1.0
conda install -c bioconda pp-sketchlib=2.0.0
pip3 install git+https://github.com/bacpop/PopPUNK

- name: Install Poetry
uses: snok/install-poetry@v1
Expand Down Expand Up @@ -117,7 +131,7 @@ jobs:
# Uncomment the next three lines to debug on failure with
# tmate. However, don't leave them uncommented on merge as that
# causes failing builds to hang forever.
#
# - name: tmate session
# if: ${{ failure() }}
# uses: mxschmitt/action-tmate@v3

- name: tmate session
if: ${{ failure() }}
uses: mxschmitt/action-tmate@v3
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -9,4 +9,5 @@ ignored
tests/results
storage/poppunk_output
node_modules
storage/GPS_v4*
storage/GPS_v4*
storage/strain_*_lineage_db
126 changes: 98 additions & 28 deletions beebop/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@
import json
import requests
import pickle
import csv

from beebop import versions, assignClusters, visualise
from beebop import versions, assignClusters, assignLineages, visualise
from beebop.filestore import PoppunkFileStore, DatabaseFileStore
from beebop.utils import get_args
import beebop.schemas
Expand Down Expand Up @@ -65,11 +66,15 @@ def check_connection(redis) -> None:
abort(500, description="Redis not found")


def generate_zip(path_folder: str, type: str, cluster: str) -> BytesIO:
def generate_zip(fs: PoppunkFileStore,
p_hash: str,
type: str,
cluster: str) -> BytesIO:
"""
[This generates a .zip folder with results data.]

:param path_folder: [path to folder to be zipped]
:param fs: [PoppunkFileStore with path to folder to be zipped]
:param p_hash: [project hash]
:param type: [can be either 'microreact' or 'network']
:param cluster: [only relevant for 'network', since there are multiple
component files stored in the folder, but only the right one should
Expand All @@ -79,9 +84,14 @@ def generate_zip(path_folder: str, type: str, cluster: str) -> BytesIO:
"""
memory_file = BytesIO()
if type == 'microreact':
path_folder = fs.output_microreact(p_hash, cluster)
add_files(memory_file, path_folder)
elif type == 'network':
file_list = (f'network_component_{cluster}.graphml',
path_folder = fs.output_network(p_hash)
with open(fs.network_mapping(p_hash), 'rb') as dict:
cluster_component_mapping = pickle.load(dict)
component = cluster_component_mapping[str(cluster)]
file_list = (f'network_component_{component}.graphml',
'network_cytoscape.csv',
'network_cytoscape.graphml')
add_files(memory_file, path_folder, file_list)
Expand Down Expand Up @@ -199,15 +209,32 @@ def run_poppunk_internal(sketches: dict,
# check connection to redis
check_connection(redis)
# submit list of hashes to redis worker
job_assign = q.enqueue(assignClusters.get_clusters,
hashes_list,
p_hash,
fs,
db_paths,
args,
job_timeout=job_timeout)
job_assign_clusters = q.enqueue(assignClusters.get_clusters,
hashes_list,
p_hash,
fs,
db_paths,
args,
job_timeout=job_timeout)
# save p-hash with job.id in redis server
redis.hset("beebop:hash:job:assign", p_hash, job_assign.id)
redis.hset("beebop:hash:job:assignClusters",
p_hash,
job_assign_clusters.id)

# submit list of hashes to redis worker
job_assign_lineages = q.enqueue(assignLineages.get_lineages,
hashes_list,
p_hash,
fs,
db_paths,
args,
depends_on=job_assign_clusters,
job_timeout=job_timeout)
# save p-hash with job.id in redis server
redis.hset("beebop:hash:job:assignLineages",
p_hash,
job_assign_lineages.id)

# create visualisations
# microreact
job_microreact = q.enqueue(visualise.microreact,
Expand All @@ -216,7 +243,8 @@ def run_poppunk_internal(sketches: dict,
db_paths,
args,
name_mapping),
depends_on=job_assign, job_timeout=job_timeout)
depends_on=job_assign_clusters,
job_timeout=job_timeout)
redis.hset("beebop:hash:job:microreact", p_hash,
job_microreact.id)
# network
Expand All @@ -226,9 +254,11 @@ def run_poppunk_internal(sketches: dict,
db_paths,
args,
name_mapping),
depends_on=job_assign, job_timeout=job_timeout)
depends_on=job_assign_clusters,
job_timeout=job_timeout)
redis.hset("beebop:hash:job:network", p_hash, job_network.id)
return jsonify(response_success({"assign": job_assign.id,
return jsonify(response_success({"assign_clusters": job_assign_clusters.id,
"assign_lineages": job_assign_lineages.id,
"microreact": job_microreact.id,
"network": job_network.id}))

Expand Down Expand Up @@ -262,16 +292,25 @@ def get_status_job(job, p_hash, redis):
id = redis.hget(f"beebop:hash:job:{job}", p_hash).decode("utf-8")
return Job.fetch(id, connection=redis).get_status()
try:
status_assign = get_status_job('assign', p_hash, redis)
if status_assign == "finished":
status_assign_clusters = get_status_job('assignClusters',
p_hash,
redis)
if status_assign_clusters == "finished":
status_assign_lineages = get_status_job('assignLineages',
p_hash,
redis)
status_microreact = get_status_job('microreact', p_hash, redis)
status_network = get_status_job('network', p_hash, redis)
else:
status_assign_lineages = "waiting"
status_microreact = "waiting"
status_network = "waiting"
return jsonify(response_success({"assign": status_assign,
"microreact": status_microreact,
"network": status_network}))
return jsonify(response_success({
"assignClusters": status_assign_clusters,
"assignLineages": status_assign_lineages,
"microreact": status_microreact,
"network": status_network
}))
except AttributeError:
return jsonify(error=response_failure({
"error": "Unknown project hash"})), 500
Expand All @@ -290,7 +329,8 @@ def get_results(result_type) -> json:
must be provided by the user in the frontend]

:param result_type: [can be
- 'assign' for clusters
- 'assignClusters' for clusters
- 'assignLineages' for lineages
- 'zip' for visualisation results as zip folders (with the json
property 'type' specifying whether 'microreact' or 'network'
results are required)
Expand All @@ -299,9 +339,12 @@ def get_results(result_type) -> json:
cluster]
:return json: [response object with result stored in 'data']
"""
if result_type == 'assign':
if result_type == 'assignClusters':
p_hash = request.json['projectHash']
return get_clusters_internal(p_hash, redis)
elif result_type == 'assignLineages':
p_hash = request.json['projectHash']
return get_lineages_internal(p_hash, storage_location)
elif result_type == 'zip':
p_hash = request.json['projectHash']
visualisation_type = request.json['type']
Expand Down Expand Up @@ -338,7 +381,8 @@ def get_clusters_internal(p_hash: str, redis: Redis) -> json:
"""
check_connection(redis)
try:
id = redis.hget("beebop:hash:job:assign", p_hash).decode("utf-8")
id = redis.hget("beebop:hash:job:assignClusters",
p_hash).decode("utf-8")
job = Job.fetch(id, connection=redis)
if job.result is None:
return jsonify(error=response_failure({
Expand All @@ -350,6 +394,36 @@ def get_clusters_internal(p_hash: str, redis: Redis) -> json:
"error": "Unknown project hash"})), 500


def get_lineages_internal(p_hash: str, storage_location: str) -> json:
"""
[returns lineage assignment results]

:param p_hash: [project hash]m
:param storage_location: [storage location]
:return json: [response object with lineage results stored in 'data']
"""
fs = PoppunkFileStore(storage_location)
path_lineages = fs.lineages_csv(p_hash)
lineages = {}

try:
with open(path_lineages) as csv_file:
csv_reader = csv.reader(csv_file, delimiter=',')
for row in csv_reader:
lineages[row[0]] = {
"rank1": row[2],
"rank2": row[3],
"rank3": row[4]}
result = jsonify(response_success(lineages))

except (FileNotFoundError):
result = jsonify(error=response_failure({
"error": "File not found",
"detail": "Lineage results not found"
})), 500
return result


def send_zip_internal(p_hash: str,
type: str,
cluster: str,
Expand All @@ -364,12 +438,8 @@ def send_zip_internal(p_hash: str,
:return any: [zipfile]
"""
fs = PoppunkFileStore(storage_location)
if type == 'microreact':
path_folder = fs.output_microreact(p_hash, cluster)
elif type == 'network':
path_folder = fs.output_network(p_hash)
# generate zipfile
memory_file = generate_zip(path_folder, type, cluster)
memory_file = generate_zip(fs, p_hash, type, cluster)
return send_file(memory_file,
download_name=type + '.zip',
as_attachment=True)
Expand Down
41 changes: 15 additions & 26 deletions beebop/assignClusters.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,10 @@
from PopPUNK.web import summarise_clusters, sketch_to_hdf5
from PopPUNK.utils import setupDBFuncs
import re
import os

from beebop.poppunkWrapper import PoppunkWrapper
from beebop.filestore import PoppunkFileStore, DatabaseFileStore


def hex_to_decimal(sketches_dict) -> None:
"""
[Converts all hexadecimal numbers in the sketches into decimal numbers.
These have been stored in hexadecimal format to not loose precision when
sending the sketches from the backend to the frontend]

:param sketches_dict: [dictionary holding all sketches]
"""
for sample in list(sketches_dict.values()):
if type(sample['14'][0]) == str and re.match('0x.*', sample['14'][0]):
for x in range(14, 30, 3):
sample[str(x)] = list(map(lambda x: int(x, 16),
sample[str(x)]))
from beebop.utils import hex_to_decimal


def get_clusters(hashes_list: list,
Expand Down Expand Up @@ -63,15 +48,19 @@ def get_clusters(hashes_list: list,

# run query assignment
wrapper = PoppunkWrapper(fs, db_paths, args, p_hash)
wrapper.assign_clusters(dbFuncs, qc_dict, qNames)
isolateClustering = wrapper.assign_clusters(dbFuncs, qc_dict, qNames)

# Process clustering
query_strains = {}
clustering_type = 'combined'
for isolate in isolateClustering[clustering_type]:
if isolate in qNames:
strain = isolateClustering[clustering_type][isolate]
if strain in query_strains:
query_strains[strain].append(isolate)
else:
query_strains[strain] = [isolate]

queries_names, queries_clusters, _, _, _, _, _ = \
summarise_clusters(outdir, args.assign.species, db_paths.db, qNames)
summarise_clusters(outdir, args.assign.species, db_paths.db, qNames)

result = {}
for i, (name, cluster) in enumerate(zip(queries_names, queries_clusters)):
result[i] = {
"hash": name,
"cluster": cluster
}
return result
return query_strains
Loading