Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ballista Python Issue(s) #1142

Open
Tracked by #1068
milenkovicm opened this issue Nov 29, 2024 · 10 comments
Open
Tracked by #1068

Ballista Python Issue(s) #1142

milenkovicm opened this issue Nov 29, 2024 · 10 comments
Labels
enhancement New feature or request

Comments

@milenkovicm
Copy link
Contributor

First of all, I'm not expert in rust-python (pyo3) integration, if I've done/said something stupid,
my apologies.

Current implementation of (py)ballista has limitation when it comes to DataFrame operations.

following code will result with an error:

from pyballista import BallistaBuilder
from datafusion import SessionContext
from datafusion import functions as f

# %%
ctx: SessionContext = BallistaBuilder()\
    .config("ballista.job.name", "example ballista")\
    .config("ballista.shuffle.partitions", "16")\
    .standalone()
    
df = ctx.sql("SELECT 1 as r").aggregate(
    [f.col("r")], [f.count_star()]
)
df.show()

it will throw exception (similar to):

---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
File /Users/user/git/arrow-ballista/python/examples/example.py:3
      1 # %% 
      2 # Select 1 to verify its working
----> 3 df = ctx.sql("SELECT 1 as r").aggregate(
      4     [f.col("r")], [f.count_star()]
      5 )
      6 df.show()

TypeError: argument 'group_by': 'Expr' object cannot be converted to 'Expr'

Actually previous implementation had the same problem, the same error will be thrown (git checkout 2f223db21557c15080bf865ac692d276b8f0b770)

# %%
from pyballista import SessionContext
from datafusion import functions as f

ctx = SessionContext("localhost", 50050)

df = ctx.sql("SELECT 1 as r").aggregate(
    [f.col("r")], [f.count_star()]
)
df.show()

The similar issue is there if SessionConfig is used:

from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import RuntimeConfig, SessionConfig, SessionContext

runtime = RuntimeConfig().with_disk_manager_os().with_fair_spill_pool(10000000)
config = (
    SessionConfig()
    .with_create_default_catalog_and_schema(True)
    .with_default_catalog_and_schema("foo", "bar")
    .with_target_partitions(8)
    .with_information_schema(True)
    .with_repartition_joins(False)
    .with_repartition_aggregations(False)
    .with_repartition_windows(False)
    .with_parquet_pruning(False)
    .set("datafusion.execution.parquet.pushdown_filters", "true")
)

# %%
ctx: SessionContext = Ballista.builder\
    .with_runtime(runtime)\ # it will panic at this point, complaining that `RuntimeConfig` object cannot be converted to `RuntimeConfig`
    .with_config(config)\
    .standalone()

ctx.sql("SELECT 1").show()

problem with RuntimeConfig, SessionConfig could be solved if they are re-exported in ballista:

from ballista import Ballista, RuntimeConfig, SessionConfig
from datafusion import SessionContext

but the first problem with DataFrame would still remain.

My guess is that there is FFI issue as ballista and datafusion is different package, I'm not sure what the problem is nor how to resolve this issue.

@timsaucer comment #1091 (comment) make more sense to me now.

Possible Solution (I)

One obvious way would be to move ballista context creation to datafusion-python. We need one line context creation:

let ctx = datafusion::prelude::SessionContext::remote("df://localhost:50050").await?;

As ballista context is the SessionContext it would be trivial to integrate, and, I believe, it would avoid previous issues.

We could only provide "remote context" (no standalone), making it optional feature for which users python datafusion users could to opt in. This would somewhat limit number of libraries ballista would bring to datafusion-python (we could split core to core and client-core to further reduce deps)

This proposal would mean that we would have to bring optional dependency to datafusion-python, and additional complexity in (datafusion-python) release process.

(py)ballista would stay, it could expose scheduler and executor control as proposed in #1107

Big risk for of this proposal is that ballista could block datafusion python release in case it goes back to unmaintained mode.

Possible Solution (II)

Another possible solution is to re-export all classes from datafusion-python in ballista. I'm not sure how complex or practical this is going to be.
I'm not sure if datafusion python applications would need some kind of re-writing to be able to run on ballista.

This would put additional responsibility to ballista maintainers (not too many of them).

