-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathexample_03_cluster_example.py
69 lines (55 loc) · 2.22 KB
/
example_03_cluster_example.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
import os
from pipebio.models.job_type import JobType
from pipebio.pipebio_client import PipebioClient
def example_03_cluster_example(sequence_document_id: int, target_folder_id: int = None):
"""
Cluster Example
Clusters a known annotated document from a known project.
"""
project_name = os.environ['PROJECT_NAME']
if sequence_document_id is None or project_name is None:
raise Exception("Error! Set sequence_document_id and project_name to continue.")
client = PipebioClient()
# Display api key user details.
user = client.user
print('\nUsing api key for {} {}. \n'.format(user['firstName'], user['lastName']))
# Get a list of all available projects for the user's organization.
projects = client.shareables.list()
# Find a specific project having a name "Example".
example_project = next((project for project in projects if project['name'] == project_name), None)
if example_project is None:
print(f'Error: Example project named {project_name} not found')
quit()
# Find a specific document with an id "22333"
entities = client.shareables.list_entities(example_project['id'])
annotated_doc = next((entity for entity in entities if entity['id'] == str(sequence_document_id)), None)
if annotated_doc is None:
print('Error: annotated_doc not found')
quit()
# Run a cluster job on that document.
organization_id = user['orgs'][0]['id']
job_id = client.jobs.create(
owner_id=organization_id,
shareable_id=annotated_doc['ownerId'],
input_entity_ids=[annotated_doc['id']],
job_type=JobType.ClusterJob,
name='Cluster job from python client',
params={
"filter": [],
"regions": [
"CDR-H3"
],
"identity": "0.90",
"wordSize": 5,
"algorithm": "CDHit",
"selection": [],
"translate": True,
"aggregates": [],
"targetFolderId": target_folder_id,
"translationTable": "Standard"
}
)
# Wait for the job to be completed.
return client.jobs.poll_job(job_id)
if __name__ == "__main__":
example_03_cluster_example(296716, 296712)