Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark35 #3

Merged
merged 524 commits into from
Oct 23, 2024
Merged

Spark35 #3

merged 524 commits into from
Oct 23, 2024

Conversation

ejblanco
Copy link

What changes were proposed in this pull request?

Why are the changes needed?

Does this PR introduce any user-facing change?

How was this patch tested?

dongjoon-hyun and others added 30 commits April 8, 2024 21:46
…to return `false` instead of failing

### What changes were proposed in this pull request?

This PR aims to fix `GenerateMIMAIgnore.isPackagePrivateModule` to work correctly.

For example, `Metadata` is a case class inside package private `DefaultParamsReader` class. Currently, MIMA fails at this class analysis.

https://github.com/apache/spark/blob/f8e652e88320528a70e605a6a3cf986725e153a5/mllib/src/main/scala/org/apache/spark/ml/util/ReadWrite.scala#L474-L485

The root cause is `isPackagePrivateModule` fails due to `scala.ScalaReflectionException`. We can simply make `isPackagePrivateModule` return `false`  instead of failing.
```
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata
Exception in thread "main" scala.ScalaReflectionException: type Serializable is not a class
	at scala.reflect.api.Symbols$SymbolApi.asClass(Symbols.scala:284)
	at scala.reflect.api.Symbols$SymbolApi.asClass$(Symbols.scala:284)
	at scala.reflect.internal.Symbols$SymbolContextApiImpl.asClass(Symbols.scala:99)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala1(JavaMirrors.scala:1085)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.$anonfun$classToScala$1(JavaMirrors.scala:1040)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.$anonfun$toScala$1(JavaMirrors.scala:150)
	at scala.reflect.runtime.TwoWayCaches$TwoWayCache.toScala(TwoWayCaches.scala:50)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.toScala(JavaMirrors.scala:148)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.classToScala(JavaMirrors.scala:1040)
	at scala.reflect.runtime.JavaMirrors$JavaMirror.typeToScala(JavaMirrors.scala:1148)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.$anonfun$completeRest$2(JavaMirrors.scala:816)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.$anonfun$completeRest$1(JavaMirrors.scala:816)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.completeRest(JavaMirrors.scala:810)
	at scala.reflect.runtime.JavaMirrors$JavaMirror$FromJavaClassCompleter.complete(JavaMirrors.scala:806)
	at scala.reflect.internal.Symbols$Symbol.completeInfo(Symbols.scala:1575)
	at scala.reflect.internal.Symbols$Symbol.info(Symbols.scala:1538)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.scala$reflect$runtime$SynchronizedSymbols$SynchronizedSymbol$$super$info(SynchronizedSymbols.scala:221)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info(SynchronizedSymbols.scala:158)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.info$(SynchronizedSymbols.scala:158)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.info(SynchronizedSymbols.scala:221)
	at scala.reflect.internal.Symbols$Symbol.initialize(Symbols.scala:1733)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.privateWithin(SynchronizedSymbols.scala:109)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol.privateWithin$(SynchronizedSymbols.scala:107)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.privateWithin(SynchronizedSymbols.scala:221)
	at scala.reflect.runtime.SynchronizedSymbols$SynchronizedSymbol$$anon$13.privateWithin(SynchronizedSymbols.scala:221)
	at org.apache.spark.tools.GenerateMIMAIgnore$.isPackagePrivateModule(GenerateMIMAIgnore.scala:48)
	at org.apache.spark.tools.GenerateMIMAIgnore$.$anonfun$privateWithin$1(GenerateMIMAIgnore.scala:67)
	at scala.collection.immutable.List.foreach(List.scala:334)
	at org.apache.spark.tools.GenerateMIMAIgnore$.privateWithin(GenerateMIMAIgnore.scala:61)
	at org.apache.spark.tools.GenerateMIMAIgnore$.main(GenerateMIMAIgnore.scala:125)
	at org.apache.spark.tools.GenerateMIMAIgnore.main(GenerateMIMAIgnore.scala)
```

### Why are the changes needed?

**BEFORE**
```
$ dev/mima | grep org.apache.spark.ml.util.DefaultParamsReader
Using SPARK_LOCAL_IP=localhost
Using SPARK_LOCAL_IP=localhost
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata$
Error instrumenting class:org.apache.spark.ml.util.DefaultParamsReader$Metadata
Using SPARK_LOCAL_IP=localhost

# I checked the following before deleing `.generated-mima-class-excludes `
$ cat .generated-mima-class-excludes | grep org.apache.spark.ml.util.DefaultParamsReader
org.apache.spark.ml.util.DefaultParamsReader$
org.apache.spark.ml.util.DefaultParamsReader#
org.apache.spark.ml.util.DefaultParamsReader
```

**AFTER**
```
$ dev/mima | grep org.apache.spark.ml.util.DefaultParamsReader
Using SPARK_LOCAL_IP=localhost
Using SPARK_LOCAL_IP=localhost
[WARN] Unable to detect inner functions for class:org.apache.spark.ml.util.DefaultParamsReader.Metadata
[WARN] Unable to detect inner functions for class:org.apache.spark.ml.util.DefaultParamsReader.Metadata
Using SPARK_LOCAL_IP=localhost

# I checked the following before deleting `.generated-mima-class-excludes `.
$ cat .generated-mima-class-excludes | grep org.apache.spark.ml.util.DefaultParamsReader
org.apache.spark.ml.util.DefaultParamsReader$Metadata$
org.apache.spark.ml.util.DefaultParamsReader$
org.apache.spark.ml.util.DefaultParamsReader#Metadata#
org.apache.spark.ml.util.DefaultParamsReader#
org.apache.spark.ml.util.DefaultParamsReader$Metadata
org.apache.spark.ml.util.DefaultParamsReader#Metadata
org.apache.spark.ml.util.DefaultParamsReader
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Manual tests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45938 from dongjoon-hyun/SPARK-47770.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 08c4963)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

This PR aims to remove redundant rules from `MimaExcludes` for Apache Spark 3.5.x.

Previously, these rules were required due to the `dev/mima` limitation which is fixed at
- apache#45938

### Why are the changes needed?

To minimize the exclusion rules for Apache Spark 3.5.x by removing the rules related to the following `private class`.

- `HadoopFSUtils`
https://github.com/apache/spark/blob/f0752f2701b1b8d5fbc38912edd9cd9325693bef/core/src/main/scala/org/apache/spark/util/HadoopFSUtils.scala#L36

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#45948 from dongjoon-hyun/SPARK-47774-3.5.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…cies from `commons-compress` and `avro*`

### Why are the changes needed?

This PR aims to exclude `commons-(io|lang3)` transitive dependencies from `commons-compress`, `avro`, and `avro-mapred` dependencies.

### Does this PR introduce _any_ user-facing change?

Apache Spark define and use our own versions. The exclusion of the transitive dependencies will clarify that.

https://github.com/apache/spark/blob/1a408033daf458f1ceebbe14a560355a1a2c0a70/pom.xml#L198

https://github.com/apache/spark/blob/1a408033daf458f1ceebbe14a560355a1a2c0a70/pom.xml#L194

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45278 from dongjoon-hyun/SPARK-47182.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
The pr aims to upgrade `commons-codec` from `1.16.0` to `1.16.1`.

1.The new version brings some bug fixed, eg:
- Fix possible IndexOutOfBoundException in PhoneticEngine.encode method apache#223. Fixes [CODEC-315](https://issues.apache.org/jira/browse/CODEC-315)
- Fix possible IndexOutOfBoundsException in PercentCodec.insertAlwaysEncodeChars() method apache#222. Fixes [CODEC-314](https://issues.apache.org/jira/browse/CODEC-314).

2.The full release notes:
    https://commons.apache.org/proper/commons-codec/changes-report.html#a1.16.1

No.

Pass GA.

No.

Closes apache#45152 from panbingkun/SPARK-47083.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

This PR aims to upgrade `commons-io` to 2.16.1.

### Why are the changes needed?

To bring the latest bug fixes
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.16.1 (2024-04-04)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.16.0 (2024-03-25)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.15.1 (2023-11-24)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.15.0 (2023-10-21)
- https://commons.apache.org/proper/commons-io/changes-report.html#a2.14.0 (2023-09-24)

### Does this PR introduce _any_ user-facing change?

Yes, this is a dependency change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45974 from dongjoon-hyun/SPARK-47790-3.5.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…ion" when spark.sql.json.enablePartialResults is enabled

This PR fixes a bug that was introduced in [SPARK-47704](https://issues.apache.org/jira/browse/SPARK-47704). To be precise, SPARK-47704 missed this corner case because I could not find a small stable repro for the problem at the time.

When `spark.sql.json.enablePartialResults` is enabled (which is the default), if a user tries to read `{"a":[{"key":{"b":0}}]}` with the code:
```scala
val df = spark.read
  .schema("a array<map<string, struct<b boolean>>>")
  .json(path)
