Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Add client interface. #16

Merged
merged 88 commits into from
Nov 30, 2024
Merged
Show file tree
Hide file tree
Changes from 81 commits
Commits
Show all changes
88 commits
Select commit Hold shift + click to select a range
6ffea6e
feat: Add client code structure and interface for Data, Future and ex…
sitaowang1998 Oct 31, 2024
94af15b
feat: Split client into two libraries and add interface
sitaowang1998 Oct 31, 2024
f69523a
fix: Add boost library for spider_client_lib
sitaowang1998 Nov 1, 2024
ccf6cc8
style: Improve code style for data based on pr comments
sitaowang1998 Nov 1, 2024
5e26f58
fix: Add absl as public library for core
sitaowang1998 Nov 1, 2024
020093c
style: Improve code style for client interface based on pr reviea pri…
sitaowang1998 Nov 1, 2024
ee222f0
fix: Try fix clang-tidy find nout found
sitaowang1998 Nov 1, 2024
1b0ccac
docs: Add quick start doc
sitaowang1998 Nov 1, 2024
b3a2e1e
style: Change markdown headings to sentence style and hard wrap markd…
sitaowang1998 Nov 3, 2024
ec8f500
docs: Update doc according to pr comments
sitaowang1998 Nov 3, 2024
5dc12cb
docs: Remove the worker note section and put the content in run task …
sitaowang1998 Nov 3, 2024
0d59c62
docs: Return a Job instead of Future for run and support user to pass…
sitaowang1998 Nov 5, 2024
4cd6233
Merge branch 'main' into interface
sitaowang1998 Nov 6, 2024
fec5e73
Change future to job
sitaowang1998 Nov 14, 2024
a5e799b
Change task to context
sitaowang1998 Nov 14, 2024
1b15b5d
Remove TaskGraph::run to simplify interface
sitaowang1998 Nov 16, 2024
98104f1
Add separate key-value store interface
sitaowang1998 Nov 16, 2024
cb369fd
Edit some docstrings.
kirkrodrigues Nov 19, 2024
b4a6f36
Fix include guard
sitaowang1998 Nov 19, 2024
f3de2ca
Merge branch 'main' into interface
sitaowang1998 Nov 19, 2024
70547ae
Add serialzable concept
sitaowang1998 Nov 20, 2024
525311c
Merge remote-tracking branch 'origin/interface' into interface
sitaowang1998 Nov 20, 2024
c776376
Fix clang-tidy
sitaowang1998 Nov 20, 2024
49c571e
Fix typo
sitaowang1998 Nov 20, 2024
43d7e16
Fix clang-tidy
sitaowang1998 Nov 20, 2024
5e8e1dd
Remove macOS build
sitaowang1998 Nov 20, 2024
fe23c3c
Change driver constructor
sitaowang1998 Nov 20, 2024
064edd8
Add exception to interface
sitaowang1998 Nov 20, 2024
e7c5240
Change run to start
sitaowang1998 Nov 20, 2024
b0b414e
Add get jobs to driver
sitaowang1998 Nov 20, 2024
97761e1
Add get jobs in context
sitaowang1998 Nov 20, 2024
91d36f2
Update doc with new interface
sitaowang1998 Nov 20, 2024
84c2f41
Fix clang-tidy
sitaowang1998 Nov 20, 2024
f7ab013
Refactor Context.hpp.
kirkrodrigues Nov 21, 2024
302e68a
style: Fix header guard name
sitaowang1998 Nov 21, 2024
a2dc8bc
style: Rename Context to TaskContext
sitaowang1998 Nov 21, 2024
046e740
style: Add missing class docstring
sitaowang1998 Nov 21, 2024
92d6489
feat: Add concepts for task argument
sitaowang1998 Nov 21, 2024
d27f042
Refactor Context.hpp.
kirkrodrigues Nov 21, 2024
2b49746
feat: Change the arguments from Serializable to TaskArgument
sitaowang1998 Nov 21, 2024
c4ee015
style: Update docstring for Driver
sitaowang1998 Nov 21, 2024
0cc231b
style: Update docstring for Data and Job
sitaowang1998 Nov 21, 2024
069a7a7
style: Update clang-format for library headers
sitaowang1998 Nov 21, 2024
9069030
style: Clean up unused headers and Change TaskGraph template
sitaowang1998 Nov 21, 2024
0fe063f
doc: Update quick start guide
sitaowang1998 Nov 21, 2024
e089107
style: Fix clang-tidy
sitaowang1998 Nov 21, 2024
159aa08
Rename TaskArgument to TaskIo
sitaowang1998 Nov 21, 2024
8e035b7
feat: Add Runnable concept and TaskFunction type
sitaowang1998 Nov 21, 2024
9d90c37
refactor: Rename insert_kv and get_kv to kv_store_insert and kv_store…
sitaowang1998 Nov 21, 2024
8de26ec
fix: Fix the template instantiation of TaskFunction
sitaowang1998 Nov 22, 2024
b9bfcdd
style: Fix clang-tidy
sitaowang1998 Nov 22, 2024
351c8b5
docs: Move cluster setup after run task and change all mentions of da…
sitaowang1998 Nov 22, 2024
7dae8d4
docs: Add task graph to group task example
sitaowang1998 Nov 22, 2024
5d15b22
Refactor Data.hpp
kirkrodrigues Nov 25, 2024
1588b51
Refactor Driver.hpp
kirkrodrigues Nov 25, 2024
1e1e41d
Refactor Exception.hpp
kirkrodrigues Nov 25, 2024
c0a6e6f
Refactor Job.hpp.
kirkrodrigues Nov 25, 2024
99c5935
Refactor TaskContext.hpp.
kirkrodrigues Nov 25, 2024
80f314a
Refactor TaskGraph.hpp.
kirkrodrigues Nov 25, 2024
7796e15
Refactor Concepts.hpp.
kirkrodrigues Nov 25, 2024
036dd51
Add absl to libraray list and sort library list
sitaowang1998 Nov 26, 2024
8affa10
Rename template types to satisfy clang-tidy
sitaowang1998 Nov 26, 2024
77d2458
Change set_cleanup to set_cleanup_func
sitaowang1998 Nov 26, 2024
026b6f1
Change set_cleanup to set_cleanup_func
sitaowang1998 Nov 26, 2024
448f693
Change job state enum name and error docstring
sitaowang1998 Nov 26, 2024
769a708
Restruct all the concepts
sitaowang1998 Nov 26, 2024
3555d2e
Add todo for task registration with timeout
sitaowang1998 Nov 26, 2024
a0c5b3a
Fix circular dependency
sitaowang1998 Nov 26, 2024
e185bf3
Restruct quick start guide
sitaowang1998 Nov 26, 2024
f0d79e9
Fix clang-tidy
sitaowang1998 Nov 26, 2024
f444772
Remove all cpp files in client
sitaowang1998 Nov 26, 2024
530da78
Move driver id section after task restart
sitaowang1998 Nov 26, 2024
db21fe7
Add Job::cancel
sitaowang1998 Nov 28, 2024
165eb84
Fix typo
sitaowang1998 Nov 29, 2024
06de774
Fix clean up function signature
sitaowang1998 Nov 29, 2024
c7a07b1
Fix set_locality argument in docstring example
sitaowang1998 Nov 29, 2024
f8c623a
Add void return type for kv_store_insert
sitaowang1998 Nov 29, 2024
488eaa3
Add noreturn and void return type for TaskContext::abort
sitaowang1998 Nov 29, 2024
f0729d9
Fix some header guards.
kirkrodrigues Nov 29, 2024
4d2aa6c
Edit some docstrings and comments.
kirkrodrigues Nov 29, 2024
88ed638
Fix typo in Data docstring example.
kirkrodrigues Nov 29, 2024
bd55552
Add exception in docstring
sitaowang1998 Nov 29, 2024
9897995
Remove pImpl in interface
sitaowang1998 Nov 29, 2024
73eabef
Fix clang-tidy
sitaowang1998 Nov 29, 2024
85d2475
Fix exception what
sitaowang1998 Nov 29, 2024
61be939
Fix docstring job state name
sitaowang1998 Nov 29, 2024
5bffeee
Refactor exceptions.
kirkrodrigues Nov 29, 2024
22f370d
Remove quick start guide
sitaowang1998 Nov 29, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
308 changes: 308 additions & 0 deletions docs/quick_start.md
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The writing is better, but I still think it would be confusing to a reader unless they read it multiple times. I think there are two things you can do:

  • Read React's quick start to get an idea of what a quick start guide should look like. Notice that they start small and build up. Obviously, Spider is a more complicated system, but I still think we can explain things in a way that's easy to follow without loss of detail.
  • Restructure the guide as follows:
    • Intro Spider---what it is, what it does, and briefly why it exists. This should only be a few sentences.
    • Explain how to write a task.
    • Explain how to run a task---this will require explaining how to write a client.
    • Explain how to start a cluster and run the client.
    • Explain how to compose tasks together, i.e. joining them together and nesting them.
    • Explain how to manage data as well as the basics of fault tolerance.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You still need to remove this, right?

