diff --git a/dev/sparktestsupport/modules.py b/dev/sparktestsupport/modules.py index e73cdd7b80c3f..ec4b25547dc4c 100644 --- a/dev/sparktestsupport/modules.py +++ b/dev/sparktestsupport/modules.py @@ -478,6 +478,7 @@ def __hash__(self): "pyspark.sql.catalog", "pyspark.sql.classic.column", "pyspark.sql.classic.dataframe", + "pyspark.sql.classic.window", "pyspark.sql.datasource", "pyspark.sql.group", "pyspark.sql.functions.builtin", @@ -488,7 +489,6 @@ def __hash__(self): "pyspark.sql.streaming.listener", "pyspark.sql.udf", "pyspark.sql.udtf", - "pyspark.sql.window", "pyspark.sql.avro.functions", "pyspark.sql.protobuf.functions", "pyspark.sql.pandas.conversion", diff --git a/python/pyspark/sql/classic/column.py b/python/pyspark/sql/classic/column.py index dc8a9a1c93b4c..7630cfed5c173 100644 --- a/python/pyspark/sql/classic/column.py +++ b/python/pyspark/sql/classic/column.py @@ -588,7 +588,7 @@ def otherwise(self, value: Any) -> ParentColumn: return Column(jc) def over(self, window: "WindowSpec") -> ParentColumn: - from pyspark.sql.window import WindowSpec + from pyspark.sql.classic.window import WindowSpec if not isinstance(window, WindowSpec): raise PySparkTypeError( diff --git a/python/pyspark/sql/classic/window.py b/python/pyspark/sql/classic/window.py new file mode 100644 index 0000000000000..b5c528eec10a1 --- /dev/null +++ b/python/pyspark/sql/classic/window.py @@ -0,0 +1,146 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +import sys +from typing import cast, Iterable, List, Tuple, TYPE_CHECKING, Union + +from pyspark.sql.window import ( + Window as ParentWindow, + WindowSpec as ParentWindowSpec, +) +from pyspark.sql.utils import get_active_spark_context + +if TYPE_CHECKING: + from py4j.java_gateway import JavaObject + from pyspark.sql._typing import ColumnOrName, ColumnOrName_ + + +__all__ = ["Window", "WindowSpec"] + + +def _to_java_cols(cols: Tuple[Union["ColumnOrName", List["ColumnOrName_"]], ...]) -> "JavaObject": + from pyspark.sql.classic.column import _to_seq, _to_java_column + + if len(cols) == 1 and isinstance(cols[0], list): + cols = cols[0] # type: ignore[assignment] + sc = get_active_spark_context() + return _to_seq(sc, cast(Iterable["ColumnOrName"], cols), _to_java_column) + + +class Window(ParentWindow): + @staticmethod + def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: + from py4j.java_gateway import JVMView + + sc = get_active_spark_context() + jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.partitionBy( + _to_java_cols(cols) + ) + return WindowSpec(jspec) + + @staticmethod + def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: + from py4j.java_gateway import JVMView + + sc = get_active_spark_context() + jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( + _to_java_cols(cols) + ) + return WindowSpec(jspec) + + @staticmethod + def rowsBetween(start: int, end: int) -> ParentWindowSpec: + from py4j.java_gateway import JVMView + + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing + sc = get_active_spark_context() + jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.rowsBetween( + start, end + ) + return WindowSpec(jspec) + + @staticmethod + def rangeBetween(start: int, end: int) -> ParentWindowSpec: + from py4j.java_gateway import JVMView + + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing + sc = get_active_spark_context() + jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.rangeBetween( + start, end + ) + return WindowSpec(jspec) + + +class WindowSpec(ParentWindowSpec): + def __new__(cls, jspec: "JavaObject") -> "WindowSpec": + self = object.__new__(cls) + self.__init__(jspec) # type: ignore[misc] + return self + + def __init__(self, jspec: "JavaObject") -> None: + self._jspec = jspec + + def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: + return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols))) + + def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: + return WindowSpec(self._jspec.orderBy(_to_java_cols(cols))) + + def rowsBetween(self, start: int, end: int) -> ParentWindowSpec: + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing + return WindowSpec(self._jspec.rowsBetween(start, end)) + + def rangeBetween(self, start: int, end: int) -> ParentWindowSpec: + if start <= Window._PRECEDING_THRESHOLD: + start = Window.unboundedPreceding + if end >= Window._FOLLOWING_THRESHOLD: + end = Window.unboundedFollowing + return WindowSpec(self._jspec.rangeBetween(start, end)) + + +def _test() -> None: + import doctest + from pyspark.sql import SparkSession + import pyspark.sql.window + + # It inherits docstrings but doctests cannot detect them so we run + # the parent classe's doctests here directly. + globs = pyspark.sql.window.__dict__.copy() + spark = ( + SparkSession.builder.master("local[4]").appName("sql.classic.window tests").getOrCreate() + ) + globs["spark"] = spark + (failure_count, test_count) = doctest.testmod( + pyspark.sql.window, + globs=globs, + optionflags=doctest.ELLIPSIS | doctest.NORMALIZE_WHITESPACE | doctest.REPORT_NDIFF, + ) + spark.stop() + if failure_count: + sys.exit(-1) + + +if __name__ == "__main__": + _test() diff --git a/python/pyspark/sql/connect/_typing.py b/python/pyspark/sql/connect/_typing.py index efb3e0e8eb507..806476af1eb60 100644 --- a/python/pyspark/sql/connect/_typing.py +++ b/python/pyspark/sql/connect/_typing.py @@ -15,7 +15,7 @@ # limitations under the License. # from types import FunctionType -from typing import Any, Callable, Iterable, Union, Optional, NewType, Protocol, Tuple +from typing import Any, Callable, Iterable, Union, Optional, NewType, Protocol, Tuple, TypeVar import datetime import decimal @@ -28,6 +28,7 @@ ColumnOrName = Union[Column, str] +ColumnOrName_ = TypeVar("ColumnOrName_", bound=ColumnOrName) ColumnOrNameOrOrdinal = Union[Column, str, int] diff --git a/python/pyspark/sql/connect/window.py b/python/pyspark/sql/connect/window.py index 6fc1a1fac1e3d..95e2c4ad2ea77 100644 --- a/python/pyspark/sql/connect/window.py +++ b/python/pyspark/sql/connect/window.py @@ -18,24 +18,23 @@ check_dependencies(__name__) -import sys from typing import TYPE_CHECKING, Union, Sequence, List, Optional from pyspark.sql.column import Column +from pyspark.sql.window import ( + Window as ParentWindow, + WindowSpec as ParentWindowSpec, +) from pyspark.sql.connect.expressions import ( ColumnReference, Expression, SortOrder, ) -from pyspark.util import ( - JVM_LONG_MIN, - JVM_LONG_MAX, -) from pyspark.sql.window import Window as PySparkWindow, WindowSpec as PySparkWindowSpec from pyspark.errors import PySparkTypeError if TYPE_CHECKING: - from pyspark.sql.connect._typing import ColumnOrName + from pyspark.sql.connect._typing import ColumnOrName, ColumnOrName_ __all__ = ["Window", "WindowSpec"] @@ -63,7 +62,17 @@ def __repr__(self) -> str: return f"WindowFrame(RANGE_FRAME, {self._start}, {self._end})" -class WindowSpec: +class WindowSpec(ParentWindowSpec): + def __new__( + cls, + partitionSpec: Sequence[Expression], + orderSpec: Sequence[SortOrder], + frame: Optional[WindowFrame], + ) -> "WindowSpec": + self = object.__new__(cls) + self.__init__(partitionSpec, orderSpec, frame) # type: ignore[misc] + return self + def __init__( self, partitionSpec: Sequence[Expression], @@ -84,7 +93,7 @@ def __init__( self._frame = frame - def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": + def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: _cols: List[ColumnOrName] = [] for col in cols: if isinstance(col, (str, Column)): @@ -105,11 +114,11 @@ def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "Wi ) newPartitionSpec: List[Expression] = [] - for c in _cols: + for c in _cols: # type: ignore[assignment] if isinstance(c, Column): newPartitionSpec.append(c._expr) # type: ignore[arg-type] else: - newPartitionSpec.append(ColumnReference(c)) + newPartitionSpec.append(ColumnReference(c)) # type: ignore[arg-type] return WindowSpec( partitionSpec=newPartitionSpec, @@ -117,7 +126,7 @@ def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "Wi frame=self._frame, ) - def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": + def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: _cols: List[ColumnOrName] = [] for col in cols: if isinstance(col, (str, Column)): @@ -138,14 +147,14 @@ def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "Window ) newOrderSpec: List[SortOrder] = [] - for c in _cols: + for c in _cols: # type: ignore[assignment] if isinstance(c, Column): if isinstance(c._expr, SortOrder): newOrderSpec.append(c._expr) else: newOrderSpec.append(SortOrder(c._expr)) # type: ignore[arg-type] else: - newOrderSpec.append(SortOrder(ColumnReference(c))) + newOrderSpec.append(SortOrder(ColumnReference(c))) # type: ignore[arg-type] return WindowSpec( partitionSpec=self._partitionSpec, @@ -153,7 +162,7 @@ def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "Window frame=self._frame, ) - def rowsBetween(self, start: int, end: int) -> "WindowSpec": + def rowsBetween(self, start: int, end: int) -> ParentWindowSpec: if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: @@ -165,7 +174,7 @@ def rowsBetween(self, start: int, end: int) -> "WindowSpec": frame=WindowFrame(isRowFrame=True, start=start, end=end), ) - def rangeBetween(self, start: int, end: int) -> "WindowSpec": + def rangeBetween(self, start: int, end: int) -> ParentWindowSpec: if start <= Window._PRECEDING_THRESHOLD: start = Window.unboundedPreceding if end >= Window._FOLLOWING_THRESHOLD: @@ -197,32 +206,23 @@ def __repr__(self) -> str: WindowSpec.__doc__ = PySparkWindowSpec.__doc__ -class Window: - _PRECEDING_THRESHOLD = max(-sys.maxsize, JVM_LONG_MIN) - _FOLLOWING_THRESHOLD = min(sys.maxsize, JVM_LONG_MAX) - - unboundedPreceding: int = JVM_LONG_MIN - - unboundedFollowing: int = JVM_LONG_MAX - - currentRow: int = 0 - +class Window(ParentWindow): _spec = WindowSpec(partitionSpec=[], orderSpec=[], frame=None) @staticmethod - def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": + def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: return Window._spec.partitionBy(*cols) @staticmethod - def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName"]]) -> "WindowSpec": + def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> ParentWindowSpec: return Window._spec.orderBy(*cols) @staticmethod - def rowsBetween(start: int, end: int) -> "WindowSpec": + def rowsBetween(start: int, end: int) -> ParentWindowSpec: return Window._spec.rowsBetween(start, end) @staticmethod - def rangeBetween(start: int, end: int) -> "WindowSpec": + def rangeBetween(start: int, end: int) -> ParentWindowSpec: return Window._spec.rangeBetween(start, end) diff --git a/python/pyspark/sql/utils.py b/python/pyspark/sql/utils.py index 98bc7a72c4aa1..1638a6b38cefa 100644 --- a/python/pyspark/sql/utils.py +++ b/python/pyspark/sql/utils.py @@ -290,36 +290,6 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: return cast(FuncT, wrapped) -def try_remote_window(f: FuncT) -> FuncT: - """Mark API supported from Spark Connect.""" - - @functools.wraps(f) - def wrapped(*args: Any, **kwargs: Any) -> Any: - if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ: - from pyspark.sql.connect.window import Window - - return getattr(Window, f.__name__)(*args, **kwargs) - else: - return f(*args, **kwargs) - - return cast(FuncT, wrapped) - - -def try_remote_windowspec(f: FuncT) -> FuncT: - """Mark API supported from Spark Connect.""" - - @functools.wraps(f) - def wrapped(*args: Any, **kwargs: Any) -> Any: - if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ: - from pyspark.sql.connect.window import WindowSpec - - return getattr(WindowSpec, f.__name__)(*args, **kwargs) - else: - return f(*args, **kwargs) - - return cast(FuncT, wrapped) - - def get_active_spark_context() -> "SparkContext": """Raise RuntimeError if SparkContext is not initialized, otherwise, returns the active SparkContext.""" @@ -404,6 +374,31 @@ def wrapped(*args: Any, **kwargs: Any) -> Any: return cast(FuncT, wrapped) +def dispatch_window_method(f: FuncT) -> FuncT: + """ + For the usecases of direct Window.method(col, ...), it checks if self + is a Connect Window or Classic Window, and dispatches. + """ + + @functools.wraps(f) + def wrapped(*args: Any, **kwargs: Any) -> Any: + if is_remote() and "PYSPARK_NO_NAMESPACE_SHARE" not in os.environ: + from pyspark.sql.connect.window import Window as ConnectWindow + + return getattr(ConnectWindow, f.__name__)(*args, **kwargs) + else: + from pyspark.sql.classic.window import Window as ClassicWindow + + return getattr(ClassicWindow, f.__name__)(*args, **kwargs) + + raise PySparkNotImplementedError( + error_class="NOT_IMPLEMENTED", + message_parameters={"feature": f"Window.{f.__name__}"}, + ) + + return cast(FuncT, wrapped) + + def pyspark_column_op( func_name: str, left: "IndexOpsLike", right: Any, fillna: Any = None ) -> Union["SeriesOrIndex", None]: @@ -428,7 +423,7 @@ def get_window_class() -> Type["Window"]: if is_remote(): from pyspark.sql.connect.window import Window as ConnectWindow - return ConnectWindow # type: ignore[return-value] + return ConnectWindow else: return PySparkWindow diff --git a/python/pyspark/sql/window.py b/python/pyspark/sql/window.py index 93ed5c172ffe5..22c9f697acde3 100644 --- a/python/pyspark/sql/window.py +++ b/python/pyspark/sql/window.py @@ -14,13 +14,16 @@ # See the License for the specific language governing permissions and # limitations under the License. # + +# mypy: disable-error-code="empty-body" + import sys -from typing import cast, Iterable, List, Tuple, TYPE_CHECKING, Union +from typing import List, TYPE_CHECKING, Union -from pyspark.sql.utils import ( - try_remote_window, - try_remote_windowspec, - get_active_spark_context, +from pyspark.sql.utils import dispatch_window_method +from pyspark.util import ( + JVM_LONG_MIN, + JVM_LONG_MAX, ) if TYPE_CHECKING: @@ -30,15 +33,6 @@ __all__ = ["Window", "WindowSpec"] -def _to_java_cols(cols: Tuple[Union["ColumnOrName", List["ColumnOrName_"]], ...]) -> "JavaObject": - from pyspark.sql.classic.column import _to_seq, _to_java_column - - if len(cols) == 1 and isinstance(cols[0], list): - cols = cols[0] # type: ignore[assignment] - sc = get_active_spark_context() - return _to_seq(sc, cast(Iterable["ColumnOrName"], cols), _to_java_column) - - class Window: """ Utility functions for defining window in DataFrames. @@ -63,19 +57,17 @@ class Window: >>> window = Window.orderBy("date").partitionBy("country").rangeBetween(-3, 3) """ - _JAVA_MIN_LONG = -(1 << 63) # -9223372036854775808 - _JAVA_MAX_LONG = (1 << 63) - 1 # 9223372036854775807 - _PRECEDING_THRESHOLD = max(-sys.maxsize, _JAVA_MIN_LONG) - _FOLLOWING_THRESHOLD = min(sys.maxsize, _JAVA_MAX_LONG) + _PRECEDING_THRESHOLD = max(-sys.maxsize, JVM_LONG_MIN) + _FOLLOWING_THRESHOLD = min(sys.maxsize, JVM_LONG_MAX) - unboundedPreceding: int = _JAVA_MIN_LONG + unboundedPreceding: int = JVM_LONG_MIN - unboundedFollowing: int = _JAVA_MAX_LONG + unboundedFollowing: int = JVM_LONG_MAX currentRow: int = 0 @staticmethod - @try_remote_window + @dispatch_window_method def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowSpec": """ Creates a :class:`WindowSpec` with the partitioning defined. @@ -125,16 +117,10 @@ def partitionBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowS | 3| b| 3| +---+--------+----------+ """ - from py4j.java_gateway import JVMView - - sc = get_active_spark_context() - jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.partitionBy( - _to_java_cols(cols) - ) - return WindowSpec(jspec) + ... @staticmethod - @try_remote_window + @dispatch_window_method def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowSpec": """ Creates a :class:`WindowSpec` with the ordering defined. @@ -184,16 +170,10 @@ def orderBy(*cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowSpec" | 3| b| 1| +---+--------+----------+ """ - from py4j.java_gateway import JVMView - - sc = get_active_spark_context() - jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.orderBy( - _to_java_cols(cols) - ) - return WindowSpec(jspec) + ... @staticmethod - @try_remote_window + @dispatch_window_method def rowsBetween(start: int, end: int) -> "WindowSpec": """ Creates a :class:`WindowSpec` with the frame boundaries defined, @@ -267,20 +247,10 @@ def rowsBetween(start: int, end: int) -> "WindowSpec": +---+--------+---+ """ - from py4j.java_gateway import JVMView - - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - sc = get_active_spark_context() - jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.rowsBetween( - start, end - ) - return WindowSpec(jspec) + ... @staticmethod - @try_remote_window + @dispatch_window_method def rangeBetween(start: int, end: int) -> "WindowSpec": """ Creates a :class:`WindowSpec` with the frame boundaries defined, @@ -357,17 +327,7 @@ def rangeBetween(start: int, end: int) -> "WindowSpec": +---+--------+---+ """ - from py4j.java_gateway import JVMView - - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - sc = get_active_spark_context() - jspec = cast(JVMView, sc._jvm).org.apache.spark.sql.expressions.Window.rangeBetween( - start, end - ) - return WindowSpec(jspec) + ... class WindowSpec: @@ -383,10 +343,11 @@ class WindowSpec: Supports Spark Connect. """ - def __init__(self, jspec: "JavaObject") -> None: - self._jspec = jspec + def __new__(cls, jspec: "JavaObject") -> "WindowSpec": + from pyspark.sql.classic.WindowSpec import WindowSpec # type: ignore[import-not-found] + + return WindowSpec.__new__(WindowSpec, jspec) - @try_remote_windowspec def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowSpec": """ Defines the partitioning columns in a :class:`WindowSpec`. @@ -398,9 +359,8 @@ def partitionBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "W cols : str, :class:`Column` or list names of columns or expressions """ - return WindowSpec(self._jspec.partitionBy(_to_java_cols(cols))) + ... - @try_remote_windowspec def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "WindowSpec": """ Defines the ordering columns in a :class:`WindowSpec`. @@ -412,9 +372,8 @@ def orderBy(self, *cols: Union["ColumnOrName", List["ColumnOrName_"]]) -> "Windo cols : str, :class:`Column` or list names of columns or expressions """ - return WindowSpec(self._jspec.orderBy(_to_java_cols(cols))) + ... - @try_remote_windowspec def rowsBetween(self, start: int, end: int) -> "WindowSpec": """ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). @@ -440,13 +399,8 @@ def rowsBetween(self, start: int, end: int) -> "WindowSpec": The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - return WindowSpec(self._jspec.rowsBetween(start, end)) + ... - @try_remote_windowspec def rangeBetween(self, start: int, end: int) -> "WindowSpec": """ Defines the frame boundaries, from `start` (inclusive) to `end` (inclusive). @@ -472,29 +426,4 @@ def rangeBetween(self, start: int, end: int) -> "WindowSpec": The frame is unbounded if this is ``Window.unboundedFollowing``, or any value greater than or equal to min(sys.maxsize, 9223372036854775807). """ - if start <= Window._PRECEDING_THRESHOLD: - start = Window.unboundedPreceding - if end >= Window._FOLLOWING_THRESHOLD: - end = Window.unboundedFollowing - return WindowSpec(self._jspec.rangeBetween(start, end)) - - -def _test() -> None: - import doctest - from pyspark.sql import SparkSession - import pyspark.sql.window - - globs = pyspark.sql.window.__dict__.copy() - spark = SparkSession.builder.master("local[4]").appName("sql.window tests").getOrCreate() - globs["spark"] = spark - - (failure_count, test_count) = doctest.testmod( - pyspark.sql.window, globs=globs, optionflags=doctest.NORMALIZE_WHITESPACE - ) - spark.stop() - if failure_count: - sys.exit(-1) - - -if __name__ == "__main__": - _test() + ...