```
exception is thrown:
```
java.lang.ClassCastException: class org.apache.spark.sql.catalyst.util.ArrayBasedMapData cannot be cast to class org.apache.spark.sql.catalyst.util.ArrayData (org.apache.spark.sql.catalyst.util.ArrayBasedMapData and org.apache.spark.sql.catalyst.util.ArrayData are in unnamed module of loader 'app')
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray(rows.scala:53)
at org.apache.spark.sql.catalyst.expressions.BaseGenericInternalRow.getArray$(rows.scala:53)
at org.apache.spark.sql.catalyst.expressions.GenericInternalRow.getArray(rows.scala:172)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$SpecificUnsafeProjection.apply(Unknown Source)
at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1$$anon$2.getNext(FileScanRDD.scala:605)
at org.apache.spark.util.NextIterator.hasNext(NextIterator.scala:73)
at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.$anonfun$prepareNextFile$1(FileScanRDD.scala:884)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
```

The same happens when map and array are reversed: `{"a":{"key":[{"b":0}]}}`:
```scala
val df = spark.read
  .schema("a map<string, array<struct<b boolean>>>")
  .json(path)
```

In both cases, we should partially parse the record, only struct with boolean type cannot be parsed:
- `Row(Array(Map("key" -> Row(null))))` in the first case.
- `Row(Map("key" -> Array(Row(null))))` in the second case.

We simply did not handle all of the partial results exceptions when converting array and map, instead of catching `PartialResultException` which is only for structs. Instead, we should catch `PartialValueException` that covers struct, map, and array.

Fixes a bug where user would encounter an exception instead of reading a partially parsed JSON record.

No.

I added unit tests that verify the fix.

No.

Closes apache#45833 from sadikovi/SPARK-47704.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a2b7050)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
…ecution.arrow.maxRecordsPerBatch`

### What changes were proposed in this pull request?

This PR fixes the documentation of `spark.sql.execution.arrow.maxRecordsPerBatch` to clarify the relation between `spark.sql.execution.arrow.maxRecordsPerBatch` and grouping API such as `DataFrame(.cogroup).groupby.applyInPandas`.

### Why are the changes needed?

To address confusion about them.

### Does this PR introduce _any_ user-facing change?

Yes, it fixes the user-facing SQL configuration page https://spark.apache.org/docs/latest/configuration.html#runtime-sql-configuration

### How was this patch tested?

CI in this PR should verify them. I ran linters.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#45993 from HyukjinKwon/minor-doc-change.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 6c8e4cf)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
This PR propose to make the link of spark properties with YARN more accurate.

### Why are the changes needed?
Currently, the link of `YARN Spark Properties` is just the page of `running-on-yarn.html`.
We should add the anchor point.

### Does this PR introduce _any_ user-facing change?
'Yes'.
More convenient for readers to read.

### How was this patch tested?
N/A

### Was this patch authored or co-authored using generative AI tooling?
'No'.

Closes apache#45994 from beliefer/accurate-yarn-link.

Authored-by: beliefer <beliefer@163.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit aca3d10)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

Use the monotonically ID as a sorting condition for `max_by` instead of a literal string.

### Why are the changes needed?
apache#35191 had a error where the literal string `"__monotonically_increasing_id__"` was used as the tie-breaker in `max_by` instead of the actual ID.

### Does this PR introduce _any_ user-facing change?
Fixes nondeterminism in `asof`

### How was this patch tested?
In some circumstances `//python:pyspark.pandas.tests.connect.series.test_parity_as_of` is sufficient to reproduce

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#46018 from markj-db/SPARK-47824.

Authored-by: Mark Jarvin <mark.jarvin@databricks.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit a0ccdf2)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
… to follow standard KEX practices

### What changes were proposed in this pull request?

Backport of SPARK-47318 to v3.5.0

This change adds an additional pass through a key derivation function (KDF) to the key exchange protocol in `AuthEngine`. Currently, it uses the shared secret from a bespoke key negotiation protocol directly. This is an encoded X coordinate on the X25519 curve. It is atypical and not recommended to use that coordinate directly as a key, but rather to pass it to an KDF.

Note, Spark now supports TLS for RPC calls. It is preferable to use that rather than the bespoke AES RPC encryption implemented by `AuthEngine` and `TransportCipher`.

### Why are the changes needed?

This follows best practices of key negotiation protocols. The encoded X coordinate is not guaranteed to be uniformly distributed over the 32-byte key space. Rather, we pass it through a HKDF function to map it uniformly to a 16-byte key space.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Exiting tests under:
`build/sbt "network-common/test:testOnly"`

Specifically:
`build/sbt "network-common/test:testOnly org.apache.spark.network.crypto.AuthEngineSuite"`
`build/sbt "network-common/test:testOnly org.apache.spark.network.crypto.AuthIntegrationSuite"`

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46014 from sweisdb/SPARK-47318-v3.5.0.

Lead-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Co-authored-by: Steve Weis <steve.weis@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
… with invalid plan

cherry pick fix apache#46023 to 3.5

Closes apache#46050 from zhengruifeng/connect_fix_overwrite_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…gate/Join nodes

### What changes were proposed in this pull request?

Streaming queries with Union of 2 data streams followed by an Aggregate (groupBy) can produce incorrect results if the grouping key is a constant literal for micro-batch duration.

The query produces incorrect results because the query optimizer recognizes the literal value in the grouping key as foldable and replaces the grouping key expression with the actual literal value. This optimization is correct for batch queries. However Streaming queries also read information from StateStore, and the output contains both the results from StateStore (computed in previous microbatches) and data from input sources (computed in this microbatch). The HashAggregate node after StateStore always reads grouping key value as the optimized literal (as the grouping key expression is optimized into a literal by the optimizer). This ends up replacing keys in StateStore with the literal value resulting in incorrect output.

See an example logical and physical plan below for a query performing a union on 2 data streams, followed by a groupBy. Note that the name#4 expression has been optimized to ds1. The Streaming query Aggregate adds StateStoreSave node as child of HashAggregate, however any grouping key read from StateStore will still be read as ds1 due to the optimization.

### Optimized Logical Plan

```
=== Applying Rule org.apache.spark.sql.catalyst.optimizer.FoldablePropagation ===

=== Old Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, Complete, 0
+- Aggregate [name#4], [name#4, count(1) AS count#31L]
   +- Project [ds1 AS name#4]
  	+- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource

=== New Plan ===

WriteToMicroBatchDataSource MemorySink, eb67645e-30fc-41a8-8006-35bb7649c202, Complete, 0
+- Aggregate [ds1], [ds1 AS name#4, count(1) AS count#31L]
   +- Project [ds1 AS name#4]
  	+- StreamingDataSourceV2ScanRelation[value#1] MemoryStreamDataSource

====
```

### Corresponding Physical Plan

```
WriteToDataSourceV2 MicroBatchWrite[epoch: 0, writer: org.apache.spark.sql.execution.streaming.sources.MemoryStreamingWrite2b4c6242], org.apache.spark.sql.execution.datasources.v2.DataSourceV2Strategy$$Lambda$3143/185907563435709d26
+- HashAggregate(keys=[ds1#39], functions=[finalmerge_count(merge count#38L) AS count(1)#30L], output=[name#4, count#31L])
   +- StateStoreSave [ds1#39], state info [ checkpoint = file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5], Complete, 0, 0, 2
  	+- HashAggregate(keys=[ds1#39], functions=[merge_count(merge count#38L) AS count#38L], output=[ds1#39, count#38L])
     	+- StateStoreRestore [ds1#39], state info [ checkpoint = file:/tmp/streaming.metadata-e470782a-18a3-463c-9e61-3a10d0bdf180/state, runId = 4dedecca-910c-4518-855e-456702617414, opId = 0, ver = 0, numPartitions = 5], 2
        	+- HashAggregate(keys=[ds1#39], functions=[merge_count(merge count#38L) AS count#38L], output=[ds1#39, count#38L])
           	+- HashAggregate(keys=[ds1 AS ds1#39], functions=[partial_count(1) AS count#38L], output=[ds1#39, count#38L])
              	+- Project
                 	+- MicroBatchScan[value#1] MemoryStreamDataSource

```

This PR disables foldable propagation across Streaming Aggregate/Join nodes in the logical plan.

### Why are the changes needed?

Changes are needed to ensure that Streaming queries with literal value for grouping key/join key produce correct results.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added `sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingQueryOptimizationCorrectnessSuite.scala` testcase.

```

[info] Run completed in 54 seconds, 150 milliseconds.
[info] Total number of tests run: 9
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 9, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.

```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46035 from sahnib/SPARK-47840.

Authored-by: Bhuwan Sahni <bhuwan.sahni@databricks.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit f217193)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?

This PR aims to upgrade `Apache Maven` to 3.9.6 for Apache Spark 3.5.2+

This is a backport of the following PR. `Apache Maven 3.9.6` has been used over 4 months in `master` branch.
- apache#44267

### Why are the changes needed?

To bring the latest bug fixes,

- https://maven.apache.org/docs/3.9.0/release-notes.html
- https://maven.apache.org/docs/3.9.1/release-notes.html
- https://maven.apache.org/docs/3.9.2/release-notes.html
- https://maven.apache.org/docs/3.9.3/release-notes.html
- https://maven.apache.org/docs/3.9.5/release-notes.html
- https://maven.apache.org/docs/3.9.6/release-notes.html

### Does this PR introduce _any_ user-facing change?

No because this is a build time change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46069 from dongjoon-hyun/SPARK-46335.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

This is a followup of apache#43797 . GROUP BY ALL has the same bug and this PR applies the same fix to GROUP BY ALL

### Why are the changes needed?

For advanced users or Spark plugins, they may manipulate the logical plans directly. We need to make the framework more reliable.

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#46113 from cloud-fan/group-all.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit b5bb75c)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…n type of boolean

Backports apache#45589 to 3.5

### What changes were proposed in this pull request?

Use V2Predicate to wrap If expr when building v2 expressions.

### Why are the changes needed?