Original file line number Diff line number Diff line change
@@ -0,0 +1,308 @@
# Spider quick start guide

## Intro of Spider

Spider is a distributes task executor. It runs tasks submitted by users on a distributed cluster and
returns the results to users. We develop Spider because existing distributed task executor
like [Ray](https://docs.ray.io/en/latest/index.html)
and [Celery](https://docs.celeryq.dev/en/stable/getting-started/introduction.html) lacks important features like
efficient failure recovery and straggler mitigation.

## Create a task

In Spider, a task is a non-member function that takes a `spider::TaskContext` as first argument. It
can then take any number of arguments which is a `TaskIo`, i.e. `Serializable` or `Data`, which will
be discussed later.

Tasks can return any `TaskIo` value. If a task needs to return more than one result, uses
`std::tuple` and makes sure all elements of the tuple are `TaskIo`.

Spider requires user to register the task function by calling `SPIDER_REGISTER_TASK` statically,
which sets up the function internally in Spider library for later user. Spider requires the function
name to be unique in the cluster.

```c++
// Task that sums to integers
auto sum(spider::TaskContext &context, int x, int y) -> int {
return x + y;
}

// Task that sorts two integers in non-ascending order
auto sort(spider::TaskContext &context, int x, int y) -> std::tuple<int, int> {
if (x >= y) {
return { x, y };
}
return { y, x };
}

SPIDER_REGISTER_TASK(sum);
SPIDER_REGISTER_TASK(sort);

```

## Run a task

Spider enables user to start a running task on the cluster. User first creates a driver that connects to Spider's
internal storage. Spider automatically cleans up the resource in driver's destructor. User then calls `Driver::start`
and provides the arguments of the task. `Driver::start`returns a `spider::Job` object, which represents the running
task. `spider::Job` takes the output type of the task as template argument. You can call `Job::state` to check the state
of the running task, and `Job::wait_complete` to block until job ends and `Job::get_result`. User can send a cancel
signal to Spider by calling `Job::cancel`. Client can get all running jobs submitted by itself by calling
`Driver::get_jobs`.

```c++
auto main(int argc, char **argv) -> int {
spider::Driver driver{"storage_url"};

spider::Job<int> sum_job = driver.run(sum, 2);
assert(4 == sum_job.get_result());

spider::Job<std::tuple<int, int>> sort_job = driver.start(4, 3);
sort_job.wait_complete();
assert(std::tuple{3, 4} == sort_job.get_result());
}
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add error handling examples

The basic example should demonstrate how to handle common errors and exceptions that may occur during task execution.

Add error handling example:

auto main(int argc, char **argv) -> int {
    try {
        spider::Driver driver{"storage_url"};

        // Example of handling task execution errors
        spider::Job<int> sum_job = driver.run(sum, 2);
        try {
            int result = sum_job.get_result();
            assert(4 == result);
        } catch (const spider::TaskExecutionError& e) {
            std::cerr << "Task failed: " << e.what() << std::endl;
            return 1;
        }

        // Example of handling connection errors
        spider::Job<std::tuple<int, int>> sort_job = driver.start(4, 3);
        try {
            sort_job.wait_complete();
            assert(std::tuple{3, 4} == sort_job.get_result());
        } catch (const spider::ConnectionError& e) {
            std::cerr << "Connection lost: " << e.what() << std::endl;
            return 1;
        }
        return 0;
    } catch (const spider::DriverInitError& e) {
        std::cerr << "Failed to initialize driver: " << e.what() << std::endl;
        return 1;
    }
}


## Set up Spider cluster

Now we have a basic client that creates and runs a task. However, we don't have a Spider cluster that will execute the
tasks. To set up a Spider cluster:

1. Start a storage supported by Spider, e.g. MySql.
2. Start a scheduler and connect it to the storage by running
`spider start --scheduler --storage <storage_url> --port <scheduler_port>`.
3. Start some workers and connect them to the storage. Workers also need to know the task functions
that will run. Thus, user need to compile all the tasks into shared libraries, including the call
to `SPIDER_REGISTER_TASK`, and provide them to the worker by running
`spider start --worker --storage <storage_url> --libs [client_libraries]`.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Add security considerations for cluster setup

The cluster setup section should include security best practices.

Add security documentation:

1. Start a storage supported by Spider, e.g. MySql.
   - Ensure the storage uses TLS encryption
   - Set up proper authentication and access controls
   - Consider network isolation for the storage service

2. Start a scheduler and connect it to the storage by running
   `spider start --scheduler --storage <storage_url> --port <scheduler_port>`.
   - Use TLS certificates for secure communication
   - Configure firewall rules to restrict access to the scheduler port
   - Set up authentication for scheduler access

3. Start workers and connect them to the storage:
   - Use signed worker libraries to prevent tampering
   - Configure resource limits for workers
   `spider start --worker --storage <storage_url> --libs [client_libraries] --tls-cert <cert_path> --resource-limits <limits_file>`

Now we can run the compiled client program and see the tasks run on Spider cluster.

## Group tasks together

In real world, running a single task is too simple to be useful. Spider lets you bind outputs of
tasks as inputs of another task using `spider::bind`. The first argument of `spider::bind` is the
child task. The later arguments are either a `spider::Task` or a `spider::TaskGraph`, whose entire
outputs are used as part of the inputs to the child task, or a `TaskIo` that is directly used as
input. Spider requires that the types of `Task` and `TaskGraph` outputs and type of `TaskIo` matches
the input types of child task.

Binding the tasks together forms a dependencies among tasks, which is represented by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. How do you plan to communicate (serialize) the task calls (name + args) from the client to the server?
  2. Let's say a task takes two inputs. Does this interface support using a constant for one input and a task for the other?
    1. If so, do you support a task graph that looks like this?
      flowchart TD
          leaf["foo(int, int) -> int"]
          parent["bar(int, int) -> int"]
          3 --> leaf
          4 --> parent
          5 --> parent
          parent --> leaf
      
      Loading
    2. If so, that means any arguments the user passes into run get passed to the inputs in the task graph with a kind of DFS ordering. Is that true?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Here comes the use of spider::register_task, which records the mapping between function name and the function pointer. When the user calls run with the function pointer, the library actually sends the function name to the db. Worker also gets the name of the function as part of the task metadata. Since the worker links to the library with register_task call, it also has the mapping and can know which function to call.
    As for arguments, their types are stored in db (serialized as string right now), and values are serialized into string.
  2. Yes.
    i. Yes.
    ii. No. spider::bind needs to bind all inputs of the child task. Thus, the value 3 must be the an argument in bind. 4 and 5 can be passed in run. However, it is possible that there are multiple first layer tasks, i.e. tasks with no parents. The input of task graph is the product of all first layer tasks' output.
    It is possible to support passing 3 as an argument in run for the above example. We can have a special placeholder type for bind. In such case, DFS is an intuitive order.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Can you explain how:
    1. register_task maps a function pointer into a function name?
    2. run turns a function pointer into a function name?
  2. Can you explain this point with an example?

    However, it is possible that there are multiple first layer tasks, i.e. tasks with no parents. The input of task graph is the product of all first layer tasks' output.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. For client and worker, when the library is loaded, it will create a mapping between function name and function pointer.
    i. I am thinking of turning register_task into a macro to get the function name at compile time. It then stores the mapping between the function name to the function pointer.
    ii. run gets the function name from the mapping stored before.
  2. In the example below, both bar and baz has no parent, and task graph input is their inputs put together, i.e. 1, 2, 3 and 4.
graph TD
    1[1]
    2[2]
    3[3]
    4[4]
    foo["foo(int, int) -> int"]
    bar["bar(int, int) -> int"]
    baz["baz(int, int) -> int"]
    1 --> bar
    2 --> bar
    3 --> baz
    4 --> baz
    bar --> foo
    baz --> foo
Loading

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Makes sense.
  2. Gotcha. So to clarify my understanding:
    • in run, users can only pass arguments to root tasks (tasks with no parents).
    • in run, when the user passes a list of arguments [1, 2, 3, 4], conceptually:
      • the framework will iterate over the root tasks in order;
      • for each task argument, the framework will pop one argument from the list.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes and yes.

`spider::TaskGraph`. `TaskGraph` can be further bound into more complicated `TaskGraph` by serving
as inputs for another task. You can run the task using `Driver::run` in the same way as running a
single task.

```c++
auto square(spider::TaskContext& context, int x) -> int {
return x * x;
}

auto square_root(spider::TaskContext& context, int x) -> int {
return sqrt(x);
}
// task registration skipped
auto main(int argc, char **argv) -> auto {
// driver initialization skipped
spider::TaskGraph<int(int, int)> sum_of_square = spider::bind(sum, square, square);
spider::TaskGraph<int(int, int)> rss = spider::bind(square_root, sum_of_square);
spider::Job<int> job = driver::start(rss, 3, 4);
job.wait_complete();
assert(5 == job.get_result());
}
```

The above code generates a task graph that accepts two `int`s and returns an `int`.

```mermaid
flowchart LR
sq1["square(int) -> int"]
sq2["square(int) -> int"]
sum["sum(int, int) -> int"]
sqrt["square_root(int) -> int"]
i3("int")
i4("int")
o5("int")
i3 --3--> sq1
i4 --4--> sq2
sq1 --9--> sum
sq2 --16--> sum
sum --25--> sqrt
sqrt --5--> o5
```

## Run task inside task

Static task graph is enough to solve a lot of real work problems, but dynamically running task
graphs on-the-fly could become handy. Running a task graph inside task is the same as running it
from a client. The only difference is to use the `TaskContext` where you need a `Driver`.

```c++
auto gcd(spider:TaskContext& context, int x, int y) -> int {
if (x < y) {
std::swap(x, y);
}
while (x != y) {
spider::Job<std:tuple<int, int>> job = context.start(gcd_impl, x, y);
job.wait_complete();
x = job.get_result().get<0>();
y = job.get_result().get<1>();
}
return x;
}

auto gcd_impl(spider::TaskContext& context, int x, int y) -> std::tuple<int, int> {
return { x, x % y};
}
```

## Data on external storage

Often simple `Serializable` value are not enough. However, passing large amount of data around is
expensive. Usually these data is stored on disk or a distributed storage system. For example, an ETL
workload usually reads in data from an external storage, writes temporary data on an external
storage, and writes final data into an external storage.

Spider lets user pass the metadata of these data around in `spider::Data` objects. `Data` stores the
value of the metadata information of external data, and provides crucial information to Spider for
correct and efficient scheduling and failure recovery. `Data` stores a list of nodes which has
locality of the external data, and user can specify if locality is a hard requirement, i.e. task can
only run on the nodes in locality list. `Data` can include a `cleanup`function, which will run when
the `Data` object is no longer reference by any task and client. `Data` has a persist flag to
represent that external data is persisted and do not need to be cleaned up.

```c++
struct HdfsFile {
std::string url;
};

/**
* In this example, we run a filter and map on the input stored in Hdfs.
* Filter writes its output into a temporary Hdfs file, which will be cleaned
* up by Spider when the task graph finishes.
* Map reads the temporary files and persists the output in Hdfs file.
*/
auto main(int argc, char** argv) -> int {
// driver initialization skipped
// Creates a HdfsFile Data to represent the input data stored in Hdfs.
spider::Data<HdfsFile> input = spider::Data<HdfsFile>::Builder()
.mark_persist(true)
.build(HdfsFile { "/path/to/input" });
spider::Job<spider::Data<HdfsFile>> job = driver::start(
driver::bind(map, filter),
input);
job.wait_complete();
std::string const output_path = job.get_result().get().url;
std::cout << "Result is stored in " << output_path << std::endl;
}

/**
* Runs filer on the input data from Hdfs file and write the output into a
* temporary Hdfs file for later tasks.
*
* @param input input file stored in Hdfs
* @return temporary file store in Hdfs
*/
auto filter(spider::TaskContext& context, spider::Data<Hdfsfile> input) -> spider::Data<HdfsFile> {
// We can use task id as a unique random number.
std::string const output_path = std::format("/path/%s", context.task_id());
std::string const input_path = input.get().url;
// Creates HdfsFile Data before creating the actual file in Hdfs so Spider
// can clean up the Hdfs file on failure.
spider::Data<HdfsFile> output = spider::Data<HdfsFile>::Builder()
.set_cleanup_func([](HdfsFile const& file) { delete_hdfs_file(file); })
.build(HdfsFile { output_path });
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve error handling in cleanup function

The cleanup function should handle errors gracefully.

 spider::Data<HdfsFile> output = spider::Data<HdfsFile>::Builder()
-    .set_cleanup_func([](HdfsFile const& file) { delete_hdfs_file(file); })
+    .set_cleanup_func([](HdfsFile const& file) {
+        try {
+            delete_hdfs_file(file);
+        } catch (const std::exception& e) {
+            // Log error but don't throw as this is called during cleanup
+            std::cerr << "Failed to cleanup HDFS file: " << e.what() << std::endl;
+        }
+    })
     .build(HdfsFile { output_path });
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
spider::Data<HdfsFile> output = spider::Data<HdfsFile>::Builder()
.set_cleanup_func([](HdfsFile const& file) { delete_hdfs_file(file); })
.build(HdfsFile { output_path });
spider::Data<HdfsFile> output = spider::Data<HdfsFile>::Builder()
.set_cleanup_func([](HdfsFile const& file) {
try {
delete_hdfs_file(file);
} catch (const std::exception& e) {
// Log error but don't throw as this is called during cleanup
std::cerr << "Failed to cleanup HDFS file: " << e.what() << std::endl;
}
})
.build(HdfsFile { output_path });

auto file = hdfs_create(output_path);
// Hdfs allows reading data from any node, but reading from the nodes where
// file is stored and replicated is faster.
std::vector<std::string> nodes = hdfs_get_nodes(file);
output.set_locality(nodes, false); // not hard locality

// Runs the filter
run_filter(input_path, file);

return output;
}

/**
* Runs map on the input data from Hdfs file and persists the output into an
* Hdfs file.
*
* @param input input file stored in Hdfs
* @return persisted output in Hdfs
*/
auto map(spider::TaskContext& context, spider::Data<HdfsFile> input) -> spider::Data<HdfsFile> {
// We use hardcoded path for simplicity in this example. You can pass in
// the path as an input to the task or use task id as random name as in
// filter.
std::string const output_path = "/path/to/output";
std::string const input_path = input.get().url;

spider::Data<HdfsFile> output = spider::Data<HdfsFile>::Builder()
.set_cleanup_func([](HdfaFile const& file) { delete_hdfs_file(file); })
.build(HdfsFile { output_path });

run_map(input_path, output_path);

// Now that map finishes, the file is persisted on Hdfs as output of job.
// We need to inform Spider that the file is not persisted and should not
// be cleaned up.
output.mark_persist();
return output;
}

```

## Using key-value store when tasks restart

Spider provides exactly-once semantics in failure recovery. To achieve this, Spider restarts some
tasks after a task fails. Tasks might want to keep some data around after restart. However, all the
`Data` objects created by tasks are cleaned up on restart. Spider provides a key-value store for
the restarted tasks and restarted clients to retrieve values stored by previous run by
`kv_store_insert` and `kv_store_get` from `TaskContext` or `Driver`. Note that a task or client can
only get the value created by itself, and the two different tasks can store two different values
using the same key.

```c++
auto long_running(spider::TaskContext& context) {
std::optional<std::string> state_option = context.kv_store_get("state");
if (!state_option.has_value()) {
long_compute_0();
context.kv_store_insert("state", "0");
}
std::string state = context.kv_store_get("state").value();
switch (std::stoi(state)) {
case 0:
long_compute_1();
context.kv_store_insert("state", "1") // Keep running after update key-value store
case 1:
long_compute_2();
}
}
```
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Address potential race conditions in key-value store usage

The key-value store example needs to handle concurrent access and state transitions atomically.

 auto long_running(spider::TaskContext& context) {
+    // Use a mutex-based lock in the key-value store
+    auto lock = context.kv_store_lock("state");
     std::optional<std::string> state_option = context.kv_store_get("state");
     if (!state_option.has_value()) {
         long_compute_0();
-        context.kv_store_insert("state", "0");
+        if (!context.kv_store_compare_and_swap("state", std::nullopt, "0")) {
+            // Another instance has already initialized the state
+            return;
+        }
     }
     std::string state = context.kv_store_get("state").value();
     switch (std::stoi(state)) {
         case 0:
             long_compute_1();
-            context.kv_store_insert("state", "1") // Keep running after update key-value store
+            if (!context.kv_store_compare_and_swap("state", "0", "1")) {
+                // State was changed by another instance
+                return;
+            }
         case 1:
             long_compute_2();
     }
 }

Committable suggestion skipped: line range outside the PR's diff.


## Recovery on Driver restarts

It is possible that a user client fails and is manually restarted. Spider keeps the jobs running when its driver fails
so that when user client recovers it can retrieve all running jobs. In spider, every driver has a driver id. When
creating a driver with a storage url, a driver id is implicitly created. When creating the driver, user can pass in a
driver id to get access to the jobs and data created by a previous driver with same id. Two drivers with same id cannot
exist at the same time.

```c++
#include <spider/spider.hpp>

auto main(int argc, char **argv) -> int {
boost::uuids::string_generator gen;
spider::Driver driver{"storage_url", gen(L"01234567-89ab-cdef-0123-456789abcdef")};
}
```

## Straggler mitigation

`SPIDER_REGISTER_TASK_TIMEOUT` is same as `SPIDER_REGISTER_TASK`, but accepts a second argument as
timeout in milliseconds. If a task instance executes for longer than the specified timeout, Spider
spawns another task instance running the same function. The task instance that finishes first wins.
Other running task instances are cancelled, and associated data is cleaned up.

The new task instance has a different id, and it is the responsibility of the user to avoid any data
race and deduplicate the output if necessary.
2 changes: 1 addition & 1 deletion src/spider/.clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ IncludeCategories:
# Ex:
# - Regex: "<(fmt|spdlog)"
# Priority: 3
- Regex: "^<(clp)"
- Regex: "^<(absl|boost|catch2|fmt|mariadb|msgpack|spdlog)"
Priority: 3
# C system headers
- Regex: "^<.+\\.h>"
Expand Down
47 changes: 46 additions & 1 deletion src/spider/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,54 @@ target_link_libraries(
)
target_link_libraries(spider_core PRIVATE fmt::fmt)

set(SPIDER_CLIENT_SHARED_SOURCES CACHE INTERNAL "spider client shared source files")

set(SPIDER_CLIENT_SHARED_HEADERS
client/Data.hpp
client/Driver.hpp
client/task.hpp
client/TaskContext.hpp
client/TaskGraph.hpp
client/type_utils.hpp
client/Exception.hpp
CACHE INTERNAL
"spider client shared header files"
)
Comment on lines +33 to +43
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Standardize header file naming convention

The header file naming is inconsistent:

  • task.hpp uses lowercase
  • Other files use PascalCase (e.g., TaskGraph.hpp)

Please maintain consistent PascalCase for class header files.

-    client/task.hpp
+    client/Task.hpp
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
set(SPIDER_CLIENT_SHARED_HEADERS
client/Data.hpp
client/Driver.hpp
client/task.hpp
client/TaskContext.hpp
client/TaskGraph.hpp
client/type_utils.hpp
client/Exception.hpp
CACHE INTERNAL
"spider client shared header files"
)
set(SPIDER_CLIENT_SHARED_HEADERS
client/Data.hpp
client/Driver.hpp
client/Task.hpp
client/TaskContext.hpp
client/TaskGraph.hpp
client/type_utils.hpp
client/Exception.hpp
CACHE INTERNAL
"spider client shared header files"
)


add_library(spider_client_lib)
target_sources(spider_client_lib PRIVATE ${SPIDER_CLIENT_SHARED_SOURCES})
target_sources(spider_client_lib PUBLIC ${SPIDER_CLIENT_SHARED_HEADERS})
target_link_libraries(
spider_client_lib
PUBLIC
Boost::boost
absl::flat_hash_map
)

set(SPIDER_CLIENT_SOURCES CACHE INTERNAL "spider client source files")

set(SPIDER_CLIENT_HEADERS
client/spider.hpp
client/Job.hpp
CACHE INTERNAL
"spider client header files"
)

add_library(spider_client)
target_sources(spider_client PRIVATE ${SPIDER_CLIENT_SOURCES})
target_sources(spider_client PUBLIC ${SPIDER_CLIENT_HEADERS})
target_link_libraries(spider_client PRIVATE spider_core)
target_link_libraries(spider_client PUBLIC spider_client_lib)
add_library(spider::spider ALIAS spider_client)

set(SPIDER_WORKER_SOURCES worker/worker.cpp CACHE INTERNAL "spider worker source files")

add_executable(spider_worker)
target_sources(spider_worker PRIVATE ${SPIDER_WORKER_SOURCES})
target_link_libraries(spider_worker PRIVATE spider_core)
target_link_libraries(
spider_worker
PRIVATE
spider_core
spider_client_lib
)
add_executable(spider::worker ALIAS spider_worker)
Loading