Skip to content

Commit

Permalink
[SPARK-44982][CONNECT] Mark Spark Connect server configurations as st…
Browse files Browse the repository at this point in the history
…atic

### What changes were proposed in this pull request?

This PR proposes to mark all Spark Connect server configurations as static configurations.

### Why are the changes needed?

They are already static configurations, and cannot be set in runtime configuration (by default), see also https://github.com/apache/spark/blob/4a4856207d414ba88a8edabeb70e20765460ef1a/sql/core/src/main/scala/org/apache/spark/sql/RuntimeConfig.scala#L164-L167

### Does this PR introduce _any_ user-facing change?

No, they are already static configurations.

### How was this patch tested?

Existing unittests.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#42695 from HyukjinKwon/SPARK-44982.

Authored-by: Hyukjin Kwon <gurwls223@apache.org>
Signed-off-by: Hyukjin Kwon <gurwls223@apache.org>
  • Loading branch information
HyukjinKwon committed Aug 28, 2023
1 parent 64636af commit 5b69dfd
Showing 1 changed file with 19 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,26 @@ package org.apache.spark.sql.connect.config

import java.util.concurrent.TimeUnit

import org.apache.spark.internal.config.ConfigBuilder
import org.apache.spark.network.util.ByteUnit
import org.apache.spark.sql.connect.common.config.ConnectCommon