The `PushFoldableIntoBranches` optimizer may fold predicate into (if / case) branches and `V2ExpressionBuilder` wraps `If` as `GeneralScalarExpression`, which causes the assertion in `PushablePredicate.unapply` to fail.

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

added unit test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46074 from wForget/SPARK-47463_3.5.

Authored-by: Zhen Wang <643348094@qq.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…cala 2.12

### What changes were proposed in this pull request?

Fix `ExpressionSet` performance regression in scala 2.12.

### Why are the changes needed?

The implementation of the `SetLike.++` method in scala 2.12 is to iteratively execute the `+` method. The `ExpressionSet.+` method first clones a new object and then adds element, which is very expensive.

https://github.com/scala/scala/blob/ceaf7e68ac93e9bbe8642d06164714b2de709c27/src/library/scala/collection/SetLike.scala#L186

After apache#36121, the `++` and `--` methods in ExpressionSet of scala 2.12 were removed, causing performance regression.

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

Benchmark code:

```
object TestBenchmark {
  def main(args: Array[String]): Unit = {
    val count = 300
    val benchmark = new Benchmark("Test ExpressionSetV2 ++ ", count)
    val aUpper = AttributeReference("A", IntegerType)(exprId = ExprId(1))

    var initialSet = ExpressionSet((0 until 300).map(i => aUpper + i))
    val setToAddWithSameDeterministicExpression = ExpressionSet((0 until 300).map(i => aUpper + i))

    benchmark.addCase("Test ++", 10) { _: Int =>
      for (_ <- 0L until count) {
        initialSet ++= setToAddWithSameDeterministicExpression
      }
    }
    benchmark.run()
  }
}
```

before this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                            1577           1691          61          0.0     5255516.0       1.0X
```

after this change:

```
OpenJDK 64-Bit Server VM 1.8.0_222-b10 on Linux 3.10.0-957.el7.x86_64
Intel Core Processor (Skylake, IBRS)
Test ExpressionSetV2 ++ :                 Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
Test ++                                              14             14           0          0.0       45395.2       1.0X
```

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46114 from wForget/SPARK-47897.

Authored-by: Zhen Wang <643348094@qq.com>
Signed-off-by: Kent Yao <yao@apache.org>
…matTestDataGenerator` deprecated

### What changes were proposed in this pull request?
The pr aims to make `KinesisTestUtils` & `WriteInputFormatTestDataGenerator` deprecated
Master pr: apache#46000

### Why are the changes needed?
Because these two classes will be moved from `main` to` test` in version `4.0`, we need to first `deprecate` them in version `3.5` to make them exit more naturally.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#46019 from panbingkun/branch-3.5_deprecated.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…eStableIdentifiersForUnionType

### What changes were proposed in this pull request?

Backport of apache#46126 to branch-3.5.

When `enableStableIdentifiersForUnionType` is enabled, all of the types are lowercased which creates a problem when field types are case-sensitive:

Union type with fields:
```
Schema.createEnum("myENUM", "", null, List[String]("E1", "e2").asJava),
Schema.createRecord("myRecord2", "", null, false, List[Schema.Field](new Schema.Field("F", Schema.create(Type.FLOAT))).asJava)
```

would become

```
struct<member_myenum: string, member_myrecord2: struct<f: float>>
```

but instead should be
```
struct<member_myENUM: string, member_myRecord2: struct<F: float>>
```

### Why are the changes needed?

Fixes a bug of lowercasing the field name (the type portion).

### Does this PR introduce _any_ user-facing change?

Yes, if a user enables `enableStableIdentifiersForUnionType` and has Union types, all fields will preserve the case. Previously, the field names would be all in lowercase.

### How was this patch tested?

I added a test case to verify the new field names.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46169 from sadikovi/SPARK-47904-3.5.

Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…leanup

([Original PR](apache#46027))

### What changes were proposed in this pull request?

Expired sessions are regularly checked and cleaned up by a maintenance thread. However, currently, this process is synchronous. Therefore, in rare cases, interrupting the execution thread of a query in a session can take hours, causing the entire maintenance process to stall, resulting in a large amount of memory not being cleared.

We address this by introducing asynchronous callbacks for execution cleanup, avoiding synchronous joins of execution threads, and preventing the maintenance thread from stalling in the above scenarios. To be more specific, instead of calling `runner.join()` in `ExecutorHolder.close()`, we set a post-cleanup function as the callback through `runner.processOnCompletion`, which will be called asynchronously once the execution runner is completed or interrupted. In this way, the maintenance thread won't get blocked on joining an execution thread.

### Why are the changes needed?

In the rare cases mentioned above, performance can be severely affected.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Existing tests and a new test `Async cleanup callback gets called after the execution is closed` in `ReattachableExecuteSuite.scala`.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46064 from xi-db/SPARK-47819-async-cleanup-3.5.

Authored-by: Xi Lyu <xi.lyu@databricks.com>
Signed-off-by: Herman van Hovell <herman@databricks.com>
…n#allAttributes` for more consistent canonicalization

This is a backport of apache#45763 to branch-3.5.

### What changes were proposed in this pull request?

Modify `LateralJoin` to include right-side plan output in `allAttributes`.

### Why are the changes needed?

In the following example, the view v1 is cached, but a query of v1 does not use the cache:
```
CREATE or REPLACE TEMP VIEW t1(c1, c2) AS VALUES (0, 1), (1, 2);
CREATE or REPLACE TEMP VIEW t2(c1, c2) AS VALUES (0, 1), (1, 2);

create or replace temp view v1 as
select *
from t1
join lateral (
  select c1 as a, c2 as b
  from t2)
on c1 = a;

cache table v1;

explain select * from v1;
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- BroadcastHashJoin [c1#180], [a#173], Inner, BuildRight, false
   :- LocalTableScan [c1#180, c2#181]
   +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, false] as bigint)),false), [plan_id=113]
      +- LocalTableScan [a#173, b#174]
```

The canonicalized version of the `LateralJoin` node is not consistent when there is a join condition. For example, for the above query, the join condition is canonicalized as follows:
```
Before canonicalization: Some((c1#174 = a#167))
After canonicalization:  Some((none#0 = none#167))
```
You can see that the `exprId` for the second operand of `EqualTo` is not normalized (it remains 167). That's because the attribute `a` from the right-side plan is not included `allAttributes`.

This PR adds right-side attributes to `allAttributes` so that references to right-side attributes in the join condition are normalized during canonicalization.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

New test.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46190 from bersprockets/lj_canonical_issue_35.

Authored-by: Bruce Robbins <bersprockets@gmail.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
### What changes were proposed in this pull request?

This PR fixes a bug in the ExecuteJobTag creation in ExecuteHolder. The sessionId and userId are reversed.

https://github.com/apache/spark/blob/8aa8ad6be7b3eeceafa2ad1e9211fb8133bb675c/connector/connect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteHolder.scala#L296-L299

### Why are the changes needed?

To fix a bug

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Existing test

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46140 from allisonwang-db/spark-47921-execute-job-tag.

Authored-by: allisonwang-db <allison.wang@databricks.com>
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
(cherry picked from commit 5a1559a)
Signed-off-by: Takuya UESHIN <ueshin@databricks.com>
…th dictionary and nulls

This fixes how `ColumnVector` handles copying arrays when the vector has a dictionary and null values. The possible issues with the previous implementation:
- An `ArrayIndexOutOfBoundsException` may be thrown when the `ColumnVector` has nulls and dictionaries. This is because the dictionary id for `null` entries might be invalid and should not be used for `null` entries.
- Copying a `ColumnarArray` (which contains a `ColumnVector`) is incorrect, if it contains `null` entries. This is because copying a primitive array does not take into account the `null` entries, so all the null entries get lost.

These changes are needed to avoid `ArrayIndexOutOfBoundsException` and to produce correct results when copying `ColumnarArray`.

The only user facing changes are to fix existing errors and incorrect results.

Added new unit tests.

No.

Closes apache#46254 from gene-db/dictionary-nulls.

Authored-by: Gene Pang <gene.pang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 76ce6b0)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
This PR fixes a correctness issue by moving the batch that resolves udf decoders to after the `UpdateNullability` batch. This means we now derive a  decoder with the updated attributes which fixes a correctness issue.

I think the issue has existed since apache#28645 when udf support case class arguments was added. So therefore this issue should be present in all currently supported versions.

Currently the following code
```
scala> val ds1 = Seq(1).toDS()
     | val ds2 = Seq[Int]().toDS()
     | val f = udf[Tuple1[Option[Int]],Tuple1[Option[Int]]](identity)
     | ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(f(struct(ds2("value")))).collect()
val ds1: org.apache.spark.sql.Dataset[Int] = [value: int]
val ds2: org.apache.spark.sql.Dataset[Int] = [value: int]
val f: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$2481/0x00007f7f50961f086b1a2c9f,StructType(StructField(_1,IntegerType,true)),List(Some(class[_1[0]: int])),Some(class[_1[0]: int]),None,true,true)
val res0: Array[org.apache.spark.sql.Row] = Array([[0]])
```
results in an row containing `0` this is incorrect as the value should be `null`. Removing the udf call
```
scala> ds1.join(ds2, ds1("value") === ds2("value"), "left_outer").select(struct(ds2("value"))).collect()
val res1: Array[org.apache.spark.sql.Row] = Array([[null]])
```
gives the correct value.

Yes, fixes a correctness issue when using ScalaUDFs.

Existing and new unit tests.

No.

Closes apache#46156 from eejbyfeldt/SPARK-47927.

Authored-by: Emil Ejbyfeldt <eejbyfeldt@liveintent.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 8b8ea60)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…mark

