Skip to content

Commit

Permalink
[Improve] support the variable in Flink configuration
Browse files Browse the repository at this point in the history
  • Loading branch information
wolfboys committed Sep 15, 2024
1 parent aaf7c74 commit 5241b32
Show file tree
Hide file tree
Showing 10 changed files with 91 additions and 113 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import org.apache.streampark.common.conf.FlinkVersion;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.common.utils.CommonUtils;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.exception.ApiDetailException;

Expand All @@ -38,7 +37,6 @@
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Map;
import java.util.Properties;

@Getter
@Setter
Expand Down Expand Up @@ -115,18 +113,6 @@ public Map<String, String> convertFlinkYamlAsMap() {
return PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
}

@JsonIgnore
public Properties getFlinkConfig(Application application) {
String flinkYamlString = DeflaterUtils.unzipString(flinkConf);
Properties flinkConfig = new Properties();
Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
for (Map.Entry<String, String> entry : config.entrySet()) {
String value = CommonUtils.fixedValueBaseVar(entry.getValue(), application.getJobName());
flinkConfig.setProperty(entry.getKey(), value);
}
return flinkConfig;
}

@JsonIgnore
public FlinkVersion getFlinkVersion() {
if (this.flinkVersion == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,14 @@
package org.apache.streampark.console.core.service;

import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkEnv;

import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.service.IService;

import java.io.IOException;
import java.util.Properties;

public interface FlinkEnvService extends IService<FlinkEnv> {

Expand Down Expand Up @@ -97,4 +99,6 @@ public interface FlinkEnvService extends IService<FlinkEnv> {
void validity(Long id);

IPage<FlinkEnv> findPage(FlinkEnv flinkEnv, RestRequest restRequest);

Properties getFlinkConfig(FlinkEnv flinkEnv, Application application);
}
Original file line number Diff line number Diff line change
Expand Up @@ -1820,7 +1820,9 @@ private Map<String, Object> getProperties(Application application, FlinkEnv flin

if (ExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) {
String archiveDir =
flinkEnv.getFlinkConfig(application).getProperty(JobManagerOptions.ARCHIVE_DIR.key());
flinkEnvService
.getFlinkConfig(flinkEnv, application)
.getProperty(JobManagerOptions.ARCHIVE_DIR.key());
if (archiveDir != null) {
properties.put(JobManagerOptions.ARCHIVE_DIR.key(), archiveDir);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,21 @@

package org.apache.streampark.console.core.service.impl;

import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.base.domain.RestRequest;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.streampark.console.base.mybatis.pager.MybatisPager;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkEnv;
import org.apache.streampark.console.core.entity.Project;
import org.apache.streampark.console.core.mapper.FlinkEnvMapper;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.FlinkEnvService;

import org.apache.commons.lang3.StringUtils;

import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.metadata.IPage;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
Expand All @@ -40,6 +45,8 @@
import java.io.File;
import java.io.IOException;
import java.util.Date;
import java.util.Map;
import java.util.Properties;

@Slf4j
@Service
Expand Down Expand Up @@ -164,6 +171,25 @@ public IPage<FlinkEnv> findPage(FlinkEnv flinkEnv, RestRequest restRequest) {
return this.baseMapper.findPage(page, flinkEnv);
}

@Override
public Properties getFlinkConfig(FlinkEnv flinkEnv, Application application) {
String flinkYamlString = DeflaterUtils.unzipString(flinkEnv.getFlinkConf());
Properties flinkConfig = new Properties();
Map<String, String> config = PropertiesUtils.loadFlinkConfYaml(flinkYamlString);
for (Map.Entry<String, String> entry : config.entrySet()) {
String value = entry.getValue();
if (StringUtils.isNotBlank(application.getJobName())) {
value =
value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", application.getJobName());
}
if (application.getId() != null) {
value = value.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", application.getId().toString());
}
flinkConfig.setProperty(entry.getKey(), value);
}
return flinkConfig;
}

private void checkOrElseAlert(FlinkEnv flinkEnv) {

// 1.check exists
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,7 @@ private void expire(Savepoint entity) {

if (cpThreshold == 0) {
String flinkConfNumRetained =
flinkEnv.getFlinkConfig(application).getProperty(numRetainedKey);
flinkEnvService.getFlinkConfig(flinkEnv, application).getProperty(numRetainedKey);
int numRetainedDefaultValue = 1;
if (flinkConfNumRetained != null) {
try {
Expand Down Expand Up @@ -293,7 +293,7 @@ public String getSavePointPath(Application appParam) throws Exception {
if (StringUtils.isBlank(savepointPath)) {
// flink
FlinkEnv flinkEnv = flinkEnvService.getById(application.getVersionId());
Properties flinkConfig = flinkEnv.getFlinkConfig(application);
Properties flinkConfig = flinkEnvService.getFlinkConfig(flinkEnv, application);
savepointPath =
flinkConfig.getProperty(
CheckpointingOptions.SAVEPOINT_DIRECTORY.key(),
Expand All @@ -306,10 +306,8 @@ public String getSavePointPath(Application appParam) throws Exception {
@Override
public String processPath(String path, String jobName, Long jobId) {
if (StringUtils.isNotBlank(path)) {
return path.replaceAll("\\$job(Id|id)", jobId.toString())
.replaceAll("\\$\\{job(Id|id)}", jobId.toString())
.replaceAll("\\$job(Name|name)", jobName)
.replaceAll("\\$\\{job(Name|name)}", jobName);
return path.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName)
.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId.toString());
}
return path;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ private List<TrackId> getK8sWatchingApps() {

public TrackId toTrackId(Application app) {
FlinkEnv flinkEnv = flinkEnvService.getById(app.getVersionId());
Properties properties = flinkEnv.getFlinkConfig(app);
Properties properties = flinkEnvService.getFlinkConfig(flinkEnv, app);

Map<String, String> dynamicProperties =
PropertiesUtils.extractDynamicPropertiesAsJava(app.getDynamicProperties());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.streampark.flink.util.FlinkUtils
import org.apache.streampark.shaded.com.fasterxml.jackson.databind.ObjectMapper

import org.apache.commons.io.FileUtils
import org.apache.flink.configuration.{Configuration, GlobalConfiguration}
import org.apache.flink.runtime.jobgraph.{SavepointConfigOptions, SavepointRestoreSettings}

import javax.annotation.Nullable
Expand All @@ -34,7 +35,7 @@ import java.io.File
import java.util.{Map => JavaMap}

import scala.collection.JavaConversions._
import scala.util.Try
import scala.util.{Success, Try}

case class SubmitRequest(
flinkVersion: FlinkVersion,
Expand Down Expand Up @@ -98,6 +99,26 @@ case class SubmitRequest(
}
}

lazy val flinkDefaultConfiguration: Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"${flinkVersion.flinkHome}/conf")) match {
case Success(value) =>
value
.keySet()
.foreach(
k => {
val v = value.getString(k, null)
if (v != null) {
val result = v
.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", effectiveAppName)
.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", id.toString)
value.setString(k, result)
}
})
value
case _ => new Configuration()
}
}

def hasProp(key: String): Boolean = properties.containsKey(key)

def getProp(key: String): Any = properties.get(key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,15 +184,11 @@ object YarnSessionClient extends YarnClientTrait {
var clusterDescriptor: YarnClusterDescriptor = null
var client: ClusterClient[ApplicationId] = null
try {
val flinkConfig = getFlinkDefaultConfiguration(shutDownRequest.flinkVersion.flinkHome)
shutDownRequest.properties.foreach(
m =>
m._2 match {
case v if v != null => flinkConfig.setString(m._1, m._2.toString)
case _ =>
})
flinkConfig.safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
flinkConfig.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
val flinkConfig = new Configuration()
flinkConfig
.safeSet(YarnConfigOptions.APPLICATION_ID, shutDownRequest.clusterId)
.safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
.safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
val yarnClusterDescriptor = getYarnClusterDescriptor(flinkConfig)
val applicationId: ApplicationId = yarnClusterDescriptor._1
clusterDescriptor = yarnClusterDescriptor._2
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,11 @@ trait FlinkClientTrait extends Logger {
.safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)

if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
submitRequest.flinkVersion.flinkHome)
// state.checkpoints.num-retained
val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
flinkConfig.safeSet(
retainedOption,
submitRequest.flinkDefaultConfiguration.get(retainedOption))
}

// 2) set savepoint parameter
Expand Down Expand Up @@ -279,29 +279,20 @@ trait FlinkClientTrait extends Logger {
throw new IllegalStateException("No valid command-line found.")
}

private[client] def getFlinkDefaultConfiguration(flinkHome: String): Configuration = {
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
}

private[client] def getOptionFromDefaultFlinkConfig[T](
flinkHome: String,
option: ConfigOption[T]): T = {
getFlinkDefaultConfiguration(flinkHome).get(option)
}

private[this] def getCustomCommandLines(flinkHome: String): JavaList[CustomCommandLine] = {
val flinkDefaultConfiguration: Configuration = getFlinkDefaultConfiguration(flinkHome)
// 1. find the configuration directory
val configurationDirectory = s"$flinkHome/conf"
// 2. load the custom command lines
loadCustomCommandLines(flinkDefaultConfiguration, configurationDirectory)
val flinkConfig =
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())
loadCustomCommandLines(flinkConfig, configurationDirectory)
}

private[client] def getParallelism(submitRequest: SubmitRequest): Integer = {
if (submitRequest.properties.containsKey(KEY_FLINK_PARALLELISM())) {
Integer.valueOf(submitRequest.properties.get(KEY_FLINK_PARALLELISM()).toString)
} else {
getFlinkDefaultConfiguration(submitRequest.flinkVersion.flinkHome)
submitRequest.flinkDefaultConfiguration
.getInteger(CoreOptions.DEFAULT_PARALLELISM, CoreOptions.DEFAULT_PARALLELISM.defaultValue())
}
}
Expand Down Expand Up @@ -371,20 +362,20 @@ trait FlinkClientTrait extends Logger {

val commandLine = FlinkRunOption.parse(commandLineOptions, cliArgs, true)

val activeCommandLine = validateAndGetActiveCommandLine(
getCustomCommandLines(submitRequest.flinkVersion.flinkHome),
commandLine)
val activeCommandLine = {
val customCommandLines: JavaList[CustomCommandLine] = {
// 1. find the configuration directory
val configurationDirectory = s"${submitRequest.flinkVersion.flinkHome}/conf"
// 2. load the custom command lines
loadCustomCommandLines(submitRequest.flinkDefaultConfiguration, configurationDirectory)
}
validateAndGetActiveCommandLine(customCommandLines, commandLine)
}

val configuration =
applyConfiguration(
submitRequest.flinkVersion.flinkHome,
activeCommandLine,
commandLine,
submitRequest.id.toString,
submitRequest.effectiveAppName)
val configuration = new Configuration(submitRequest.flinkDefaultConfiguration)
configuration.addAll(activeCommandLine.toConfiguration(commandLine))

commandLine -> configuration

}

private[client] def getCommandLineOptions(flinkHome: String) = {
Expand Down Expand Up @@ -417,10 +408,16 @@ trait FlinkClientTrait extends Logger {
}
FlinkRunOption.parse(commandLineOptions, cliArgs, true)
}

val activeCommandLine =
validateAndGetActiveCommandLine(getCustomCommandLines(flinkHome), commandLine)
val flinkConfig = applyConfiguration(flinkHome, activeCommandLine, commandLine)
flinkConfig

val flinkDefaultConfiguration =
Try(GlobalConfiguration.loadConfiguration(s"$flinkHome/conf")).getOrElse(new Configuration())

val configuration = new Configuration(flinkDefaultConfiguration)
configuration.addAll(activeCommandLine.toConfiguration(commandLine))
configuration
}

private[this] def extractProgramArgs(submitRequest: SubmitRequest): JavaList[String] = {
Expand All @@ -445,32 +442,6 @@ trait FlinkClientTrait extends Logger {
Lists.newArrayList(programArgs: _*)
}

private[this] def applyConfiguration(
flinkHome: String,
activeCustomCommandLine: CustomCommandLine,
commandLine: CommandLine,
jobId: String = null,
jobName: String = null): Configuration = {

require(activeCustomCommandLine != null, "activeCustomCommandLine must not be null.")
val configuration = new Configuration()
val flinkDefaultConfiguration = getFlinkDefaultConfiguration(flinkHome)
flinkDefaultConfiguration.keySet.foreach(
key => {
val value = flinkDefaultConfiguration.getString(key, null)
var result = value
if (value != null && StringUtils.isNotBlank(jobName)) {
result = value.replaceAll("\\$\\{job(Name|name)}|\\$job(Name|name)", jobName)
}
if (jobId != null) {
result = result.replaceAll("\\$\\{job(Id|id)}|\\$job(Id|id)", jobId)
}
configuration.setString(key, result)
})
configuration.addAll(activeCustomCommandLine.toConfiguration(commandLine))
configuration
}

implicit private[client] class EnhanceFlinkConfiguration(flinkConfig: Configuration) {
def safeSet[T](option: ConfigOption[T], value: T): Configuration = {
flinkConfig match {
Expand Down

0 comments on commit 5241b32

Please sign in to comment.