From 64636aff61aa473c8fc81c0bb3311e1fe824dc20 Mon Sep 17 00:00:00 2001 From: Hyukjin Kwon Date: Mon, 28 Aug 2023 17:34:32 +0900 Subject: [PATCH] [SPARK-44981][PYTHON][CONNECT] Filter out static configurations used in local mode ### What changes were proposed in this pull request? This PR is a kind of a followup of https://github.com/apache/spark/pull/42548. This PR proposes to filter static configurations out in remote=local mode. ### Why are the changes needed? Otherwise, it shows a bunch of warnings as below: ``` 23/08/28 11:39:42 ERROR ErrorUtils: Spark Connect RPC error during: config. UserId: hyukjin.kwon. SessionId: 424674ef-af95-4b12-b10e-86479413f9fd. org.apache.spark.sql.AnalysisException: Cannot modify the value of a static config: spark.connect.copyFromLocalToFs.allowDestLocal. at org.apache.spark.sql.errors.QueryCompilationErrors$.cannotModifyValueOfStaticConfigError(QueryCompilationErrors.scala:3227) at org.apache.spark.sql.RuntimeConfig.requireNonStaticConf(RuntimeConfig.scala:162) at org.apache.spark.sql.RuntimeConfig.set(RuntimeConfig.scala:42) at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.$anonfun$handleSet$1(SparkConnectConfigHandler.scala:67) at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.$anonfun$handleSet$1$adapted(SparkConnectConfigHandler.scala:65) at scala.collection.Iterator.foreach(Iterator.scala:943) at scala.collection.Iterator.foreach$(Iterator.scala:943) at scala.collection.AbstractIterator.foreach(Iterator.scala:1431) at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.handleSet(SparkConnectConfigHandler.scala:65) at org.apache.spark.sql.connect.service.SparkConnectConfigHandler.handle(SparkConnectConfigHandler.scala:40) at org.apache.spark.sql.connect.service.SparkConnectService.config(SparkConnectService.scala:120) at org.apache.spark.connect.proto.SparkConnectServiceGrpc$MethodHandlers.invoke(SparkConnectServiceGrpc.java:751) at org.sparkproject.connect.grpc.io.grpc.stub.ServerCalls$UnaryServerCallHandler$UnaryServerCallListener.onHalfClose(ServerCalls.java:182) at org.sparkproject.connect.grpc.io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:346) at org.sparkproject.connect.grpc.io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:860) at org.sparkproject.connect.grpc.io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37) at org.sparkproject.connect.grpc.io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) ``` In fact, we do support to set static configurations (and all other configurations) when `remote` is specific to `local`. ### Does this PR introduce _any_ user-facing change? No, the main change has not been released out yet. ### How was this patch tested? Manually tested. ### Was this patch authored or co-authored using generative AI tooling? No. Closes #42694 from HyukjinKwon/SPARK-44981. Authored-by: Hyukjin Kwon Signed-off-by: Hyukjin Kwon --- python/pyspark/sql/connect/session.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/python/pyspark/sql/connect/session.py b/python/pyspark/sql/connect/session.py index 8e234442c20fd..6c01aad06c08a 100644 --- a/python/pyspark/sql/connect/session.py +++ b/python/pyspark/sql/connect/session.py @@ -884,6 +884,14 @@ def create_conf(**kwargs: Any) -> SparkConf: PySparkSession( SparkContext.getOrCreate(create_conf(loadDefaults=True, _jvm=SparkContext._jvm)) ) + + # Lastly remove all static configurations that are not allowed to set in the regular + # Spark Connect session. + jvm = SparkContext._jvm + utl = jvm.org.apache.spark.sql.api.python.PythonSQLUtils # type: ignore[union-attr] + for conf_set in utl.listStaticSQLConfigs(): + opts.pop(conf_set._1(), None) + finally: if origin_remote is not None: os.environ["SPARK_REMOTE"] = origin_remote