### What changes were proposed in this pull request?

This PR fixes an NPE in MapStatusesSerDeserBenchmark. The cause is that we try to stop the tracker twice.

```
3197java.lang.NullPointerException: Cannot invoke "org.apache.spark.rpc.RpcEndpointRef.askSync(Object, scala.reflect.ClassTag)" because the return value of "org.apache.spark.MapOutputTracker.trackerEndpoint()" is null
3198	at org.apache.spark.MapOutputTracker.askTracker(MapOutputTracker.scala:541)
3199	at org.apache.spark.MapOutputTracker.sendTracker(MapOutputTracker.scala:551)
3200	at org.apache.spark.MapOutputTrackerMaster.stop(MapOutputTracker.scala:1242)
3201	at org.apache.spark.SparkEnv.stop(SparkEnv.scala:112)
3202	at org.apache.spark.SparkContext.$anonfun$stop$25(SparkContext.scala:2354)
3203	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1294)
3204	at org.apache.spark.SparkContext.stop(SparkContext.scala:2354)
3205	at org.apache.spark.SparkContext.stop(SparkContext.scala:2259)
3206	at org.apache.spark.MapStatusesSerDeserBenchmark$.afterAll(MapStatusesSerDeserBenchmark.scala:128)
3207	at org.apache.spark.benchmark.BenchmarkBase.main(BenchmarkBase.scala:80)
3208	at org.apache.spark.MapStatusesSerDeserBenchmark.main(MapStatusesSerDeserBenchmark.scala)
3209	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
3210	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
3211	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
3212	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
3213	at org.apache.spark.benchmark.Benchmarks$.$anonfun$main$7(Benchmarks.scala:128)
3214	at scala.collection.ArrayOps$.foreach$extension(ArrayOps.scala:1323)
3215	at org.apache.spark.benchmark.Benchmarks$.main(Benchmarks.scala:91)
3216	at org.apache.spark.benchmark.Benchmarks.main(Benchmarks.scala)
```
### Why are the changes needed?

test bugfix

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

manually

### Was this patch authored or co-authored using generative AI tooling?
no

Closes apache#46270 from yaooqinn/SPARK-48034.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Kent Yao <yao@apache.org>
(cherry picked from commit 59d5946)
Signed-off-by: Kent Yao <yao@apache.org>
 Currently, the following query will throw DIVIDE_BY_ZERO error instead of returning null
 ```
SELECT try_divide(1, decimal(0));
```

This is caused by the rule `DecimalPrecision`:
```
case b  BinaryOperator(left, right) if left.dataType != right.dataType =>
  (left, right) match {
 ...
    case (l: Literal, r) if r.dataType.isInstanceOf[DecimalType] &&
        l.dataType.isInstanceOf[IntegralType] &&
        literalPickMinimumPrecision =>
      b.makeCopy(Array(Cast(l, DataTypeUtils.fromLiteral(l)), r))
```
The result of the above makeCopy will contain `ANSI` as the `evalMode`, instead of `TRY`.
This PR is to fix this bug by replacing the makeCopy method calls with withNewChildren

Bug fix in try_* functions.

Yes, it fixes a long-standing bug in the try_divide function.

New UT

No

Closes apache#46286 from gengliangwang/avoidMakeCopy.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
(cherry picked from commit 3fbcb26)
Signed-off-by: Gengliang Wang <gengliang@apache.org>
…plan properly

### What changes were proposed in this pull request?
Make `ResolveRelations` handle plan id properly

cherry-pick bugfix apache#45214 to 3.5

### Why are the changes needed?
bug fix for Spark Connect, it won't affect classic Spark SQL

before this PR:
```
from pyspark.sql import functions as sf

spark.range(10).withColumn("value_1", sf.lit(1)).write.saveAsTable("test_table_1")
spark.range(10).withColumnRenamed("id", "index").withColumn("value_2", sf.lit(2)).write.saveAsTable("test_table_2")

df1 = spark.read.table("test_table_1")
df2 = spark.read.table("test_table_2")
df3 = spark.read.table("test_table_1")

join1 = df1.join(df2, on=df1.id==df2.index).select(df2.index, df2.value_2)
join2 = df3.join(join1, how="left", on=join1.index==df3.id)

join2.schema
```

fails with
```
AnalysisException: [CANNOT_RESOLVE_DATAFRAME_COLUMN] Cannot resolve dataframe column "id". It's probably because of illegal references like `df1.select(df2.col("a"))`. SQLSTATE: 42704
```

That is due to existing plan caching in `ResolveRelations` doesn't work with Spark Connect

```
=== Applying Rule org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveRelations ===
 '[apache#12]Join LeftOuter, '`==`('index, 'id)                     '[apache#12]Join LeftOuter, '`==`('index, 'id)
!:- '[apache#9]UnresolvedRelation [test_table_1], [], false         :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
!+- '[apache#11]Project ['index, 'value_2]                          :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!   +- '[apache#10]Join Inner, '`==`('id, 'index)                   +- '[apache#11]Project ['index, 'value_2]
!      :- '[apache#7]UnresolvedRelation [test_table_1], [], false      +- '[apache#10]Join Inner, '`==`('id, 'index)
!      +- '[apache#8]UnresolvedRelation [test_table_2], [], false         :- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
!                                                                   :  +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
!                                                                   +- '[apache#8]SubqueryAlias spark_catalog.default.test_table_2
!                                                                      +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_2`, [], false

