Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Branch 3.5 #1

Closed
wants to merge 7,433 commits into from
Closed

Conversation

jiyong-lee-dev
Copy link

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

hasnain-db and others added 30 commits September 28, 2023 18:17
### What changes were proposed in this pull request?

This PR avoids a race condition where a connection which is in the process of being closed could be returned by the TransportClientFactory only to be immediately closed and cause errors upon use.

This race condition is rare and not easily triggered, but with the upcoming changes to introduce SSL connection support, connection closing can take just a slight bit longer and it's much easier to trigger this issue.

Looking at the history of the code I believe this was an oversight in #9853.

### Why are the changes needed?

Without this change, some of the new tests added in #42685 would fail

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests were run in CI.
Without this change, some of the new tests added in #42685 fail

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43162 from hasnain-db/spark-tls-timeout.

Authored-by: Hasnain Lakhani <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 2a88fea)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
### What changes were proposed in this pull request?
Add `keepReadLock` parameter in `lockNewBlockForWriting()`. When `keepReadLock` is `false`, skip `lockForReading()` to avoid block on read Lock or potential deadlock issue.

When 2 tasks try to compute same rdd with replication level of 2 and running on only 2 executors. Deadlock will happen. Details refer [SPARK-45057]

Task thread hold write lock and waiting for replication to remote executor while shuffle server thread which handling block upload request waiting on `lockForReading` in [BlockInfoManager.scala](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/storage/BlockInfoManager.scala#L457C24-L457C24)

### Why are the changes needed?
This could save unnecessary read lock acquire and avoid deadlock issue mention above.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added UT in BlockInfoManagerSuite

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43067 from warrenzhu25/deadlock.

Authored-by: Warren Zhu <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 0d6fda5)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…edExecutorBackend

### What changes were proposed in this pull request?
Fix a subtle thread-safety issue with CoarseGrainedExecutorBackend where an executor process randomly gets stuck

### Why are the changes needed?
For each executor, the single-threaded dispatcher can run into an "infinite loop" (as explained in the SPARK-45227). Once an executor process runs into a state, it'd stop launching tasks from the driver or reporting task status back.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
```
$ build/mvn package -DskipTests -pl core
$ build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.executor.CoarseGrainedExecutorBackendSuite test
```

### Was this patch authored or co-authored using generative AI tooling?
No

******************************************************************************
**_Please feel free to skip reading unless you're interested in details_**
******************************************************************************

### Symptom

Our Spark 3 app running on EMR 6.10.0 with Spark 3.3.1 got stuck in the very last step of writing a data frame to S3 by calling `df.write`. Looking at Spark UI, we saw that an executor process hung over 1 hour. After we manually killed the executor process, the app succeeded. Note that the same EMR cluster with two worker nodes was able to run the same app without any issue before and after the incident.

Below is what's observed from relevant container logs and thread dump.

- A regular task that's sent to the executor, which also reported back to the driver upon the task completion.

```
    $zgrep 'task 150' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 150.0 in stage 23.0 (TID 923) (ip-10-0-185-107.ec2.internal, executor 3, partition 150, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()
    23/09/12 18:13:55 INFO TaskSetManager: Finished task 150.0 in stage 23.0 (TID 923) in 126 ms on ip-10-0-185-107.ec2.internal (executor 3) (16/200)

    $zgrep ' 923' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 923

    $zgrep 'task 150' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO Executor: Running task 150.0 in stage 23.0 (TID 923)
    23/09/12 18:13:55 INFO Executor: Finished task 150.0 in stage 23.0 (TID 923). 4495 bytes result sent to driver
```

- Another task that's sent to the executor but didn't get launched since the single-threaded dispatcher was stuck (presumably in an "infinite loop" as explained later).

```
    $zgrep 'task 153' container_1694029806204_12865_01_000001/stderr.gz
    23/09/12 18:13:55 INFO TaskSetManager: Starting task 153.0 in stage 23.0 (TID 924) (ip-10-0-185-107.ec2.internal, executor 3, partition 153, NODE_LOCAL, 4432 bytes) taskResourceAssignments Map()

    $zgrep ' 924' container_1694029806204_12865_01_000004/stderr.gz
    23/09/12 18:13:55 INFO YarnCoarseGrainedExecutorBackend: Got assigned task 924

    $zgrep 'task 153' container_1694029806204_12865_01_000004/stderr.gz
    >> note that the above command has no matching result, indicating that task 153.0 in stage 23.0 (TID 924) was never launched
```

- Thread dump shows that the dispatcher-Executor thread has the following stack trace.

```
    "dispatcher-Executor" #40 daemon prio=5 os_prio=0 tid=0x0000ffff98e37800 nid=0x1aff runnable [0x0000ffff73bba000]
    java.lang.Thread.State: RUNNABLE
    at scala.runtime.BoxesRunTime.equalsNumObject(BoxesRunTime.java:142)
    at scala.runtime.BoxesRunTime.equals2(BoxesRunTime.java:131)
    at scala.runtime.BoxesRunTime.equals(BoxesRunTime.java:123)
    at scala.collection.mutable.HashTable.elemEquals(HashTable.scala:365)
    at scala.collection.mutable.HashTable.elemEquals$(HashTable.scala:365)
    at scala.collection.mutable.HashMap.elemEquals(HashMap.scala:44)
    at scala.collection.mutable.HashTable.findEntry0(HashTable.scala:140)
    at scala.collection.mutable.HashTable.findOrAddEntry(HashTable.scala:169)
    at scala.collection.mutable.HashTable.findOrAddEntry$(HashTable.scala:167)
    at scala.collection.mutable.HashMap.findOrAddEntry(HashMap.scala:44)
    at scala.collection.mutable.HashMap.put(HashMap.scala:126)
    at scala.collection.mutable.HashMap.update(HashMap.scala:131)
    at org.apache.spark.executor.CoarseGrainedExecutorBackend$$anonfun$receive$1.applyOrElse(CoarseGrainedExecutorBackend.scala:200)
    at org.apache.spark.rpc.netty.Inbox.$anonfun$process$1(Inbox.scala:115)
    at org.apache.spark.rpc.netty.Inbox$$Lambda$323/1930826709.apply$mcV$sp(Unknown Source)
    at org.apache.spark.rpc.netty.Inbox.safelyCall(Inbox.scala:213)
    at org.apache.spark.rpc.netty.Inbox.process(Inbox.scala:100)
    at org.apache.spark.rpc.netty.MessageLoop.org$apache$spark$rpc$netty$MessageLoop$$receiveLoop(MessageLoop.scala:75)
    at org.apache.spark.rpc.netty.MessageLoop$$anon$1.run(MessageLoop.scala:41)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:750)
```