object Connect {
import org.apache.spark.sql.internal.SQLConf.buildStaticConf

val CONNECT_GRPC_BINDING_ADDRESS =
ConfigBuilder("spark.connect.grpc.binding.address")
buildStaticConf("spark.connect.grpc.binding.address")
.version("4.0.0")
.stringConf
.createOptional

val CONNECT_GRPC_BINDING_PORT =
ConfigBuilder("spark.connect.grpc.binding.port")
buildStaticConf("spark.connect.grpc.binding.port")
.version("3.4.0")
.intConf
.createWithDefault(ConnectCommon.CONNECT_GRPC_BINDING_PORT)

val CONNECT_GRPC_INTERCEPTOR_CLASSES =
ConfigBuilder("spark.connect.grpc.interceptor.classes")
buildStaticConf("spark.connect.grpc.interceptor.classes")
.doc(
"Comma separated list of class names that must " +
"implement the io.grpc.ServerInterceptor interface.")
Expand All @@ -47,7 +46,7 @@ object Connect {
.createOptional

val CONNECT_GRPC_ARROW_MAX_BATCH_SIZE =
ConfigBuilder("spark.connect.grpc.arrow.maxBatchSize")
buildStaticConf("spark.connect.grpc.arrow.maxBatchSize")
.doc(
"When using Apache Arrow, limit the maximum size of one arrow batch, in bytes unless " +
"otherwise specified, that can be sent from server side to client side. Currently, we " +
Expand All @@ -57,15 +56,15 @@ object Connect {
.createWithDefault(4 * 1024 * 1024)

val CONNECT_GRPC_MAX_INBOUND_MESSAGE_SIZE =
ConfigBuilder("spark.connect.grpc.maxInboundMessageSize")
buildStaticConf("spark.connect.grpc.maxInboundMessageSize")
.doc("Sets the maximum inbound message in bytes size for the gRPC requests." +
"Requests with a larger payload will fail.")
.version("3.4.0")
.bytesConf(ByteUnit.BYTE)
.createWithDefault(ConnectCommon.CONNECT_GRPC_MAX_MESSAGE_SIZE)

val CONNECT_GRPC_MARSHALLER_RECURSION_LIMIT =
ConfigBuilder("spark.connect.grpc.marshallerRecursionLimit")
buildStaticConf("spark.connect.grpc.marshallerRecursionLimit")
.internal()
.doc("""
|Sets the recursion limit to grpc protobuf messages.
Expand All @@ -75,31 +74,31 @@ object Connect {
.createWithDefault(1024)

val CONNECT_EXECUTE_MANAGER_DETACHED_TIMEOUT =
ConfigBuilder("spark.connect.execute.manager.detachedTimeout")
buildStaticConf("spark.connect.execute.manager.detachedTimeout")
.internal()
.doc("Timeout after which executions without an attached RPC will be removed.")
.version("3.5.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("5m")

val CONNECT_EXECUTE_MANAGER_MAINTENANCE_INTERVAL =
ConfigBuilder("spark.connect.execute.manager.maintenanceInterval")
buildStaticConf("spark.connect.execute.manager.maintenanceInterval")
.internal()
.doc("Interval at which execution manager will search for abandoned executions to remove.")
.version("3.5.0")
.timeConf(TimeUnit.MILLISECONDS)
.createWithDefaultString("30s")

val CONNECT_EXECUTE_MANAGER_ABANDONED_TOMBSTONES_SIZE =
ConfigBuilder("spark.connect.execute.manager.abandonedTombstonesSize")
buildStaticConf("spark.connect.execute.manager.abandonedTombstonesSize")
.internal()
.doc("Maximum size of the cache of abandoned executions.")
.version("3.5.0")
.intConf
.createWithDefaultString("10000")

val CONNECT_EXECUTE_REATTACHABLE_ENABLED =
ConfigBuilder("spark.connect.execute.reattachable.enabled")
buildStaticConf("spark.connect.execute.reattachable.enabled")
.internal()
.doc("Enables reattachable execution on the server. If disabled and a client requests it, " +
"non-reattachable execution will follow and should run until query completion. This will " +
Expand All @@ -110,7 +109,7 @@ object Connect {
.createWithDefault(true)

val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION =
ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamDuration")
buildStaticConf("spark.connect.execute.reattachable.senderMaxStreamDuration")
.internal()
.doc("For reattachable execution, after this amount of time the response stream will be " +
"automatically completed and client needs to send a new ReattachExecute RPC to continue. " +
Expand All @@ -120,7 +119,7 @@ object Connect {
.createWithDefaultString("2m")

val CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.senderMaxStreamSize")
buildStaticConf("spark.connect.execute.reattachable.senderMaxStreamSize")
.internal()
.doc(
"For reattachable execution, after total responses size exceeds this value, the " +
Expand All @@ -131,7 +130,7 @@ object Connect {
.createWithDefaultString("1g")

val CONNECT_EXECUTE_REATTACHABLE_OBSERVER_RETRY_BUFFER_SIZE =
ConfigBuilder("spark.connect.execute.reattachable.observerRetryBufferSize")
buildStaticConf("spark.connect.execute.reattachable.observerRetryBufferSize")
.internal()
.doc(
"For reattachable execution, the total size of responses that were already sent to be " +
Expand All @@ -143,7 +142,7 @@ object Connect {
.createWithDefaultString("1m")

val CONNECT_EXTENSIONS_RELATION_CLASSES =
ConfigBuilder("spark.connect.extensions.relation.classes")
buildStaticConf("spark.connect.extensions.relation.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.RelationPlugin to support custom
Expand All @@ -155,7 +154,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_EXTENSIONS_EXPRESSION_CLASSES =
ConfigBuilder("spark.connect.extensions.expression.classes")
buildStaticConf("spark.connect.extensions.expression.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.ExpressionPlugin to support custom
Expand All @@ -167,7 +166,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_EXTENSIONS_COMMAND_CLASSES =
ConfigBuilder("spark.connect.extensions.command.classes")
buildStaticConf("spark.connect.extensions.command.classes")
.doc("""
|Comma separated list of classes that implement the trait
|org.apache.spark.sql.connect.plugin.CommandPlugin to support custom
Expand All @@ -179,7 +178,7 @@ object Connect {
.createWithDefault(Nil)

val CONNECT_JVM_STACK_TRACE_MAX_SIZE =
ConfigBuilder("spark.connect.jvmStacktrace.maxSize")
buildStaticConf("spark.connect.jvmStacktrace.maxSize")
.doc("""
|Sets the maximum stack trace size to display when
|`spark.sql.pyspark.jvmStacktrace.enabled` is true.
Expand All @@ -203,13 +202,13 @@ object Connect {
.createWithDefault(false)

val CONNECT_UI_STATEMENT_LIMIT =
ConfigBuilder("spark.sql.connect.ui.retainedStatements")
buildStaticConf("spark.sql.connect.ui.retainedStatements")
.doc("The number of statements kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
.createWithDefault(200)

val CONNECT_UI_SESSION_LIMIT = ConfigBuilder("spark.sql.connect.ui.retainedSessions")
val CONNECT_UI_SESSION_LIMIT = buildStaticConf("spark.sql.connect.ui.retainedSessions")
.doc("The number of client sessions kept in the Spark Connect UI history.")
.version("3.5.0")
.intConf
Expand Down

0 comments on commit 5b69dfd

Please sign in to comment.