Can not resolve 'id with plan 7
```

`[apache#7]UnresolvedRelation [test_table_1], [], false` was wrongly resolved to the cached one
```
:- '[apache#9]SubqueryAlias spark_catalog.default.test_table_1
   +- 'UnresolvedCatalogRelation `spark_catalog`.`default`.`test_table_1`, [], false
```

### Does this PR introduce _any_ user-facing change?
yes, bug fix

### How was this patch tested?
added ut

### Was this patch authored or co-authored using generative AI tooling?
ci

Closes apache#46291 from zhengruifeng/connect_fix_read_join_35.

Authored-by: Ruifeng Zheng <ruifengz@apache.org>
Signed-off-by: Ruifeng Zheng <ruifengz@apache.org>
### What changes were proposed in this pull request?

This PR aims to fix `mypy` failure by propagating `lint-python`'s `PYTHON_EXECUTABLE` to `mypy`'s parameter correctly.

### Why are the changes needed?

We assumed that `PYTHON_EXECUTABLE` is used for `dev/lint-python` like the following. That's not always guaranteed. We need to use `mypy`'s parameter to make it sure.
https://github.com/apache/spark/blob/ff401dde50343c9bbc1c49a0294272f2da7d01e2/.github/workflows/build_and_test.yml#L705

This patch is useful whose `python3` chooses one of multiple Python installation like our CI environment.
```
$ docker run -it --rm ghcr.io/apache/apache-spark-ci-image:master-8905641334 bash
WARNING: The requested image's platform (linux/amd64) does not match the detected host platform (linux/arm64/v8) and no specific platform was requested
root2ef6ce08d2c4:/# python3 --version
Python 3.10.12
root2ef6ce08d2c4:/# python3.9 --version
Python 3.9.19
```

For example, the following shows that `PYTHON_EXECUTABLE` is not considered by `mypy`.
```
root18c8eae5791e:/spark# PYTHON_EXECUTABLE=python3.9 mypy --python-executable=python3.11 --namespace-packages --config-file python/mypy.ini python/pyspark | wc -l
3428
root18c8eae5791e:/spark# PYTHON_EXECUTABLE=python3.9 mypy --namespace-packages --config-file python/mypy.ini python/pyspark | wc -l
1
root18c8eae5791e:/spark# PYTHON_EXECUTABLE=python3.11 mypy --namespace-packages --config-file python/mypy.ini python/pyspark | wc -l
1
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#46314 from dongjoon-hyun/SPARK-48068.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 26c871f)
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
…cimals

### What changes were proposed in this pull request?

 Currently, the following query will throw DIVIDE_BY_ZERO error instead of returning null
 ```
SELECT try_divide(1, decimal(0));
```

This is caused by the rule `DecimalPrecision`:
```
case b  BinaryOperator(left, right) if left.dataType != right.dataType =>
  (left, right) match {
 ...
    case (l: Literal, r) if r.dataType.isInstanceOf[DecimalType] &&
        l.dataType.isInstanceOf[IntegralType] &&
        literalPickMinimumPrecision =>
      b.makeCopy(Array(Cast(l, DataTypeUtils.fromLiteral(l)), r))
```
The result of the above makeCopy will contain `ANSI` as the `evalMode`, instead of `TRY`.
This PR is to fix this bug by replacing the makeCopy method calls with withNewChildren

### Why are the changes needed?

Bug fix in try_* functions.

### Does this PR introduce _any_ user-facing change?

Yes, it fixes a long-standing bug in the try_divide function.

### How was this patch tested?

New UT

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#46323 from gengliangwang/pickFix.

Authored-by: Gengliang Wang <gengliang@apache.org>
Signed-off-by: Gengliang Wang <gengliang@apache.org>
…ericAlias in Python 3.11+

### What changes were proposed in this pull request?

This PR aims to fix `type hints` to handle `list` GenericAlias in Python 3.11+ for Apache Spark 4.0.0 and 3.5.1.
- https://github.com/apache/spark/actions/workflows/build_python.yml

### Why are the changes needed?

PEP 646 changes `GenericAlias` instances into `Iterable` ones at Python 3.11.
- https://peps.python.org/pep-0646/

This behavior changes introduce the following failure on Python 3.11.

- **Python 3.11.6**

```python
Python 3.11.6 (main, Nov  1 2023, 07:46:30) [Clang 14.0.0 (clang-1400.0.28.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/18 16:34:09 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.11.6 (main, Nov  1 2023 07:46:30)
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1700354049391).
SparkSession available as 'spark'.
>>> from pyspark import pandas as ps
>>> from typing import List
>>> ps.DataFrame[float, [int, List[int]]]
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/Users/dongjoon/APACHE/spark-release/spark-3.5.0-bin-hadoop3/python/pyspark/pandas/frame.py", line 13647, in __class_getitem__
    return create_tuple_for_frame_type(params)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dongjoon/APACHE/spark-release/spark-3.5.0-bin-hadoop3/python/pyspark/pandas/typedef/typehints.py", line 717, in create_tuple_for_frame_type
    return Tuple[_to_type_holders(params)]
                 ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dongjoon/APACHE/spark-release/spark-3.5.0-bin-hadoop3/python/pyspark/pandas/typedef/typehints.py", line 762, in _to_type_holders
    data_types = _new_type_holders(data_types, NameTypeHolder)
                 ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/Users/dongjoon/APACHE/spark-release/spark-3.5.0-bin-hadoop3/python/pyspark/pandas/typedef/typehints.py", line 828, in _new_type_holders
    raise TypeError(
TypeError: Type hints should be specified as one of:
  - DataFrame[type, type, ...]
  - DataFrame[name: type, name: type, ...]
  - DataFrame[dtypes instance]
  - DataFrame[zip(names, types)]
  - DataFrame[index_type, [type, ...]]
  - DataFrame[(index_name, index_type), [(name, type), ...]]
  - DataFrame[dtype instance, dtypes instance]
  - DataFrame[(index_name, index_type), zip(names, types)]
  - DataFrame[[index_type, ...], [type, ...]]
  - DataFrame[[(index_name, index_type), ...], [(name, type), ...]]
  - DataFrame[dtypes instance, dtypes instance]
  - DataFrame[zip(index_names, index_types), zip(names, types)]
However, got (<class 'int'>, typing.List[int]).
```

- **Python 3.10.13**

```python
Python 3.10.13 (main, Sep 29 2023, 16:03:45) [Clang 14.0.0 (clang-1400.0.28.1)] on darwin
Type "help", "copyright", "credits" or "license" for more information.
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/11/18 16:33:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.5.0
      /_/

Using Python version 3.10.13 (main, Sep 29 2023 16:03:45)
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1700354002048).
SparkSession available as 'spark'.
>>> from pyspark import pandas as ps
>>> from typing import List
>>> ps.DataFrame[float, [int, List[int]]]
typing.Tuple[pyspark.pandas.typedef.typehints.IndexNameType, pyspark.pandas.typedef.typehints.NameType, pyspark.pandas.typedef.typehints.NameType]
>>>
```

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Pass the CIs. Manually test with Python 3.11.

```
$ build/sbt -Phadoop-3 -Pkinesis-asl -Pyarn -Pkubernetes -Pdocker-integration-tests -Pconnect -Pspark-ganglia-lgpl -Pvolcano -Phadoop-cloud -Phive-thriftserver -Phive Test/package streaming-kinesis-asl-assembly/assembly connect/assembly
$ python/run-tests --modules pyspark-pandas-slow --python-executables python3.11
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#43888 from dongjoon-hyun/SPARK-45988.

Authored-by: Dongjoon Hyun <dhyun@apple.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
amaliujia and others added 29 commits September 27, 2024 08:29
Support catalog in QualifiedTableName and remove `FullQualifiedTableName`.

Consolidate and remove duplicate code.

No

Existing UT

No

Closes apache#48255 from amaliujia/qualifedtablename.

Authored-by: Rui Wang <rui.wang@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit fc9d421)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ut` to 10min

This PR aims to increase `spark.test.docker.connectionTimeout` to 10min.

Recently, various DB images fails at `connection` stage on multiple branches.

**MASTER** branch
https://github.com/apache/spark/actions/runs/11045311764/job/30682732260

```
[info] OracleIntegrationSuite:
[info] org.apache.spark.sql.jdbc.OracleIntegrationSuite *** ABORTED *** (5 minutes, 17 seconds)
[info]   The code passed to eventually never returned normally. Attempted 298 times over 5.0045005511500005 minutes. Last failure message: ORA-12541: Cannot connect. No listener at host 10.1.0.41 port 41079. (CONNECTION_ID=n9ZWIh+nQn+G9fkwKyoBQA==)
```

**branch-3.5** branch
https://github.com/apache/spark/actions/runs/10939696926/job/30370552237

```
[info] MsSqlServerNamespaceSuite:
[info] org.apache.spark.sql.jdbc.v2.MsSqlServerNamespaceSuite *** ABORTED *** (5 minutes, 42 seconds)
[info]   The code passed to eventually never returned normally. Attempted 11 times over 5.487631282400001 minutes. Last failure message: The TCP/IP connection to the host 10.1.0.56, port 35345 has failed. Error: "Connection refused (Connection refused). Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".. (DockerJDBCIntegrationSuite.scala:166)
```

**branch-3.4** branch
https://github.com/apache/spark/actions/runs/10937842509/job/30364658576

```
[info] MsSqlServerNamespaceSuite:
[info] org.apache.spark.sql.jdbc.v2.MsSqlServerNamespaceSuite *** ABORTED *** (5 minutes, 42 seconds)
[info]   The code passed to eventually never returned normally. Attempted 11 times over 5.487555645633333 minutes. Last failure message: The TCP/IP connection to the host 10.1.0.153, port 46153 has failed. Error: "Connection refused (Connection refused). Verify the connection properties. Make sure that an instance of SQL Server is running on the host and accepting TCP/IP connections at the port. Make sure that TCP connections to the port are not blocked by a firewall.".. (DockerJDBCIntegrationSuite.scala:166)
```

No, this is a test-only change.

Pass the CIs.

No.

Closes apache#48272 from dongjoon-hyun/SPARK-49803.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 09b7aa6)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

When deploying Spark pods on Kubernetes with sidecars, the reported executor's exit code may be incorrect.
For example, the reported executor's exit code is 0(success), but the actual is 52 (OOM).
```
2024-09-25 02:35:29,383 ERROR TaskSchedulerImpl.logExecutorLoss - Lost executor 1 on XXXXX: The executor with
 id 1 exited with exit code 0(success).

The API gave the following container statuses:

     container name: fluentd
     container image: docker-images-release.XXXXX.com/XXXXX/fluentd:XXXXX
     container state: terminated
     container started at: 2024-09-25T02:32:17Z
     container finished at: 2024-09-25T02:34:52Z
     exit code: 0
     termination reason: Completed

     container name: istio-proxy
     container image: docker-images-release.XXXXX.com/XXXXX-istio/proxyv2:XXXXX
     container state: running
     container started at: 2024-09-25T02:32:16Z

     container name: spark-kubernetes-executor
     container image: docker-dev-artifactory.XXXXX.com/XXXXX/spark-XXXXX:XXXXX
     container state: terminated
     container started at: 2024-09-25T02:32:17Z
     container finished at: 2024-09-25T02:35:28Z
     exit code: 52
     termination reason: Error
```
The `ExecutorPodsLifecycleManager.findExitCode()` looks for any terminated container and may choose the sidecar instead of the main executor container. I'm changing it to look for the executor container always.
Note, it may happen that the pod fails because of the failure of the sidecar container while executor's container is still running, with my changes the reported exit code will be -1 (`UNKNOWN_EXIT_CODE`).

### Why are the changes needed?

To correctly report executor failure reason on UI, in the logs and for the event listeners `SparkListener.onExecutorRemoved()`

### Does this PR introduce _any_ user-facing change?

Yes, the executor's exit code is taken from the main container instead of the sidecar.

### How was this patch tested?

Added unit test and tested manually on the Kubernetes cluster by simulating different types of executor failure (JVM OOM and container eviction due to disk pressure on the node).

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#48275 from fe2s/SPARK-49804-fix-exit-code.

Lead-authored-by: oleksii.diagiliev <oleksii.diagiliev@workday.com>
Co-authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit 5d701f2)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…e Silicon

### What changes were proposed in this pull request?

This is a merged backport of SPARK-46525 with the original authorship, yaooqinn .
- apache#44509
- apache#44612
- apache#45303

`com.spotify.docker.client` is not going to support Apple Silicons as it has already been archived and the [jnr-unixsocket](https://mvnrepository.com/artifact/com.github.jnr/jnr-unixsocket) 0.18 it uses is not compatible with Apple Silicons.

If we run our docker IT tests on Apple Silicons, it will fail like

```java
[info] org.apache.spark.sql.jdbc.MariaDBKrbIntegrationSuite *** ABORTED *** (2 seconds, 264 milliseconds)
[info]   com.spotify.docker.client.exceptions.DockerException: java.util.concurrent.ExecutionException:
com.spotify.docker.client.shaded.javax.ws.rs.ProcessingException:
java.lang.UnsatisfiedLinkError: could not load FFI provider jnr.ffi.provider.jffi.Provider
...
[info]   Cause: java.lang.IllegalStateException: Can't overwrite cause with java.lang.UnsatisfiedLinkError:
java.lang.UnsatisfiedLinkError: /Users/hzyaoqin/spark/target/tmp/jffi15403099445119552969.dylib:
dlopen(/Users/hzyaoqin/spark/target/tmp/jffi15403099445119552969.dylib, 0x0001): tried:
'/Users/hzyaoqin/spark/target/tmp/jffi15403099445119552969.dylib' (fat file, but missing compatible architecture (have 'i386,x86_64', need 'arm64')),
'/System/Volumes/Preboot/Cryptexes/OS/Users/hzyaoqin/spark/target/tmp/jffi15403099445119552969.dylib' (no such file), '/Users/hzyaoqin/spark/target/tmp/jffi15403099445119552969.dylib' (fat file, but missing compatible architecture (have 'i386,x86_64', need 'arm64'))
```

In this PR, we use its alternative to enable docker-related tests on Apple Chips

```xml
    <dependency>
      <groupId>com.github.docker-java</groupId>
      <artifactId>docker-java</artifactId>
      <scope>test</scope>
    </dependency>
```

### Why are the changes needed?

For developers who use Apple Silicons, w/ this patch, they can test JDBC/Docker Integration test locally instead of suffering slowness from GitHub actions.

### Does this PR introduce _any_ user-facing change?

No, dev only

### How was this patch tested?

Pass the CIs and do the manual test on Apple Silicon.
```
$ build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.*MariaDB*'
...
[info] All tests passed.
[success] Total time: 157 s (02:37), completed Sep 27, 2024, 2:45:16 PM

$ build/sbt -Pdocker-integration-tests 'docker-integration-tests/testOnly org.apache.spark.sql.jdbc.*MySQL*'
...
[info] All tests passed.
[success] Total time: 109 s (01:49), completed Sep 27, 2024, 2:48:47 PM
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48289 from dongjoon-hyun/SPARK-46525.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…mage version to 2022-CU12-GDR1-ubuntu-22.04

### What changes were proposed in this pull request?

This PR umps up mssql docker image version to 2022-CU12-GDR1-ubuntu-22.04

FYI, https://mcr.microsoft.com/en-us/product/mssql/server/tags

### Why are the changes needed?

dependency mgr

### Does this PR introduce _any_ user-facing change?

no

### How was this patch tested?

existing tests

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#46176 from yaooqinn/SPARK-47949.

Authored-by: Kent Yao <yao@apache.org>
Signed-off-by: Dongjoon Hyun <dhyun@apple.com>
(cherry picked from commit 9c4f12c)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ojection over aggregate correctly

### What changes were proposed in this pull request?

CollapseProject should block collapsing with an aggregate if any correlated subquery is present. There are other correlated subqueries that are not ScalarSubquery that are not accounted for here.

### Why are the changes needed?

Availability issue.

### Does this PR introduce _any_ user-facing change?

Previously failing queries will not fail anymore.

### How was this patch tested?

UT.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48286 from n-young-db/n-young-db/collapse-project-correlated-subquery-check.

Lead-authored-by: Nick Young <nick.young@databricks.com>
Co-authored-by: Wenchen Fan <cloud0fan@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit 97ae372)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…s when pruning GetArrayStructFields

### What changes were proposed in this pull request?
- Cherry-pick of the original PR - apache#48190 to Spark 3.5

- When pruning the schema of the struct in `GetArrayStructFields`, rely on the existing `StructType` to obtain the pruned schema instead of using the accessed field.

### Why are the changes needed?

- Fixes a bug in `OptimizeCsvJsonExprs` rule that would have otherwise changed the schema fields of the underlying struct to be extracted.
- This would show up as a correctness issue where for a field instead of picking the right values we would have ended up giving null output.

### Does this PR introduce _any_ user-facing change?

Yes. The query output would change for the queries of the following type:
```
SELECT
  from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT, b: INT>>').a,
  from_json('[{"a": '||id||', "b": '|| (2*id) ||'}]', 'array<struct<a: INT, b: INT>>').A
FROM
  range(3) as t
```

Earlier, the result would had been:
```
Array([ArraySeq(0),ArraySeq(null)], [ArraySeq(1),ArraySeq(null)], [ArraySeq(2),ArraySeq(null)])
```
vs the new result is (verified through spark-shell):
```
Array([ArraySeq(0),ArraySeq(0)], [ArraySeq(1),ArraySeq(1)], [ArraySeq(2),ArraySeq(2)])
```

### How was this patch tested?
- Added unit tests.
- Without this change, the added test would fail as we would have modified the schema from `a` to `A`:
```
- SPARK-49743: prune unnecessary columns from GetArrayStructFields does not change schema *** FAILED ***
  == FAIL: Plans do not match ===
  !Project [from_json(ArrayType(StructType(StructField(A,IntegerType,true)),true), json#0, Some(America/Los_Angeles)).A AS a#0]   Project [from_json(ArrayType(StructType(S
tructField(a,IntegerType,true)),true), json#0, Some(America/Los_Angeles)).A AS a#0]
   +- LocalRelation <empty>, [json#0]                                                                                             +- LocalRelation <empty>, [json#0] (PlanT
est.scala:179)
```

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48308 from nikhilsheoran-db/SPARK-49743-3.5.

Authored-by: Nikhil Sheoran <125331115+nikhilsheoran-db@users.noreply.github.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ferenced outer CTE relation

backport apache#48284 to 3.5

### What changes were proposed in this pull request?

This PR fixes a long-standing reference counting bug in the rule `InlineCTE`. Let's look at the minimal repro:
```
      sql(
        """
          |WITH
          |t1 AS (SELECT 1 col),
          |t2 AS (SELECT * FROM t1)
          |SELECT * FROM t2
          |""".stripMargin).createTempView("v")
      // r1 is un-referenced, but it should not decrease the ref count of t2 inside view v.
      val df = sql(
        """
          |WITH
          |r1 AS (SELECT * FROM v),
          |r2 AS (SELECT * FROM v)
          |SELECT * FROM r2
          |""".stripMargin)
```
The logical plan is something like below
```
WithCTE
  CTEDef r1
    View v
      WithCTE
        CTEDef t1
          OneRowRelation
        CTEDef t2
          CTERef t1
        CTERef t2    // main query of the inner WithCTE
  CTEDef r2
    View v   // exactly the same as the view v above
      WithCTE
        CTEDef t1
          OneRowRelation
        CTEDef t2
          CTERef t1
        CTERef t2
  CTERef r2    // main query of the outer WithCTE
```
Ideally, the ref count of `t1`, `t2` and `r2` should be all `1`. They will be inlined and the final plan is the `OneRowRelation`. However, in `InlineCTE#buildCTEMap`, when we traverse into `CTEDef r1` and hit `CTERef t2`, we mistakenly update the out-going-ref-count of `r1`, which means that `r1` references `t2` and this is totally wrong. Later on, in `InlineCTE#cleanCTEMap`, we find that `r1` is not referenced at all, so we decrease the ref count of its out-going-ref, which is `t2`, and the ref count of `t2` becomes `0`. Finally, in `InlineCTE#inlineCTE`, we leave the plan of `t2` unchanged because its ref count is `0`, and the plan of `t2` contains `CTERef t1`. `t2` is still inlined so we end up with `CTERef t1` as the final plan without the `WithCTE` node.

### Why are the changes needed?

bug fix

### Does this PR introduce _any_ user-facing change?

Yes, the query failed before and now can work

### How was this patch tested?

new test

### Was this patch authored or co-authored using generative AI tooling?

no

Closes apache#48301 from cloud-fan/cte.

Authored-by: Wenchen Fan <wenchen@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…_udt`test during `SPARK_SKIP_CONNECT_COMPAT_TESTS`

### What changes were proposed in this pull request?

This PR aims to skip PySpark `test_cast_to_udt_with_udt`test during `SPARK_SKIP_CONNECT_COMPAT_TESTS` in branch-3.5.

### Why are the changes needed?

Currently, Apache Spark 4.0.0's PySpark Connect compatibility test fails.
- https://github.com/apache/spark/actions/workflows/build_python_connect35.yml

This is due to the newly added feature. Although this is a kind of Apache Spark 4.0.0 test suite, we need to disable this test case from `branch-3.5`.
- apache#48251

### Does this PR introduce _any_ user-facing change?

No. This is a test-only PR.

### How was this patch tested?

This should be tested after merging via Daily CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48307 from dongjoon-hyun/SPARK-49841.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…s list when RDD block is removed form a node

### What changes were proposed in this pull request?

Credit to maheshk114 for the initial investigation and the fix.

This PR fix a bug where the shuffle service's ID is kept among the block location list at the removing of a RDD block from a node. Before this change `StorageLevel.NONE` is used to notify about the block remove which causes the block manager master ignoring the update of the locations for shuffle service's IDs (for details please see the method `BlockManagerMasterEndpoint#updateBlockInfo()` and keep in mind `StorageLevel.NONE.useDisk` is `false`). But after this change only the replication count is set to 0 to notify the block remove so `StorageLevel#isValid` is still false but `storageLevel.useDisk` is kept as `true` this way the the shuffle service's ID will be removed from the block location list.

### Why are the changes needed?

If the block location is not updated properly, then tasks fails with fetch failed exception. The tasks will try to read the RDD blocks from a node using external shuffle service. The read will fail, if the node is already decommissioned.

```
WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 (TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)
```

### Does this PR introduce _any_ user-facing change?

No

### How was this patch tested?

Added a new UT.

### Was this patch authored or co-authored using generative AI tooling?

No

Closes apache#47779 from attilapiros/SPARK-47702-attila.

Lead-authored-by: maheshbehera <maheshbehera@microsoft.com>
Co-authored-by: Attila Zsolt Piros <2017933+attilapiros@users.noreply.github.com>
Co-authored-by: attilapiros <piros.attila.zsolt@gmail.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
(cherry picked from commit 8a51ca7)
Signed-off-by: yangjie01 <yangjie01@baidu.com>
…locations list when RDD block is removed form a node"

### What changes were proposed in this pull request?
This reverts commit ec28154.

### Why are the changes needed?
branch-3.5 cannot be compiled successfully with commit ec28154

```
[error] /home/runner/work/spark/spark/core/src/test/scala/org/apache/spark/storage/BlockManagerReplicationSuite.scala:304:23: value TEST_SKIP_ESS_REGISTER is not a member of object org.apache.spark.internal.config.Tests
[error]     newConf.set(Tests.TEST_SKIP_ESS_REGISTER, true)
[error]                       ^
[error] one error found
```

### Does this PR introduce _any_ user-facing change?
No

### How was this patch tested?
Pass GItHub Actions

### Was this patch authored or co-authored using generative AI tooling?
No

Closes apache#48353 from LuciferYang/Revert-SPARK-47702.

Authored-by: yangjie01 <yangjie01@baidu.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
### What changes were proposed in this pull request?

Fix the issue in `AlterTableChangeColumnCommand` where changing the comment of a char/varchar column also tries to change the column type to string.

Backporting apache#48315 to 3.5

### Why are the changes needed?

Because the newColumn will always be a `StringType` even when the metadata says that it was originally char/varchar.

### Does this PR introduce _any_ user-facing change?

Yes, the query will no longer fail when using this code path.

### How was this patch tested?

New query in golden files.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48333 from stefankandic/branch-3.5.

Authored-by: Stefan Kandic <stefan.kandic@databricks.com>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ded to window/session_window fn

This PR fixes the correctness issue about losing operators during analysis - it happens when window is provided to window()/session_window() function.

The rule `TimeWindowing` and `SessionWindowing` are responsible to resolve the time window functions. When the window function has `window` as parameter (time column) (in other words, building time window from time window), the rule wraps window with WindowTime function so that the rule ResolveWindowTime will further resolve this. (And TimeWindowing/SessionWindowing will resolve this again against the result of ResolveWindowTime.)

The issue is that the rule uses "return" for the above, which intends to have "early return" as the other branch is too long compared to this branch. This unfortunately does not work as intended - the intention is just to go out of current local scope (mostly end of curly brace), but it seems to break the loop of execution in "outer" side.
(I haven't debugged further but it's simply clear that it doesn't work as intended.)

Quoting from Scala doc:

> Nonlocal returns are implemented by throwing and catching scala.runtime.NonLocalReturnException-s.

It's not super clear where NonLocalReturnException is caught in the call stack; it might exit the execution for much broader scope (context) than expected. And it's finally deprecated in Scala 3.2 and likely be removed in future.

https://dotty.epfl.ch/docs/reference/dropped-features/nonlocal-returns.html

Interestingly it does not break every query for chained time window aggregations. Spark already has several tests with DataFrame API and they haven't failed. The reproducer in community report is using SQL statement - where each aggregation is considered as subquery.

This PR fixes the rule to NOT use early return and instead have a huge if else.

Described in above.

Yes, this fixes the possible query breakage. The impacted workloads may not be very huge as chained time window aggregations is an advanced usage, and it does not break every query for the usage.

New UTs.

No.

Closes apache#48309 from HeartSaVioR/SPARK-49836.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Andrzej Zera <andrzejzera@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit d8c04cf)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
…ations list when RDD block is removed form a node

Credit to maheshk114 for the initial investigation and the fix.

This PR fix a bug where the shuffle service's ID is kept among the block location list at the removing of a RDD block from a node. Before this change `StorageLevel.NONE` is used to notify about the block remove which causes the block manager master ignoring the update of the locations for shuffle service's IDs (for details please see the method `BlockManagerMasterEndpoint#updateBlockInfo()` and keep in mind `StorageLevel.NONE.useDisk` is `false`). But after this change only the replication count is set to 0 to notify the block remove so `StorageLevel#isValid` is still false but `storageLevel.useDisk` is kept as `true` this way the the shuffle service's ID will be removed from the block location list.

If the block location is not updated properly, then tasks fails with fetch failed exception. The tasks will try to read the RDD blocks from a node using external shuffle service. The read will fail, if the node is already decommissioned.

```
WARN BlockManager [Executor task launch worker for task 25.0 in stage 6.0 (TID 1567)]: Failed to fetch remote block rdd_5_25 from BlockManagerId(4, vm-92303839, 7337, None) (failed attempt 1)
org.apache.spark.SparkException: Exception thrown in awaitResult:
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.network.BlockTransferService.fetchBlockSync(BlockTransferService.scala:103)
	at org.apache.spark.storage.BlockManager.fetchRemoteManagedBuffer(BlockManager.scala:1155)
	at org.apache.spark.storage.BlockManager.$anonfun$getRemoteBlock$8(BlockManager.scala:1099)
	at scala.Option.orElse(Option.scala:447)
	at org.apache.spark.storage.BlockManager.getRemoteBlock(BlockManager.scala:1099)
	at org.apache.spark.storage.BlockManager.getRemoteValues(BlockManager.scala:1045)
	at org.apache.spark.storage.BlockManager.get(BlockManager.scala:1264)
	at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1326)
```

No

Added a new UT.

No

Closes apache#48357 from attilapiros/SPARK-47702-Spark3.5.

Authored-by: maheshbehera <maheshbehera@microsoft.com>
Signed-off-by: yangjie01 <yangjie01@baidu.com>
…ark Connect compatibility test

### What changes were proposed in this pull request?

This PR is a followup of apache#48277 that disables newline related tests in Spark Connect compatibility test.

### Why are the changes needed?

To make the Spark Connect build pass. Currently, it fails as below (https://github.com/apache/spark/actions/runs/11186932292/job/31102993825):

```
======================================================================
FAIL [0.100s]: test_repr_behaviors (pyspark.sql.tests.connect.test_parity_dataframe.DataFrameParityTests.test_repr_behaviors)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "/home/runner/work/spark/spark-3.5/python/pyspark/sql/tests/test_dataframe.py", line 1586, in test_repr_behaviors
    self.assertEqual(re.sub(pattern, "", expected3), df.__repr__())
AssertionError: '+---[23 chars]---+-----+\n|  1|    1|\n+---+-----+\nonly showing top 1 row\n' != '+---[23 chars]---+-----+\n|  1|    1|\n+---+-----+\nonly showing top 1 row'
  +---+-----+
  |key|value|
  +---+-----+
  |  1|    1|
  +---+-----+
- only showing top 1 row
?                       -
+ only showing top 1 row
```

### Does this PR introduce _any_ user-facing change?

No, test-only.

### How was this patch tested?

Manaully.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48361 from HyukjinKwon/SPARK-49806-follow-up.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
### What changes were proposed in this pull request?
The pr aims to fix the `pretty name` of some `expressions`, includes: `random`, `to_varchar`, `current_database`, `curdate`, `dateadd` and `array_agg`.
(PS: The pr is backport branch-3.5, master pr is: apache#48385)

### Why are the changes needed?
The actual function name used does not match the displayed name, as shown below:
- Before:
<img width="573" alt="image" src="https://github.com/user-attachments/assets/f5785c80-f6cb-494f-a15e-9258eca688a7">

- After:
<img width="570" alt="image" src="https://github.com/user-attachments/assets/792a7092-ccbf-49f4-a616-19110e5c2361">

### Does this PR introduce _any_ user-facing change?
Yes, Make the header of the data seen by the end-user from `Spark SQL` consistent with the `actual function name` used.

### How was this patch tested?
- Pass GA.
- Update existed UT.

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48393 from panbingkun/branch-3.5_SPARK-49909.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…edAttribute with child output

When the drop list of `DataFrameDropColumns` contains an UnresolvedAttribute. Current rule mistakenly resolve the column with its grand-children's output attributes.
In dataframe/dataset API application, issue cannot be encountered since the `dropList` are all AttributeReferences.
But when we use Spark LogicalPlan, the bug will be encountered, the UnresolvedAttribute in dropList cannot work.

In `ResolveDataFrameDropColumns`
```scala
      val dropped = d.dropList.map {
        case u: UnresolvedAttribute =>
          resolveExpressionByPlanChildren(u, d.child)   //mistakenly resolve the column with its grand-children's output attributes
        case e => e
      }
```
To fix it, change to `resolveExpressionByPlanChildren(u, d)` or `resolveExpressionByPlanOutput(u, d.child)`

No

UT added.

No.

Closes apache#48240 from LantaoJin/SPARK-49782.

Authored-by: LantaoJin <jinlantao@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit d7772f2)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…rrect offset

### What changes were proposed in this pull request?

`ColumnarArray` represents an array containing elements from `data[offset]` to `data[offset + length)`. When copying the array, the null flag should also be read starting from `offset` rather than 0.

Some expressions depend on this utility function. For example, this bug can lead to incorrect results in `ArrayTransform`.

### Why are the changes needed?

Fix correctness issue.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Two unit tests, one with `ArrayTransform`, and the other tests `ColumnarArray` directly. Both the tests would fail without the change in the PR.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48458 from chenhao-db/fix_ColumnarArray_copy.

Authored-by: Chenhao Li <chenhao.li@databricks.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
(cherry picked from commit 5f2bd5c)
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ult, timeout` test case to be robust

### What changes were proposed in this pull request?

This PR aims to fix `AsyncRDDActionsSuite.FutureAction result, timeout` test case to be robust.

### Why are the changes needed?

To reduce the flakiness in GitHub Action CI. Previously, the sleep time is identical to the timeout time. It causes a flakiness in some environments like GitHub Action.
- https://github.com/apache/spark/actions/runs/11298639789/job/31428018075
```
AsyncRDDActionsSuite:
...
- FutureAction result, timeout *** FAILED ***
  Expected exception java.util.concurrent.TimeoutException to be thrown, but no exception was thrown (AsyncRDDActionsSuite.scala:206)
```

### Does this PR introduce _any_ user-facing change?

No, this is a test-only change.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48485 from dongjoon-hyun/SPARK-49981.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit a3b9124)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…sync with allGather and barrier` test case to be robust

### What changes were proposed in this pull request?

This PR aims to fix `BarrierTaskContextSuite.successively sync with allGather and barrier` test case to be robust.

### Why are the changes needed?

The test case asserts the duration of partitions. However, this is flaky because we don't know when a partition is triggered before `barrier` sync.

https://github.com/apache/spark/blob/0e75d19a736aa18fe77414991ebb7e3577a43af8/core/src/test/scala/org/apache/spark/scheduler/BarrierTaskContextSuite.scala#L116-L118

Although we added `TestUtils.waitUntilExecutorsUp` at Apache Spark 3.0.0 like the following,

- apache#28658

let's say a partition starts slowly than `38ms` and all partitions sleep `1s` exactly. Then, the test case fails like the following.
- https://github.com/apache/spark/actions/runs/11298639789/job/31428018075
```
BarrierTaskContextSuite:
...
- successively sync with allGather and barrier *** FAILED ***
  1038 was not less than or equal to 1000 (BarrierTaskContextSuite.scala:118)
```

According to the failure history here (SPARK-49983) and SPARK-31730, the slowness seems to be less than `200ms` when it happens. So, this PR aims to reduce the flakiness by capping the sleep up to 500ms while keeping the `1s` validation. There is no test coverage change because this test case focuses on the `successively sync with allGather and battier`.

### Does this PR introduce _any_ user-facing change?

No, this is a test-only test case.

### How was this patch tested?

Pass the CIs.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48487 from dongjoon-hyun/SPARK-49983.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
(cherry picked from commit bcfe62b)
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…ailed plan

### What changes were proposed in this pull request?

Record failure/error status in query stage. And abort immediately upon seeing failed query stage when creating new query stages.

### Why are the changes needed?

AQE has a potential hanging issue when we collect twice from a failed AQE plan, no new query stage will be created, and no stage will be submitted either. We will be waiting for a finish event forever, which will never come because that query stage has already failed in the previous run.

### Does this PR introduce _any_ user-facing change?
NO

### How was this patch tested?
New UT.

### Was this patch authored or co-authored using generative AI tooling?
NO

Closes apache#48484 from liuzqt/SPARK-49979.

Authored-by: Ziqi Liu <ziqi.liu@databricks.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
(cherry picked from commit e374b94)
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…ck if remain child is just BroadcastQueryStageExec

### What changes were proposed in this pull request?
This PR backports apache#46523 and apache#48300 for SPARK-48155 to branch-3.5.

### Why are the changes needed?

### Does this PR introduce _any_ user-facing change?

### How was this patch tested?

### Was this patch authored or co-authored using generative AI tooling?

Closes apache#48505 from zml1206/SPARK-48155-3.5.

Lead-authored-by: Angerszhuuuu <angers.zhu@gmail.com>
Co-authored-by: zml1206 <zhuml1206@gmail.com>
Signed-off-by: Wenchen Fan <wenchen@databricks.com>
…state store in stream-stream join

The PR proposes to revise the optimization on adding input to state store in stream-stream join, to fix correctness issue.

Here is the logic of optimization before this PR:

https://github.com/apache/spark/blob/039fd13eacb1cef835045e3a60cebf958589e1a2/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingSymmetricHashJoinExec.scala#L671-L677

```
        val isLeftSemiWithMatch =
          joinType == LeftSemi && joinSide == LeftSide && iteratorNotEmpty
        // Add to state store only if both removal predicates do not match,
        // and the row is not matched for left side of left semi join.
        val shouldAddToState =
          !stateKeyWatermarkPredicateFunc(key) && !stateValueWatermarkPredicateFunc(thisRow) &&
          !isLeftSemiWithMatch
```

The criteria of `both removal predicates do not match` means the input is going to be evicted in this batch. I'm not sure about the coverage of this optimization, but there are two major issues with the above optimization:

1) missing to add the input to state store in left side prevents the input on the right side to match with "that" input. Even though the input is going to be evicted in this batch, there could be still inputs on the right side in this batch which can match with that input.

2) missing to add the input to state store prevents that input to produce unmatched (null-outer) output, as we produce unmatched output during the eviction of state.

Worth noting that `state watermark != watermark for eviction` and eviction we mentioned in above is based on "state watermark". state watermark could be either 1) equal or earlier than watermark for eviction or 2) "later" than watermark for eviction.