Any Other Solution?

I'm not sure, open to suggestions

Proposal

Short term proposal:

We should release (py)ballista once we figure out the best approach to fix it.

@milenkovicm milenkovicm added the enhancement New feature or request label Nov 29, 2024
@timsaucer
Copy link

I’ve been meaning to dive into this and also some work happening on datafusion-ray that may encounter similar problems. One thing the datafusion-python package is doing is adding wrappers around its internal classes. To solve the first bug you probably have ballista exposing a SessionContext internal class not the wrapper class. So the expressions getting created are not wrapper ones that the functions are looking for.

I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do ctx = SessionContext(ctx)

I did write up an issue to improve these confusing errors. apache/datafusion-python#853

@timsaucer
Copy link

But even if that unblocks you I worry it still doesn’t resolve to core issue of trying to share that session context from one python package to another.

@milenkovicm
Copy link
Contributor Author

Draft patch to illustrate "Possible Solution (I)", for datafusion-python (v42) which would solve (py)ballista issues:

diff --git a/Cargo.lock b/Cargo.lock
index 815323b..a00bdc5 100644
diff --git a/Cargo.toml b/Cargo.toml
index df72cd4..cf3cb1c 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -47,6 +47,7 @@ async-trait = "0.1"
 futures = "0.3"
 object_store = { version = "0.11.0", features = ["aws", "gcp", "azure", "http"] }
 url = "2"
+ballista = { path = "../arrow-ballista/ballista/client", default-features = false }
 
 [build-dependencies]
 prost-types = "0.13" # keep in line with `datafusion-substrait`
diff --git a/python/datafusion/context.py b/python/datafusion/context.py
index 957d7e3..ca6094a 100644
--- a/python/datafusion/context.py
+++ b/python/datafusion/context.py
@@ -423,7 +423,7 @@ class SessionContext:
     """
 
     def __init__(
-        self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None
+        self, config: SessionConfig | None = None, runtime: RuntimeConfig | None = None, url: str | None = None
     ) -> None:
         """Main interface for executing queries with DataFusion.
 
@@ -448,7 +448,7 @@ class SessionContext:
         config = config.config_internal if config is not None else None
         runtime = runtime.config_internal if runtime is not None else None
 