### Relevant code paths

Within an executor process, there's a [dispatcher thread](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/rpc/netty/MessageLoop.scala#L170) dedicated to CoarseGrainedExecutorBackend(a single RPC endpoint) that launches tasks scheduled by the driver. Each task is run on a TaskRunner thread backed by a thread pool created for the executor. The TaskRunner thread and the dispatcher thread are different. However, they read and write a common object (i.e., taskResources) that's a mutable hashmap without thread-safety, in [Executor](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/Executor.scala#L561) and [CoarseGrainedExecutorBackend](https://github.com/apache/spark/blob/1fdd46f173f7bc90e0523eb0a2d5e8e27e990102/core/src/main/scala/org/apache/spark/executor/CoarseGrainedExecutorBackend.scala#L189), respectively.

### What's going on?

Based on the above observations, our hypothesis is that the dispatcher thread runs into an "infinite loop" due to a race condition when two threads access the same hashmap object.  For illustration purpose, let's consider the following scenario where two threads (Thread 1 and Thread 2) access a hash table without thread-safety

- Thread 1 sees A.next = B, but then yields execution to Thread 2
<img src="https://issues.apache.org/jira/secure/attachment/13063040/13063040_hashtable1.png" width="400">

- Thread 2 triggers a resize operation resulting in B.next = A (Note that hashmap doesn't care about ordering), and then yields execution to Thread 1.
<img src="https://issues.apache.org/jira/secure/attachment/13063041/13063041_hashtable2.png" width="400">

- After taking over CPU, Thread 1 would run into an "infinite loop" when traversing the list in the last bucket, given A.next = B and B.next = A in its view.

Closes #43021 from xiongbo-sjtu/master.

Authored-by: Bo Xiong <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 8e6b160)
Signed-off-by: Mridul Muralidharan <mridulatgmail.com>
…rting Metrics programmatically using Asynchronous APIs

Fix Python language code sample in the docs for _StreamingQueryListener_:
Reporting Metrics programmatically using Asynchronous APIs section.

### What changes were proposed in this pull request?
The code sample in the [Reporting Metrics programmatically using Asynchronous APIs](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#reporting-metrics-programmatically-using-asynchronous-apis) section was this:
```
spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        println("Query terminated: " + queryTerminated.id)

    def onQueryTerminated(self, event):
        println("Query made progress: " + queryProgress.progress)

spark.streams.addListener(Listener())
```

Which is not a proper Python code, and has QueryProgress and QueryTerminated prints mixed. Proposed change/fix:
```
spark = ...

class Listener(StreamingQueryListener):
    def onQueryStarted(self, event):
        print("Query started: " + queryStarted.id)

    def onQueryProgress(self, event):
        print("Query made progress: " + queryProgress.progress)

    def onQueryTerminated(self, event):
        print("Query terminated: " + queryTerminated.id)

spark.streams.addListener(Listener())
```

### Why are the changes needed?
To fix docimentation errors.

### Does this PR introduce _any_ user-facing change?
Yes. -> Sample python code snippet is fixed in docs (see above).

### How was this patch tested?
Checked with github's .md preview, and built the docs according to the readme.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43190 from kasztp/master.

Authored-by: Peter Kaszt <[email protected]>
Signed-off-by: Sean Owen <[email protected]>
(cherry picked from commit d708fd7)
Signed-off-by: Sean Owen <[email protected]>
…Client

### What changes were proposed in this pull request?
This PR fixes shading for the Spark Connect Scala Client maven build. The following things are addressed:
- Guava and protobuf are included in the shaded jars. These were missing, and were causing users to see `ClassNotFoundException`s.
- Fixed duplicate shading of guava. We use the parent pom's location now.
- Fixed duplicate Netty dependency (shaded and transitive). One was used for GRPC and the other was needed by Arrow. This was fixed by pulling arrow into the shaded jar.
- Use the same package as the shading defined in the parent package.

### Why are the changes needed?
The maven artifacts for the Spark Connect Scala Client are currently broken.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Manual tests.
#### Step 1:  Build new shaded library and install it in local maven repository
`build/mvn clean install -pl connector/connect/client/jvm -am -DskipTests`
#### Step 2: Start Connect Server
`connector/connect/bin/spark-connect`
#### Step 3: Launch REPL using the newly created library
This step requires [coursier](https://get-coursier.io/) to be installed.
`cs launch --jvm zulu:17.0.8 --scala 2.13.9 -r m2Local com.lihaoyi:::ammonite:2.5.11 org.apache.spark::spark-connect-client-jvm:4.0.0-SNAPSHOT --java-opt --add-opens=java.base/java.nio=ALL-UNNAMED -M org.apache.spark.sql.application.ConnectRepl`
#### Step 4: Run a bunch of commands:
```scala
// Check version
spark.version

// Run a simple query
{
  spark.range(1, 10000, 1)
    .select($"id", $"id" % 5 as "group", rand(1).as("v1"), rand(2).as("v2"))
    .groupBy($"group")
    .agg(
      avg($"v1").as("v1_avg"),
      avg($"v2").as("v2_avg"))
    .show()
}

// Run a streaming query
{
  import org.apache.spark.sql.execution.streaming.ProcessingTimeTrigger
  val query_name = "simple_streaming"
  val stream = spark.readStream
    .format("rate")
    .option("numPartitions", "1")
    .option("rowsPerSecond", "10")
    .load()
    .withWatermark("timestamp", "10 milliseconds")
    .groupBy(window(col("timestamp"), "10 milliseconds"))
    .count()
    .selectExpr("window.start as timestamp", "count as num_events")
    .writeStream
    .format("memory")
    .queryName(query_name)
    .trigger(ProcessingTimeTrigger.create("10 milliseconds"))
  // run for 20 seconds
  val query = stream.start()
  val start = System.currentTimeMillis()
  val end = System.currentTimeMillis() + 20 * 1000
  while (System.currentTimeMillis() < end) {
    println(s"time: ${System.currentTimeMillis() - start} ms")
    println(query.status)
    spark.sql(s"select * from ${query_name}").show()
    Thread.sleep(500)
  }
  query.stop()
}
```

Closes #43195 from hvanhovell/SPARK-45371.

Authored-by: Herman van Hovell <[email protected]>
Signed-off-by: Herman van Hovell <[email protected]>
(cherry picked from commit e53abbb)
Signed-off-by: Herman van Hovell <[email protected]>
…ng Rlock, and use the existing eventually util function"

This reverts commit 2a9dd2b.
…ageLevel.NONE on Dataset

### What changes were proposed in this pull request?
Support for InMememoryTableScanExec in AQE was added in #39624, but this patch contained a bug when a Dataset is persisted using `StorageLevel.NONE`. Before that patch a query like:
```
import org.apache.spark.storage.StorageLevel
spark.createDataset(Seq(1, 2)).persist(StorageLevel.NONE).count()
```
would correctly return 2. But after that patch it incorrectly returns 0. This is because AQE incorrectly determines based on the runtime statistics that are collected here:
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala#L294
that the input is empty. The problem is that the action that should make sure the statistics are collected here
https://github.com/apache/spark/blob/eac5a8c7e6da94bb27e926fc9a681aed6582f7d3/sql/core/src/main/scala/org/apache/spark/sql/execution/adaptive/QueryStageExec.scala#L285-L291
never use the iterator and when we have `StorageLevel.NONE` the persisting will also not use the iterator and we will not gather the correct statistics.

The proposed fix in the patch just make calling persist with StorageLevel.NONE a no-op. Changing the action since it always "emptied" the iterator would also work but seems like that would be unnecessary work in a lot of normal circumstances.

### Why are the changes needed?
The current code has a correctness issue.

### Does this PR introduce _any_ user-facing change?
Yes, fixes the correctness issue.

### How was this patch tested?
New and existing unit tests.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43213 from eejbyfeldt/SPARK-45386-branch-3.5.

Authored-by: Emil Ejbyfeldt <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…Rlock

### What changes were proposed in this pull request?

This PR reverts the revert: 522af69. It only partially ports the real change within main code. It excludes the testing side which depends on 9798244 that does not exist in `branch-3.5`.

### Why are the changes needed?

Mainly for code clean-up.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests should cover this.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43221 from HyukjinKwon/SPARK-45167-followup2.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…n cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage level task resource profile for standalone cluster when dynamic allocation disabled. This PR enables stage-level task resource profile for yarn cluster.

### Why are the changes needed?

Users who work on spark ML/DL cases running on Yarn would expect stage-level task resource profile feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The current tests of #37268 can also cover this PR since both yarn and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover yarn cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43030 from wbo4958/yarn-task-resoure-profile.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com>
(cherry picked from commit 5b80639)
Signed-off-by: Thomas Graves <[email protected]>
…and adds `Evaluator` to `__all__` at `ml.connect`

This PR documents MLlib's Spark Connect support at API reference.

This PR also piggies back a fix in `__all__` at `python/pyspark/ml/connect/__init__.py` so `from pyspark.sql.commect import Evaluator` works.

With this this, user cannot see `pyspark.ml.connect` Python APIs on doc website.

Yes it adds the new page into your facing documentation ([PySpark API reference](https://spark.apache.org/docs/latest/api/python/reference/index.html)).

Manually tested via:

```bash
cd python/docs
make clean html
```

No.

Closes #43210 from HyukjinKwon/SPARK-45396-followup.

Lead-authored-by: Weichen Xu <[email protected]>
Co-authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 35b627a)
Signed-off-by: Hyukjin Kwon <[email protected]>
…g partition metadata

This is the backport of #43191 for `branch-3.5`, it should also be applicable for `branch-3.3` and `branch-3.4`

### What changes were proposed in this pull request?

This PR aims to fix the HMS call fallback logic introduced in SPARK-35437.

```patch
try {
  ...
  hive.getPartitionNames
  ...
  hive.getPartitionsByNames
} catch {
- case ex: InvocationTargetException if ex.getCause.isInstanceOf[MetaException] =>
+ case ex: HiveException if ex.getCause.isInstanceOf[MetaException] =>
  ...
}
```

### Why are the changes needed?

Directly method call won't throw `InvocationTargetException`, and check the code of `hive.getPartitionNames` and `hive.getPartitionsByNames`, both of them will wrap a `HiveException` if `MetaException` throws.

### Does this PR introduce _any_ user-facing change?

Yes, it should be a bug fix.

### How was this patch tested?

Pass GA and code review. (I'm not sure how to construct/simulate a MetaException during the HMS thrift call with the current HMS testing infrastructure)

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43260 from pan3793/SPARK-45389-3.5.

Authored-by: Cheng Pan <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to update `CTAS` with `LOCATION` behavior according to Spark 3.2+.

### Why are the changes needed?

SPARK-28551 changed the behavior at Apache Spark 3.2.0.

https://github.com/apache/spark/blob/24b82dfd6cfb9a658af615446be5423695830dd9/sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala#L2306-L2313

### Does this PR introduce _any_ user-facing change?

No. This is a documentation fix.

### How was this patch tested?

N/A

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #43277 from dongjoon-hyun/minor_ctas.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit 2d6d09b)
Signed-off-by: Dongjoon Hyun <[email protected]>
…ts when only prefix match

### What changes were proposed in this pull request?
When use custom pattern to parse timestamp, if there have matched prefix, not matched all. The `Iso8601TimestampFormatter::parseOptional` and `Iso8601TimestampFormatter::parseWithoutTimeZoneOptional` should not return not empty result.
eg: pattern = `yyyy-MM-dd HH:mm:ss`, value = `9999-12-31 23:59:59.999`. If fact, `yyyy-MM-dd HH:mm:ss` can parse `9999-12-31 23:59:59`  normally, but value have suffix `.999`. so we can't return not empty result.
This bug will affect inference the schema in CSV/JSON.

### Why are the changes needed?
Fix inference the schema bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43245 from Hisoka-X/SPARK-45424-inference-schema-unresolved.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit 4493b43)
Signed-off-by: Max Gekk <[email protected]>
… automatically generated `sql-error-conditions.md` file

### What changes were proposed in this pull request?
The pr aims to remove the last 2 extra spaces in the automatically generated `sql-error-conditions.md` file.

### Why are the changes needed?
- When I am work on another PR, I use the following command:
```
SPARK_GENERATE_GOLDEN_FILES=1 build/sbt \
        "core/testOnly *SparkThrowableSuite -- -t \"Error classes match with document\""
```
  I found that in the automatically generated `sql-error-conditions.md` file, there are 2 extra spaces added at the end,
Obviously, this is not what we expected, otherwise we would need to manually remove it, which is not in line with automation.

- The git tells us this difference, as follows:
<img width="725" alt="image" src="https://github.com/apache/spark/assets/15246973/a68b657f-3a00-4405-9623-1f7ab9d44d82">

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
- Pass GA.
- Manually test.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #43274 from panbingkun/SPARK-45459.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit af800b5)
Signed-off-by: Max Gekk <[email protected]>
…g table

### What changes were proposed in this pull request?

Fixes a small bug to report `TABLE_OR_VIEW_NOT_FOUND` error correctly for time travel. It was missed before because `RelationTimeTravel` is a leaf node but it may contain `UnresolvedRelation`.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

Yes, the error message becomes reasonable

### How was this patch tested?

new tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43298 from cloud-fan/time-travel.

Authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit ced321c)
Signed-off-by: Max Gekk <[email protected]>
…db instance

### What changes were proposed in this pull request?
When loading a rocksdb instance, remove file version map entry of larger versions to avoid rocksdb sst file unique id mismatch exception. The SST files in larger versions can't be reused even if they have the same size and name because they belong to another rocksdb instance.

### Why are the changes needed?
Avoid rocksdb file mismatch exception that may occur in runtime.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add rocksdb unit test.

Closes #43174 from chaoqin-li1123/rocksdb_mismatch.

Authored-by: Chaoqin Li <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
…avoid triggering multiple jobs

### What changes were proposed in this pull request?

After SPARK-35378 was changed, the execution of statements such as ‘show parititions test' became slower. The change point is that the execution process changes from ExecutedCommandEnec to CommandResultExec, but ExecutedCommandExec originally implemented the following method

override def executeToIterator(): Iterator[InternalRow] = sideEffectResult.iterator

CommandResultExec is not rewritten, so when the hasNext method is executed, a job process is created, resulting in increased time-consuming

### Why are the changes needed?

Improve performance when show partitions/tables.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing tests should cover this.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43270 from yorksity/SPARK-45205.

Authored-by: yorksity <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit c9c9922)
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
Add an equals method to `JDBCOptions` that considers two instances equal if their `JDBCOptions.parameters` are the same.

### Why are the changes needed?
We have identified a cache invalidation issue when caching JDBC tables in Spark SQL. The cached table is unexpectedly invalidated when queried, leading to a re-read from the JDBC table instead of retrieving data from the cache.
Example SQL:

```
CACHE TABLE cache_t SELECT * FROM mysql.test.test1;
SELECT * FROM cache_t;
```
Expected Behavior:
The expectation is that querying the cached table (cache_t) should retrieve the result from the cache without re-evaluating the execution plan.

Actual Behavior:
However, the cache is invalidated, and the content is re-read from the JDBC table.

Root Cause:
The issue lies in the `CacheData` class, where the comparison involves `JDBCTable`. The `JDBCTable` is a case class:

`case class JDBCTable(ident: Identifier, schema: StructType, jdbcOptions: JDBCOptions)`
The comparison of non-case class components, such as `jdbcOptions`, involves pointer comparison. This leads to unnecessary cache invalidation.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Add uts

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43258 from lyy-pineapple/spark-git-cache.

Authored-by: liangyongyuan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit d073f2d)
Signed-off-by: Wenchen Fan <[email protected]>
…eachPartition in JdbcUtils

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

In order to probably assign the SQL execution ID so `df.observe` works with this.

Yes. `df.observe` will work with JDBC connectors.

Manually tested.

Unit test was added.

Closes #43304 from HyukjinKwon/foreachbatch.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 39cc4ab)
Signed-off-by: Hyukjin Kwon <[email protected]>
… RDD.foreachPartition in JdbcUtils"

This reverts commit 40d44a3.
### What changes were proposed in this pull request?

This minor patch fixes incorrect error message of `RoundBase`.

### Why are the changes needed?

Fix incorrect error message.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #43315 from viirya/minor_fix-3.5.

Authored-by: Liang-Chi Hsieh <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…D.foreachPartition in JdbcUtils

This PR cherry-picks #43304 to branch-3.5

---

### What changes were proposed in this pull request?

This PR is kind of a followup for #39976 that addresses #39976 (comment) comment.

### Why are the changes needed?

In order to probably assign the SQL execution ID so `df.observe` works with this.

### Does this PR introduce _any_ user-facing change?

Yes. `df.observe` will work with JDBC connectors.

### How was this patch tested?

Manually tested.

### Was this patch authored or co-authored using generative AI tooling?

Unit test was added.

Closes #43322 from HyukjinKwon/SPARK-45475-3.5.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…ot match specified timestampFormat

### What changes were proposed in this pull request?
This PR fix CSV/JSON schema inference when timestamps do not match specified timestampFormat will report error.
```scala
//eg
val csv = spark.read.option("timestampFormat", "yyyy-MM-dd'T'HH:mm:ss")
  .option("inferSchema", true).csv(Seq("2884-06-24T02:45:51.138").toDS())
csv.show()
//error
Caused by: java.time.format.DateTimeParseException: Text '2884-06-24T02:45:51.138' could not be parsed, unparsed text found at index 19
```
This bug only happend when partition had one row. The data type should be `StringType` not `TimestampType` because the value not match `timestampFormat`.

Use csv as eg, in `CSVInferSchema::tryParseTimestampNTZ`, first, use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to inferring return `TimestampType`, if same partition had another row, it will use `tryParseTimestamp` to parse row with user defined `timestampFormat`, then found it can't be convert to timestamp with `timestampFormat`. Finally return `StringType`. But when only one row, we use `timestampNTZFormatter.parseWithoutTimeZoneOptional` to parse normally timestamp not right. We should only parse it when `spark.sql.timestampType` is `TIMESTAMP_NTZ`. If `spark.sql.timestampType` is `TIMESTAMP_LTZ`, we should directly parse it use `tryParseTimestamp`. To avoid return `TimestampType` when timestamps do not match specified timestampFormat.

### Why are the changes needed?
Fix schema inference bug.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
add new test.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43243 from Hisoka-X/SPARK-45433-inference-mismatch-timestamp-one-row.

Authored-by: Jia Fan <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
(cherry picked from commit eae5c0e)
Signed-off-by: Max Gekk <[email protected]>
### What changes were proposed in this pull request?

Due to a quirk in the parser, in some cases, IDENTIFIER(<funcStr>)(<arg>) is not properly recognized as a function invocation.

The change is to remove the explicit IDENTIFIER-clause rule in the function invocation grammar and instead recognize
IDENTIFIER(<arg>) within visitFunctionCall.

### Why are the changes needed?

Function invocation support for IDENTIFIER is incomplete otherwise

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added new testcases to identifier-clause.sql

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #42888 from srielau/SPARK-45132.

Lead-authored-by: srielau <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Co-authored-by: Wenchen Fan <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit f0b2e6d)
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?
With [SPARK-45182](https://issues.apache.org/jira/browse/SPARK-45182), we added a fix for not letting laggard tasks of the older attempts of the indeterminate stage from marking the partition has completed in the map output tracker.

When a task is completed, the DAG scheduler also notifies all the task sets of the stage about that partition being completed. Tasksets would not schedule such tasks if they are not already scheduled. This is not correct for the indeterminate stage, since we want to re-run all the tasks on a re-attempt

### Why are the changes needed?
Since the partition is not completed by older attempts and the partition from the newer attempt also doesn't get scheduled, the stage will have to be rescheduled to complete that partition. Since the stage is indeterminate, all the partitions will be recomputed

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Added check in existing unit test

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43326 from mayurdb/indeterminateFix.

Authored-by: mayurb <[email protected]>
Signed-off-by: Wenchen Fan <[email protected]>
(cherry picked from commit fb3b707)
Signed-off-by: Wenchen Fan <[email protected]>
### What changes were proposed in this pull request?

This PR restores the [Protobuf Data Source Guide](https://spark.apache.org/docs/latest/sql-data-sources-protobuf.html#python)'s code tabs which #40614 removed for markdown syntax fixes

In this PR, we introduce a hidden div to hold the code-block marker of markdown, then make both the liquid and markdown happy.

### Why are the changes needed?

improve doc readability and consistency.

### Does this PR introduce _any_ user-facing change?

yes, doc change

### How was this patch tested?

#### Doc build

![image](https://github.com/apache/spark/assets/8326978/8aefeee0-92b2-4048-a3f6-108e4c3f309d)

#### markdown editor and view

![image](https://github.com/apache/spark/assets/8326978/283b0820-390a-4540-8713-647c40f956ac)

### Was this patch authored or co-authored using generative AI tooling?

no

Closes #43361 from yaooqinn/SPARK-45532.

Authored-by: Kent Yao <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
(cherry picked from commit 0257b77)
Signed-off-by: Kent Yao <[email protected]>
… cluster when dynamic allocation disabled

### What changes were proposed in this pull request?
This PR is a follow-up of #37268 which supports stage-level task resource profile for standalone cluster when dynamic allocation is disabled. This PR enables stage-level task resource profile for the Kubernetes cluster.

### Why are the changes needed?

Users who work on spark ML/DL cases running on Kubernetes would expect stage-level task resource profile feature.

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?

The current tests of #37268 can also cover this PR since both Kubernetes and standalone cluster share the same TaskSchedulerImpl class which implements this feature. Apart from that, modifying the existing test to cover the Kubernetes cluster. Apart from that, I also performed some manual tests which have been updated in the comments.

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43323 from wbo4958/k8s-stage-level.

Authored-by: Bobby Wang <[email protected]>
Signed-off-by: Thomas Graves <[email protected]>
(cherry picked from commit 632eabd)
Signed-off-by: Thomas Graves <[email protected]>
…NNAMED" so Platform can access Cleaner on Java 9+

This PR adds `--add-opens=java.base/jdk.internal.ref=ALL-UNNAMED` to our JVM flags so that we can access `jdk.internal.ref.Cleaner` in JDK 9+.

This allows Spark to allocate direct memory while ignoring the JVM's MaxDirectMemorySize limit. Spark uses JDK internal APIs to directly construct DirectByteBuffers while bypassing that limit, but there is a fallback path at https://github.com/apache/spark/blob/v3.5.0/common/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java#L213 that is used if we cannot reflectively access the `Cleaner` API.

No.

Added a unit test in `PlatformUtilSuite`.

No.

Closes #43344 from JoshRosen/SPARK-45508.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit 96bac6c)
Signed-off-by: Dongjoon Hyun <[email protected]>
Fix a bug in pyspark connect.

DataFrameWriterV2.overwritePartitions set mode as overwrite_partitions [pyspark/sql/connect/readwriter.py, line 825], but WirteOperationV2 take it as overwrite_partition [pyspark/sql/connect/plan.py, line 1660]

make dataframe.writeTo(table).overwritePartitions() work

No

No test. This bug is very obvious.

No

Closes #43367 from xieshuaihu/python_connect_overwrite.

Authored-by: xieshuaihu <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 9bdad31)
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
Free up disk space for container jobs

### Why are the changes needed?
increase the available disk space

before this PR
![image](https://github.com/apache/spark/assets/7322292/64230324-607b-4c1d-ac2d-84b9bcaab12a)

after this PR
![image](https://github.com/apache/spark/assets/7322292/aafed2d6-5d26-4f7f-b020-1efe4f551a8f)

### Does this PR introduce _any_ user-facing change?
No, infra-only

### How was this patch tested?
updated CI

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #43381 from LuciferYang/SPARK-44619-35.

Authored-by: Ruifeng Zheng <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
sahnib and others added 17 commits April 5, 2024 10:51
…and load/commit for snapshot files

Backports #45724 to 3.5

### What changes were proposed in this pull request?

This PR fixes a race condition between the maintenance thread and task thread when change-log checkpointing is enabled, and ensure all snapshots are valid.

1. The maintenance thread currently relies on class variable lastSnapshot to find the latest checkpoint and uploads it to DFS. This checkpoint can be modified at commit time by Task thread if a new snapshot is created.
2. The task thread was not resetting the lastSnapshot at load time, which can result in newer snapshots (if a old version is loaded) being considered valid and uploaded to DFS. This results in VersionIdMismatch errors.

### Why are the changes needed?

These are logical bugs which can cause `VersionIdMismatch` errors causing user to discard the snapshot and restart the query.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added unit test cases.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45881 from sahnib/rocks-db-fix-3.5.

Authored-by: Bhuwan Sahni <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
…by stopping streaming query

### What changes were proposed in this pull request?

This PR deflakes the `pyspark.sql.dataframe.DataFrame.writeStream` doctest.

PR #45298 aimed to fix that test but misdiagnosed the root issue. The problem is not that concurrent tests were colliding on a temporary directory. Rather, the issue is specific to the `DataFrame.writeStream` test's logic: that test is starting a streaming query that writes files to the temporary directory, the exits the temp directory context manager without first stopping the streaming query. That creates a race condition where the context manager might be deleting the directory while the streaming query is writing new files into it, leading to the following type of error during cleanup:

```
File "/__w/spark/spark/python/pyspark/sql/dataframe.py", line ?, in pyspark.sql.dataframe.DataFrame.writeStream
Failed example:
    with tempfile.TemporaryDirectory() as d:
        # Create a table with Rate source.
        df.writeStream.toTable(
            "my_table", checkpointLocation=d)
Exception raised:
    Traceback (most recent call last):
      File "/usr/lib/python3.11/doctest.py", line 1353, in __run
        exec(compile(example.source, filename, "single",
      File "<doctest pyspark.sql.dataframe.DataFrame.writeStream[3]>", line 1, in <module>
        with tempfile.TemporaryDirectory() as d:
      File "/usr/lib/python3.11/tempfile.py", line 1043, in __exit__
        self.cleanup()
      File "/usr/lib/python3.11/tempfile.py", line 1047, in cleanup
        self._rmtree(self.name, ignore_errors=self._ignore_cleanup_errors)
      File "/usr/lib/python3.11/tempfile.py", line 1029, in _rmtree
        _rmtree(name, onerror=onerror)
      File "/usr/lib/python3.11/shutil.py", line 738, in rmtree
        onerror(os.rmdir, path, sys.exc_info())
      File "/usr/lib/python3.11/shutil.py", line 736, in rmtree
        os.rmdir(path, dir_fd=dir_fd)
    OSError: [Errno 39] Directory not empty: '/__w/spark/spark/python/target/4f062b09-213f-4ac2-a10a-2d704990141b/tmp29irqweq'
```

In this PR, I update the doctest to properly stop the streaming query.

### Why are the changes needed?

Fix flaky test.

### Does this PR introduce _any_ user-facing change?

No, test-only. Small user-facing doc change, but one that is consistent with other doctest examples.

### How was this patch tested?

Manually ran updated test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45885 from JoshRosen/fix-flaky-writestream-doctest.

Authored-by: Josh Rosen <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 0107435)
Signed-off-by: Hyukjin Kwon <[email protected]>
This PR migrates the test dependency `bcprov/bcpkix` from `jdk15on` to `jdk18on`, and upgrades the version from 1.70 to 1.77, the `jdk18on` jars are compiled to work with anything from Java 1.8 up.

The full release notes as follows:
- https://www.bouncycastle.org/releasenotes.html#r1rv77

No, just for test.

Pass GitHub Actions.

No

Closes #44359 from LuciferYang/bouncycastle-177.

Lead-authored-by: yangjie01 <[email protected]>
Co-authored-by: YangJie <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…k-common module

### What changes were proposed in this pull request?

This PR aims to fix `common/network-common/pom.xml`.

### Why are the changes needed?

To fix the cherry-pick mistake.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45897 from dongjoon-hyun/SPARK-46411.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
….7.2 and docker image to 16.2

### What changes were proposed in this pull request?

This PR aims to upgrade `PostgreSQL` JDBC driver and docker images.
- JDBC Driver: `org.postgresql:postgresql` from 42.7.0 to 42.7.2
- Docker Image: `postgres` from `15.1-alpine` to `16.2-alpine`

### Why are the changes needed?

To use the latest PostgreSQL combination in the following integration tests.

- PostgresIntegrationSuite
- PostgresKrbIntegrationSuite
- v2/PostgresIntegrationSuite
- v2/PostgresNamespaceSuite

### Does this PR introduce _any_ user-facing change?

No. This is a pure test-environment update.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45899 from dongjoon-hyun/SPARK-47111.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This is a backport of #43254.

The pr aims to upgrade snappy to 1.1.10.5.

### Why are the changes needed?
- Although the `1.1.10.4` version was upgraded approximately 2-3 weeks ago, the new version includes some bug fixes, eg:
<img width="868" alt="image" src="https://github.com/apache/spark/assets/15246973/6c7f05f7-382f-4e82-bb68-22fc50895b94">
- Full release notes: https://github.com/xerial/snappy-java/releases

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45901 from dongjoon-hyun/SPARK-45445.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…own of different versions of PySpark documents

### What changes were proposed in this pull request?
The pr aims to use the same `versions.json` in the dropdown of `different versions` of PySpark documents.

### Why are the changes needed?
As discussed in the email group, using this approach can avoid `maintenance difficulties` and `inconsistencies` that may arise when `multi active release version lines` are released in the future.
<img width="798" alt="image" src="https://github.com/apache/spark/assets/15246973/8a08a4fe-e1fb-4334-a3f9-c6dffb01cbd6">

### Does this PR introduce _any_ user-facing change?
Yes, only for pyspark docs.

### How was this patch tested?
- Manually test.
- Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes #45400 from panbingkun/SPARK-47299.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Jungtaek Lim <[email protected]>
(cherry picked from commit b299b2b)
Signed-off-by: Jungtaek Lim <[email protected]>
…setup.py

This PR is a followup of #42563 (but using new JIRA as it's already released), which adds `pyspark.sql.connect.protobuf` into `setup.py`.

So PyPI packaged PySpark can support protobuf function with Spark Connect on.

Yes. The new feature is now available with Spark Connect on if users install Spark Connect by `pip`.

Being tested in #45870

No.

Closes #45924 from HyukjinKwon/SPARK-47762.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit f94d95d)
Signed-off-by: Hyukjin Kwon <[email protected]>
…to return `false` instead of failing

### What changes were proposed in this pull request?

This PR aims to fix `GenerateMIMAIgnore.isPackagePrivateModule` to work correctly.

For example, `Metadata` is a case class inside package private `DefaultParamsReader` class. Currently, MIMA fails at this class analysis.

https://github.com/apache/spark/blob/f8e652e88320528a70e605a6a3cf986725e153a5/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L474-L485

The root cause is `isPackagePrivateModule` fails due to `scala.ScalaReflectionException`. We can simply make `isPackagePrivateModule` return `false`  instead of failing.
```
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata
Exception in thread "main" scala.ScalaReflectionException: type Serializable is not a class
	at scala.reflect.api.Symbols$SymbolApi.asClass(Symbols.scala:284)
	at scala.reflect.api.Symbols$SymbolApi.asClass$(Symbols.scala:284)
	at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:99)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala1(JavaMirrors.scala:1085)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.$anonfun$classToScala$1(JavaMirrors.scala:1040)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.$anonfun$toScala$1(JavaMirrors.scala:150)
	at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:50)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:148)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:1040)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToScala(JavaMirrors.scala:1148)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.$anonfun$completeRest$2(JavaMirrors.scala:816)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.$anonfun$completeRest$1(JavaMirrors.scala:816)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.completeRest(JavaMirrors.scala:810)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:806)
	at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1575)
	at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1538)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:221)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:158)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.info(SynchronizedSymbols.scala:221)
	at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1733)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.privateWithin(SynchronizedSymbols.scala:109)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.privateWithin$(SynchronizedSymbols.scala:107)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.privateWithin(SynchronizedSymbols.scala:221)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.privateWithin(SynchronizedSymbols.scala:221)
	at org.apache.spark.tools.GenerateMIMAIgnore$.isPackagePrivateModule(GenerateMIMAIgnore.scala:48)
	at org.apache.spark.tools.GenerateMIMAIgnore$.$anonfun$privateWithin$1(GenerateMIMAIgnore.scala:67)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.tools.GenerateMIMAIgnore$.privateWithin(GenerateMIMAIgnore.scala:61)
	at org.apache.spark.tools.GenerateMIMAIgnore$.main(GenerateMIMAIgnore.scala:125)
	at org.apache.spark.tools.GenerateMIMAIgnore.main(GenerateMIMAIgnore.scala)
```

### Why are the changes needed?

**BEFORE**
```
$ dev/mima | grep org.apache.spark.ml.util.DefaultParamsReader
Using SPARK_LOCAL_IP=localhost
Using SPARK_LOCAL_IP=localhost
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata$
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata
Using SPARK_LOCAL_IP=localhost

# I checked the following before deleing `.generated-mima-class-excludes `
$ cat .generated-mima-class-excludes | grep org.apache.spark.ml.util.DefaultParamsReader
org.apache.spark.ml.util.DefaultParamsReader$
org.apache.spark.ml.util.DefaultParamsReader#
org.apache.spark.ml.util.DefaultParamsReader
```

**AFTER**
```
$ dev/mima | grep org.apache.spark.ml.util.DefaultParamsReader
Using SPARK_LOCAL_IP=localhost
Using SPARK_LOCAL_IP=localhost
[WARN] Unable to detect inner functions for class:org.apache.spark.ml.util.DefaultParamsReader.Metadata
[WARN] Unable to detect inner functions for class:org.apache.spark.ml.util.DefaultParamsReader.Metadata
Using SPARK_LOCAL_IP=localhost

# I checked the following before deleting `.generated-mima-class-excludes `.
$ cat .generated-mima-class-excludes | grep org.apache.spark.ml.util.DefaultParamsReader
org.apache.spark.ml.util.DefaultParamsReader$Metadata$
org.apache.spark.ml.util.DefaultParamsReader$
org.apache.spark.ml.util.DefaultParamsReader#Metadata#
org.apache.spark.ml.util.DefaultParamsReader#
org.apache.spark.ml.util.DefaultParamsReader$Metadata
org.apache.spark.ml.util.DefaultParamsReader#Metadata
org.apache.spark.ml.util.DefaultParamsReader
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45938 from dongjoon-hyun/SPARK-47770.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: yangjie01 <[email protected]>
(cherry picked from commit 08c4963)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to remove redundant rules from `MimaExcludes` for Apache Spark 3.5.x.

Previously, these rules were required due to the `dev/mima` limitation which is fixed at
- #45938

### Why are the changes needed?

To minimize the exclusion rules for Apache Spark 3.5.x by removing the rules related to the following `private class`.

- `HadoopFSUtils`
https://github.com/apache/spark/blob/f0752f2701b1b8d5fbc38912edd9cd9325693bef/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L36

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes #45948 from dongjoon-hyun/SPARK-47774-3.5.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…cies from `commons-compress` and `avro*`

### Why are the changes needed?

This PR aims to exclude `commons-(io|lang3)` transitive dependencies from `commons-compress`, `avro`, and `avro-mapred` dependencies.

### Does this PR introduce _any_ user-facing change?

Apache Spark define and use our own versions. The exclusion of the transitive dependencies will clarify that.

https://github.com/apache/spark/blob/1a408033daf458f1ceebbe14a560355a1a2c0a70/pom.xml#L198

https://github.com/apache/spark/blob/1a408033daf458f1ceebbe14a560355a1a2c0a70/pom.xml#L194

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45278 from dongjoon-hyun/SPARK-47182.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
The pr aims to upgrade `commons-codec` from `1.16.0` to `1.16.1`.

1.The new version brings some bug fixed, eg:
- Fix possible IndexOutOfBoundException in PhoneticEngine.encode method #223. Fixes [CODEC-315](https://issues.apache.org/jira/browse/CODEC-315)
- Fix possible IndexOutOfBoundsException in PercentCodec.insertAlwaysEncodeChars() method #222. Fixes [CODEC-314](https://issues.apache.org/jira/browse/CODEC-314).

2.The full release notes:
    https://commons.apache.org/proper/commons-codec/changes-report.html#a1.16.1

No.

Pass GA.

No.

Closes #45152 from panbingkun/SPARK-47083.

Authored-by: panbingkun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

This PR aims to upgrade `commons-io` to 2.16.1.

### Why are the changes needed?

To bring the latest bug fixes
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.16.1 (2024-04-04)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.16.0 (2024-03-25)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.15.1 (2023-11-24)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.15.0 (2023-10-21)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.14.0 (2023-09-24)

### Does this PR introduce _any_ user-facing change?

Yes, this is a dependency change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45974 from dongjoon-hyun/SPARK-47790-3.5.

Authored-by: Dongjoon Hyun <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
…ion" when spark.sql.json.enablePartialResults is enabled

This PR fixes a bug that was introduced in [SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be precise, SPARK-47704 missed this corner case because I could not find a small stable repro for the problem at the time.

When `spark.sql.json.enablePartialResults` is enabled (which is the default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code:
```scala
val df = spark.read
  .schema("a array<map<string, struct<b boolean>>>")
  .json(path)
```
exception is thrown:
```
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class org.apache.spark.sql.catalyst.util.ArrayData (org.apache.spark.sql.catalyst.util.ArrayBasedMapData and org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
```

The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`:
```scala
val df = spark.read
  .schema("a map<string, array<struct<b boolean>>>")
  .json(path)
```

In both cases, we should partially parse the record, only struct with boolean type cannot be parsed:
- `Row(Array(Map("key" -> Row(null))))` in the first case.
- `Row(Map("key" -> Array(Row(null))))` in the second case.

We simply did not handle all of the partial results exceptions when converting array and map, instead of catching `PartialResultException` which is only for structs. Instead, we should catch `PartialValueException` that covers struct, map, and array.

Fixes a bug where user would encounter an exception instead of reading a partially parsed JSON record.

No.

I added unit tests that verify the fix.

No.

Closes #45833 from sadikovi/SPARK-47704.

Authored-by: Ivan Sadikov <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit a2b7050)
Signed-off-by: Hyukjin Kwon <[email protected]>
…ecution.arrow.maxRecordsPerBatch`

### What changes were proposed in this pull request?

This PR fixes the documentation of `spark.sql.execution.arrow.maxRecordsPerBatch` to clarify the relation between `spark.sql.execution.arrow.maxRecordsPerBatch` and grouping API such as `DataFrame(.cogroup).groupby.applyInPandas`.

### Why are the changes needed?

To address confusion about them.

### Does this PR introduce _any_ user-facing change?

Yes, it fixes the user-facing SQL configuration page https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration

### How was this patch tested?

CI in this PR should verify them. I ran linters.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes #45993 from HyukjinKwon/minor-doc-change.

Authored-by: Hyukjin Kwon <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit 6c8e4cf)
Signed-off-by: Hyukjin Kwon <[email protected]>
### What changes were proposed in this pull request?
This PR propose to make the link of spark properties with YARN more accurate.

### Why are the changes needed?
Currently, the link of `YARN Spark Properties` is just the page of `running-on-yarn.html`.
We should add the anchor point.

### Does this PR introduce _any_ user-facing change?
'Yes'.
More convenient for readers to read.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes #45994 from beliefer/accurate-yarn-link.

Authored-by: beliefer <[email protected]>
Signed-off-by: Dongjoon Hyun <[email protected]>
(cherry picked from commit aca3d10)
Signed-off-by: Dongjoon Hyun <[email protected]>
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes #46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <[email protected]>
Signed-off-by: Hyukjin Kwon <[email protected]>
(cherry picked from commit a0ccdf2)
Signed-off-by: Hyukjin Kwon <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment