Skip to content

Commit

Permalink
Rest E2E Pipeline
Browse files Browse the repository at this point in the history
  • Loading branch information
benknoll-umn committed Feb 12, 2024
1 parent 490f486 commit 75c5d40
Show file tree
Hide file tree
Showing 15 changed files with 317 additions and 79 deletions.
12 changes: 7 additions & 5 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -37,21 +37,23 @@ jobs:
with:
distribution: 'temurin'
java-version: ${{ matrix.java-version }}
- uses: actions/setup-go@v5
with:
go-version: '1.18'
- name: Install dependencies (not release)
if: ${{ !startsWith(github.head_ref, 'release') && success() }}
run: |
python -m pip install --upgrade pip setuptools wheel
pip install flake8 pytest
pip install git+https://github.com/nlpie/mtap@main#egg=mtap
pip install ./biomedicus_client
pip install .[test,stanza] --extra-index-url https://download.pytorch.org/whl/cpu
pip install git+https://github.com/nlpie/mtap@main#egg=mtap ./biomedicus_client .[test,stanza] --extra-index-url https://download.pytorch.org/whl/cpu
go install github.com/nlpie/mtap/go/mtap-gateway/mtap-gateway.go
- name: Install dependencies (release)
if: ${{ startsWith(github.head_ref, 'release') && success() }}
run: |
python -m pip install --upgrade pip setuptools wheel
pip install flake8 pytest
SETUPTOOLS_SCM_PRETEND_VERSION=${GITHUB_HEAD_REF##*/} pip install ./biomedicus_client
pip install .[test,stanza] --extra-index-url https://download.pytorch.org/whl/cpu
SETUPTOOLS_SCM_PRETEND_VERSION=${GITHUB_HEAD_REF##*/} pip install ./biomedicus_client .[test,stanza] --extra-index-url https://download.pytorch.org/whl/cpu
go install github.com/nlpie/mtap/go/mtap-gateway/mtap-gateway.go
- name: Lint with flake8
run: |
pip install flake8
Expand Down
2 changes: 1 addition & 1 deletion biomedicus_client/src/biomedicus_client/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
5 changes: 3 additions & 2 deletions biomedicus_client/src/biomedicus_client/sources.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,12 +11,13 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Input sources for running pipelines."""

import fnmatch
import time
from pathlib import Path
from typing import Generator, Iterator
from typing import Iterator

from mtap import Event
from mtap.pipeline import ProcessingSource
Expand Down
4 changes: 3 additions & 1 deletion python/biomedicus/cli.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -22,6 +22,7 @@
rtf_to_text
)
from biomedicus.java_support import RunJavaCommand
from biomedicus.pipeline_service import ServePipeline
from biomedicus.utilities.print_all_processors_metadata import PrintProcessorMetaCommand
from biomedicus_client import cli_tools
from biomedicus_client.cli_tools import WriteConfigsCommand
Expand All @@ -44,6 +45,7 @@ def main(args=None):
DownloadDataCommand(),
PrintProcessorMetaCommand(),
rtf_to_text.DeployRtfToTextCommand(),
ServePipeline()
)

conf = parser.parse_args(args)
Expand Down
5 changes: 2 additions & 3 deletions python/biomedicus/examples/tutorial/sql_pipeline.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example SQL pipeline.
Note: This file is in the documentation. Any updates here should be reflected in guides/reading-from-db.md"""
Expand All @@ -32,14 +33,12 @@
con = sqlite3.connect(args.input_file)
cur = con.cursor()


def source():
for name, text in cur.execute("SELECT NAME, TEXT FROM DOCUMENTS"):
with Event(event_id=name, client=events) as e:
doc = e.create_document('plaintext', text)
yield doc


count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
times = pipeline.run_multithread(source(), total=count)
times.print()
Expand Down
5 changes: 2 additions & 3 deletions python/biomedicus/examples/tutorial/sql_pipeline_rtf_only.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,6 +11,7 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Example SQL pipeline for rtf conversion only.
Note: This file is in the documentation. Any updates here should be reflected in guides/reading-from-db.md"""
Expand All @@ -34,7 +35,6 @@
con = sqlite3.connect(args.input_file)
cur = con.cursor()


def source():
# Note I recommended that RTF documents be stored as BLOBs since most
# databases do not support storing text in the standard Windows-1252
Expand All @@ -50,7 +50,6 @@ def source():
# or "e.binaries['rtf'] = text.encode('cp1252')" in TEXT column case
yield e


count, = next(cur.execute("SELECT COUNT(*) FROM DOCUMENTS"))
# Here we're adding the params since we're calling the pipeline with a source that
# provides Events rather than documents. This param will tell DocumentProcessors
Expand Down
63 changes: 63 additions & 0 deletions python/biomedicus/pipeline_service.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from argparse import ArgumentParser
from typing import List

from mtap.pipeline import pipeline_parser, run_pipeline_server

from biomedicus_client import default_pipeline
from biomedicus_client.cli_tools import Command


class ServePipeline(Command):
@property
def command(self) -> str:
return "serve-pipeline"

@property
def help(self) -> str:
return "Starts the end-to-end BioMedICUS pipeline service."

@property
def parents(self) -> List[ArgumentParser]:
return [pipeline_parser()]

def add_arguments(self, parser: ArgumentParser):
parser.add_argument(
'--config',
default=None,
help='Path to the pipeline configuration file.'
)
parser.add_argument(
'--include-label-text',
action='store_true',
help="Flag to include the covered text for every label"
)
parser.add_argument(
'--rtf',
action='store_true',
help="Flag to use a source for the rtf reader instead of plain text."
)
parser.add_argument(
'--rtf-address',
help="The address (or addresses, comma separated) for the rtf to text converter processor."
)

def command_fn(self, conf):
if conf.port == 0:
conf.port = 55000
conf.serializer = None
pipeline = default_pipeline.from_args(conf)
run_pipeline_server(pipeline, conf)
14 changes: 13 additions & 1 deletion python/tests/concepts/test_concepts_performance.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,16 @@
from subprocess import Popen, PIPE
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import pytest
from mtap import RemoteProcessor, Pipeline, LocalProcessor
Expand Down
7 changes: 5 additions & 2 deletions python/tests/conftest.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2019 Regents of the University of Minnesota.
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -11,8 +11,10 @@
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import subprocess
from contextlib import suppress
from pathlib import Path
from threading import Thread

Expand Down Expand Up @@ -105,7 +107,8 @@ def func(address, process, timeout=20):
finally:
try:
process.terminate()
listener.join(timeout=5.0)
with suppress(TimeoutError):
listener.join(timeout=5.0)
if listener.is_alive():
process.kill()
listener.join()
Expand Down
64 changes: 64 additions & 0 deletions python/tests/deployment/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import sys
import threading
import traceback
from contextlib import suppress
from subprocess import Popen, PIPE, STDOUT

import pytest


@pytest.fixture(name='deploy_all', scope='module')
def fixture_deploy_all(processor_timeout):
p = None
listener = None
try:
p = Popen([sys.executable, '-m', 'biomedicus', 'deploy', '--rtf',
'--noninteractive', '--log-level', 'DEBUG',
'--startup-timeout', str(processor_timeout)],
stdout=PIPE, stderr=STDOUT)
e = threading.Event()

def listen():
print("Starting listener", flush=True)
for line in p.stdout:
line = line.decode()
print(line, end='', flush=True)
if line.startswith("Done deploying all servers."):
e.set()
p.wait()
e.set()

listener = threading.Thread(target=listen)
listener.start()
e.wait(timeout=processor_timeout)
if p.returncode is not None:
raise ValueError("Failed to deploy.")
print("Done starting deployment for tests, yielding to test functions.", flush=True)
yield p
finally:
try:
if p is not None:
p.terminate()
if listener is not None:
with suppress(TimeoutError):
listener.join(timeout=60.0)
if listener.is_alive():
p.kill()
listener.join(timeout=10.0)
except Exception:
print("Error cleaning up deployment")
traceback.print_exc()
59 changes: 15 additions & 44 deletions python/tests/deployment/test_deployment.py
Original file line number Diff line number Diff line change
@@ -1,56 +1,27 @@
# Copyright (c) Regents of the University of Minnesota.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import threading
import traceback
from pathlib import Path
from subprocess import Popen, PIPE, STDOUT, run
from subprocess import PIPE, STDOUT, run
from tempfile import TemporaryDirectory

import pytest
from mtap.serialization import JsonSerializer


@pytest.fixture(name='deploy_all')
def fixture_deploy_all(processor_timeout):
p = None
listener = None
try:
p = Popen([sys.executable, '-m', 'biomedicus', 'deploy', '--rtf', '--noninteractive', '--log-level', 'DEBUG',
'--startup-timeout', str(processor_timeout)],
stdout=PIPE, stderr=STDOUT)
e = threading.Event()

def listen():
print("Starting listener", flush=True)
for line in p.stdout:
line = line.decode()
print(line, end='', flush=True)
if line.startswith("Done deploying all servers."):
e.set()
p.wait()
e.set()

listener = threading.Thread(target=listen)
listener.start()
e.wait(timeout=processor_timeout)
if p.returncode is not None:
raise ValueError("Failed to deploy.")
print("Done starting deployment for tests, yielding to test functions.", flush=True)
yield p
finally:
try:
if p is not None:
p.terminate()
if listener is not None:
listener.join(timeout=60.0)
if listener.is_alive():
p.kill()
listener.join()
except Exception:
print("Error cleaning up deployment")
traceback.print_exc()


@pytest.mark.integration
def test_deploy_run(deploy_all, processor_timeout):
print("testing deployment run", flush=True)
Expand Down
Loading

0 comments on commit 75c5d40

Please sign in to comment.