https://github.com/apache/spark
Revision 7c3887c1ed2e23bd0010d3e79a847bad18818461 authored by Peter Toth on 22 October 2022, 01:39:32 UTC, committed by Hyukjin Kwon on 24 October 2022, 01:29:51 UTC
This PR fixes a bug in broadcast handling `PythonRunner` when encryption is enabed. Due to this bug the following pyspark script:
```
bin/pyspark --conf spark.io.encryption.enabled=true

...

bar = {"a": "aa", "b": "bb"}
foo = spark.sparkContext.broadcast(bar)
spark.udf.register("MYUDF", lambda x: foo.value[x] if x else "")
spark.sql("SELECT MYUDF('a') AS a, MYUDF('b') AS b").collect()
```
fails with:
```
22/10/21 17:14:32 ERROR Executor: Exception in task 0.0 in stage 0.0 (TID 0)/ 1]
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 811, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/worker.py", line 87, in read_command
    command = serializer._read_with_length(file)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/Users/petertoth/git/apache/spark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
EOFError: Ran out of input
```
The reason for this failure is that we have multiple Python UDF referencing the same broadcast and in the current code:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/core/src/main/scala/org/apache/spark/api/python/PythonRunner.scala#L385-L420
the number of broadcasts (`cnt`) is correct (1) but the broadcast id is serialized 2 times from JVM to Python ruining the next item that Python expects from JVM side.

Please note that the example above works in Spark 3.3 without this fix. That is because https://github.com/apache/spark/pull/36121 in Spark 3.4 modified `ExpressionSet` and so `udfs` in `ExtractPythonUDFs`:
https://github.com/apache/spark/blob/748fa2792e488a6b923b32e2898d9bb6e16fb4ca/sql/core/src/main/scala/org/apache/spark/sql/execution/python/ExtractPythonUDFs.scala#L239-L242
changed from `Stream` to `Vector`. When `broadcastVars` (and so `idsAndFiles`) is a `Stream` the example accidentaly works as the broadcast id is written to `dataOut` once (`oldBids.add(id)` in `idsAndFiles.foreach` is called before the 2nd item is calculated in `broadcastVars.flatMap`). But that doesn't mean that https://github.com/apache/spark/pull/36121 introduced the regression as `EncryptedPythonBroadcastServer` shouldn't serve the broadcast data 2 times (which `EncryptedPythonBroadcastServer` does now, but it is not noticed) as it could fail other cases when there are more than 1 broadcast used in UDFs).

To fix a bug.

No.

Added new UT.

Closes #38334 from peter-toth/SPARK-40874-fix-broadcasts-in-python-udf.

Authored-by: Peter Toth <peter.toth@gmail.com>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
(cherry picked from commit 8a96f69bb536729eaa59fae55160f8a6747efbe3)
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
1 parent bc9e434
History
Tip revision: 7c3887c1ed2e23bd0010d3e79a847bad18818461 authored by Peter Toth on 22 October 2022, 01:39:32 UTC
[SPARK-40874][PYTHON] Fix broadcasts in Python UDFs when encryption enabled
Tip revision: 7c3887c

README.md

back to top