From 5241b32b9792ee033303740dbef861fabdbebfbe Mon Sep 17 00:00:00 2001 From: benjobs Date: Sun, 15 Sep 2024 12:07:24 +0800 Subject: [PATCH] [Improve] support the variable in Flink configuration --- .../streampark/common/utils/CommonUtils.java | 26 ------ .../console/core/entity/FlinkEnv.java | 14 ---- .../console/core/service/FlinkEnvService.java | 4 + .../service/impl/ApplicationServiceImpl.java | 4 +- .../service/impl/FlinkEnvServiceImpl.java | 26 ++++++ .../service/impl/SavepointServiceImpl.java | 10 +-- .../core/task/FlinkK8sWatcherWrapper.java | 2 +- .../flink/client/bean/SubmitRequest.scala | 23 +++++- .../flink/client/impl/YarnSessionClient.scala | 14 ++-- .../flink/client/trait/FlinkClientTrait.scala | 81 ++++++------------- 10 files changed, 91 insertions(+), 113 deletions(-) delete mode 100644 streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java diff --git a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java b/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java deleted file mode 100644 index eb022491cd..0000000000 --- a/streampark-common/src/main/java/org/apache/streampark/common/utils/CommonUtils.java +++ /dev/null @@ -1,26 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.streampark.common.utils; - -public class CommonUtils { - private CommonUtils() {} - - public static String fixedValueBaseVar(String configValue, String jobName) { - return configValue.replace("${jobName}", jobName); - } -} diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java index 4662aeb6eb..9ccf907737 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkEnv.java @@ -20,7 +20,6 @@ import org.apache.streampark.common.conf.FlinkVersion; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.PropertiesUtils; -import org.apache.streampark.common.utils.CommonUtils; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.exception.ApiDetailException; @@ -38,7 +37,6 @@ import java.nio.charset.StandardCharsets; import java.util.Date; import java.util.Map; -import java.util.Properties; @Getter @Setter @@ -115,18 +113,6 @@ public Map convertFlinkYamlAsMap() { return PropertiesUtils.loadFlinkConfYaml(flinkYamlString); } - @JsonIgnore - public Properties getFlinkConfig(Application application) { - String flinkYamlString = DeflaterUtils.unzipString(flinkConf); - Properties flinkConfig = new Properties(); - Map config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString); - for (Map.Entry entry : config.entrySet()) { - String value = CommonUtils.fixedValueBaseVar(entry.getValue(), application.getJobName()); - flinkConfig.setProperty(entry.getKey(), value); - } - return flinkConfig; - } - @JsonIgnore public FlinkVersion getFlinkVersion() { if (this.flinkVersion == null) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java index bbcbe8af85..53930320c3 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkEnvService.java @@ -18,12 +18,14 @@ package org.apache.streampark.console.core.service; import org.apache.streampark.console.base.domain.RestRequest; +import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkEnv; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.service.IService; import java.io.IOException; +import java.util.Properties; public interface FlinkEnvService extends IService { @@ -97,4 +99,6 @@ public interface FlinkEnvService extends IService { void validity(Long id); IPage findPage(FlinkEnv flinkEnv, RestRequest restRequest); + + Properties getFlinkConfig(FlinkEnv flinkEnv, Application application); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 881706aca1..a15aefaee2 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1820,7 +1820,9 @@ private Map getProperties(Application application, FlinkEnv flin if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { String archiveDir = - flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key()); + flinkEnvService + .getFlinkConfig(flinkEnv, application) + .getProperty(JobManagerOptions.ARCHIVE_DIR.key()); if (archiveDir != null) { properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java index 9a71f896ce..1f97dd6a0b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkEnvServiceImpl.java @@ -17,9 +17,12 @@ package org.apache.streampark.console.core.service.impl; +import org.apache.streampark.common.util.DeflaterUtils; +import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.base.domain.RestRequest; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.streampark.console.base.mybatis.pager.MybatisPager; +import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkEnv; import org.apache.streampark.console.core.entity.Project; import org.apache.streampark.console.core.mapper.FlinkEnvMapper; @@ -27,6 +30,8 @@ import org.apache.streampark.console.core.service.FlinkClusterService; import org.apache.streampark.console.core.service.FlinkEnvService; +import org.apache.commons.lang3.StringUtils; + import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.core.metadata.IPage; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; @@ -40,6 +45,8 @@ import java.io.File; import java.io.IOException; import java.util.Date; +import java.util.Map; +import java.util.Properties; @Slf4j @Service @@ -164,6 +171,25 @@ public IPage findPage(FlinkEnv flinkEnv, RestRequest restRequest) { return this.baseMapper.findPage(page, flinkEnv); } + @Override + public Properties getFlinkConfig(FlinkEnv flinkEnv, Application application) { + String flinkYamlString = DeflaterUtils.unzipString(flinkEnv.getFlinkConf()); + Properties flinkConfig = new Properties(); + Map config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString); + for (Map.Entry entry : config.entrySet()) { + String value = entry.getValue(); + if (StringUtils.isNotBlank(application.getJobName())) { + value = + value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", application.getJobName()); + } + if (application.getId() != null) { + value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", application.getId().toString()); + } + flinkConfig.setProperty(entry.getKey(), value); + } + return flinkConfig; + } + private void checkOrElseAlert(FlinkEnv flinkEnv) { // 1.check exists diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java index ebefe6197b..cfc1128424 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavepointServiceImpl.java @@ -156,7 +156,7 @@ private void expire(Savepoint entity) { if (cpThreshold == 0) { String flinkConfNumRetained = - flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey); + flinkEnvService.getFlinkConfig(flinkEnv, application).getProperty(numRetainedKey); int numRetainedDefaultValue = 1; if (flinkConfNumRetained != null) { try { @@ -293,7 +293,7 @@ public String getSavePointPath(Application appParam) throws Exception { if (StringUtils.isBlank(savepointPath)) { // flink FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId()); - Properties flinkConfig = flinkEnv.getFlinkConfig(application); + Properties flinkConfig = flinkEnvService.getFlinkConfig(flinkEnv, application); savepointPath = flinkConfig.getProperty( CheckpointingOptions.SAVEPOINT_DIRECTORY.key(), @@ -306,10 +306,8 @@ public String getSavePointPath(Application appParam) throws Exception { @Override public String processPath(String path, String jobName, Long jobId) { if (StringUtils.isNotBlank(path)) { - return path.replaceAll("\\$job(Id|id)", jobId.toString()) - .replaceAll("\\$\\{job(Id|id)}", jobId.toString()) - .replaceAll("\\$job(Name|name)", jobName) - .replaceAll("\\$\\{job(Name|name)}", jobName); + return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName) + .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString()); } return path; } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java index a09f85ef9f..123c399a53 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sWatcherWrapper.java @@ -119,7 +119,7 @@ private List getK8sWatchingApps() { public TrackId toTrackId(Application app) { FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId()); - Properties properties = flinkEnv.getFlinkConfig(app); + Properties properties = flinkEnvService.getFlinkConfig(flinkEnv, app); Map dynamicProperties = PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties()); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 86bc2bf8d0..49854e74e8 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -26,6 +26,7 @@ import org.apache.streampark.flink.util.FlinkUtils import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper import org.apache.commons.io.FileUtils +import org.apache.flink.configuration.{Configuration, GlobalConfiguration} import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings} import javax.annotation.Nullable @@ -34,7 +35,7 @@ import java.io.File import java.util.{Map => JavaMap} import scala.collection.JavaConversions._ -import scala.util.Try +import scala.util.{Success, Try} case class SubmitRequest( flinkVersion: FlinkVersion, @@ -98,6 +99,26 @@ case class SubmitRequest( } } + lazy val flinkDefaultConfiguration: Configuration = { + Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) match { + case Success(value) => + value + .keySet() + .foreach( + k => { + val v = value.getString(k, null) + if (v != null) { + val result = v + .replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName) + .replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString) + value.setString(k, result) + } + }) + value + case _ => new Configuration() + } + } + def hasProp(key: String): Boolean = properties.containsKey(key) def getProp(key: String): Any = properties.get(key) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala index db0c3db717..eb49ad4a10 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala @@ -184,15 +184,11 @@ object YarnSessionClient extends YarnClientTrait { var clusterDescriptor: YarnClusterDescriptor = null var client: ClusterClient[ApplicationId] = null try { - val flinkConfig = getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome) - shutDownRequest.properties.foreach( - m => - m._2 match { - case v if v != null => flinkConfig.setString(m._1, m._2.toString) - case _ => - }) - flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId) - flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) + val flinkConfig = new Configuration() + flinkConfig + .safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId) + .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName) + .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark") val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig) val applicationId: ApplicationId = yarnClusterDescriptor._1 clusterDescriptor = yarnClusterDescriptor._2 diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index ff7c7304c0..896767ca3a 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -114,11 +114,11 @@ trait FlinkClientTrait extends Logger { .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId) if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) { - val flinkDefaultConfiguration = getFlinkDefaultConfiguration( - submitRequest.flinkVersion.flinkHome) // state.checkpoints.num-retained val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS - flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption)) + flinkConfig.safeSet( + retainedOption, + submitRequest.flinkDefaultConfiguration.get(retainedOption)) } // 2) set savepoint parameter @@ -279,29 +279,20 @@ trait FlinkClientTrait extends Logger { throw new IllegalStateException("No valid command-line found.") } - private[client] def getFlinkDefaultConfiguration(flinkHome: String): Configuration = { - Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) - } - - private[client] def getOptionFromDefaultFlinkConfig[T]( - flinkHome: String, - option: ConfigOption[T]): T = { - getFlinkDefaultConfiguration(flinkHome).get(option) - } - private[this] def getCustomCommandLines(flinkHome: String): JavaList[CustomCommandLine] = { - val flinkDefaultConfiguration: Configuration = getFlinkDefaultConfiguration(flinkHome) // 1. find the configuration directory val configurationDirectory = s"$flinkHome/conf" // 2. load the custom command lines - loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory) + val flinkConfig = + Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) + loadCustomCommandLines(flinkConfig, configurationDirectory) } private[client] def getParallelism(submitRequest: SubmitRequest): Integer = { if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) { Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString) } else { - getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome) + submitRequest.flinkDefaultConfiguration .getInteger(CoreOptions.DEFAULT_PARALLELISM, CoreOptions.DEFAULT_PARALLELISM.defaultValue()) } } @@ -371,20 +362,20 @@ trait FlinkClientTrait extends Logger { val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true) - val activeCommandLine = validateAndGetActiveCommandLine( - getCustomCommandLines(submitRequest.flinkVersion.flinkHome), - commandLine) + val activeCommandLine = { + val customCommandLines: JavaList[CustomCommandLine] = { + // 1. find the configuration directory + val configurationDirectory = s"${submitRequest.flinkVersion.flinkHome}/conf" + // 2. load the custom command lines + loadCustomCommandLines(submitRequest.flinkDefaultConfiguration, configurationDirectory) + } + validateAndGetActiveCommandLine(customCommandLines, commandLine) + } - val configuration = - applyConfiguration( - submitRequest.flinkVersion.flinkHome, - activeCommandLine, - commandLine, - submitRequest.id.toString, - submitRequest.effectiveAppName) + val configuration = new Configuration(submitRequest.flinkDefaultConfiguration) + configuration.addAll(activeCommandLine.toConfiguration(commandLine)) commandLine -> configuration - } private[client] def getCommandLineOptions(flinkHome: String) = { @@ -417,10 +408,16 @@ trait FlinkClientTrait extends Logger { } FlinkRunOption.parse(commandLineOptions, cliArgs, true) } + val activeCommandLine = validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), commandLine) - val flinkConfig = applyConfiguration(flinkHome, activeCommandLine, commandLine) - flinkConfig + + val flinkDefaultConfiguration = + Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration()) + + val configuration = new Configuration(flinkDefaultConfiguration) + configuration.addAll(activeCommandLine.toConfiguration(commandLine)) + configuration } private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = { @@ -445,32 +442,6 @@ trait FlinkClientTrait extends Logger { Lists.newArrayList(programArgs: _*) } - private[this] def applyConfiguration( - flinkHome: String, - activeCustomCommandLine: CustomCommandLine, - commandLine: CommandLine, - jobId: String = null, - jobName: String = null): Configuration = { - - require(activeCustomCommandLine != null, "activeCustomCommandLine must not be null.") - val configuration = new Configuration() - val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome) - flinkDefaultConfiguration.keySet.foreach( - key => { - val value = flinkDefaultConfiguration.getString(key, null) - var result = value - if (value != null && StringUtils.isNotBlank(jobName)) { - result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName) - } - if (jobId != null) { - result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId) - } - configuration.setString(key, result) - }) - configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine)) - configuration - } - implicit private[client] class EnhanceFlinkConfiguration(flinkConfig: Configuration) { def safeSet[T](option: ConfigOption[T], value: T): Configuration = { flinkConfig match {