-        self.ctx = SessionContextInternal(config, runtime)
+        self.ctx = SessionContextInternal(config, runtime, url)
 
     def register_object_store(
         self, schema: str, store: Any, host: str | None = None
diff --git a/src/context.rs b/src/context.rs
index f445874..a40bc47 100644
--- a/src/context.rs
+++ b/src/context.rs
@@ -23,6 +23,7 @@ use std::sync::Arc;
 use arrow::array::RecordBatchReader;
 use arrow::ffi_stream::ArrowArrayStreamReader;
 use arrow::pyarrow::FromPyArrow;
+use ballista::prelude::SessionContextExt;
 use datafusion::execution::session_state::SessionStateBuilder;
 use object_store::ObjectStore;
 use url::Url;
@@ -271,11 +272,13 @@ pub struct PySessionContext {
 
 #[pymethods]
 impl PySessionContext {
-    #[pyo3(signature = (config=None, runtime=None))]
+    #[pyo3(signature = (config=None, runtime=None, ballista_url=None))]
     #[new]
     pub fn new(
         config: Option<PySessionConfig>,
         runtime: Option<PyRuntimeConfig>,
+        ballista_url: Option<String>,
+        py: Python,
     ) -> PyResult<Self> {
         let config = if let Some(c) = config {
             c.config
@@ -293,9 +296,16 @@ impl PySessionContext {
             .with_runtime_env(runtime)
             .with_default_features()
             .build();
-        Ok(PySessionContext {
-            ctx: SessionContext::new_with_state(session_state),
-        })
+
+        match ballista_url {
+            Some(url) => Ok(PySessionContext {
+                ctx: wait_for_future(py, SessionContext::remote_with_state(&url, session_state))
+                    .map_err(DataFusionError::from)?,
+            }),
+            None => Ok(PySessionContext {
+                ctx: SessionContext::new_with_state(session_state),
+            }),
+        }
     }
 
     /// Register an object store with the given name

more details at apache/datafusion-python@main...milenkovicm:datafusion-python:feat_add_ballista

If we go this direction we would need to make ballista optional feature

@milenkovicm
Copy link
Contributor Author

milenkovicm commented Dec 4, 2024

I don’t have my computer this weekend so I can’t test to verify but you may get unblocked if you do ctx = SessionContext(ctx)

I finally got some time to try this, but unfortunately no luck, no such function.

I tried variation of the proposal wrapping DataFrame, but same error

from ballista import BallistaBuilder
# from datafusion.context import SessionContext
from datafusion import functions as f
from datafusion.dataframe import DataFrame


ctx = BallistaBuilder()\
    .standalone()

df = ctx.sql("SELECT 1 as r")

df0 = DataFrame(df)
df0.aggregate(
    [f.col("r")], [f.count_star()]
)
df0.show()
TypeError                                 Traceback (most recent call last)
Cell In[4], [line 14](vscode-notebook-cell:?execution_count=4&line=14)
     [11](vscode-notebook-cell:?execution_count=4&line=11) df = ctx.sql("SELECT 1 as r")
     [13](vscode-notebook-cell:?execution_count=4&line=13) df0 = DataFrame(df)
---> [14](vscode-notebook-cell:?execution_count=4&line=14) df0.aggregate(
     [15](vscode-notebook-cell:?execution_count=4&line=15)     [f.col("r")], [f.count_star()]
     [16](vscode-notebook-cell:?execution_count=4&line=16) )
     [17](vscode-notebook-cell:?execution_count=4&line=17) df0.show()

File ~/git/arrow-ballista/python/venv/lib/python3.12/site-packages/datafusion/dataframe.py:197, in DataFrame.aggregate(self, group_by, aggs)
    [195](.../datafusion/dataframe.py:195) group_by = [e.expr for e in group_by]
    [196](.../datafusion/dataframe.py:196) aggs = [e.expr for e in aggs]
--> [197](.../datafusion/dataframe.py:197) return DataFrame(self.df.aggregate(group_by, aggs))

Update:

I have also tried:

ctx = SessionContext()
ctx.ctx = BallistaBuilder()\
    .standalone()

same issue with function conversion as previous

@milenkovicm
Copy link
Contributor Author

After spending some time and reading PyO3/pyo3#1444 there is no simple solution for the problem.

@milenkovicm
Copy link
Contributor Author

Summary

After some instigation and reading PyO3/pyo3#1444 it looks not trivial to share (pyo3) structures between multiple crates, there might be some hacks but its a long shot.

So options mentioned in #1142 still stands:

Starting from option 2 - re-export all the (py)datafusion structures and functions as part of (py)ballista. I can't comment about effort scale, but if we go with it we could get into same situation were ballista was, constantly lagging behind (py)datafusion. Thus I'd argue that this approach would be dead-on-arrival due to lack of maintainers, and overall duplicated work.

Option 1 - creating ballista specific context in (py)datafusion. IMHO, this approach makes the most sense from technical perspective. We would just need to expose optional (py)datafusion ballista integration. This would mean a bit of extra work on (py)datafusion team. Ballista would be baggage which in the long run may go to "unmaintained" mode.

In short term, I would suggest not to release (py)ballista bindings, until we make decision on approach. Also, if we decide to go with "Option 1" we could use (py)ballista project for scheduler/executor py bindings.

Open for any suggestion

@andygrove
Copy link
Member

One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?

We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.

@milenkovicm
Copy link
Contributor Author

we dont even need flightsql, protocol supports sending sql statement:

string sql = 2 [deprecated=true]; // I'd suggest to remove this, if SQL needed use `flight-sql`

so we would not need any context on (py)ballista side just a grpc client

@milenkovicm
Copy link
Contributor Author

personally I find (py)datafusion running on ballista killer feature :) a great way to avoid GIL limitations

@milenkovicm
Copy link
Contributor Author

One more option to throw in. Could we reduce the scope for (py)Ballista for now to just support SQL and not the DataFrame API?

We would just need the ability to send SQL to the server (perhaps via FlightSQL) and then fetch record batches.

@andygrove may I ask what kind of scenarios you'd like to support with "option 3"?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

3 participants