Skip to content

Commit

Permalink
feat: Add pytest integration test for scheduler and worker (#39)
Browse files Browse the repository at this point in the history
  • Loading branch information
sitaowang1998 authored Dec 18, 2024
1 parent 9169591 commit 0a896a8
Show file tree
Hide file tree
Showing 10 changed files with 429 additions and 6 deletions.
14 changes: 13 additions & 1 deletion docs/testing.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ require this storage backend.
4. Set the `cStorageUrl` in `tests/storage/StorageTestHelper.hpp` to
`jdbc:mariadb://localhost:3306/<db_name>?user=<usr>&password=<pwd>`.

## Running tests
5. Set the `storage_url` in `tests/integration/client.py` to
`jdbc:mariadb://localhost:3306/<db_name>?user=<usr>&password=<pwd>`.

## Running unit tests

You can use the following tasks to run the set of unit tests that's appropriate.

Expand All @@ -45,4 +48,13 @@ REQUIRE( storage->connect(spider::test::cStorageUrl).success() )
The [unit_tests.yaml][gh-workflow-unit-tests] GitHub workflow runs the unit tests on push,
pull requests, and daily. Currently, it only runs unit tests that don't require a storage backend.
## Running integration tests
You can use the following tasks to run integration tests.
| Task | Description |
|-------------------------------|-------------------------------------------------------------------|
| `test:integration` | Runs all integration tests. |
[gh-workflow-unit-tests]: ../.github/workflows/unit-tests.yaml
4 changes: 3 additions & 1 deletion lint-requirements.txt
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
black>=24.4.2
# Lock to v18.x until we can upgrade our code to meet v19's formatting standards.
clang-format~=18.1
clang-tidy>=19.1.0
ruff>=0.4.4
gersemi>=0.16.2
yamllint>=1.35.1
yamllint>=1.35.1
29 changes: 29 additions & 0 deletions lint-tasks.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ tasks:
cmds:
- task: "cmake-check"
- task: "cpp-check"
- task: "py-check"
- task: "yml-check"

fix:
Expand Down Expand Up @@ -102,6 +103,34 @@ tasks:
SRC_DIR: "{{.G_SRC_SPIDER_DIR}}"
TEST_DIR: "{{.G_TEST_DIR}}"

py-check:
cmds:
- task: "py"
vars:
BLACK_FLAGS: "--check"
RUFF_FLAGS: ""

py-fix:
cmds:
- task: "py"
vars:
BLACK_FLAGS: ""
RUFF_FLAGS: "--fix"

py:
internal: true
requires:
vars: ["BLACK_FLAGS", "RUFF_FLAGS"]
deps: ["venv"]
cmds:
- for:
- "tests/integration"
cmd: |-
. "{{.G_LINT_VENV_DIR}}/bin/activate"
cd "{{.ITEM}}"
black --color --line-length 100 {{.BLACK_FLAGS}} .
ruff check {{.RUFF_FLAGS}} .
yml:
aliases:
- "yml-check"
Expand Down
6 changes: 6 additions & 0 deletions ruff.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
line-length = 100
lint.select = ["I"]

[lint.isort]
case-sensitive = false
order-by-type = false
3 changes: 3 additions & 0 deletions test-requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
msgpack>=1.1.0
mysql-connector-python>=8.0.26
pytest>=8.3.4
55 changes: 51 additions & 4 deletions test-tasks.yaml
Original file line number Diff line number Diff line change
@@ -1,30 +1,77 @@
version: "3"

vars:
G_TEST_BINARY: "{{.G_BUILD_SPIDER_DIR}}/tests/unitTest"
G_UNIT_TEST_BINARY: "{{.G_BUILD_SPIDER_DIR}}/tests/unitTest"
G_TEST_VENV_DIR: "{{.G_BUILD_DIR}}/test-venv"
G_TEST_VENV_CHECKSUM_FILE: "{{.G_BUILD_DIR}}/test#venv.md5"

tasks:
non-storage-unit-tests:
deps:
- "build-unit-test"
cmds:
- "{{.G_TEST_BINARY}} \"~[storage]\""
- "{{.G_UNIT_TEST_BINARY}} \"~[storage]\""

storage-unit-tests:
deps:
- "build-unit-test"
cmds:
- "{{.G_TEST_BINARY}} \"[storage]\""
- "{{.G_UNIT_TEST_BINARY}} \"[storage]\""

all:
deps:
- "build-unit-test"
cmds:
- "{{.G_TEST_BINARY}}"
- "{{.G_UNIT_TEST_BINARY}}"

build-unit-test:
internal: true
deps:
- task: ":build:target"
vars:
TARGETS: ["spider_task_executor", "unitTest", "worker_test"]

integration:
dir: "{{.G_BUILD_SPIDER_DIR}}"
deps:
- "venv"
- task: ":build:target"
vars:
TARGETS: [
"spider_task_executor",
"worker_test",
"spider_worker",
"spider_scheduler",
"integrationTest"]
cmd: |-
. ../test-venv/bin/activate
../test-venv/bin/pytest tests/integration
venv:
internal: true
vars:
CHECKSUM_FILE: "{{.G_TEST_VENV_CHECKSUM_FILE}}"
OUTPUT_DIR: "{{.G_TEST_VENV_DIR}}"
sources:
- "{{.ROOT_DIR}}/taskfile.yaml"
- "{{.TASKFILE}}"
- "test-requirements.txt"
generates: ["{{.CHECKSUM_FILE}}"]
run: "once"
deps:
- ":init"
- task: ":utils:validate-checksum"
vars:
CHECKSUM_FILE: "{{.CHECKSUM_FILE}}"
DATA_DIR: "{{.OUTPUT_DIR}}"
cmds:
- task: ":utils:create-venv"
vars:
LABEL: "test"
OUTPUT_DIR: "{{.OUTPUT_DIR}}"
REQUIREMENTS_FILE: "test-requirements.txt"
# This command must be last
- task: ":utils:compute-checksum"
vars:
DATA_DIR: "{{.OUTPUT_DIR}}"
OUTPUT_FILE: "{{.CHECKSUM_FILE}}"
11 changes: 11 additions & 0 deletions tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,18 @@ target_link_libraries(
Boost::system
spdlog::spdlog
)
add_dependencies(unitTest worker_test)

add_library(worker_test SHARED)
target_sources(worker_test PRIVATE worker/worker-test.cpp)
target_link_libraries(worker_test PRIVATE spider_core)

add_custom_target(integrationTest ALL)
add_custom_command(
TARGET integrationTest
PRE_BUILD
COMMAND
${CMAKE_COMMAND} -E copy_directory ${CMAKE_CURRENT_SOURCE_DIR}/integration
${CMAKE_CURRENT_BINARY_DIR}/integration
)
add_dependencies(integrationTest worker_test)
Empty file added tests/integration/__init__.py
Empty file.
157 changes: 157 additions & 0 deletions tests/integration/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
import re
import uuid
from dataclasses import dataclass
from typing import Dict, List, Optional, Tuple

import mysql.connector
import pytest


@dataclass
class TaskInput:
type: str
task_output: Optional[Tuple[uuid.UUID, int]] = None
value: Optional[str] = None
data_id: Optional[uuid.UUID] = None


@dataclass
class TaskOutput:
type: str
value: Optional[str] = None
data_id: Optional[uuid.UUID] = None


@dataclass
class Task:
id: uuid.UUID
function_name: str
inputs: List[TaskInput]
outputs: List[TaskOutput]
timeout: float = 0.0


@dataclass
class TaskGraph:
id: uuid.UUID
tasks: Dict[uuid.UUID, Task]
dependencies: List[Tuple[uuid.UUID, uuid.UUID]]


def create_connection(storage_url: str):
pattern = re.compile(
r"jdbc:mariadb://(?P<host>[^:/]+):(?P<port>\d+)/(?P<database>[^?]+)\?user=(?P<user>[^&]+)&password=(?P<password>[^&]+)"
)
match = pattern.match(storage_url)
if not match:
raise ValueError("Invalid JDBC URL format")

connection_params = match.groupdict()
return mysql.connector.connect(
host=connection_params["host"],
port=int(connection_params["port"]),
database=connection_params["database"],
user=connection_params["user"],
password=connection_params["password"],
)


def is_head_task(task_id: uuid.UUID, dependencies: List[Tuple[uuid.UUID, uuid.UUID]]):
return not any(dependency[1] == task_id for dependency in dependencies)


storage_url = "jdbc:mariadb://localhost:3306/spider_test?user=root&password=password"


@pytest.fixture(scope="session")
def storage():
conn = create_connection(storage_url)
yield conn
conn.close()


def submit_job(conn, client_id: uuid.UUID, graph: TaskGraph):
cursor = conn.cursor()

cursor.execute(
"INSERT INTO jobs (id, client_id) VALUES (%s, %s)", (graph.id.bytes, client_id.bytes)
)

for task_id, task in graph.tasks.items():
if is_head_task(task_id, graph.dependencies):
state = "ready"
else:
state = "pending"
cursor.execute(
"INSERT INTO tasks (id, job_id, func_name, state, timeout) VALUES (%s, %s, %s, %s, %s)",
(task.id.bytes, graph.id.bytes, task.function_name, state, task.timeout),
)

for i, task_input in enumerate(task.inputs):
cursor.execute(
"INSERT INTO task_inputs (type, task_id, position, output_task_id, output_task_position, value, data_id) VALUES (%s, %s, %s, %s, %s, %s, %s)",
(
task_input.type,
task.id.bytes,
i,
task_input.task_output[0].bytes if task_input.task_output is not None else None,
task_input.task_output[1] if task_input.task_output is not None else None,
task_input.value,
task_input.data_id.bytes if task_input.data_id is not None else None,
),
)

for i, task_output in enumerate(task.outputs):
cursor.execute(
"INSERT INTO task_outputs (task_id, position, type) VALUES (%s, %s, %s)",
(task.id.bytes, i, task_output.type),
)

for dependency in graph.dependencies:
cursor.execute(
"INSERT INTO task_dependencies (parent, child) VALUES (%s, %s)",
(dependency[0].bytes, dependency[1].bytes),
)

conn.commit()
cursor.close()


def get_task_outputs(conn, task_id: uuid.UUID) -> List[TaskOutput]:
cursor = conn.cursor()

cursor.execute(
"SELECT type, value, data_id FROM task_outputs WHERE task_id = %s ORDER BY position",
(task_id.bytes,),
)
outputs = []
for output_type, value, data_id in cursor.fetchall():
if value is not None:
outputs.append(TaskOutput(type=output_type, value=value))
elif data_id is not None:
outputs.append(TaskOutput(type=output_type, data_id=uuid.UUID(bytes=data_id)))
else:
outputs.append(TaskOutput(type=output_type))

conn.commit()
cursor.close()
return outputs


def get_task_state(conn, task_id: uuid.UUID) -> str:
cursor = conn.cursor()

cursor.execute("SELECT state FROM tasks WHERE id = %s", (task_id.bytes,))
state = cursor.fetchone()[0]

conn.commit()
cursor.close()
return state


def remove_job(conn, job_id: uuid.UUID):
cursor = conn.cursor()

cursor.execute("DELETE FROM jobs WHERE id = %s", (job_id.bytes,))
conn.commit()
cursor.close()
Loading

0 comments on commit 0a896a8

Please sign in to comment.