Yes, there are correctness issues among stream-stream join, especially when the output of the stateful operator is provided as input of stream-stream join. The correctness issue is fixed with the PR.

New UTs.

No.

Closes apache#48297 from HeartSaVioR/SPARK-49829.

Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
Co-authored-by: Andrzej Zera <andrzejzera@gmail.com>
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
(cherry picked from commit 75b8666)
Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com>
### What changes were proposed in this pull request?
The pr aims to revert apache#48393.
This reverts commit 4472fb2.

### Why are the changes needed?
Only revert.

### Does this PR introduce _any_ user-facing change?
No.

### How was this patch tested?
Pass GA

### Was this patch authored or co-authored using generative AI tooling?
No.

Closes apache#48531 from panbingkun/branch-3.5_SPARK-49909_revert.

Authored-by: panbingkun <panbingkun@baidu.com>
Signed-off-by: Max Gekk <max.gekk@gmail.com>
…ks when UI is disabled

### What changes were proposed in this pull request?

This PR aims to fix `ApplicationPage` to hide UI link when UI is disabled.

### Why are the changes needed?

Previously, Spark throws `HTTP ERROR 500 java.lang.IllegalArgumentException: Invalid URI host: null (authority: null)` like the following

**1. PREPARATION**
```
$ cat conf/spark-defaults.conf
spark.ui.reverseProxy true
spark.ui.reverseProxyUrl http://localhost:8080

$ sbin/start-master.sh

$ sbin/start-worker.sh spark://$(hostname):7077

$ bin/spark-shell --master spark://$(hostname):7077 -c spark.ui.enabled=false
```

