https://github.com/apache/spark

sort by:
Revision Author Date Message Commit Date
5103e00 Preparing Spark release v3.3.2-rc1 10 February 2023, 17:25:33 UTC
307ec98 [MINOR][SS] Fix setTimeoutTimestamp doc ### What changes were proposed in this pull request? This patch updates the API doc of `setTimeoutTimestamp` of `GroupState`. ### Why are the changes needed? Update incorrect API doc. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Doc change only. Closes #39958 from viirya/fix_group_state. Authored-by: Liang-Chi Hsieh <viirya@gmail.com> Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> (cherry picked from commit a180e67d3859a4e145beaf671c1221fb4d6cbda7) Signed-off-by: Liang-Chi Hsieh <viirya@gmail.com> 10 February 2023, 02:17:51 UTC
7205567 [SPARK-40819][SQL][FOLLOWUP] Update SqlConf version for nanosAsLong configuration As requested by HyukjinKwon in https://github.com/apache/spark/pull/38312 NB: This change needs to be backported ### What changes were proposed in this pull request? Update version set for "spark.sql.legacy.parquet.nanosAsLong" configuration in SqlConf. This update is required because the previous PR set version to `3.2.3` which has already been released. Updating to version `3.2.4` will correctly reflect when this configuration element was added ### Why are the changes needed? Correctness and to complete SPARK-40819 ### Does this PR introduce _any_ user-facing change? No, this is merely so this configuration element has the correct version ### How was this patch tested? N/A Closes #39943 from awdavidson/SPARK-40819_sql-conf. Authored-by: awdavidson <54780428+awdavidson@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 409c661542c4b966876f0af4119803de25670649) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 09 February 2023, 00:02:36 UTC
3ec9b05 [SPARK-40819][SQL][3.3] Timestamp nanos behaviour regression As per HyukjinKwon request on https://github.com/apache/spark/pull/38312 to backport fix into 3.3 ### What changes were proposed in this pull request? Handle `TimeUnit.NANOS` for parquet `Timestamps` addressing a regression in behaviour since 3.2 ### Why are the changes needed? Since version 3.2 reading parquet files that contain attributes with type `TIMESTAMP(NANOS,true)` is not possible as ParquetSchemaConverter returns ``` Caused by: org.apache.spark.sql.AnalysisException: Illegal Parquet type: INT64 (TIMESTAMP(NANOS,true)) ``` https://issues.apache.org/jira/browse/SPARK-34661 introduced a change matching on the `LogicalTypeAnnotation` which only covers Timestamp cases for `TimeUnit.MILLIS` and `TimeUnit.MICROS` meaning `TimeUnit.NANOS` would return `illegalType()` Prior to 3.2 the matching used the `originalType` which for `TIMESTAMP(NANOS,true)` return `null` and therefore resulted to a `LongType`, the change proposed is too consider `TimeUnit.NANOS` and return `LongType` making behaviour the same as before. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit test covering this scenario. Internally deployed to read parquet files that contain `TIMESTAMP(NANOS,true)` Closes #39904 from awdavidson/ts-nanos-fix-3.3. Lead-authored-by: alfreddavidson <alfie.davidson9@gmail.com> Co-authored-by: awdavidson <54780428+awdavidson@users.noreply.github.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 08 February 2023, 02:07:25 UTC
51ed6ba [SPARK-41962][MINOR][SQL] Update the order of imports in class SpecificParquetRecordReaderBase ### What changes were proposed in this pull request? Update the order of imports in class SpecificParquetRecordReaderBase. ### Why are the changes needed? Follow the code style. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Passed GA. Closes #39906 from wayneguow/import. Authored-by: wayneguow <guow93@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit d6134f78d3d448a990af53beb8850ff91b71aef6) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 07 February 2023, 07:11:29 UTC
17b7123 [SPARK-42346][SQL] Rewrite distinct aggregates after subquery merge ### What changes were proposed in this pull request? Unfortunately https://github.com/apache/spark/pull/32298 introduced a regression from Spark 3.2 to 3.3 as after that change a merged subquery can contain multiple distict type aggregates. Those aggregates need to be rewritten by the `RewriteDistinctAggregates` rule to get the correct results. This PR fixed that. ### Why are the changes needed? The following query: ``` SELECT (SELECT count(distinct c1) FROM t1), (SELECT count(distinct c2) FROM t1) ``` currently fails with: ``` java.lang.IllegalStateException: You hit a query analyzer bug. Please report your query to Spark user mailing list. at org.apache.spark.sql.execution.SparkStrategies$Aggregation$.apply(SparkStrategies.scala:538) ``` but works again after this PR. ### Does this PR introduce _any_ user-facing change? Yes, the above query works again. ### How was this patch tested? Added new UT. Closes #39887 from peter-toth/SPARK-42346-rewrite-distinct-aggregates-after-subquery-merge. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 5940b9884b4b172f65220da7857d2952b137bc51) Signed-off-by: Yuming Wang <yumwang@ebay.com> 06 February 2023, 13:22:58 UTC
cdb494b [SPARK-42344][K8S] Change the default size of the CONFIG_MAP_MAXSIZE The default size of the CONFIG_MAP_MAXSIZE should not be greater than 1048576 ### What changes were proposed in this pull request? This PR changed the default size of the CONFIG_MAP_MAXSIZE from 1572864(1.5 MiB) to 1048576(1.0 MiB) ### Why are the changes needed? When a job is submitted by the spark to the K8S with a configmap , The Spark-Submit will call the K8S‘s POST API "api/v1/namespaces/default/configmaps". And the size of the configmaps will be validated by this K8S API,the max value shoud not be greater than 1048576. In the previous comment,the explain in https://etcd.io/docs/v3.4/dev-guide/limit/ is: "etcd is designed to handle small key value pairs typical for metadata. Larger requests will work, but may increase the latency of other requests. By default, the maximum size of any request is 1.5 MiB. This limit is configurable through --max-request-bytes flag for etcd server." This explanation is from the perspective of etcd ,not K8S. So I think the default value of the configmap in Spark should not be greate than 1048576. ### Does this PR introduce _any_ user-facing change? Yes. Generally, the size of the configmap will not exceed 1572864 or even 1048576. So the problem solved here may not be perceived by users. ### How was this patch tested? local test Closes #39884 from ninebigbig/master. Authored-by: Yan Wei <ninebigbig@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 9ac46408ec943d5121bbc14f2ce0d8b2ff453de5) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit d07a0e9b476e1d846e7be13394c8251244cc832e) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 05 February 2023, 11:12:19 UTC
2d539c5 [SPARK-41554] fix changing of Decimal scale when scale decreased by m… …ore than 18 This is a backport PR for https://github.com/apache/spark/pull/39099 Closes #39813 from fe2s/branch-3.3-fix-decimal-scaling. Authored-by: oleksii.diagiliev <oleksii.diagiliev@workday.com> Signed-off-by: Sean Owen <srowen@gmail.com> 03 February 2023, 16:48:56 UTC
6e0dfa9 [MINOR][DOCS][PYTHON][PS] Fix the `.groupby()` method docstring ### What changes were proposed in this pull request? Update the docstring for the `.groupby()` method. ### Why are the changes needed? The `.groupby()` method accept a list of columns (or a single column), and a column is defined by a `Series` or name (`Label`). It's a bit confusing to say "using a Series of columns", because `Series` (capitalized) is a specific object that isn't actually used/reasonable to use here. ### Does this PR introduce _any_ user-facing change? Yes (documentation) ### How was this patch tested? N/A Closes #38625 from deepyaman/patch-3. Authored-by: Deepyaman Datta <deepyaman.datta@utexas.edu> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 71154dc1b35c7227ef9033fe5abc2a8b3f2d0990) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 03 February 2023, 06:51:15 UTC
80e8df1 [SPARK-42259][SQL] ResolveGroupingAnalytics should take care of Python UDAF This is a long-standing correctness issue with Python UDAF and grouping analytics. The rule `ResolveGroupingAnalytics` should take care of Python UDAF when matching aggregate expressions. bug fix Yes, the query result was wrong before existing tests Closes #39824 from cloud-fan/python. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1219c8492376e038894111cd5d922229260482e7) Signed-off-by: Wenchen Fan <wenchen@databricks.com> 01 February 2023, 09:40:56 UTC
0bb8f22 [SPARK-42230][INFRA][FOLLOWUP] Add `GITHUB_PREV_SHA` and `APACHE_SPARK_REF` to lint job ### What changes were proposed in this pull request? Like the other jobs, this PR aims to add `GITHUB_PREV_SHA` and `APACHE_SPARK_REF` environment variables to `lint` job. ### Why are the changes needed? This is required to detect the changed module accurately. ### Does this PR introduce _any_ user-facing change? No, this is a infra-only bug fix. ### How was this patch tested? Manual review. Closes #39809 from dongjoon-hyun/SPARK-42230-2. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1304a3329d7feb1bd6f9a9dba09f37494c9bb4a2) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 30 January 2023, 19:59:45 UTC
9d7373a [SPARK-42230][INFRA] Improve `lint` job by skipping PySpark and SparkR docs if unchanged ### What changes were proposed in this pull request? This PR aims to improve `GitHub Action lint` job by skipping `PySpark` and `SparkR` documentation generations if PySpark and R module is unchanged. ### Why are the changes needed? `Documentation Generation` took over 53 minutes because it generates all Java/Scala/SQL/PySpark/R documentation always. However, `R` module is not changed frequently so that the documentation is always identical. `PySpark` module is more frequently changed, but still we can skip in many cases. This PR shows the reduction from `53` minutes to `18` minutes. **BEFORE** ![Screenshot 2023-01-29 at 4 36 07 PM](https://user-images.githubusercontent.com/9700541/215365573-bf83717b-cd9e-46e2-912f-5c9d2f359b08.png) **AFTER** ![Screenshot 2023-01-29 at 10 13 27 PM](https://user-images.githubusercontent.com/9700541/215401795-3f810e52-2fe3-44fd-99f4-b5750964c3b6.png) ### Does this PR introduce _any_ user-facing change? No, this is an infra only change. ### How was this patch tested? Manual review. Closes #39792 from dongjoon-hyun/SPARK-42230. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1d3c2681d26bf6034d15ee261e5395e9f45d67f8) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 30 January 2023, 07:38:48 UTC
6cd4ff6 [SPARK-42168][3.3][SQL][PYTHON][FOLLOW-UP] Test FlatMapCoGroupsInPandas with Window function ### What changes were proposed in this pull request? This ports tests from #39717 in branch-3.2 to branch-3.3. See https://github.com/apache/spark/pull/39752#issuecomment-1407157253. ### Why are the changes needed? To make sure this use case is tested. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? E2E test in `test_pandas_cogrouped_map.py` and analysis test in `EnsureRequirementsSuite.scala`. Closes #39781 from EnricoMi/branch-3.3-cogroup-window-bug-test. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 29 January 2023, 07:25:13 UTC
289e650 [SPARK-42201][BUILD] `build/sbt` should allow `SBT_OPTS` to override JVM memory setting ### What changes were proposed in this pull request? This PR aims to fix a bug which `build/sbt` doesn't allow JVM memory setting via `SBT_OPTS`. ### Why are the changes needed? `SBT_OPTS` is supposed to be used in this way in the community. https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/appveyor.yml#L54 However, `SBT_OPTS` memory setting like the following is ignored because ` -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m` is injected by default after `SBT_OPTS`. We should switch the order. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` https://github.com/apache/spark/blob/e30bb538e480940b1963eb14c3267662912d8584/build/sbt-launch-lib.bash#L124 ### Does this PR introduce _any_ user-facing change? No. This is a dev-only change. ### How was this patch tested? Manually run the following. ``` $ SBT_OPTS="-Xmx6g" build/sbt package ``` While running the above command, check the JVM options. ``` $ ps aux | grep java dongjoon 36683 434.3 3.1 418465456 1031888 s001 R+ 1:11PM 0:19.86 /Users/dongjoon/.jenv/versions/temurin17/bin/java -Xms4096m -Xmx4096m -XX:ReservedCodeCacheSize=512m -Xmx6g -jar build/sbt-launch-1.8.2.jar package ``` Closes #39758 from dongjoon-hyun/SPARK-42201. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 66ec1eb630a4682f5ad2ed2ee989ffcce9031608) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 27 January 2023, 00:39:35 UTC
518a24c [SPARK-42188][BUILD][3.3] Force SBT protobuf version to match Maven ### What changes were proposed in this pull request? Update `SparkBuild.scala` to force SBT use of `protobuf-java` to match the Maven version. The Maven dependencyManagement section forces `protobuf-java` to use `2.5.0`, but SBT is using `3.14.0`. ### Why are the changes needed? Define `protoVersion` in `SparkBuild.scala` and use it in `DependencyOverrides` to force the SBT version of `protobuf-java` to match the setting defined in the Maven top-level `pom.xml`. Add comments to both `pom.xml` and `SparkBuild.scala` to ensure that the values are kept in sync. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Before the update, SBT reported using `3.14.0`: ``` % build/sbt dependencyTree | grep proto | sed 's/^.*-com/com/' | sort | uniq -c 8 com.google.protobuf:protobuf-java:2.5.0 (evicted by: 3.14.0) 70 com.google.protobuf:protobuf-java:3.14.0 ``` After the patch is applied, SBT reports using `2.5.0`: ``` % build/sbt dependencyTree | grep proto | sed 's/^.*-com/com/' | sort | uniq -c 70 com.google.protobuf:protobuf-java:2.5.0 ``` Closes #39746 from snmvaughan/feature/SPARK-42188-3.3. Authored-by: Steve Vaughan Jr <s_vaughan@apple.com> Signed-off-by: huaxingao <huaxin_gao@apple.com> 26 January 2023, 02:17:39 UTC
d69e7b6 [SPARK-42179][BUILD][SQL][3.3] Upgrade ORC to 1.7.8 ### What changes were proposed in this pull request? This PR aims to upgrade ORC to 1.7.8 for Apache Spark 3.3.2. ### Why are the changes needed? Apache ORC 1.7.8 is a maintenance release with important bug fixes. - https://orc.apache.org/news/2023/01/21/ORC-1.7.8/ - [ORC-1332](https://issues.apache.org/jira/browse/ORC-1332) Avoid NegativeArraySizeException when using searchArgument - [ORC-1343](https://issues.apache.org/jira/browse/ORC-1343) Ignore orc.create.index ### Does this PR introduce _any_ user-facing change? The ORC dependency is going to be changed from 1.7.6 (Apache Spark 3.3.1) to 1.7.8 (Apache Spark 3.3.2) ### How was this patch tested? Pass the CIs. Closes #39735 from dongjoon-hyun/SPARK-42179. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 25 January 2023, 11:24:22 UTC
04edc7e [SPARK-42176][SQL] Fix cast of a boolean value to timestamp The PR fixes an issue when casting a boolean to timestamp. While `select cast(true as timestamp)` works and returns `1970-01-01 00:00:00.000001`, casting `false` to timestamp fails with the following error: > IllegalArgumentException: requirement failed: Literal must have a corresponding value to timestamp, but class Integer found. SBT test also fails with this error: ``` [info] java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.Long [info] at scala.runtime.BoxesRunTime.unboxToLong(BoxesRunTime.java:107) [info] at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getWriter$5(InternalRow.scala:178) [info] at org.apache.spark.sql.catalyst.InternalRow$.$anonfun$getWriter$5$adapted(InternalRow.scala:178) ``` The issue was that we need to return `0L` instead of `0` when converting `false` to a long. Fixes a small bug in cast. No. I added a unit test to verify the fix. Closes #39729 from sadikovi/fix_spark_boolean_to_timestamp. Authored-by: Ivan Sadikov <ivan.sadikov@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 866343c7be47d71b88ae9a6b4dda26f8c4f5964b) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 25 January 2023, 03:33:15 UTC
b5cbee8 [SPARK-42090][3.3] Introduce sasl retry count in RetryingBlockTransferor ### What changes were proposed in this pull request? This PR introduces sasl retry count in RetryingBlockTransferor. ### Why are the changes needed? Previously a boolean variable, saslTimeoutSeen, was used. However, the boolean variable wouldn't cover the following scenario: 1. SaslTimeoutException 2. IOException 3. SaslTimeoutException 4. IOException Even though IOException at #2 is retried (resulting in increment of retryCount), the retryCount would be cleared at step #4. Since the intention of saslTimeoutSeen is to undo the increment due to retrying SaslTimeoutException, we should keep a counter for SaslTimeoutException retries and subtract the value of this counter from retryCount. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New test is added, courtesy of Mridul. Closes #39611 from tedyu/sasl-cnt. Authored-by: Ted Yu <yuzhihonggmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Closes #39709 from akpatnam25/SPARK-42090-backport-3.3. Authored-by: Ted Yu <yuzhihong@gmail.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> 24 January 2023, 18:23:25 UTC
0fff27d [MINOR][K8S][DOCS] Add all resource managers in `Scheduling Within an Application` section ### What changes were proposed in this pull request? `Job Scheduling` document doesn't mention `K8s resource manager` so far because `Scheduling Across Applications` section only mentions all resource managers except K8s. This PR aims to add all supported resource managers in `Scheduling Within an Application section` section. ### Why are the changes needed? K8s also supports `FAIR` schedule within an application. ### Does this PR introduce _any_ user-facing change? No. This is a doc-only update. ### How was this patch tested? N/A Closes #39704 from dongjoon-hyun/minor_job_scheduling. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 45dbc44410f9bf74c7fb4431aad458db32960461) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 24 January 2023, 08:06:32 UTC
41e6875 [SPARK-42157][CORE] `spark.scheduler.mode=FAIR` should provide FAIR scheduler ### What changes were proposed in this pull request? Like our documentation, `spark.sheduler.mode=FAIR` should provide a `FAIR Scheduling Within an Application`. https://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application ![Screenshot 2023-01-22 at 2 59 22 PM](https://user-images.githubusercontent.com/9700541/213944956-931e3a3c-d094-4455-8990-233c7966194b.png) This bug is hidden in our CI because we have `fairscheduler.xml` always as one of test resources. - https://github.com/apache/spark/blob/master/core/src/test/resources/fairscheduler.xml ### Why are the changes needed? Currently, when `spark.scheduler.mode=FAIR` is given without scheduler allocation file, Spark creates `Fair Scheduler Pools` with `FIFO` scheduler which is wrong. We need to switch the mode to `FAIR` from `FIFO`. **BEFORE** ``` $ bin/spark-shell -c spark.scheduler.mode=FAIR Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/01/22 14:47:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 23/01/22 14:47:38 WARN FairSchedulableBuilder: Fair Scheduler configuration file not found so jobs will be scheduled in FIFO order. To use fair scheduling, configure pools in fairscheduler.xml or set spark.scheduler.allocation.file to a file that contains the configuration. Spark context Web UI available at http://localhost:4040 ``` ![Screenshot 2023-01-22 at 2 50 38 PM](https://user-images.githubusercontent.com/9700541/213944555-6e367a33-ca58-4daf-9ba4-b0319fbe4516.png) **AFTER** ``` $ bin/spark-shell -c spark.scheduler.mode=FAIR Setting default log level to "WARN". To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel). 23/01/22 14:48:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable Spark context Web UI available at http://localhost:4040 ``` ![Screenshot 2023-01-22 at 2 50 14 PM](https://user-images.githubusercontent.com/9700541/213944551-660aa298-638b-450c-ad61-db9e42a624b0.png) ### Does this PR introduce _any_ user-facing change? Yes, but this is a bug fix to match with Apache Spark official documentation. ### How was this patch tested? Pass the CIs. Closes #39703 from dongjoon-hyun/SPARK-42157. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 4d51bfa725c26996641f566e42ae392195d639c5) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 24 January 2023, 07:48:27 UTC
b7ababf [SPARK-41415][3.3] SASL Request Retries ### What changes were proposed in this pull request? Add the ability to retry SASL requests. Will add it as a metric too soon to track SASL retries. ### Why are the changes needed? We are seeing increased SASL timeouts internally, and this issue would mitigate the issue. We already have this feature enabled for our 2.3 jobs, and we have seen failures significantly decrease. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Added unit tests, and tested on cluster to ensure the retries are being triggered correctly. Closes #38959 from akpatnam25/SPARK-41415. Authored-by: Aravind Patnam <apatnamlinkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> Closes #39644 from akpatnam25/SPARK-41415-backport-3.3. Authored-by: Aravind Patnam <apatnam@linkedin.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> 21 January 2023, 03:30:51 UTC
8f09a69 [SPARK-42134][SQL] Fix getPartitionFiltersAndDataFilters() to handle filters without referenced attributes ### What changes were proposed in this pull request? This is a small correctness fix to `DataSourceUtils.getPartitionFiltersAndDataFilters()` to handle filters without any referenced attributes correctly. E.g. without the fix the following query on ParquetV2 source: ``` spark.conf.set("spark.sql.sources.useV1SourceList", "") spark.range(1).write.mode("overwrite").format("parquet").save(path) df = spark.read.parquet(path).toDF("i") f = udf(lambda x: False, "boolean")(lit(1)) val r = df.filter(f) r.show() ``` returns ``` +---+ | i| +---+ | 0| +---+ ``` but it should return with empty results. The root cause of the issue is that during `V2ScanRelationPushDown` a filter that doesn't reference any column is incorrectly identified as partition filter. ### Why are the changes needed? To fix a correctness issue. ### Does this PR introduce _any_ user-facing change? Yes, fixes a correctness issue. ### How was this patch tested? Added new UT. Closes #39676 from peter-toth/SPARK-42134-fix-getpartitionfiltersanddatafilters. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: huaxingao <huaxin_gao@apple.com> (cherry picked from commit dcdcb80c53681d1daff416c007cf8a2810155625) Signed-off-by: huaxingao <huaxin_gao@apple.com> 21 January 2023, 02:35:57 UTC
ffa6cbf [SPARK-40817][K8S][3.3] `spark.files` should preserve remote files ### What changes were proposed in this pull request? Backport https://github.com/apache/spark/pull/38376 to `branch-3.3` You can find a detailed description of the issue and an example reproduction on the Jira card: https://issues.apache.org/jira/browse/SPARK-40817 The idea for this fix is to update the logic which uploads user-specified files (via `spark.jars`, `spark.files`, etc) to `spark.kubernetes.file.upload.path`. After uploading local files, it used to overwrite the initial list of URIs passed by the user and it would thus erase all remote URIs which were specified there. Small example of this behaviour: 1. User set the value of `spark.jars` to `s3a://some-bucket/my-application.jar,/tmp/some-local-jar.jar` when running `spark-submit` in cluster mode 2. `BasicDriverFeatureStep.getAdditionalPodSystemProperties()` gets called at one point while running `spark-submit` 3. This function would set `spark.jars` to a new value of `${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`. Note that `s3a://some-bucket/my-application.jar` has been discarded. With the logic proposed in this PR, the new value of `spark.jars` would be `s3a://some-bucket/my-application.jar,${SPARK_KUBERNETES_UPLOAD_PATH}/spark-upload-${RANDOM_STRING}/some-local-jar.jar`, so in other words we are making sure that remote URIs are no longer discarded. ### Why are the changes needed? We encountered this issue in production when trying to launch Spark on Kubernetes jobs in cluster mode with a fix of local and remote dependencies. ### Does this PR introduce _any_ user-facing change? Yes, see description of the new behaviour above. ### How was this patch tested? - Added a unit test for the new behaviour - Added an integration test for the new behaviour - Tried this patch in our Kubernetes environment with `SparkPi`: ``` spark-submit \ --master k8s://https://$KUBERNETES_API_SERVER_URL:443 \ --deploy-mode cluster \ --name=spark-submit-test \ --class org.apache.spark.examples.SparkPi \ --conf spark.jars=/opt/my-local-jar.jar,s3a://$BUCKET_NAME/my-remote-jar.jar \ --conf spark.kubernetes.file.upload.path=s3a://$BUCKET_NAME/my-upload-path/ \ [...] /opt/spark/examples/jars/spark-examples_2.12-3.1.3.jar ``` Before applying the patch, `s3a://$BUCKET_NAME/my-remote-jar.jar` was discarded from the final value of `spark.jars`. After applying the patch and launching the job again, I confirmed that `s3a://$BUCKET_NAME/my-remote-jar.jar` was no longer discarded by looking at the Spark config for the running job. Closes #39669 from antonipp/spark-40817-branch-3.3. Authored-by: Anton Ippolitov <anton.ippolitov@datadoghq.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 20 January 2023, 22:39:56 UTC
0d37682 [SPARK-42110][SQL][TESTS] Reduce the number of repetition in ParquetDeltaEncodingSuite.`random data test` ### What changes were proposed in this pull request? `random data test` is consuming about 4 minutes in GitHub Action and worse in some other environment. ### Why are the changes needed? - https://github.com/apache/spark/actions/runs/3948081724/jobs/6757667891 ``` ParquetDeltaEncodingInt - random data test (1 minute, 51 seconds) ... ParquetDeltaEncodingLong ... - random data test (1 minute, 54 seconds) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass the CIs. Closes #39648 from dongjoon-hyun/SPARK-42110. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit d4f757baca64b4b66dd8f4e0b09bf085cce34af5) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 19 January 2023, 00:36:05 UTC
9a8b652 [SPARK-42084][SQL] Avoid leaking the qualified-access-only restriction This is a better fix than https://github.com/apache/spark/pull/39077 and https://github.com/apache/spark/pull/38862 The special attribute metadata `__qualified_access_only` is very risky, as it breaks normal column resolution. The aforementioned 2 PRs remove the restriction in `SubqueryAlias` and `Alias`, but it's not good enough as we may forget to do the same thing for new logical plans/expressions in the future. It's also problematic if advanced users manipulate logical plans and expressions directly, when there is no `SubqueryAlias` and `Alias` to remove the restriction. To be safe, we should only apply this restriction when resolving join hidden columns, which means the plan node right above `Project(Join(using or natural join))`. This PR simply removes the restriction when a column is resolved from a sequence of `Attributes`, or from star expansion, and also when adding the `Project` hidden columns to its output. This makes sure that the qualified-access-only restriction will not be leaked to normal column resolution, but only metadata column resolution. To make the join hidden column feature more robust No existing tests Closes #39596 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 18 January 2023, 10:58:56 UTC
408b583 [SPARK-42071][CORE] Register `scala.math.Ordering$Reverse` to KyroSerializer ### What changes were proposed in this pull request? This PR aims to register `scala.math.Ordering$Reverse` to KyroSerializer. ### Why are the changes needed? Scala 2.12.12 added a new class 'Reverse' via https://github.com/scala/scala/pull/8965. This affects Apache Spark 3.2.0+. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs with newly added test case. Closes #39578 from dongjoon-hyun/SPARK-42071. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit e3c0fbeadfe5242fa6265cb0646d72d3b5f6ef35) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 15 January 2023, 09:10:03 UTC
ff30903 [SPARK-41989][PYTHON] Avoid breaking logging config from pyspark.pandas See https://issues.apache.org/jira/browse/SPARK-41989 for in depth explanation Short summary: `pyspark/pandas/__init__.py` uses, at import time, `logging.warning()` which might silently call `logging.basicConfig()`. So by importing `pyspark.pandas` (directly or indirectly) a user might unknowingly break their own logging setup (e.g. when based on `logging.basicConfig()` or related). `logging.getLogger(...).warning()` does not trigger this behavior. User-defined logging setups will be more predictable. Manual testing so far. I'm not sure it's worthwhile to cover this with a unit test Closes #39516 from soxofaan/SPARK-41989-pyspark-pandas-logging-setup. Authored-by: Stefaan Lippens <stefaan.lippens@vito.be> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 04836babb7a1a2aafa7c65393c53c42937ef75a4) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 12 January 2023, 09:25:30 UTC
b97f79d [SPARK-41162][SQL][3.3] Fix anti- and semi-join for self-join with aggregations ### What changes were proposed in this pull request? Backport #39131 to branch-3.3. Rule `PushDownLeftSemiAntiJoin` should not push an anti-join below an `Aggregate` when the join condition references an attribute that exists in its right plan and its left plan's child. This usually happens when the anti-join / semi-join is a self-join while `DeduplicateRelations` cannot deduplicate those attributes (in this example due to the projection of `value` to `id`). This behaviour already exists for `Project` and `Union`, but `Aggregate` lacks this safety guard. ### Why are the changes needed? Without this change, the optimizer creates an incorrect plan. This example fails with `distinct()` (an aggregation), and succeeds without `distinct()`, but both queries are identical: ```scala val ids = Seq(1, 2, 3).toDF("id").distinct() val result = ids.withColumn("id", $"id" + 1).join(ids, Seq("id"), "left_anti").collect() assert(result.length == 1) ``` With `distinct()`, rule `PushDownLeftSemiAntiJoin` creates a join condition `(value#907 + 1) = value#907`, which can never be true. This effectively removes the anti-join. **Before this PR:** The anti-join is fully removed from the plan. ``` == Physical Plan == AdaptiveSparkPlan (16) +- == Final Plan == LocalTableScan (1) (16) AdaptiveSparkPlan Output [1]: [id#900] Arguments: isFinalPlan=true ``` This is caused by `PushDownLeftSemiAntiJoin` adding join condition `(value#907 + 1) = value#907`, which is wrong as because `id#910` in `(id#910 + 1) AS id#912` exists in the right child of the join as well as in the left grandchild: ``` === Applying Rule org.apache.spark.sql.catalyst.optimizer.PushDownLeftSemiAntiJoin === !Join LeftAnti, (id#912 = id#910) Aggregate [id#910], [(id#910 + 1) AS id#912] !:- Aggregate [id#910], [(id#910 + 1) AS id#912] +- Project [value#907 AS id#910] !: +- Project [value#907 AS id#910] +- Join LeftAnti, ((value#907 + 1) = value#907) !: +- LocalRelation [value#907] :- LocalRelation [value#907] !+- Aggregate [id#910], [id#910] +- Aggregate [id#910], [id#910] ! +- Project [value#914 AS id#910] +- Project [value#914 AS id#910] ! +- LocalRelation [value#914] +- LocalRelation [value#914] ``` The right child of the join and in the left grandchild would become the children of the pushed-down join, which creates an invalid join condition. **After this PR:** Join condition `(id#910 + 1) AS id#912` is understood to become ambiguous as both sides of the prospect join contain `id#910`. Hence, the join is not pushed down. The rule is then not applied any more. The final plan contains the anti-join: ``` == Physical Plan == AdaptiveSparkPlan (24) +- == Final Plan == * BroadcastHashJoin LeftSemi BuildRight (14) :- * HashAggregate (7) : +- AQEShuffleRead (6) : +- ShuffleQueryStage (5), Statistics(sizeInBytes=48.0 B, rowCount=3) : +- Exchange (4) : +- * HashAggregate (3) : +- * Project (2) : +- * LocalTableScan (1) +- BroadcastQueryStage (13), Statistics(sizeInBytes=1024.0 KiB, rowCount=3) +- BroadcastExchange (12) +- * HashAggregate (11) +- AQEShuffleRead (10) +- ShuffleQueryStage (9), Statistics(sizeInBytes=48.0 B, rowCount=3) +- ReusedExchange (8) (8) ReusedExchange [Reuses operator id: 4] Output [1]: [id#898] (24) AdaptiveSparkPlan Output [1]: [id#900] Arguments: isFinalPlan=true ``` ### Does this PR introduce _any_ user-facing change? It fixes correctness. ### How was this patch tested? Unit tests in `DataFrameJoinSuite` and `LeftSemiAntiJoinPushDownSuite`. Closes #39409 from EnricoMi/branch-antijoin-selfjoin-fix-3.3. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 06 January 2023, 03:32:45 UTC
977e445 [SPARK-41864][INFRA][PYTHON] Fix mypy linter errors Currently, the GitHub Action Python linter job is broken. This PR will recover Python linter failure. There are two kind of failures. 1. https://github.com/apache/spark/actions/runs/3829330032/jobs/6524170799 ``` python/pyspark/pandas/sql_processor.py:221: error: unused "type: ignore" comment Found 1 error in 1 file (checked 380 source files) ``` 2. After fixing (1), we hit the following. ``` ModuleNotFoundError: No module named 'py._path'; 'py' is not a package ``` No. Pass the GitHub CI on this PR. Or, manually run the following. ``` $ dev/lint-python starting python compilation test... python compilation succeeded. starting black test... black checks passed. starting flake8 test... flake8 checks passed. starting mypy annotations test... annotations passed mypy checks. starting mypy examples test... examples passed mypy checks. starting mypy data test... annotations passed data checks. all lint-python tests passed! ``` Closes #39373 from dongjoon-hyun/SPARK-41864. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 13b2856e6e77392a417d2bb2ce804f873ee72b28) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 04 January 2023, 03:10:04 UTC
2da30ad [SPARK-41863][INFRA][PYTHON][TESTS] Skip `flake8` tests if the command is not available ### What changes were proposed in this pull request? This PR aims to skip `flake8` tests if the command is not available. ### Why are the changes needed? Linters are optional modules and we can be skip in some systems like `mypy`. ``` $ dev/lint-python starting python compilation test... python compilation succeeded. The Python library providing 'black' module was not found. Skipping black checks for now. The flake8 command was not found. Skipping for now. The mypy command was not found. Skipping for now. all lint-python tests passed! ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual tests. Closes #39372 from dongjoon-hyun/SPARK-41863. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 1a5ef40a4d59b377b028b55ea3805caf5d55f28f) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 03 January 2023, 23:01:52 UTC
02a7fda [SPARK-41732][SQL][SS][3.3] Apply tree-pattern based pruning for the rule SessionWindowing This PR ports back #39245 to branch-3.3. ### What changes were proposed in this pull request? This PR proposes to apply tree-pattern based pruning for the rule SessionWindowing, to minimize the evaluation of rule with SessionWindow node. ### Why are the changes needed? The rule SessionWindowing is unnecessarily evaluated multiple times without proper pruning. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests. Closes #39253 from HeartSaVioR/SPARK-41732-3.3. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 28 December 2022, 11:51:09 UTC
0887a2f Revert "[MINOR][TEST][SQL] Add a CTE subquery scope test case" This reverts commit aa39b06462a98f37be59e239d12edd9f09a25b88. 28 December 2022, 08:57:18 UTC
aa39b06 [MINOR][TEST][SQL] Add a CTE subquery scope test case ### What changes were proposed in this pull request? I noticed we were missing a test case for this in SQL tests, so I added one. ### Why are the changes needed? To ensure we scope CTEs properly in subqueries. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? This is a test case change. Closes #39189 from rxin/cte_test. Authored-by: Reynold Xin <rxin@databricks.com> Signed-off-by: Reynold Xin <rxin@databricks.com> (cherry picked from commit 24edf8ecb5e47af294f89552dfd9957a2d9f193b) Signed-off-by: Reynold Xin <rxin@databricks.com> 23 December 2022, 22:55:54 UTC
19824cf [SPARK-41686][SPARK-41030][BUILD][3.3] Upgrade Apache Ivy to 2.5.1 ### What changes were proposed in this pull request? Upgrade Apache Ivy from 2.5.0 to 2.5.1 ### Why are the changes needed? [CVE-2022-37865](https://www.cve.org/CVERecord?id=CVE-2022-37865) and [CVE-2022-37866](https://nvd.nist.gov/vuln/detail/CVE-2022-37866) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass GA Closes #39176 from tobiasstadler/SPARK-41686. Authored-by: Tobias Stadler <ts.stadler@gmx.de> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 22 December 2022, 12:53:35 UTC
9934b56 [SPARK-41350][3.3][SQL][FOLLOWUP] Allow simple name access of join hidden columns after alias backport https://github.com/apache/spark/pull/39077 to 3.3 Closes #39121 from cloud-fan/backport. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 22 December 2022, 03:26:19 UTC
7cd6907 [SPARK-41668][SQL] DECODE function returns wrong results when passed NULL ### What changes were proposed in this pull request? The DECODE function was implemented for Oracle compatibility. It works similar to CASE expression, but it is supposed to have one major difference: NULL == NULL https://docs.oracle.com/database/121/SQLRF/functions057.htm#SQLRF00631 The Spark implementation does not observe this, however: ``` > select decode(null, 6, 'Spark', NULL, 'SQL', 4, 'rocks'); NULL ``` The result is supposed to be 'SQL'. This PR is to fix the issue. ### Why are the changes needed? Bug fix and Oracle compatibility. ### Does this PR introduce _any_ user-facing change? Yes, DECODE function will return matched value when passed null, instead of always returning null. ### How was this patch tested? New UT. Closes #39163 from gengliangwang/fixDecode. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Gengliang Wang <gengliang@apache.org> (cherry picked from commit e09fcebdfaed22b28abbe5c9336f3a6fc92bd046) Signed-off-by: Gengliang Wang <gengliang@apache.org> 22 December 2022, 02:34:03 UTC
b0c9b73 [SPARK-41535][SQL] Set null correctly for calendar interval fields in `InterpretedUnsafeProjection` and `InterpretedMutableProjection` In `InterpretedUnsafeProjection`, use `UnsafeWriter.write`, rather than `UnsafeWriter.setNullAt`, to set null for interval fields. Also, in `InterpretedMutableProjection`, use `InternalRow.setInterval`, rather than `InternalRow.setNullAt`, to set null for interval fields. This returns the wrong answer: ``` set spark.sql.codegen.wholeStage=false; set spark.sql.codegen.factoryMode=NO_CODEGEN; select first(col1), last(col2) from values (make_interval(0, 0, 0, 7, 0, 0, 0), make_interval(17, 0, 0, 2, 0, 0, 0)) as data(col1, col2); +---------------+---------------+ |first(col1) |last(col2) | +---------------+---------------+ |16 years 2 days|16 years 2 days| +---------------+---------------+ ``` In the above case, `TungstenAggregationIterator` uses `InterpretedUnsafeProjection` to create the aggregation buffer and to initialize all the fields to null. `InterpretedUnsafeProjection` incorrectly calls `UnsafeRowWriter#setNullAt`, rather than `unsafeRowWriter#write`, for the two calendar interval fields. As a result, the writer never allocates memory from the variable length region for the two intervals, and the pointers in the fixed region get left as zero. Later, when `InterpretedMutableProjection` attempts to update the first field, `UnsafeRow#setInterval` picks up the zero pointer and stores interval data on top of the null-tracking bit set. The call to UnsafeRow#setInterval for the second field also stomps the null-tracking bit set. Later updates to the null-tracking bit set (e.g., calls to `setNotNullAt`) further corrupt the interval data, turning `interval 7 years 2 days` into `interval 16 years 2 days`. Even after one fixes the above bug in `InterpretedUnsafeProjection` so that the buffer is created correctly, `InterpretedMutableProjection` has a similar bug to SPARK-41395, except this time for calendar interval data: ``` set spark.sql.codegen.wholeStage=false; set spark.sql.codegen.factoryMode=NO_CODEGEN; select first(col1), last(col2), max(col3) from values (null, null, 1), (make_interval(0, 0, 0, 7, 0, 0, 0), make_interval(17, 0, 0, 2, 0, 0, 0), 3) as data(col1, col2, col3); +---------------+---------------+---------+ |first(col1) |last(col2) |max(col3)| +---------------+---------------+---------+ |16 years 2 days|16 years 2 days|3 | +---------------+---------------+---------+ ``` These two bugs could get exercised during codegen fallback. No. New unit tests. Closes #39117 from bersprockets/unsafe_interval_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 7f153842041d66e9cf0465262f4458cfffda4f43) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 20 December 2022, 00:30:25 UTC
356b56d [SPARK-41365][UI][3.3] Stages UI page fails to load for proxy in specific yarn environment backport https://github.com/apache/spark/pull/38882 ### What changes were proposed in this pull request? Stages UI page fails to load for proxy in some specific yarn environment. ### Why are the changes needed? My environment CDH 5.8 , click to enter the spark UI from the yarn Resource Manager page when visit the stage URI, it fails to load, URI is http://<yarn-url>:8088/proxy/application_1669877165233_0021/stages/stage/?id=0&attempt=0 The issue is similar to, the final phenomenon of the issue is the same, because the parameter encode twice [SPARK-32467](https://issues.apache.org/jira/browse/SPARK-32467) [SPARK-33611](https://issues.apache.org/jira/browse/SPARK-33611) The two issues solve two scenarios to avoid encode twice: 1. https redirect proxy 2. set reverse proxy enabled (spark.ui.reverseProxy) in Nginx But if encode twice due to other reasons, such as this issue (yarn proxy), it will also fail when visit stage page. It is better to decode parameter twice here. Just like fix here [SPARK-12708](https://issues.apache.org/jira/browse/SPARK-12708) [codes](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/ui/UIUtils.scala#L626) ### Does this PR introduce any user-facing change? No ### How was this patch tested? new added UT Closes #39087 from yabola/fixui-backport. Authored-by: chenliang.lu <marssss2929@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 16 December 2022, 19:30:01 UTC
48a2110 [SPARK-41541][SQL] Fix call to wrong child method in SQLShuffleWriteMetricsReporter.decRecordsWritten() ### What changes were proposed in this pull request? This PR fixes a bug in `SQLShuffleWriteMetricsReporter.decRecordsWritten()`: this method is supposed to call the delegate `metricsReporter`'s `decRecordsWritten` method but due to a typo it calls the `decBytesWritten` method instead. ### Why are the changes needed? One of the situations where `decRecordsWritten(v)` is called while reverting shuffle writes from failed/canceled tasks. Due to the mixup in these calls, the _recordsWritten_ metric ends up being _v_ records too high (since it wasn't decremented) and the _bytesWritten_ metric ends up _v_ records too low, causing some failed tasks' write metrics to look like > {"Shuffle Bytes Written":-2109,"Shuffle Write Time":2923270,"Shuffle Records Written":2109} instead of > {"Shuffle Bytes Written":0,"Shuffle Write Time":2923270,"Shuffle Records Written":0} ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing tests / manual code review only. The existing SQLMetricsSuite contains end-to-end tests which exercise this class but they don't exercise the decrement path because they don't exercise the shuffle write failure paths. In theory I could add new unit tests but I don't think the ROI is worth it given that this class is intended to be a simple wrapper and it ~never changes (this PR is the first change to the file in 5 years). Closes #39086 from JoshRosen/SPARK-41541. Authored-by: Josh Rosen <joshrosen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit ed27121607cf526e69420a1faff01383759c9134) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 16 December 2022, 10:16:37 UTC
b23198e [SPARK-41538][SQL] Metadata column should be appended at the end of project list ### What changes were proposed in this pull request? For the following query: ``` CREATE TABLE table_1 ( a ARRAY<STRING>, s STRUCT<id: STRING>) USING parquet; CREATE VIEW view_1 (id) AS WITH source AS ( SELECT * FROM table_1 ), renamed AS ( SELECT s.id FROM source ) SELECT id FROM renamed; with foo AS ( SELECT 'a' as id ), bar AS ( SELECT 'a' as id ) SELECT 1 FROM foo FULL OUTER JOIN bar USING(id) FULL OUTER JOIN view_1 USING(id) WHERE foo.id IS NOT NULL ``` There will be the following error: ``` class org.apache.spark.sql.types.ArrayType cannot be cast to class org.apache.spark.sql.types.StructType (org.apache.spark.sql.types.ArrayType and org.apache.spark.sql.types.StructType are in unnamed module of loader 'app') java.lang.ClassCastException: class org.apache.spark.sql.types.ArrayType cannot be cast to class org.apache.spark.sql.types.StructType (org.apache.spark.sql.types.ArrayType and org.apache.spark.sql.types.StructType are in unnamed module of loader 'app') at org.apache.spark.sql.catalyst.expressions.GetStructField.childSchema$lzycompute(complexTypeExtractors.scala:108) at org.apache.spark.sql.catalyst.expressions.GetStructField.childSchema(complexTypeExtractors.scala:108) ``` This is caused by the inconsistent metadata column positions in the following two nodes: * Table relation: at the ending position * Project list: at the beginning position <img width="1442" alt="image" src="https://user-images.githubusercontent.com/1097932/207992343-438714bc-e1d1-46f7-9a79-84ab83dd299f.png"> When the InlineCTE rule executes, the metadata column in the project is wrongly combined with the table output. <img width="1438" alt="image" src="https://user-images.githubusercontent.com/1097932/207992431-f4cfc774-4cab-4728-b109-2ebff94e5fe2.png"> Thus the column `a ARRAY<STRING>` is casted as `s STRUCT<id: STRING>` and cause the error. This PR is to fix the issue by putting the Metadata column at the end of project list, so that it is consistent with the table relation. ### Why are the changes needed? Bug fix ### Does this PR introduce _any_ user-facing change? Yes, it fixes a bug in the analysis rule `AddMetadataColumns` ### How was this patch tested? New test case Closes #39081 from gengliangwang/fixMetadata. Authored-by: Gengliang Wang <gengliang@apache.org> Signed-off-by: Max Gekk <max.gekk@gmail.com> (cherry picked from commit 172f719fffa84a2528628e08627a02cf8d1fe8a8) Signed-off-by: Max Gekk <max.gekk@gmail.com> 16 December 2022, 07:43:29 UTC
8918cbb [SPARK-41522][BUILD] Pin `versions-maven-plugin` to 2.13.0 to recover `test-dependencies.sh` ### What changes were proposed in this pull request? This pr aims to pin `versions-maven-plugin` to 2.13.0 to recover `test-dependencies.sh` and make GA pass , this pr should revert after we know how to use version 2.14.0. ### Why are the changes needed? `dev/test-dependencies.sh` always use latest `versions-maven-plugin` version, and `versions-maven-plugin` 2.14.0 has not set the version of the sub-module. Run: ``` build/mvn -q versions:set -DnewVersion=spark-928034 -DgenerateBackupPoms=false ``` **2.14.0** ``` + git status On branch test-ci Changes not staged for commit: (use "git add <file>..." to update what will be committed) (use "git restore <file>..." to discard changes in working directory) modified: assembly/pom.xml modified: core/pom.xml modified: examples/pom.xml modified: graphx/pom.xml modified: hadoop-cloud/pom.xml modified: launcher/pom.xml modified: mllib-local/pom.xml modified: mllib/pom.xml modified: pom.xml modified: repl/pom.xml modified: streaming/pom.xml modified: tools/pom.xml ``` **2.13.0** ``` + git status On branch test-ci Changes not staged for commit: (use "git add <file>..." to update what will be committed) (use "git restore <file>..." to discard changes in working directory) modified: assembly/pom.xml modified: common/kvstore/pom.xml modified: common/network-common/pom.xml modified: common/network-shuffle/pom.xml modified: common/network-yarn/pom.xml modified: common/sketch/pom.xml modified: common/tags/pom.xml modified: common/unsafe/pom.xml modified: connector/avro/pom.xml modified: connector/connect/common/pom.xml modified: connector/connect/server/pom.xml modified: connector/docker-integration-tests/pom.xml modified: connector/kafka-0-10-assembly/pom.xml modified: connector/kafka-0-10-sql/pom.xml modified: connector/kafka-0-10-token-provider/pom.xml modified: connector/kafka-0-10/pom.xml modified: connector/kinesis-asl-assembly/pom.xml modified: connector/kinesis-asl/pom.xml modified: connector/protobuf/pom.xml modified: connector/spark-ganglia-lgpl/pom.xml modified: core/pom.xml modified: dev/test-dependencies.sh modified: examples/pom.xml modified: graphx/pom.xml modified: hadoop-cloud/pom.xml modified: launcher/pom.xml modified: mllib-local/pom.xml modified: mllib/pom.xml modified: pom.xml modified: repl/pom.xml modified: resource-managers/kubernetes/core/pom.xml modified: resource-managers/kubernetes/integration-tests/pom.xml modified: resource-managers/mesos/pom.xml modified: resource-managers/yarn/pom.xml modified: sql/catalyst/pom.xml modified: sql/core/pom.xml modified: sql/hive-thriftserver/pom.xml modified: sql/hive/pom.xml modified: streaming/pom.xml modified: tools/pom.xml ``` Therefore, the following compilation error will occur when using 2.14.0. ``` 2022-12-15T02:37:35.5536924Z [ERROR] [ERROR] Some problems were encountered while processing the POMs: 2022-12-15T02:37:35.5538469Z [FATAL] Non-resolvable parent POM for org.apache.spark:spark-sketch_2.12:3.4.0-SNAPSHOT: Could not find artifact org.apache.spark:spark-parent_2.12:pom:3.4.0-SNAPSHOT and 'parent.relativePath' points at wrong local POM line 22, column 11 ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #39067 from LuciferYang/test-ci. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit bfe1af9a720ed235937a0fdf665376ffff7cce54) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 15 December 2022, 08:40:14 UTC
9f7baae [SPARK-41360][CORE][BUILD][FOLLOW-UP] Exclude BlockManagerMessages.RegisterBlockManager in MiMa This PR is a followup of https://github.com/apache/spark/pull/38876 that excludes BlockManagerMessages.RegisterBlockManager in MiMa compatibility check. It fails in MiMa check presumably with Scala 2.13 in other branches. Should be safer to exclude them all in the affected branches. No, dev-only. Filters copied from error messages. Will monitor the build in other branches. Closes #39052 from HyukjinKwon/SPARK-41360-followup. Authored-by: Hyukjin Kwon <gurwls223@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit a2ceff29f9d1c0133fa0c8274fa84c43106e90f0) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 13 December 2022, 14:36:23 UTC
b9a1b71 [SPARK-41360][CORE] Avoid BlockManager re-registration if the executor has been lost ### What changes were proposed in this pull request? This PR majorly proposes to reject the block manager re-registration if the executor has been already considered lost/dead from the scheduler backend. Along with the major proposal, this PR also includes a few other changes: * Only post `SparkListenerBlockManagerAdded` event when the registration succeeds * Return an "invalid" executor id when the re-registration fails * Do not report all blocks when the re-registration fails ### Why are the changes needed? BlockManager re-registration from lost executor (terminated/terminating executor or orphan executor) has led to some known issues, e.g., false-active executor shows up in UP (SPARK-35011), [block fetching to the dead executor](https://github.com/apache/spark/pull/32114#issuecomment-899979045). And since there's no re-registration from the lost executor itself, it's meaningless to have BlockManager re-registration when the executor is already lost. Regarding the corner case where the re-registration event comes earlier before the lost executor is actually removed from the scheduler backend, I think it is not possible. Because re-registration will only be required when the BlockManager doesn't see the block manager in `blockManagerInfo`. And the block manager will only be removed from `blockManagerInfo` whether when the executor is already know lost or removed by the driver proactively. So the executor should always be removed from the scheduler backend first before the re-registration event comes. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38876 from Ngone51/fix-blockmanager-reregister. Authored-by: Yi Wu <yi.wu@databricks.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit c3f46d5c6d69a9b21473dae6d86dee53833dfd52) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> 12 December 2022, 19:29:46 UTC
70f3d2f [SPARK-41448] Make consistent MR job IDs in FileBatchWriter and FileFormatWriter ### What changes were proposed in this pull request? Make consistent MR job IDs in FileBatchWriter and FileFormatWriter ### Why are the changes needed? [SPARK-26873](https://issues.apache.org/jira/browse/SPARK-26873) fix the consistent issue for FileFormatWriter, but [SPARK-33402](https://issues.apache.org/jira/browse/SPARK-33402) break this requirement by introducing a random long, we need to address this to expects identical task IDs across attempts for correctness. Also FileBatchWriter doesn't follow this requirement, need to fix it as well. ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? Closes #38980 from boneanxs/SPARK-41448. Authored-by: Hui An <hui.an@shopee.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 7801666f3b5ea3bfa0f95571c1d68147ce5240ec) Signed-off-by: Wenchen Fan <wenchen@databricks.com> 12 December 2022, 10:18:16 UTC
231c63a [SPARK-41468][SQL] Fix PlanExpression handling in EquivalentExpressions ### What changes were proposed in this pull request? https://github.com/apache/spark/pull/36012 already added a check to avoid adding expressions containing `PlanExpression`s to `EquivalentExpressions` as those expressions might cause NPE on executors. But, for some reason, the check is still missing from `getExprState()` where we check the presence of an experssion in the equivalence map. This PR: - adds the check to `getExprState()` - moves the check from `updateExprTree()` to `addExprTree()` so as to run it only once. ### Why are the changes needed? To avoid exceptions like: ``` org.apache.spark.SparkException: Task failed while writing rows. at org.apache.spark.sql.errors.QueryExecutionErrors$.taskFailedWhileWritingRowsError(QueryExecutionErrors.scala:642) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:348) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$write$21(FileFormatWriter.scala:256) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:136) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628) at java.base/java.lang.Thread.run(Thread.java:834) Caused by: java.lang.NullPointerException at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.$anonfun$doCanonicalize$1(InMemoryTableScanExec.scala:51) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62) at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doCanonicalize(InMemoryTableScanExec.scala:51) at org.apache.spark.sql.execution.columnar.InMemoryTableScanExec.doCanonicalize(InMemoryTableScanExec.scala:30) ... at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:541) at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:850) at org.apache.spark.sql.execution.SubqueryExec.doCanonicalize(basicPhysicalOperators.scala:814) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized$lzycompute(QueryPlan.scala:542) at org.apache.spark.sql.catalyst.plans.QueryPlan.canonicalized(QueryPlan.scala:541) at org.apache.spark.sql.execution.ScalarSubquery.preCanonicalized$lzycompute(subquery.scala:72) at org.apache.spark.sql.execution.ScalarSubquery.preCanonicalized(subquery.scala:71) ... at org.apache.spark.sql.catalyst.expressions.Expression.canonicalized(Expression.scala:261) at org.apache.spark.sql.catalyst.expressions.Expression.semanticHash(Expression.scala:278) at org.apache.spark.sql.catalyst.expressions.ExpressionEquals.hashCode(EquivalentExpressions.scala:226) at scala.runtime.Statics.anyHash(Statics.java:122) at scala.collection.mutable.HashTable$HashUtils.elemHashCode(HashTable.scala:416) at scala.collection.mutable.HashTable$HashUtils.elemHashCode$(HashTable.scala:416) at scala.collection.mutable.HashMap.elemHashCode(HashMap.scala:44) at scala.collection.mutable.HashTable.findEntry(HashTable.scala:136) at scala.collection.mutable.HashTable.findEntry$(HashTable.scala:135) at scala.collection.mutable.HashMap.findEntry(HashMap.scala:44) at scala.collection.mutable.HashMap.get(HashMap.scala:74) at org.apache.spark.sql.catalyst.expressions.EquivalentExpressions.getExprState(EquivalentExpressions.scala:180) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.replaceWithProxy(SubExprEvaluationRuntime.scala:78) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.$anonfun$proxyExpressions$3(SubExprEvaluationRuntime.scala:109) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.AbstractTraversable.map(Traversable.scala:108) at org.apache.spark.sql.catalyst.expressions.SubExprEvaluationRuntime.proxyExpressions(SubExprEvaluationRuntime.scala:109) at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection.<init>(InterpretedUnsafeProjection.scala:40) at org.apache.spark.sql.catalyst.expressions.InterpretedUnsafeProjection$.createProjection(InterpretedUnsafeProjection.scala:112) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createInterpretedObject(Projection.scala:127) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.createInterpretedObject(Projection.scala:119) at org.apache.spark.sql.catalyst.expressions.CodeGeneratorWithInterpretedFallback.createObject(CodeGeneratorWithInterpretedFallback.scala:56) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:150) at org.apache.spark.sql.catalyst.expressions.UnsafeProjection$.create(Projection.scala:160) at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1(basicPhysicalOperators.scala:95) at org.apache.spark.sql.execution.ProjectExec.$anonfun$doExecute$1$adapted(basicPhysicalOperators.scala:94) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2(RDD.scala:877) at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndexInternal$2$adapted(RDD.scala:877) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.UnionRDD.compute(UnionRDD.scala:106) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.CoalescedRDD.$anonfun$compute$1(CoalescedRDD.scala:99) at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:486) at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:492) at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460) at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.writeWithIterator(FileFormatDataWriter.scala:91) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.$anonfun$executeTask$1(FileFormatWriter.scala:331) at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1538) at org.apache.spark.sql.execution.datasources.FileFormatWriter$.executeTask(FileFormatWriter.scala:338) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Existing UTs. Closes #39010 from peter-toth/SPARK-41468-fix-planexpressions-in-equivalentexpressions. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 1b2d7001f2924738b61609a5399ebc152969b5c8) Signed-off-by: Wenchen Fan <wenchen@databricks.com> 12 December 2022, 10:14:51 UTC
1c6cb35 [SPARK-41187][CORE] LiveExecutor MemoryLeak in AppStatusListener when ExecutorLost happen ### What changes were proposed in this pull request? Ignore the SparkListenerTaskEnd with Reason "Resubmitted" in AppStatusListener to avoid memory leak ### Why are the changes needed? For a long running spark thriftserver, LiveExecutor will be accumulated in the deadExecutors HashMap and cause message event queue processing slowly. For a every task, actually always sent out a `SparkListenerTaskStart` event and a `SparkListenerTaskEnd` event, they are always pairs. But in a executor lost situation, it send out event like following steps. a) There was a pair of task start and task end event which were fired for the task (let us call it Tr) b) When executor which ran Tr was lost, while stage is still running, a task end event with reason `Resubmitted` is fired for Tr. c) Subsequently, a new task start and task end will be fired for the retry of Tr. The processing of the `Resubmitted` task end event in AppStatusListener can lead to negative `LiveStage.activeTasks` since there's no corresponding `SparkListenerTaskStart` event for each of them. The negative activeTasks will make the stage always remains in the live stage list as it can never meet the condition activeTasks == 0. This in turn causes the dead executor to never be cleaned up if that live stage's submissionTime is less than the dead executor's removeTime( see isExecutorActiveForLiveStages). Since this kind of `SparkListenerTaskEnd` is useless here, we simply ignore it. Check [SPARK-41187](https://issues.apache.org/jira/browse/SPARK-41187) for evidences. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT Added Test in thriftserver env ### The way to reproduce I try to reproduce it in spark shell, but it is a little bit handy 1. start spark-shell , set spark.dynamicAllocation.maxExecutors=2 for convient ` bin/spark-shell --driver-java-options "-agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=8006"` 2. run a job with shuffle `sc.parallelize(1 to 1000, 10).map { x => Thread.sleep(1000) ; (x % 3, x) }.reduceByKey((a, b) => a + b).collect()` 3. After some ShuffleMapTask finished, kill one or two executor to let tasks resubmitted 4. check by heap dump or debug or log Closes #38702 from wineternity/SPARK-41187. Authored-by: yuanyimeng <yuanyimeng@youzan.com> Signed-off-by: Mridul Muralidharan <mridul<at>gmail.com> (cherry picked from commit 7e7bc940dcbbf918c7d571e1d27c7654ad387817) Signed-off-by: Mridul Muralidharan <mridulatgmail.com> 12 December 2022, 04:49:48 UTC
0df0fd8 [SPARK-41476][INFRA] Prevent `README.md` from triggering CIs ### What changes were proposed in this pull request? This PR prevents `README.md`-only changes from triggering CIs. ### Why are the changes needed? While CIs go slower and slower, we are also getting more and more `README.md` files. ``` $ find . -name 'README.md' ./resource-managers/kubernetes/integration-tests/README.md ./core/src/main/resources/error/README.md ./core/src/main/scala/org/apache/spark/deploy/security/README.md ./hadoop-cloud/README.md ./python/README.md ./R/pkg/README.md ./docs/README.md ./README.md ./common/network-common/src/main/java/org/apache/spark/network/crypto/README.md ./common/tags/README.md ./connector/docker/README.md ./connector/docker/spark-test/README.md ./connector/connect/README.md ./connector/protobuf/README.md ./dev/README.md ./dev/ansible-for-test-node/roles/common/README.md ./dev/ansible-for-test-node/roles/jenkins-worker/README.md ./dev/ansible-for-test-node/README.md ./sql/core/src/test/README.md ./sql/core/src/main/scala/org/apache/spark/sql/test/README.md ./sql/core/src/main/scala/org/apache/spark/sql/jdbc/README.md ./sql/README.md ``` We can exclude these files in order to save the community CI resources. https://github.com/apache/spark/blob/435f6b1b3588d8c3c719f0e23b91209dd5f7bdb9/.github/workflows/build_and_test.yml#L86-L89 ### Does this PR introduce _any_ user-facing change? No. This is an infra-only change. ### How was this patch tested? Pass the doctest. ``` python -m doctest sparktestsupport/utils.py ``` Closes #39015 from dongjoon-hyun/SPARK-41476. Authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit c4af4b0cca4503e2fa00cac11b2d59c7dacd40df) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 10 December 2022, 08:09:20 UTC
88d20e4 [SPARK-41458][BUILD][YARN][SHUFFLE] Correctly transform the SPI services for Yarn Shuffle Service ### What changes were proposed in this pull request? Correctly transform the SPI services for Yarn Shuffle Service by configuring `ServicesResourceTransformer`. ### Why are the changes needed? SPARK-12807 relocated the Jackson classes, but did not handle SPI services properly. It affects Spark 2.0 and above, so this PR is for 3.2/3.3/master. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually checked the output jar. Before: <img width="772" alt="Xnip2022-12-09_12-49-31" src="https://user-images.githubusercontent.com/26535726/206632421-acbec562-c600-4497-83a3-f9b2f6caba74.png"> After: <img width="1209" alt="Xnip2022-12-09_12-53-39" src="https://user-images.githubusercontent.com/26535726/206632440-4c8ed745-dbc8-4b6e-a9e7-f285521aa8b4.png"> Closes #38989 from pan3793/SPARK-41458. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit be52d67fbe98110eeabf1b2a7c16741dceefdca6) Signed-off-by: Sean Owen <srowen@gmail.com> 09 December 2022, 14:14:25 UTC
b6c6526 [SPARK-40270][PS][FOLLOWUP][3.3] Skip test_style when pandas <1.3.0 ### What changes were proposed in this pull request? According to https://pandas.pydata.org/docs/reference/api/pandas.io.formats.style.Styler.to_latex.html: `pandas.io.formats.style.Styler.to_latex` introduced since 1.3.0, so before panda 1.3.0, should skip the check ``` ERROR [0.180s]: test_style (pyspark.pandas.tests.test_dataframe.DataFrameTest) ---------------------------------------------------------------------- Traceback (most recent call last): File "/__w/spark/spark/python/pyspark/pandas/tests/test_dataframe.py", line 5795, in test_style check_style() File "/__w/spark/spark/python/pyspark/pandas/tests/test_dataframe.py", line 5793, in check_style self.assert_eq(pdf_style.to_latex(), psdf_style.to_latex()) AttributeError: 'Styler' object has no attribute 'to_latex' ``` Related: https://github.com/apache/spark/commit/58375a86e6ff49c5bcee49939fbd98eb848ae59f ### Why are the changes needed? This test break the 3.2 branch pyspark test (with python 3.6 + pandas 1.1.x), so I think better add the `skipIf` it. See also https://github.com/apache/spark/pull/38982#issuecomment-1343923114 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - CI passed Closes #39007 from Yikun/branch-3.3-check. Authored-by: Yikun Jiang <yikunkero@gmail.com> Signed-off-by: Yikun Jiang <yikunkero@gmail.com> 09 December 2022, 14:13:09 UTC
d39e7ba [SPARK-41395][SQL] `InterpretedMutableProjection` should use `setDecimal` to set null values for decimals in an unsafe row Change `InterpretedMutableProjection` to use `setDecimal` rather than `setNullAt` to set null values for decimals in unsafe rows. The following returns the wrong answer: ``` set spark.sql.codegen.wholeStage=false; set spark.sql.codegen.factoryMode=NO_CODEGEN; select max(col1), max(col2) from values (cast(null as decimal(27,2)), cast(null as decimal(27,2))), (cast(77.77 as decimal(27,2)), cast(245.00 as decimal(27,2))) as data(col1, col2); +---------+---------+ |max(col1)|max(col2)| +---------+---------+ |null |239.88 | +---------+---------+ ``` This is because `InterpretedMutableProjection` inappropriately uses `InternalRow#setNullAt` on unsafe rows to set null for decimal types with precision > `Decimal.MAX_LONG_DIGITS`. When `setNullAt` is used, the pointer to the decimal's storage area in the variable length region gets zeroed out. Later, when `InterpretedMutableProjection` calls `setDecimal` on that field, `UnsafeRow#setDecimal` picks up the zero pointer and stores decimal data on top of the null-tracking bit set. Later updates to the null-tracking bit set (e.g., calls to `setNotNullAt`) further corrupt the decimal data (turning 245.00 into 239.88, for example). The stomping of the null-tracking bit set also can make non-null fields appear null (turning 77.77 into null, for example). This bug can manifest for end-users after codegen fallback (say, if an expression's generated code fails to compile). [Codegen for mutable projection](https://github.com/apache/spark/blob/89b2ee27d258dec8fe265fa862846e800a374d8e/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala#L1729) uses `mutableRow.setDecimal` for null decimal values regardless of precision or the type for `mutableRow`, so this PR does the same. No. New unit tests. Closes #38923 from bersprockets/unsafe_decimal_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit fec210b36be22f187b51b67970960692f75ac31f) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 09 December 2022, 12:46:31 UTC
1f074ff [SPARK-41376][CORE][3.3] Correct the Netty preferDirectBufs check logic on executor start ### What changes were proposed in this pull request? Backport #38901 to branch-3.3. Fix the condition for judging Netty prefer direct memory on executor start, the logic should match `org.apache.spark.network.client.TransportClientFactory`. ### Why are the changes needed? The check logical was added in SPARK-27991, the original intention is to avoid potential Netty OOM issue when Netty uses direct memory to consume shuffle data, but the condition is not sufficient, this PR updates the logic to match `org.apache.spark.network.client.TransportClientFactory` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manual testing. Closes #38981 from pan3793/SPARK-41376-3.3. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 08 December 2022, 18:40:09 UTC
02f32ee [SPARK-41375][SS] Avoid empty latest KafkaSourceOffset ### What changes were proposed in this pull request? Add the empty offset filter in `latestOffset()` for Kafka Source, so that offset remains unchanged if Kafka provides no topic partition during fetch. ### Why are the changes needed? KafkaOffsetReader may fetch empty partitions in some extreme cases like getting partitions while Kafka cluster is reassigning partitions, this will produce an empty `PartitionOffsetMap` (although there are topic-partitions being unchanged) and stored in `committedOffsets` after `runBatch()`. Then in the next batch, we fetch partitions normally and get the actual offsets, but when fetching data of this batch in `KafkaOffsetReaderAdmin#getOffsetRangesFromResolvedOffsets()` all partitions in endOffsets will be considered as new partitions since the startOffsets is empty, then these "new partitions" will fetch earliest offsets, which will cause the data duplication. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Add an unit test. Closes #38898 from wecharyu/SPARK-41375. Authored-by: wecharyu <yuwq1996@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 043475a87844f11c252fb0ebab469148ae6985d7) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 08 December 2022, 08:12:45 UTC
5a91b21 [SPARK-38277][SS] Clear write batch after RocksDB state store's commit ### What changes were proposed in this pull request? This PR proposes to clear the write batch (and also corresponding prefix iterators) after commit has succeeded on RocksDB state store. This PR also fixes the test case as side effect, as it had been relying on the "sort of bug" that we didn't clean up write batch till either rollback or load has been called. ### Why are the changes needed? Without this, the memory usage of WriteBatch for RocksDB state store is "accumulated" over the partitions in the same executor. Say, 10 partitions in stateful operator are assigned to an executor and run sequentially. Given that we didn't clear write batch after commit, when the executor processes the last partition assigned to it, 10 WriteBatch instances contain all writes being performed in this microbatch. ### Does this PR introduce _any_ user-facing change? No. This is a sort of bugfix. ### How was this patch tested? Existing tests, with fixing the test case. Closes #38880 from HeartSaVioR/SPARK-38277. Lead-authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Co-authored-by: Yun Tang <myasuka@live.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit 9eabe67a693c28509dda25b3e5998eb7ca7a3aa9) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 08 December 2022, 06:52:01 UTC
398948e [SPARK-41350][SQL] Allow simple name access of join hidden columns after subquery alias <!-- Thanks for sending a pull request! Here are some tips for you: 1. If this is your first time, please read our contributor guidelines: https://spark.apache.org/contributing.html 2. Ensure you have added or run the appropriate tests for your PR: https://spark.apache.org/developer-tools.html 3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP][SPARK-XXXX] Your PR title ...'. 4. Be sure to keep the PR description updated to reflect all changes. 5. Please write your PR title to summarize what this PR proposes. 6. If possible, provide a concise example to reproduce the issue for a faster review. 7. If you want to add a new configuration, please read the guideline first for naming configurations in 'core/src/main/scala/org/apache/spark/internal/config/ConfigEntry.scala'. 8. If you want to add or modify an error type or message, please read the guideline first in 'core/src/main/resources/error/README.md'. --> ### What changes were proposed in this pull request? <!-- Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue. If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below. 1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers. 2. If you fix some SQL features, you can provide some references of other DBMSes. 3. If there is design documentation, please add the link. 4. If there is a discussion in the mailing list, please add the link. --> This fixes a regression caused by https://github.com/apache/spark/pull/37758 . In https://github.com/apache/spark/pull/37758 , we decided to only allow qualified name access for using/natural join hidden columns, to fix other problems around hidden columns. We thought that is not a breaking change, as you can only access the join hidden columns by qualified names to disambiguate. However, one case is missed: when we wrap the join with a subquery alias, the ambiguity is gone and we should allow simple name access. This PR fixes this bug by removing the qualified access only restriction in `SubqueryAlias.output`. ### Why are the changes needed? <!-- Please clarify why the changes are needed. For instance, 1. If you propose a new API, clarify the use case for a new API. 2. If you fix a bug, you can clarify why it is a bug. --> fix a regression. ### Does this PR introduce _any_ user-facing change? <!-- Note that it means *any* user-facing change including all aspects such as the documentation fix. If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible. If possible, please also clarify if this is a user-facing change compared to the released Spark versions or within the unreleased branches such as master. If no, write 'No'. --> Yes, certain querys that failed with `UNRESOLVED_COLUMN` before this PR can work now. ### How was this patch tested? <!-- If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible. If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future. If tests were not added, please describe why they were not added and/or why it was difficult to add. If benchmark tests were added, please run the benchmarks in GitHub Actions for the consistent environment, and the instructions could accord to: https://spark.apache.org/developer-tools.html#github-workflow-benchmarks. --> new tests Closes #38862 from cloud-fan/join. Authored-by: Wenchen Fan <wenchen@databricks.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 06 December 2022, 04:31:17 UTC
e8a4fb8 [SPARK-41151][FOLLOW-UP][SQL][3.3] Keep built-in file _metadata fields nullable value consistent ### What changes were proposed in this pull request? Cherry-pick https://github.com/apache/spark/pull/38777. Resolved conflicts in https://github.com/apache/spark/commit/ac2d027a768f50e279a1785ebf4dae1a37b7d3f4 ### Why are the changes needed? N/A ### Does this PR introduce _any_ user-facing change? N/A ### How was this patch tested? N/A Closes #38910 from Yaohua628/spark-41151-follow-up-3-3. Authored-by: yaohua <yaohua.zhao@databricks.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 06 December 2022, 00:23:07 UTC
651f5da [SPARK-41388][K8S] `getReusablePVCs` should ignore recently created PVCs in the previous batch This PR aims to prevent `getReusablePVCs` from choosing recently created PVCs in the very previous batch by excluding newly created PVCs whose creation time is within `spark.kubernetes.allocation.batch.delay`. In case of slow K8s control plane situation where `spark.kubernetes.allocation.batch.delay` is too short relatively or `spark.kubernetes.executor.enablePollingWithResourceVersion=true` is used, `onNewSnapshots` may not return the full list of executor pods created by the previous batch. This sometimes makes Spark driver think the PVCs in the previous batch are reusable for the next batch. No. Pass the CIs with the newly created test case. Closes #38912 from dongjoon-hyun/SPARK-41388. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit e234cd8276a603ab8a191dd078b11c605b22a50c) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 05 December 2022, 09:20:12 UTC
5e68c98 [SPARK-41385][K8S] Replace deprecated `.newInstance()` in K8s module ### What changes were proposed in this pull request? This PR aims to replace the deprecated `Class.newInstance` with `Class.getConstructor.newInstance`. ### Why are the changes needed? SPARK-25984 removed these instances at Spark 3.0.0. SPARK-37145 introduced newly two instances at Spark 3.3.0. ``` $ git grep classForName | grep newInstance | grep -v getConstructor resource-managers/kubernetes/core/src/main/scala/org/apache/spark/deploy/k8s/submit/KubernetesDriverBuilder.scala: val feature = Utils.classForName[Any](className).newInstance() resource-managers/kubernetes/core/src/main/scala/org/apache/spark/scheduler/cluster/k8s/KubernetesExecutorBuilder.scala: val feature = Utils.classForName[Any](className).newInstance() ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #38909 from dongjoon-hyun/SPARK-41385. Authored-by: Dongjoon Hyun <dhyun@apple.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 94829065f677221938d00f77e60a07bcccecf9d4) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 05 December 2022, 06:26:49 UTC
7405a40 [SPARK-41379][SS][PYTHON] Provide cloned spark session in DataFrame in user function for foreachBatch sink in PySpark ### What changes were proposed in this pull request? This PR proposes to provide cloned spark session in DataFrame in user function for foreachBatch sink in PySpark. ### Why are the changes needed? It's arguable a bug - previously given DataFrame is associated with two different SparkSessions, 1) one which runs the streaming query (accessed via `df.sparkSession`) 2) one which microbatch execution "cloned" (accessed via `df._jdf.sparkSession()`). If users pick the 1), it destroys the purpose of cloning spark session, e.g. disabling AQE. Also, which session is picked up depends on the underlying implementation of "each" method in DataFrame, which would give inconsistency. Following is a problematic example: ``` def user_func(batch_df, batch_id): batch_df.createOrReplaceTempView("updates") ... # what is the right way to refer the temp view "updates"? ``` Before this PR, the only way to refer the temp view "updates" is, using "internal" field in DataFrame, `_jdf`. That said, running a new query via `batch_df._jdf.sparkSession()` can only see the temp view defined in the user function. We would like to make this possible without enforcing end users to access "internal" field. After this PR, they can (and should) use `batch_df.sparkSession` instead. ### Does this PR introduce _any_ user-facing change? Yes, this PR makes in sync to which spark session to use. Users can use df.sparkSession to access cloned spark session, which will be the same with the spark session the methods in DataFrame will use. ### How was this patch tested? New test case which fails with current master branch. Closes #38906 from HeartSaVioR/SPARK-41379. Authored-by: Jungtaek Lim <kabhwan.opensource@gmail.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit f4ec6f2eeef7f82d478a1047231f1de1bfc429bd) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 05 December 2022, 05:39:37 UTC
821997b [SPARK-41253][K8S][TESTS] Make Spark K8S volcano IT work in Github Action ### What changes were proposed in this pull request? This patch makes Spark K8s volcano IT can be ran in Github Action resource limited env. It will help downstream community like volcano to enable spark IT test in github action. BTW, there is no plan to enable volcano test in Spark community, this patch only make test work but **DO NOT** enable the volcano test in Apache Spark GA, it will help downstream test. - Change parallel job number from 4 to 2. (Only 1 job in each queue) if in github action env. - Get specified `spark.kubernetes.[driver|executor].request.cores` - Set queue limit according specified [driver|executor].request.cores just like we done in normal test: https://github.com/apache/spark/commit/883a481e44a1f91ef3fc3aea2838a598cbd6cf0f ### Why are the changes needed? It helps downstream communitys who want to use free github action hosted resources to enable spark IT test in github action. ### Does this PR introduce _any_ user-facing change? No, test only. ### How was this patch tested? - Test on my local env with enough resource (default): ``` $ build/sbt -Pvolcano -Pkubernetes -Pkubernetes-integration-tests -Dtest.include.tags=volcano "kubernetes-integration-tests/test" [info] KubernetesSuite: [info] VolcanoSuite: [info] - Run SparkPi with volcano scheduler (10 seconds, 410 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minCPU (25 seconds, 489 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minMemory (25 seconds, 518 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (14 seconds, 349 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (23 seconds, 516 milliseconds) [info] - SPARK-38423: Run driver job to validate priority order (16 seconds, 404 milliseconds) [info] YuniKornSuite: [info] Run completed in 2 minutes, 34 seconds. [info] Total number of tests run: 6 [info] Suites: completed 3, aborted 0 [info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [success] Total time: 439 s (07:19), completed 2022-12-3 8:58:50 ``` - Test on Github Action with `volcanoMaxConcurrencyJobNum`: https://github.com/Yikun/spark/pull/192 ``` $ build/sbt -Pvolcano -Psparkr -Pkubernetes -Pkubernetes-integration-tests -Dspark.kubernetes.test.driverRequestCores=0.5 -Dspark.kubernetes.test.executorRequestCores=0.2 -Dspark.kubernetes.test.volcanoMaxConcurrencyJobNum=1 -Dtest.include.tags=volcano "kubernetes-integration-tests/test" [info] VolcanoSuite: [info] - Run SparkPi with volcano scheduler (18 seconds, 122 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minCPU (53 seconds, 964 milliseconds) [info] - SPARK-38187: Run SparkPi Jobs with minMemory (54 seconds, 523 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (only 1 enabled) (22 seconds, 185 milliseconds) [info] - SPARK-38188: Run SparkPi jobs with 2 queues (all enabled) (33 seconds, 349 milliseconds) [info] - SPARK-38423: Run driver job to validate priority order (32 seconds, 435 milliseconds) [info] YuniKornSuite: [info] Run completed in 4 minutes, 16 seconds. [info] Total number of tests run: 6 [info] Suites: completed 3, aborted 0 [info] Tests: succeeded 6, failed 0, canceled 0, ignored 0, pending 0 [info] All tests passed. [warn] In the last 494 seconds, 7.296 (1.5%) were spent in GC. [Heap: 3.12GB free of 3.83GB, max 3.83GB] Consider increasing the JVM heap using `-Xmx` or try a different collector, e.g. `-XX:+UseG1GC`, for better performance. [success] Total time: 924 s (15:24), completed Dec 3, 2022 12:49:42 AM ``` - CI passed Closes #38789 from Yikun/SPARK-41253. Authored-by: Yikun Jiang <yikunkero@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 72d58d5f8a847bac53cf01b137780c7e4e2664d7) Signed-off-by: Yikun Jiang <yikunkero@gmail.com> 03 December 2022, 10:02:40 UTC
20cc2b6 [SPARK-38921][K8S][TESTS] Use k8s-client to create queue resource in Volcano IT ### What changes were proposed in this pull request? Use fabric8io/k8s-client to create queue resource in Volcano IT. ### Why are the changes needed? Use k8s-client to create volcano queue to - Make code easy to understand - Enable abity to set queue capacity dynamically. This will help to support running Volcano test in a resource limited env (such as github action). ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Volcano IT passed Closes #36219 from Yikun/SPARK-38921. Authored-by: Yikun Jiang <yikunkero@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit a49f66fe49d4d4bbfb41da2e5bbb5af4bd64d1da) Signed-off-by: Yikun Jiang <yikunkero@gmail.com> 03 December 2022, 10:00:40 UTC
bdafe57 [SPARK-41339][SQL] Close and recreate RocksDB write batch instead of just clearing ### What changes were proposed in this pull request? Instead of just calling `writeBatch.clear`, close the write batch and recreate it. ### Why are the changes needed? A RocksDB `WriteBatch` (and by extension `WriteBatchWithIndex`) stores its underlying data in a `std::string`. Why? I'm not sure. But after a partition is finished, `writeBatch.clear()` is called (somewhat indirectly through a call to `store.abort`), presumably clearing the data in the `WriteBatch`. This calls `std::string::clear` followed by `std::string::resize` underneath the hood. However, neither of these two things actually reclaims native memory. All the memory allocated for expanding the string when adding data to the `WriteBatch` will be there until the `std::string` is deallocated, which in this case means deleting the `WriteBatch`. This leads to native memory accumulation on an executor and it executes several partitions consecutively, which would happen when your total executor cores is less than your shuffle partitions for your stateful stream. So instead of just calling `writeBatch.clear()`, close the `WriteBatch` and create a new one to free up the native memory. ### Does this PR introduce _any_ user-facing change? Fix for excess native memory usage. ### How was this patch tested? Existing UTs, not sure how to test for memory usage. Closes #38853 from Kimahriman/rocksdb-write-batch-close. Lead-authored-by: Adam Binford <adamq43@gmail.com> Co-authored-by: centos <centos@adam-dev.novalocal> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> (cherry picked from commit d67e22826cda41d732e010d73687e74fab60f4b6) Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 01 December 2022, 06:50:35 UTC
a94dd18 [SPARK-41327][CORE] Fix `SparkStatusTracker.getExecutorInfos` by switch On/OffHeapStorageMemory info ### What changes were proposed in this pull request? This PR aims to fix `SparkStatusTracker.getExecutorInfos` to return a correct `on/offHeapStorageMemory`. ### Why are the changes needed? `SparkExecutorInfoImpl` used the following parameter order. https://github.com/apache/spark/blob/54c57fa86906f933e089a33ef25ae0c053769cc8/core/src/main/scala/org/apache/spark/StatusAPIImpl.scala#L42-L45 SPARK-20659 introduced a bug with wrong parameter order at Apache Spark 2.4.0. - https://github.com/apache/spark/pull/20546/files#diff-7daca909d33ff8e9b4938e2b4a4aaa1558fbdf4604273b9e38cce32c55e1508cR118-R121 ### Does this PR introduce _any_ user-facing change? Yes. ### How was this patch tested? Manually review. Closes #38843 from ylybest/master. Lead-authored-by: Lingyun Yuan <ylybest@gmail.com> Co-authored-by: ylybest <119458293+ylybest@users.noreply.github.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 388824c448804161b076507f0f39ef0596e0a0bf) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 30 November 2022, 19:47:39 UTC
3f7ff35 [SPARK-40987][CORE] `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully ### What changes were proposed in this pull request? `BlockManager#removeBlockInternal` should ensure the lock is unlocked gracefully. `removeBlockInternal` tries to call `removeBlock` in the finally block. ### Why are the changes needed? When the driver submits a job, `DAGScheduler` calls `sc.broadcast(taskBinaryBytes)`. `TorrentBroadcast#writeBlocks` may fail due to disk problems during `blockManager#putBytes`. `BlockManager#doPut` calls `BlockManager#removeBlockInternal` to clean up the block. `BlockManager#removeBlockInternal` calls `DiskStore#remove` to clean up blocks on disk. `DiskStore#remove` will try to create the directory because the directory does not exist, and an exception will be thrown at this time. `BlockInfoManager#blockInfoWrappers` block info and lock not removed. The catch block in `TorrentBroadcast#writeBlocks` will call `blockManager.removeBroadcast` to clean up the broadcast. Because the block lock in `BlockInfoManager#blockInfoWrappers` is not released, the `dag-scheduler-event-loop` thread of `DAGScheduler` will wait forever. ``` 22/11/01 18:27:48 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: XXXXX. 22/11/01 18:27:48 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast ``` ``` "dag-scheduler-event-loop" #54 daemon prio=5 os_prio=31 tid=0x00007fc98e3fa800 nid=0x7203 waiting on condition [0x0000700008c1e000]    java.lang.Thread.State: WAITING (parking)     at sun.misc.Unsafe.park(Native Method)     - parking to wait for  <0x00000007add3d8c8> (a java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)     at java.util.concurrent.locks.LockSupport.park(LockSupport.java:175)     at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2039)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1(BlockInfoManager.scala:221)     at org.apache.spark.storage.BlockInfoManager.$anonfun$acquireLock$1$adapted(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager$$Lambda$3038/1307533457.apply(Unknown Source)     at org.apache.spark.storage.BlockInfoWrapper.withLock(BlockInfoManager.scala:105)     at org.apache.spark.storage.BlockInfoManager.acquireLock(BlockInfoManager.scala:214)     at org.apache.spark.storage.BlockInfoManager.lockForWriting(BlockInfoManager.scala:293)     at org.apache.spark.storage.BlockManager.removeBlock(BlockManager.scala:1979)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager.$anonfun$removeBroadcast$3$adapted(BlockManager.scala:1970)     at org.apache.spark.storage.BlockManager$$Lambda$3092/1241801156.apply(Unknown Source)     at scala.collection.Iterator.foreach(Iterator.scala:943)     at scala.collection.Iterator.foreach$(Iterator.scala:943)     at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)     at org.apache.spark.storage.BlockManager.removeBroadcast(BlockManager.scala:1970)     at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:179)     at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99)     at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38)     at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78)     at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538)     at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520)     at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539)     at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355)     at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2921)     at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2910)     at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) ``` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Throw an exception before `Files.createDirectory` to simulate disk problems. DiskBlockManager#getFile ```java if (filename.contains("piece")) { throw new java.io.IOException("disk issue") } Files.createDirectory(path) ``` ``` ./bin/spark-shell ``` ```scala spark.sql("select 1").collect() ``` ``` 22/11/24 19:29:58 WARN BlockManager: Putting block broadcast_0_piece0 failed due to exception java.io.IOException: disk issue. 22/11/24 19:29:58 ERROR TorrentBroadcast: Store broadcast broadcast_0 fail, remove all pieces of the broadcast org.apache.spark.SparkException: Job aborted due to stage failure: Task serialization failed: java.io.IOException: disk issue java.io.IOException: disk issue at org.apache.spark.storage.DiskBlockManager.getFile(DiskBlockManager.scala:109) at org.apache.spark.storage.DiskBlockManager.containsBlock(DiskBlockManager.scala:160) at org.apache.spark.storage.DiskStore.contains(DiskStore.scala:153) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$getCurrentBlockStatus(BlockManager.scala:879) at org.apache.spark.storage.BlockManager.removeBlockInternal(BlockManager.scala:1998) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1484) at org.apache.spark.storage.BlockManager$BlockStoreUpdater.save(BlockManager.scala:378) at org.apache.spark.storage.BlockManager.putBytes(BlockManager.scala:1419) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1(TorrentBroadcast.scala:170) at org.apache.spark.broadcast.TorrentBroadcast.$anonfun$writeBlocks$1$adapted(TorrentBroadcast.scala:164) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at org.apache.spark.broadcast.TorrentBroadcast.writeBlocks(TorrentBroadcast.scala:164) at org.apache.spark.broadcast.TorrentBroadcast.<init>(TorrentBroadcast.scala:99) at org.apache.spark.broadcast.TorrentBroadcastFactory.newBroadcast(TorrentBroadcastFactory.scala:38) at org.apache.spark.broadcast.BroadcastManager.newBroadcast(BroadcastManager.scala:78) at org.apache.spark.SparkContext.broadcastInternal(SparkContext.scala:1538) at org.apache.spark.SparkContext.broadcast(SparkContext.scala:1520) at org.apache.spark.scheduler.DAGScheduler.submitMissingTasks(DAGScheduler.scala:1539) at org.apache.spark.scheduler.DAGScheduler.submitStage(DAGScheduler.scala:1355) at org.apache.spark.scheduler.DAGScheduler.handleJobSubmitted(DAGScheduler.scala:1297) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2929) ``` Closes #38467 from cxzl25/SPARK-40987. Authored-by: sychen <sychen@ctrip.com> Signed-off-by: Mridul <mridul<at>gmail.com> (cherry picked from commit bbab0afb9a6919694cda5b9d490203af93a23460) Signed-off-by: Mridul <mridulatgmail.com> 30 November 2022, 03:53:12 UTC
00185e3 [SPARK-41185][K8S][DOCS] Remove ARM limitation for YuniKorn from docs ### What changes were proposed in this pull request? Remove the limitations section from the K8s documentation for YuniKorn. ### Why are the changes needed? The limitation section is outdated because YuniKorn is fully supported from release 1.1.0 onwards. YuniKorn 1.1.0 is the release that is referenced in the documentation. ### Does this PR introduce any user-facing change? No. ### How was this patch tested? Existing tests. Closes #38780 from wilfred-s/SPARK-41185. Authored-by: Wilfred Spiegelenburg <wilfreds@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit bfc9e4ef111e21eee99407309ca6be278617d319) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 28 November 2022, 20:39:38 UTC
090bebd [SPARK-41254][YARN] bugfix wrong usage when check YarnAllocator.rpIdToYarnResource key existence ### What changes were proposed in this pull request? bugfix, a misuse of ConcurrentHashMap.contains causing map YarnAllocator.rpIdToYarnResource always updated ### Why are the changes needed? It causing duplicated log during yarn resource allocation and unnecessary object creation and gc ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #38790 from CavemanIV/SPARK-41254. Authored-by: John Caveman <selnteer@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit bccfe5bca600b3091ea93b4c5d6437af8381973f) Signed-off-by: Sean Owen <srowen@gmail.com> 28 November 2022, 14:25:09 UTC
a483106 [SPARK-40872][3.3] Fallback to original shuffle block when a push-merged shuffle chunk is zero-size ### What changes were proposed in this pull request? This is a backport PR of #38333. When push-based shuffle is enabled, a zero-size buf error may occur when fetching shuffle chunks from bad nodes, especially when memory is full. In this case, we can fall back to original shuffle blocks. ### Why are the changes needed? When the reduce task obtains the shuffle chunk with a zero-size buf, we let it fall back to original shuffle block. After verification, these blocks can be read successfully without shuffle retry. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? UT Closes #38751 from gaoyajun02/SPARK-40872-backport. Authored-by: gaoyajun02 <gaoyajun02@meituan.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 27 November 2022, 01:57:42 UTC
0900040 [SPARK-41118][SQL][3.3] to_number`/`try_to_number` should return `null` when format is `null` Backport of #38635 ### What changes were proposed in this pull request? When a user specifies a null format in `to_number`/`try_to_number`, return `null`, with a data type of `DecimalType.USER_DEFAULT`, rather than throwing a `NullPointerException`. Also, since the code for `ToNumber` and `TryToNumber` is virtually identical, put all common code in new abstract class `ToNumberBase` to avoid fixing the bug in two places. ### Why are the changes needed? `to_number`/`try_to_number` currently throws a `NullPointerException` when the format is `null`: ``` spark-sql> SELECT to_number('454', null); org.apache.spark.SparkException: The Spark SQL phase analysis failed with an internal error. Please, fill a bug report in, and provide the full stack trace. at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) ... Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormat$lzycompute(numberFormatExpressions.scala:72) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormat(numberFormatExpressions.scala:72) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormatter$lzycompute(numberFormatExpressions.scala:73) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormatter(numberFormatExpressions.scala:73) at org.apache.spark.sql.catalyst.expressions.ToNumber.checkInputDataTypes(numberFormatExpressions.scala:81) ``` Also: ``` spark-sql> SELECT try_to_number('454', null); org.apache.spark.SparkException: The Spark SQL phase analysis failed with an internal error. Please, fill a bug report in, and provide the full stack trace. at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:500) at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:512) at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:185) ... Caused by: java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormat$lzycompute(numberFormatExpressions.scala:72) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormat(numberFormatExpressions.scala:72) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormatter$lzycompute(numberFormatExpressions.scala:73) at org.apache.spark.sql.catalyst.expressions.ToNumber.numberFormatter(numberFormatExpressions.scala:73) at org.apache.spark.sql.catalyst.expressions.ToNumber.checkInputDataTypes(numberFormatExpressions.scala:81) at org.apache.spark.sql.catalyst.expressions.TryToNumber.checkInputDataTypes(numberFormatExpressions.scala:146) ``` Compare to `to_binary` and `try_to_binary`: ``` spark-sql> SELECT to_binary('abc', null); NULL Time taken: 3.111 seconds, Fetched 1 row(s) spark-sql> SELECT try_to_binary('abc', null); NULL Time taken: 0.06 seconds, Fetched 1 row(s) spark-sql> ``` Also compare to `to_number` in PostgreSQL 11.18: ``` SELECT to_number('454', null) is null as a; a true ``` ### Does this PR introduce _any_ user-facing change? `to_number`/`try_to_number` with null format will now return `null` with a data type of `DecimalType.USER_DEFAULT`. ### How was this patch tested? New unit test. Closes #38697 from bersprockets/to_number_issue_33. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 22 November 2022, 06:54:05 UTC
abc343f [SPARK-41151][SQL][3.3] Keep built-in file `_metadata` column nullable value consistent ### What changes were proposed in this pull request? NOTE: This PR cherry-picks https://github.com/apache/spark/pull/38683 to 3.3 In FileSourceStrategy, we add an Alias node to wrap the file metadata fields (e.g. file_name, file_size) in a NamedStruct ([here](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala#L279)). But `CreateNamedStruct` has an override `nullable` value `false` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/complexTypeCreator.scala#L443)), which is different from the `_metadata` struct `nullable` value `true` ([here](https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala#L467)). This PR fixes this, by changing `_metadata` column to be always not nullable. Rationale: 1. By definition, `_metadata` for file-based sources is always not null; 2. If users have already persisted this nullable `_metadata` somewhere, then it's totally fine to write non-nullable data to this nullable column. ### Why are the changes needed? For stateful streaming, we store the schema in the state store and [check consistency across batches](https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/StateSchemaCompatibilityChecker.scala#L47). To avoid state schema compatibility mismatched, we should keep nullable consistent in `_metadata`. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? New UT Closes #38748 from Yaohua628/spark-41151-3-3. Authored-by: yaohua <yaohua.zhao@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 22 November 2022, 06:30:31 UTC
ace3c69 [SPARK-41154][SQL][3.3] Incorrect relation caching for queries with time travel spec backport https://github.com/apache/spark/pull/38687 for branch-3.3 ### What changes were proposed in this pull request? Add TimeTravelSpec to the key of relation cache in AnalysisContext. ### Why are the changes needed? Correct the relation resolution for the same table but different TimeTravelSpec. ### Does this PR introduce _any_ user-facing change? yes, bug fix ### How was this patch tested? add test Closes #38741 from ulysses-you/time-travel-spec-3.3. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 22 November 2022, 02:07:17 UTC
0152c2e [SPARK-41202][BUILD][3.3] Update ORC to 1.7.7 ### What changes were proposed in this pull request? This PR aims to update ORC to 1.7.7. ### Why are the changes needed? This will bring the latest bug fixes. - https://orc.apache.org/news/2022/11/17/ORC-1.7.7/ ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass the CIs. Closes #38724 from williamhyun/SPARK-41202. Authored-by: William Hyun <william@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 19 November 2022, 12:17:20 UTC
f431cdf [SPARK-41188][CORE][ML] Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes Signed-off-by: Weichen Xu <weichen.xudatabricks.com> ### What changes were proposed in this pull request? Set executorEnv OMP_NUM_THREADS to be spark.task.cpus by default for spark executor JVM processes. ### Why are the changes needed? This is for limiting the thread number for OpenBLAS routine to the number of cores assigned to this executor because some spark ML algorithms calls OpenBlAS via netlib-java, e.g.: Spark ALS estimator training calls LAPACK API `dppsv` (internally it will call BLAS lib), if it calls OpenBLAS lib, by default OpenBLAS will try to use all CPU cores. But spark will launch multiple spark tasks on a spark worker, and each spark task might call `dppsv` API at the same time, and each call internally it will create multiple threads (threads number equals to CPU cores), this causes CPU oversubscription. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually. Closes #38699 from WeichenXu123/SPARK-41188. Authored-by: Weichen Xu <weichen.xu@databricks.com> Signed-off-by: Weichen Xu <weichen.xu@databricks.com> (cherry picked from commit 82a41d8ca273e7a93333268324c6958f8bb14d9e) Signed-off-by: Weichen Xu <weichen.xu@databricks.com> 19 November 2022, 09:24:13 UTC
29dee66 [SPARK-41144][SQL] Unresolved hint should not cause query failure Skip `UnresolvedHint` in rule `AddMetadataColumns` to avoid call exprId on `UnresolvedAttribute`. ``` CREATE TABLE t1(c1 bigint) USING PARQUET; CREATE TABLE t2(c2 bigint) USING PARQUET; SELECT /*+ hash(t2) */ * FROM t1 join t2 on c1 = c2; ``` failed with msg: ``` org.apache.spark.sql.catalyst.analysis.UnresolvedException: Invalid call to exprId on unresolved object at org.apache.spark.sql.catalyst.analysis.UnresolvedAttribute.exprId(unresolved.scala:147) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$4(Analyzer.scala:1005) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$4$adapted(Analyzer.scala:1005) at scala.collection.Iterator.exists(Iterator.scala:969) at scala.collection.Iterator.exists$(Iterator.scala:967) at scala.collection.AbstractIterator.exists(Iterator.scala:1431) at scala.collection.IterableLike.exists(IterableLike.scala:79) at scala.collection.IterableLike.exists$(IterableLike.scala:78) at scala.collection.AbstractIterable.exists(Iterable.scala:56) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3(Analyzer.scala:1005) at org.apache.spark.sql.catalyst.analysis.Analyzer$AddMetadataColumns$.$anonfun$hasMetadataCol$3$adapted(Analyzer.scala:1005) ``` But before just a warning: `WARN HintErrorLogger: Unrecognized hint: hash(t2)` yes, fix regression from 3.3.1. Note, the root reason is we mark `UnresolvedHint` is resolved if child is resolved since https://github.com/apache/spark/pull/32841, then https://github.com/apache/spark/pull/37758 trigger this bug. add test Closes #38662 from ulysses-you/hint. Authored-by: ulysses-you <ulyssesyou18@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit a9bf5d2b3f5b3331e3b024a3ad631fcbe88a9d18) Signed-off-by: Wenchen Fan <wenchen@databricks.com> 15 November 2022, 08:52:03 UTC
bea58e4 [SPARK-41089][YARN][SHUFFLE] Relocate Netty native arm64 libs ### What changes were proposed in this pull request? SPARK-27610 relocated the netty x86 native libs, and the recent version netty ships arm64 native libs as well, we should do same thing to make it works on arm64 platform. ### Why are the changes needed? Align arm64 behavior w/ x86 ### Does this PR introduce _any_ user-facing change? Yes, bug fix for ARM64 platform. ### How was this patch tested? Before patch ``` ➜ apache-spark git:(SPARK-41089) ll common/network-yarn/target/exploded/META-INF/native total 752 -rw-r--r-- 1 chengpan staff 101K Oct 11 23:24 libnetty_transport_native_epoll_aarch_64.so -rw-r--r-- 1 chengpan staff 94K Oct 11 17:57 libnetty_transport_native_kqueue_aarch_64.jnilib -rw-r--r-- 1 chengpan staff 93K Oct 11 23:27 liborg_sparkproject_netty_transport_native_epoll_x86_64.so -rw-r--r-- 1 chengpan staff 77K Oct 11 17:51 liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib drwxr-xr-x 3 chengpan staff 96B Nov 9 13:46 linux32 drwxr-xr-x 3 chengpan staff 96B Nov 9 13:46 linux64 drwxr-xr-x 3 chengpan staff 96B Nov 9 13:46 osx drwxr-xr-x 3 chengpan staff 96B Nov 9 13:46 windows32 drwxr-xr-x 3 chengpan staff 96B Nov 9 13:46 windows64 ``` After patch ``` ➜ apache-spark git:(SPARK-41089) ll common/network-yarn/target/exploded/META-INF/native total 752 -rw-r--r-- 1 chengpan staff 101K Oct 11 23:24 liborg_sparkproject_netty_transport_native_epoll_aarch_64.so -rw-r--r-- 1 chengpan staff 93K Oct 11 23:27 liborg_sparkproject_netty_transport_native_epoll_x86_64.so -rw-r--r-- 1 chengpan staff 94K Oct 11 17:57 liborg_sparkproject_netty_transport_native_kqueue_aarch_64.jnilib -rw-r--r-- 1 chengpan staff 77K Oct 11 17:51 liborg_sparkproject_netty_transport_native_kqueue_x86_64.jnilib drwxr-xr-x 3 chengpan staff 96B Nov 10 12:07 linux32 drwxr-xr-x 3 chengpan staff 96B Nov 10 12:07 linux64 drwxr-xr-x 3 chengpan staff 96B Nov 10 12:07 osx drwxr-xr-x 3 chengpan staff 96B Nov 10 12:07 windows32 drwxr-xr-x 3 chengpan staff 96B Nov 10 12:07 windows64 ``` Closes #38593 from pan3793/SPARK-41089. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit c72d39990182ad2207c8cdd523af06ee4dc02fc5) Signed-off-by: Sean Owen <srowen@gmail.com> 10 November 2022, 14:58:05 UTC
74bf9fe [MINOR][DOCS] Fix links in the sql-pyspark-pandas-with-arrow ### What changes were proposed in this pull request? The pr aims to fix links in the sql-pyspark-pandas-with-arrow. ### Why are the changes needed? https://spark.apache.org/docs/latest/sql-pyspark-pandas-with-arrow.html <img width="696" alt="image" src="https://user-images.githubusercontent.com/15246973/200457446-250e8c9b-3712-4e79-b6e9-6bdabf322206.png"> when click [this page](https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html), will jump to https://spark.apache.org/docs/latest/api/python/user_guide/arrow_pandas.html, as follow: <img width="791" alt="image" src="https://user-images.githubusercontent.com/15246973/200457489-2561b9df-3107-4e19-960d-881f31851f82.png"> ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Manually verified. Closes #38545 from panbingkun/arrow_pandas_doc. Authored-by: panbingkun <pbk1982@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 70bc5dfc96810e47f11f0f39054b1ceb61066f77) Signed-off-by: Sean Owen <srowen@gmail.com> 09 November 2022, 13:07:39 UTC
f0cad7a [SPARK-40588] FileFormatWriter materializes AQE plan before accessing outputOrdering ### What changes were proposed in this pull request? The `FileFormatWriter` materializes an `AdaptiveQueryPlan` before accessing the plan's `outputOrdering`. This is required for Spark 3.0 to 3.3. Spark 3.4 does not need this because `FileFormatWriter` gets the final plan. ### Why are the changes needed? `FileFormatWriter` enforces an ordering if the written plan does not provide that ordering. An `AdaptiveQueryPlan` does not know its final ordering (Spark 3.0 to 3.3), in which case `FileFormatWriter` enforces the ordering (e.g. by column `"a"`) even if the plan provides a compatible ordering (e.g. by columns `"a", "b"`). In case of spilling, that order (e.g. by columns `"a", "b"`) gets broken (see SPARK-40588). ### Does this PR introduce _any_ user-facing change? This fixes SPARK-40588, which was introduced in 3.0. This restores behaviour from Spark 2.4. ### How was this patch tested? The final plan that is written to files cannot be extracted from `FileFormatWriter`. The bug explained in [SPARK-40588](https://issues.apache.org/jira/browse/SPARK-40588) can only be asserted on the result files when spilling occurs. This is very hard to control in an unit test scenario. Therefore, this was tested manually. The [example to reproduce this issue](https://issues.apache.org/jira/browse/SPARK-40588?focusedCommentId=17621032&page=com.atlassian.jira.plugin.system.issuetabpanels%3Acomment-tabpanel#comment-17621032) given in SPARK-40588 now produces sorted files. The actual plan written into the files changed from ``` Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0 +- AdaptiveSparkPlan isFinalPlan=false +- Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- Exchange hashpartitioning(day#2L, 2), REPARTITION_BY_NUM, [id=#30] +- BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastExchange IdentityBroadcastMode, [id=#28] : +- Project [id#0L AS day#2L] : +- Range (0, 2, step=1, splits=2) +- Range (0, 10000000, step=1, splits=2) ``` where `FileFormatWriter` enforces order with `Sort [input[0, bigint, false] ASC NULLS FIRST], false, 0`, to ``` *(3) Sort [day#2L ASC NULLS FIRST, id#4L ASC NULLS FIRST], false, 0 +- AQEShuffleRead coalesced +- ShuffleQueryStage 1 +- Exchange hashpartitioning(day#2L, 200), REPARTITION_BY_COL, [id=#68] +- *(2) BroadcastNestedLoopJoin BuildLeft, Inner :- BroadcastQueryStage 0 : +- BroadcastExchange IdentityBroadcastMode, [id=#42] : +- *(1) Project [id#0L AS day#2L] : +- *(1) Range (0, 2, step=1, splits=2) +- *(2) Range (0, 1000000, step=1, splits=2) ``` where the sort given by the user is the outermost sort now. Closes #38358 from EnricoMi/branch-3.3-materialize-aqe-plan. Authored-by: Enrico Minack <github@enrico.minack.dev> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 09 November 2022, 07:59:54 UTC
ef74381 [SPARK-41035][SQL] Don't patch foldable children of aggregate functions in `RewriteDistinctAggregates` `RewriteDistinctAggregates` doesn't typically patch foldable children of the distinct aggregation functions except in one odd case (and seemingly by accident). This PR extends the policy of not patching foldables to that odd case. This query produces incorrect results: ``` select a, count(distinct 100) as cnt1, count(distinct b, 100) as cnt2 from values (1, 2), (4, 5) as data(a, b) group by a; +---+----+----+ |a |cnt1|cnt2| +---+----+----+ |1 |1 |0 | |4 |1 |0 | +---+----+----+ ``` The values for `cnt2` should be 1 and 1 (not 0 and 0). If you change the literal used in the first aggregate function, the second aggregate function now works correctly: ``` select a, count(distinct 101) as cnt1, count(distinct b, 100) as cnt2 from values (1, 2), (4, 5) as data(a, b) group by a; +---+----+----+ |a |cnt1|cnt2| +---+----+----+ |1 |1 |1 | |4 |1 |1 | +---+----+----+ ``` The bug is in the rule `RewriteDistinctAggregates`. When a distinct aggregation has only foldable children, `RewriteDistinctAggregates` uses the first child as the grouping key (_grouping key_ in this context means the function children of distinct aggregate functions: `RewriteDistinctAggregates` groups distinct aggregations by function children to determine the `Expand` projections it needs to create). Therefore, the first foldable child gets included in the `Expand` projection associated with the aggregation, with a corresponding output attribute that is also included in the map for patching aggregate functions in the final aggregation. The `Expand` projections for all other distinct aggregate groups will have `null` in the slot associated with that output attribute. If the same foldable expression is used in a distinct aggregation associated with a different group, `RewriteDistinctAggregates` will improperly patch the associated aggregate function to use the previous aggregation's output attribute. Since the output attribute is associated with a different group, the value of that slot in the `Expand` projection will always be `null`. In the example above, `count(distinct 100) as cnt1` is the aggregation with only foldable children, and `count(distinct b, 100) as cnt2` is the aggregation that gets inappropriately patched with the wrong group's output attribute. As a result `count(distinct b, 100) as cnt2` (from the first example above) essentially becomes `count(distinct b, null) as cnt2`, which is always zero. `RewriteDistinctAggregates` doesn't typically patch foldable children of the distinct aggregation functions in the final aggregation. It potentially patches foldable expressions only when there is a distinct aggregation with only foldable children, and even then it doesn't patch the aggregation that has only foldable children, but instead some other unlucky aggregate function that happened to use the same foldable expression. This PR skips patching any foldable expressions in the aggregate functions to avoid patching an aggregation with a different group's output attribute. No. New unit test. Closes #38565 from bersprockets/distinct_literal_issue. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 0add57a1c0290a158666027afb3e035728d2dcee) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 09 November 2022, 01:39:36 UTC
b01dd4c [SPARK-41031][BUILD] Upgrade `xz` to 1.9 for `avro` 1.11.1 This pr aims to upgrade `xz` to 1.9 for `avro` 1.11.1. Spark depend on `avro` 1.11.1 and `avro` 1.11.1 use `xz` as an optional dependency, we need to manually check `xz` version when upgrading `avro`. https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/pom.xml#L59 https://github.com/apache/avro/blob/3a9e5a789b5165e0c8c4da799c387fdf84bfb75e/lang/java/avro/pom.xml#L238-L242 The release notes as follows: - https://git.tukaani.org/?p=xz-java.git;a=blob;f=NEWS;hb=HEAD No Pass Github Actions Closes #38538 from LuciferYang/SPARK-41031. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit e9503c84c4d8d4b51844a195523ebf064bdf185e) Signed-off-by: Sean Owen <srowen@gmail.com> 08 November 2022, 01:37:46 UTC
797e2d9 [SPARK-32380][SQL] Fixing access of HBase table via Hive from Spark This is an update of https://github.com/apache/spark/pull/29178 which was closed because the root cause of the error was just vaguely defined there but here I will give an explanation why `HiveHBaseTableInputFormat` does not work well with the `NewHadoopRDD` (see in the next section). The PR modify `TableReader.scala` to create `OldHadoopRDD` when input format is 'org.apache.hadoop.hive.hbase.HiveHBaseTableInputFormat'. - environments (Cloudera distribution 7.1.7.SP1): hadoop 3.1.1 hive 3.1.300 spark 3.2.1 hbase 2.2.3 With the `NewHadoopRDD` the following exception is raised: ``` java.io.IOException: Cannot create a record reader because of a previous error. Please look at the previous logs lines from the task's full log for more details. at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:253) at org.apache.spark.rdd.NewHadoopRDD.getPartitions(NewHadoopRDD.scala:131) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:49) at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300) at scala.Option.getOrElse(Option.scala:189) at org.apache.spark.rdd.RDD.partitions(RDD.scala:296) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:446) at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:429) at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:48) at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3715) at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2728) at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3706) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163) at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90) at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64) at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3704) at org.apache.spark.sql.Dataset.head(Dataset.scala:2728) at org.apache.spark.sql.Dataset.take(Dataset.scala:2935) at org.apache.spark.sql.Dataset.getRows(Dataset.scala:287) at org.apache.spark.sql.Dataset.showString(Dataset.scala:326) at org.apache.spark.sql.Dataset.show(Dataset.scala:806) at org.apache.spark.sql.Dataset.show(Dataset.scala:765) at org.apache.spark.sql.Dataset.show(Dataset.scala:774) ... 47 elided Caused by: java.lang.IllegalStateException: The input format instance has not been properly initialized. Ensure you call initializeTable either in your constructor or initialize method at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getTable(TableInputFormatBase.java:557) at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase.getSplits(TableInputFormatBase.java:248) ... 86 more ``` There are two interfaces: - the new `org.apache.hadoop.mapreduce.InputFormat`: providing a one arg method `getSplits(JobContext context)` (returning `List<InputSplit>`) - the old `org.apache.hadoop.mapred.InputFormat`: providing a two arg method `getSplits(JobConf job, int numSplits)` (returning `InputSplit[]`) And in Hive both are implemented by `HiveHBaseTableInputFormat` but only the old method leads to required initialisation and this why `NewHadoopRDD` fails here. Here all the link refers latest commits of the master branches for the mentioned components at the time of writing this description (to get the right line numbers in the future too as `master` itself is a moving target). Spark in `NewHadoopRDD` uses the new interface providing the one arg method: https://github.com/apache/spark/blob/5556cfc59aa97a3ad4ea0baacebe19859ec0bcb7/core/src/main/scala/org/apache/spark/rdd/NewHadoopRDD.scala#L136 Hive on the other hand binds the initialisation to the two args method coming from the old interface. See [Hive#getSplits](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L268): ``` Override public InputSplit[] getSplits(final JobConf jobConf, final int numSplits) throws IOException { ``` This calls `getSplitsInternal` which contains the [initialisation](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L299) too: ``` initializeTable(conn, tableName); ``` Interesting that Hive also uses the one arg method internally within the `getSplitsInternal` [here](https://github.com/apache/hive/blob/fd029c5b246340058aee513980b8bf660aee0227/hbase-handler/src/java/org/apache/hadoop/hive/hbase/HiveHBaseTableInputFormat.java#L356) but the initialisation done earlier. By calling the new interface method (what `NewHadoopRDD` does) the call goes straight to the HBase method: [org.apache.hadoop.hbase.mapreduce.TableInputFormatBase#getSplits](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L230). Where there would be some `JobContext` based [initialisation](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L234-L237) which by default is an [empty method](https://github.com/apache/hbase/blob/63cdd026f08cdde6ac0fde1342ffd050e8e02441/hbase-mapreduce/src/main/java/org/apache/hadoop/hbase/mapreduce/TableInputFormatBase.java#L628-L640): ```java /** * Handle subclass specific set up. Each of the entry points used by the MapReduce framework, * {link #createRecordReader(InputSplit, TaskAttemptContext)} and {link #getSplits(JobContext)}, * will call {link #initialize(JobContext)} as a convenient centralized location to handle * retrieving the necessary configuration information and calling * {link #initializeTable(Connection, TableName)}. Subclasses should implement their initialize * call such that it is safe to call multiple times. The current TableInputFormatBase * implementation relies on a non-null table reference to decide if an initialize call is needed, * but this behavior may change in the future. In particular, it is critical that initializeTable * not be called multiple times since this will leak Connection instances. */ protected void initialize(JobContext context) throws IOException { } ``` This is not overridden by Hive and hard to reason why we need that (its an internal Hive class) so it is easier to fix this in Spark. No. 1) create hbase table ``` hbase(main):001:0>create 'hbase_test1', 'cf1' hbase(main):001:0> put 'hbase_test', 'r1', 'cf1:c1', '123' ``` 2) create hive table related to hbase table hive> ``` CREATE EXTERNAL TABLE `hivetest.hbase_test`( `key` string COMMENT '', `value` string COMMENT '') ROW FORMAT SERDE 'org.apache.hadoop.hive.hbase.HBaseSerDe' STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' WITH SERDEPROPERTIES ( 'hbase.columns.mapping'=':key,cf1:v1', 'serialization.format'='1') TBLPROPERTIES ( 'hbase.table.name'='hbase_test') ```   3): spark-shell query hive table while data in HBase ``` scala> spark.sql("select * from hivetest.hbase_test").show() 22/11/05 01:14:16 WARN conf.HiveConf: HiveConf of name hive.masking.algo does not exist 22/11/05 01:14:16 WARN client.HiveClientImpl: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic Hive Session ID = f05b6866-86df-4d88-9eea-f1c45043bb5f +---+-----+ |key|value| +---+-----+ | r1| 123| +---+-----+ ``` Closes #38516 from attilapiros/SPARK-32380. Authored-by: attilapiros <piros.attila.zsolt@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 7009ef0510dae444c72e7513357e681b08379603) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 05 November 2022, 12:30:20 UTC
ff23498 [SPARK-40869][K8S] Resource name prefix should not start with a hyphen ### What changes were proposed in this pull request? Strip leading - from resource name prefix ### Why are the changes needed? leading - are not allowed for resource name prefix (especially spark.kubernetes.executor.podNamePrefix) ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit test Closes #38331 from tobiasstadler/fix-SPARK-40869. Lead-authored-by: Tobias Stadler <ts.stadler@gmx.de> Co-authored-by: Dongjoon Hyun <dongjoon@apache.org> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 7f3b5987de1f79434a861408e6c8bf55c5598031) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 03 November 2022, 17:34:13 UTC
067c427 [MINOR][BUILD] Correct the `files` contend in `checkstyle-suppressions.xml` ### What changes were proposed in this pull request? The pr aims to change the suppress files from `sql/core/src/main/java/org/apache/spark/sql/api.java/*` to `sql/core/src/main/java/org/apache/spark/sql/api/java/*`, the former seems to be a wrong code path. ### Why are the changes needed? Correct the `files` contend in `checkstyle-suppressions.xml` ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #38469 from LuciferYang/fix-java-supperessions. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 5457193dc095bc6c97259e31fa3df44184822f65) Signed-off-by: Sean Owen <srowen@gmail.com> 01 November 2022, 23:10:48 UTC
33321cf [SPARK-40983][DOC] Remove Hadoop requirements for zstd mentioned in Parquet compression codec ### What changes were proposed in this pull request? Change the doc to remove Hadoop requirements for zstd mentioned in Parquet compression codec. ### Why are the changes needed? This requirement is removed after https://issues.apache.org/jira/browse/PARQUET-1866, and Spark uses Parquet 1.12.3 now. ### Does this PR introduce _any_ user-facing change? Yes, doc updated. ### How was this patch tested? <img width="1144" alt="image" src="https://user-images.githubusercontent.com/26535726/199180625-4e3a2ee1-3e4d-4d61-8842-f1d5b7b9321d.png"> Closes #38458 from pan3793/SPARK-40983. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 9c1bb41ca34229c87b463b4941b4e9c829a0e396) Signed-off-by: Yuming Wang <yumwang@ebay.com> 01 November 2022, 12:32:54 UTC
fb2bdea [SPARK-40918][SQL][3.3] Mismatch between FileSourceScanExec and Orc and ParquetFileFormat on producing columnar output ### What changes were proposed in this pull request? We move the decision about supporting columnar output based on WSCG one level from ParquetFileFormat / OrcFileFormat up to FileSourceScanExec, and pass it as a new required option for ParquetFileFormat / OrcFileFormat. Now the semantics is as follows: * `ParquetFileFormat.supportsBatch` and `OrcFileFormat.supportsBatch` returns whether it **can**, not necessarily **will** return columnar output. * To return columnar output, an option `FileFormat.OPTION_RETURNING_BATCH` needs to be passed to `buildReaderWithPartitionValues` in these two file formats. It should only be set to `true` if `supportsBatch` is also `true`, but it can be set to `false` if we don't want columnar output nevertheless - this way, `FileSourceScanExec` can set it to false when there are more than 100 columsn for WSCG, and `ParquetFileFormat` / `OrcFileFormat` doesn't have to concern itself about WSCG limits. * To avoid not passing it by accident, this option is made required. Making it required requires updating a few places that use it, but an error resulting from this is very obscure. It's better to fail early and explicitly here. ### Why are the changes needed? This explains it for `ParquetFileFormat`. `OrcFileFormat` had exactly the same issue. `java.lang.ClassCastException: org.apache.spark.sql.vectorized.ColumnarBatch cannot be cast to org.apache.spark.sql.catalyst.InternalRow` was being thrown because ParquetReader was outputting columnar batches, while FileSourceScanExec expected row output. The mismatch comes from the fact that `ParquetFileFormat.supportBatch` depends on `WholeStageCodegenExec.isTooManyFields(conf, schema)`, where the threshold is 100 fields. When this is used in `FileSourceScanExec`: ``` override lazy val supportsColumnar: Boolean = { relation.fileFormat.supportBatch(relation.sparkSession, schema) } ``` the `schema` comes from output attributes, which includes extra metadata attributes. However, inside `ParquetFileFormat.buildReaderWithPartitionValues` it was calculated again as ``` relation.fileFormat.buildReaderWithPartitionValues( sparkSession = relation.sparkSession, dataSchema = relation.dataSchema, partitionSchema = relation.partitionSchema, requiredSchema = requiredSchema, filters = pushedDownFilters, options = options, hadoopConf = hadoopConf ... val resultSchema = StructType(requiredSchema.fields ++ partitionSchema.fields) ... val returningBatch = supportBatch(sparkSession, resultSchema) ``` Where `requiredSchema` and `partitionSchema` wouldn't include the metadata columns: ``` FileSourceScanExec: output: List(c1#4608L, c2#4609L, ..., c100#4707L, file_path#6388) FileSourceScanExec: dataSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) FileSourceScanExec: partitionSchema: StructType() FileSourceScanExec: requiredSchema: StructType(StructField(c1,LongType,true),StructField(c2,LongType,true),...,StructField(c100,LongType,true)) ``` Column like `file_path#6388` are added by the scan, and contain metadata added by the scan, not by the file reader which concerns itself with what is within the file. ### Does this PR introduce _any_ user-facing change? Not a public API change, but it is now required to pass `FileFormat.OPTION_RETURNING_BATCH` in `options` to `ParquetFileFormat.buildReaderWithPartitionValues`. The only user of this API in Apache Spark is `FileSourceScanExec`. ### How was this patch tested? Tests added Backports #38397 from juliuszsompolski/SPARK-40918. Authored-by: Juliusz Sompolski <julekdatabricks.com> Signed-off-by: Wenchen Fan <wenchendatabricks.com> Closes #38431 from juliuszsompolski/SPARK-40918-3.3. Authored-by: Juliusz Sompolski <julek@databricks.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> 31 October 2022, 06:00:09 UTC
c7ef560 [SPARK-40963][SQL] Set nullable correctly in project created by `ExtractGenerator` ### What changes were proposed in this pull request? When creating the project list for the new projection In `ExtractGenerator`, take into account whether the generator is outer when setting nullable on generator-related output attributes. ### Why are the changes needed? This PR fixes an issue that can produce either incorrect results or a `NullPointerException`. It's a bit of an obscure issue in that I am hard-pressed to reproduce without using a subquery that has a inline table. Example: ``` select c1, explode(c4) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, null) as data(c1, c2) ) ); +---+---+ |c1 |c5 | +---+---+ |1 |1 | |1 |2 | |2 |2 | |2 |3 | |3 |0 | +---+---+ ``` In the last row, `c5` is 0, but should be `NULL`. Another example: ``` select c1, exists(c4, x -> x is null) as c5 from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(1, 2)), (2, array(2, 3)), (3, array()) as data(c1, c2) ) ); +---+-----+ |c1 |c5 | +---+-----+ |1 |false| |1 |false| |2 |false| |2 |false| |3 |false| +---+-----+ ``` In the last row, `false` should be `true`. In both cases, at the time `CreateArray(c3)` is instantiated, `c3`'s nullability is incorrect because the new projection created by `ExtractGenerator` uses `generatorOutput` from `explode_outer(c2)` as a projection list. `generatorOutput` doesn't take into account that `explode_outer(c2)` is an _outer_ explode, so the nullability setting is lost. `UpdateAttributeNullability` will eventually fix the nullable setting for attributes referring to `c3`, but it doesn't fix the `containsNull` setting for `c4` in `explode(c4)` (from the first example) or `exists(c4, x -> x is null)` (from the second example). This example fails with a `NullPointerException`: ``` select c1, inline_outer(c4) from ( select c1, array(c3) as c4 from ( select c1, explode_outer(c2) as c3 from values (1, array(named_struct('a', 1, 'b', 2))), (2, array(named_struct('a', 3, 'b', 4), named_struct('a', 5, 'b', 6))), (3, array()) as data(c1, c2) ) ); 22/10/30 17:34:42 ERROR Executor: Exception in task 1.0 in stage 8.0 (TID 14) java.lang.NullPointerException at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_1$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.generate_doConsume_0$(Unknown Source) at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source) at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43) at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760) at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364) ``` ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? New unit test. Closes #38440 from bersprockets/SPARK-40963. Authored-by: Bruce Robbins <bersprockets@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 90d31541fb0313d762cc36067060e6445c04a9b6) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 31 October 2022, 01:45:27 UTC
0f234d9 [SPARK-40932][CORE] Fix issue messages for allGather are overridden ### What changes were proposed in this pull request? The messages returned by allGather may be overridden by the following barrier APIs, eg, ``` scala val messages: Array[String] = context.allGather("ABC") context.barrier() ``` the `messages` may be like Array("", ""), but we're expecting Array("ABC", "ABC") The root cause of this issue is the [messages got by allGather](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierTaskContext.scala#L102) pointing to the [original message](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/BarrierCoordinator.scala#L107) in the local mode. So when the following barrier APIs changed the messages, then the allGather message will be changed accordingly. Finally, users can't get the correct result. This PR fixed this issue by sending back the cloned messages. ### Why are the changes needed? The bug mentioned in this description may block some external SPARK ML libraries which heavily depend on the spark barrier API to do some synchronization. If the barrier mechanism can't guarantee the correctness of the barrier APIs, it will be a disaster for external SPARK ML libraries. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? I added a unit test, with this PR, the unit test can pass Closes #38410 from wbo4958/allgather-issue. Authored-by: Bobby Wang <wbo4958@gmail.com> Signed-off-by: Wenchen Fan <wenchen@databricks.com> (cherry picked from commit 0b892a543f9ea913f961eea95a4e45f1231b9a57) Signed-off-by: Wenchen Fan <wenchen@databricks.com> 28 October 2022, 13:07:08 UTC
d1440bf [SPARK-40924][SQL][3.3] Fix for Unhex when input has odd number of symbols ### What changes were proposed in this pull request? Fix for a bug in Unhex function when there is an odd number of symbols in the input string. This is backport of #38402 ### Why are the changes needed? Unhex function and other functions depending on it (e.g. ToBinary) produce incorrect output. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit tests Closes #38416 from vitaliili-db/backport33. Authored-by: Vitalii Li <vitalii.li@databricks.com> Signed-off-by: Max Gekk <max.gekk@gmail.com> 28 October 2022, 05:01:05 UTC
287842e [SPARK-38697][SQL][3.3] Extend SparkSessionExtensions to inject rules into AQE Optimizer ### What changes were proposed in this pull request? Backport SPARK-38697 to Spark 3.3.x ### Why are the changes needed? Allows users to inject logical plan optimizer rules into AQE ### Does this PR introduce _any_ user-facing change? Yes, new API method to inject logical plan optimizer rules into AQE ### How was this patch tested? Backport includes a unit test Closes #38403 from andygrove/backport-SPARK-38697-spark33. Authored-by: Andy Grove <andygrove73@gmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 26 October 2022, 20:31:41 UTC
ffc7c1d [SPARK-40913][INFRA] Pin `pytest==7.1.3` Pin pytest==7.1.3 `pytest-mypy-plugins==1.9.3` depends on `pytest [required: >=6.0.0]`, [pytest 7.2.0](https://pypi.org/project/pytest/) is released just now I guess it breaks the python linter ``` Traceback (most recent call last): File "/usr/local/bin/pytest", line 8, in <module> sys.exit(console_main()) File "/usr/local/lib/python3.9/dist-packages/_pytest/config/__init__.py", line 190, in console_main code = main() File "/usr/local/lib/python3.9/dist-packages/_pytest/config/__init__.py", line 148, in main config = _prepareconfig(args, plugins) ``` No CI, but can not reproduce locally Closes #38390 from zhengruifeng/infra_pin_pytest. Authored-by: Ruifeng Zheng <ruifengz@apache.org> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> (cherry picked from commit 25a9dfcf9c40b04dcb27a1d1665a84761e29a548) Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 25 October 2022, 15:35:35 UTC
5001b9a [SPARK-40902][MESOS][TESTS] Fix issue with mesos tests failing due to quick submission of drivers ### What changes were proposed in this pull request? ##### Quick submission of drivers in tests to mesos scheduler results in dropping drivers Queued drivers in `MesosClusterScheduler` are ordered based on `MesosDriverDescription` - and the ordering used checks for priority (if different), followed by comparison of submission time. For two driver submissions with same priority, if made in quick succession (such that submission time is same due to millisecond granularity of Date), this results in dropping the second `MesosDriverDescription` from `queuedDrivers` (since `driverOrdering` returns `0` when comparing the descriptions). This PR fixes the more immediate issue with tests. ### Why are the changes needed? Flakey tests, [see here](https://lists.apache.org/thread/jof098qxp0s6qqmt9qwv52f9665b1pjg) for an example. ### Does this PR introduce _any_ user-facing change? No. Fixing only tests for now - as mesos support is deprecated, not changing scheduler itself to address this. ### How was this patch tested? Fixes unit tests Closes #38378 from mridulm/fix_MesosClusterSchedulerSuite. Authored-by: Mridul <mridulatgmail.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 60b1056307b3ee9d880a936f3a97c5fb16a2b698) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 24 October 2022, 17:51:55 UTC
a3a38ab [SPARK-40851][INFRA ][SQL][TESTS][3.3] Make GA run successfully with the latest Java 8/11/17 ### What changes were proposed in this pull request? The main change of this pr as follows: - Replace `Antarctica/Vostok` to `Asia/Urumqi` in Spark code - Replace `Europe/Amsterdam` to `Europe/Brussels` in Spark code - Regenerate `gregorian-julian-rebase-micros.json` using generate 'gregorian-julian-rebase-micros.json' in `RebaseDateTimeSuite` with Java 8u352 - Regenerate `julian-gregorian-rebase-micros.json` using generate 'julian-gregorian-rebase-micros.json' in RebaseDateTimeSuite with Java 8u352 ### Why are the changes needed? Make GA run successfully with the latest Java 8/11/17: - Java 8u352 - Java 11.0.17 - Java 17.0.5 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? - Pass GitHub Actions - Manual test: the following commands can test pass with Java 8u345, 8u352, 11.0.16, 11.0.17, 17.0.4 and 17.0.5 - `build/sbt "catalyst/test"` - `build/sbt "sql/testOnly *SQLQueryTestSuite -- -t \"timestamp-ltz.sql\""` - `build/sbt "sql/testOnly *SQLQueryTestSuite -- -t \"timestamp-ntz.sql\""` Closes #38363 from LuciferYang/SPARK-40851-33. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 24 October 2022, 07:31:12 UTC
476ce56 [SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script: ``` bin/pyspark --conf spark.io.encryption.enabled=true ... bar = {"a": "aa", "b": "bb"} foo = spark.sparkContext.broadcast(bar) spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "") spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect() ``` fails with: ``` 22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1] org.apache.spark.api.python.PythonException: Traceback (most recent call last): File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main func, profiler, deserializer, serializer = read_command(pickleSer, infile) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command command = serializer._read_with_length(file) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length return self.loads(obj) File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads return cloudpickle.loads(obj, encoding=encoding) EOFError: Ran out of input ``` The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420 the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side. Please note that the example above works in Spark 3.3 without this fix. That is because https://github.com/apache/spark/pull/36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`: https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242 changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that https://github.com/apache/spark/pull/36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs). To fix a bug. No. Added new UT. Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf. Authored-by: Peter Toth <peter.toth@gmail.com> Signed-off-by: Hyukjin Kwon <gurwls223@apache.org> 24 October 2022, 01:28:36 UTC
e674356 [SPARK-40886][BUILD] Bump Jackson Databind 2.13.4.2 ### What changes were proposed in this pull request? Bump Jackson Databind from 2.13.4.1 to 2.13.4.2 ### Why are the changes needed? ### Does this PR introduce _any_ user-facing change? There is a regression about Gradle in 2.13.4.1 and got fixed in 2.13.4.2 https://github.com/FasterXML/jackson-databind/issues/3627 ### How was this patch tested? Existing UT. Closes #38355 from pan3793/SPARK-40886. Authored-by: Cheng Pan <chengpan@apache.org> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit e73b157f5c4d20c49ec0e3a7bd82a72d3271f766) Signed-off-by: Sean Owen <srowen@gmail.com> 23 October 2022, 16:37:50 UTC
64c9af2 [SPARK-39404][SS][3.3] Minor fix for querying _metadata in streaming ### What changes were proposed in this pull request? (This cherry-picks https://github.com/apache/spark/pull/36801) We added the support to query the `_metadata` column with a file-based streaming source: https://github.com/apache/spark/pull/35676. We propose to use `transformUp` instead of `match` when pattern matching the `dataPlan` in `MicroBatchExecution` `runBatch` method in this PR. It is fine for `FileStreamSource` because `FileStreamSource` always returns one `LogicalRelation` node (https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/FileStreamSource.scala#L247). But the proposed change will make the logic robust and we really should not rely on the upstream source to return a desired plan. In addition, the proposed change could also make `_metadata` work if someone wants to customize `FileStreamSource` `getBatch`. ### Why are the changes needed? Robust ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Existing tests Closes #38337 from Yaohua628/spark-39404-3-3. Authored-by: yaohua <yaohua.zhao@databricks.com> Signed-off-by: Jungtaek Lim <kabhwan.opensource@gmail.com> 22 October 2022, 10:26:54 UTC
0487e81 [SPARK-40829][SQL] STORED AS serde in CREATE TABLE LIKE view does not work ### What changes were proposed in this pull request? After [SPARK-29839](https://issues.apache.org/jira/browse/SPARK-29839), we could create a table with specife based a existing view, but the serde of created is always parquet. However, if we use USING syntax ([SPARK-29421](https://issues.apache.org/jira/browse/SPARK-29421)) to create a table with specified serde based a view, we can get the correct serde. ### Why are the changes needed? We should add specified serde for the created table when using `create table like view stored as` syntax. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Unit Test Closes #38295 from zhangbutao/SPARK-40829. Authored-by: zhangbutao <zhangbutao@cmss.chinamobile.com> Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> (cherry picked from commit 4ad29829bf53fff26172845312b334008bc4cb68) Signed-off-by: Dongjoon Hyun <dongjoon@apache.org> 18 October 2022, 21:54:49 UTC
3cda829 [SPARK-40801][BUILD] Upgrade `Apache commons-text` to 1.10 ### What changes were proposed in this pull request? Upgrade Apache commons-text from 1.9 to 1.10.0 ### Why are the changes needed? [CVE-2022-42889](https://nvd.nist.gov/vuln/detail/CVE-2022-42889) ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Pass github action Closes #38262 from bjornjorgensen/commons-text-1.10. Authored-by: Bjørn <bjornjorgensen@gmail.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit 99abc94039e3c069d0fc8b8e7025522fea124cbb) Signed-off-by: Yuming Wang <yumwang@ebay.com> 15 October 2022, 09:19:42 UTC
6279861 Preparing development version 3.3.2-SNAPSHOT 15 October 2022, 05:56:11 UTC
fbbcf94 Preparing Spark release v3.3.1-rc4 15 October 2022, 05:56:02 UTC
ca60665 [SPARK-40703][SQL] Introduce shuffle on SinglePartition to improve parallelism ### What changes were proposed in this pull request? This PR fixes a performance regression issue when one side of a join uses `HashPartitioning` with `ShuffleExchange` while the other side uses `SinglePartition`. In this case, Spark will re-shuffle the side with `HashPartitioning` and both sides will end up with only a single partition. This could hurt query performance a lot if the side with `HashPartitioning` contains a lot of input data. ### Why are the changes needed? After SPARK-35703, when Spark sees that one side of the join has `ShuffleExchange` (meaning it needs to be shuffled anyways), and the other side doesn't, it'll try to avoid shuffling the side without `ShuffleExchange`. For instance: ``` ShuffleExchange(HashPartition(200)) <-> HashPartition(150) ``` will be converted into ``` ShuffleExchange(HashPartition(150)) <-> HashPartition(150) ``` However, when the side without `ShuffleExchange` is `SinglePartition`, like the following: ``` ShuffleExchange(HashPartition(150)) <-> SinglePartition ``` Spark will also do the same which causes the left-hand side to only use one partition. This can hurt job parallelism dramatically, especially when using DataSource V2, since `SinglePartition` is used by the V2 scan. On the other hand, it seems DataSource V1 won't be impacted much as it always report `UnknownPartitioning` in this situation. ### Does this PR introduce _any_ user-facing change? No. ### How was this patch tested? Added new unit tests in `EnsureRequirementsSuite`. Closes #38196 from sunchao/SPARK-40703. Authored-by: Chao Sun <sunchao@apple.com> Signed-off-by: Yuming Wang <yumwang@ebay.com> (cherry picked from commit bde6423c947dea0c7529bd3a1f8a0be36b970ff5) Signed-off-by: Yuming Wang <yumwang@ebay.com> 15 October 2022, 02:21:14 UTC
27ca30a [SPARK-40782][BUILD] Upgrade `jackson-databind` to 2.13.4.1 ### What changes were proposed in this pull request? This pr aims upgrade `jackson-databind` to 2.13.4.1. ### Why are the changes needed? This is a bug fix version related to [CVE-2022-42003] - https://github.com/FasterXML/jackson-databind/pull/3621 ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? Pass GitHub Actions Closes #38235 from LuciferYang/SPARK-40782. Authored-by: yangjie01 <yangjie01@baidu.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 2a8b2a136d5a705526bb76697596f5ad01ce391d) Signed-off-by: Sean Owen <srowen@gmail.com> 13 October 2022, 15:30:09 UTC
442ae56 [SPARK-8731] Beeline doesn't work with -e option when started in background ### What changes were proposed in this pull request? Append jline option "-Djline.terminal=jline.UnsupportedTerminal" to enable the Beeline process to run in background. ### Why are the changes needed? Currently, if we execute spark Beeline in background, the Beeline process stops immediately. <img width="1350" alt="image" src="https://user-images.githubusercontent.com/88070094/194742935-8235b1ba-386e-4470-b182-873ef185e19f.png"> ### Does this PR introduce _any_ user-facing change? User will be able to execute Spark Beeline in background. ### How was this patch tested? 1. Start Spark ThriftServer 2. Execute command `./bin/beeline -u "jdbc:hive2://localhost:10000" -e "select 1;" &` 3. Verify Beeline process output in console: <img width="1407" alt="image" src="https://user-images.githubusercontent.com/88070094/194743153-ff3f1d19-ac23-443b-97a6-f024719008cd.png"> ### Note Beeline works fine on Windows when backgrounded: ![image](https://user-images.githubusercontent.com/88070094/194743797-7dc4fc21-dec6-4056-8b13-21fc96f1476e.png) Closes #38172 from zhouyifan279/SPARK-8731. Authored-by: zhouyifan279 <zhouyifan279@gmail.com> Signed-off-by: Kent Yao <yao@apache.org> (cherry picked from commit cb0d6ed46acee7271597764e018558b86aa8c29b) Signed-off-by: Kent Yao <yao@apache.org> 12 October 2022, 03:35:07 UTC
fdc51c7 [SPARK-40705][SQL] Handle case of using mutable array when converting Row to JSON for Scala 2.13 ### What changes were proposed in this pull request? I encountered an issue using Spark while reading JSON files based on a schema it throws every time an exception related to conversion of types. >Note: This issue can be reproduced only with Scala `2.13`, I'm not having this issue with `2.12` ```` Failed to convert value ArraySeq(1, 2, 3) (class of class scala.collection.mutable.ArraySeq$ofRef}) with the type of ArrayType(StringType,true) to JSON. java.lang.IllegalArgumentException: Failed to convert value ArraySeq(1, 2, 3) (class of class scala.collection.mutable.ArraySeq$ofRef}) with the type of ArrayType(StringType,true) to JSON. ```` If I add ArraySeq to the matching cases, the test that I added passed successfully ![image](https://user-images.githubusercontent.com/28459763/194669557-2f13032f-126f-4c2e-bc6d-1a4cfd0a009d.png) With the current code source, the test fails and we have this following error ![image](https://user-images.githubusercontent.com/28459763/194669654-19cefb13-180c-48ac-9206-69d8f672f64c.png) ### Why are the changes needed? If the person is using Scala 2.13, they can't parse an array. Which means they need to fallback to 2.12 to keep the project functioning ### How was this patch tested? I added a sample unit test for the case, but I can add more if you want to. Closes #38154 from Amraneze/fix/spark_40705. Authored-by: Ait Zeouay Amrane <a.zeouayamran@gmail.com> Signed-off-by: Sean Owen <srowen@gmail.com> (cherry picked from commit 9a97f8c62bcd1ad9f34c6318792ae443af46ea85) Signed-off-by: Sean Owen <srowen@gmail.com> 10 October 2022, 15:18:59 UTC
back to top