**2. BEFORE**
<img width="496" alt="Screenshot 2024-10-17 at 21 24 32" src="https://github.com/user-attachments/assets/9884790c-a294-4e61-b630-7758c5532afc">

<img width="1002" alt="Screenshot 2024-10-17 at 21 24 51" src="https://github.com/user-attachments/assets/f1e3a121-37ba-4525-a433-21ad15402edf">

**3. AFTER**
<img width="493" alt="Screenshot 2024-10-17 at 21 22 26" src="https://github.com/user-attachments/assets/7a1ef578-3d9f-495e-9545-6edd26b4d565">

### Does this PR introduce _any_ user-facing change?

Yes, but previously it was a broken link.

### How was this patch tested?

Pass the CIs with the newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48547 from dongjoon-hyun/SPARK-50021-3.5.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
…en UI is disabled

### What changes were proposed in this pull request?

This PR aims to fix `MasterPage` to hide App UI links when UI is disabled.

Previously, the link leads the following errors if a user clicks it.
<img width="997" alt="Screenshot 2024-10-17 at 22 06 22" src="https://github.com/user-attachments/assets/e53ba01f-533f-4d42-a488-830afaf40efa">

### Why are the changes needed?

**1. PREPARATION**

```
$ cat conf/spark-defaults.conf
spark.ui.reverseProxy true
spark.ui.reverseProxyUrl http://localhost:8080

$ sbin/start-master.sh

$ sbin/start-worker.sh spark://$(hostname):7077

$ bin/spark-shell --master spark://$(hostname):7077 -c spark.ui.enabled=false
```

**2. BEFORE**

<img width="340" alt="Screenshot 2024-10-17 at 22 01 16" src="https://github.com/user-attachments/assets/3069e43d-ba8c-4d36-8101-65e10b420879">

**3. AFTER**

<img width="345" alt="Screenshot 2024-10-17 at 22 04 12" src="https://github.com/user-attachments/assets/b9feba47-90fb-4557-803c-94eaa8ce62e1">

### Does this PR introduce _any_ user-facing change?

The previous behavior shows HTTP 500 error.

### How was this patch tested?

Pass the CIs with a newly added test case.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48549 from dongjoon-hyun/SPARK-50022-3.5.

Authored-by: Dongjoon Hyun <dongjoon@apache.org>
Signed-off-by: Dongjoon Hyun <dongjoon@apache.org>
@pradomota pradomota merged commit 4eead32 into spark-3.5 Oct 23, 2024
27 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.