Skip to content

Commit

Permalink
[improvement] The timed state synchronizes the periodic configuration (
Browse files Browse the repository at this point in the history
  • Loading branch information
Mrart authored Sep 9, 2024
1 parent 139a26b commit 7c2334f
Show file tree
Hide file tree
Showing 6 changed files with 91 additions and 63 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,18 +115,18 @@ public class FlinkAppHttpWatcher {
* set the state of the task to savepoint
* </pre>
*/
private static final Cache<Long, Byte> SAVEPOINT_CACHE = Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES)
.build();
private static final Cache<Long, Byte> SAVEPOINT_CACHE =
Caffeine.newBuilder().expireAfterWrite(1, TimeUnit.MINUTES).build();

/**
* Record the status of the first tracking task, because after the task is started, the overview
* of the task will be obtained during the first tracking
*/
private static final Cache<Long, Byte> STARTING_CACHE = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
.build();
private static final Cache<Long, Byte> STARTING_CACHE =
Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();

private static final Cache<Long, Date> LOST_CACHE = Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES)
.build();
private static final Cache<Long, Date> LOST_CACHE =
Caffeine.newBuilder().expireAfterWrite(5, TimeUnit.MINUTES).build();

/** tracking task list */
private static final Map<Long, Application> WATCHING_APPS = new ConcurrentHashMap<>(0);
Expand All @@ -148,11 +148,11 @@ public class FlinkAppHttpWatcher {
* Cancelling tasks are placed in this cache with an expiration time of 10 seconds (the time of 2
* task monitoring polls).
*/
private static final Cache<Long, Byte> CANCELING_CACHE = Caffeine.newBuilder()
.expireAfterWrite(10, TimeUnit.SECONDS).build();
private static final Cache<Long, Byte> CANCELING_CACHE =
Caffeine.newBuilder().expireAfterWrite(10, TimeUnit.SECONDS).build();

private static final Cache<Long, FlinkStateChangeEvent> PREVIOUS_STATUS = Caffeine.newBuilder()
.expireAfterWrite(24, TimeUnit.HOURS).build();
private static final Cache<Long, FlinkStateChangeEvent> PREVIOUS_STATUS =
Caffeine.newBuilder().expireAfterWrite(24, TimeUnit.HOURS).build();

/**
* Task canceled tracking list, record who cancelled the tracking task Map<applicationId,userId>
Expand All @@ -176,10 +176,11 @@ public class FlinkAppHttpWatcher {
@PostConstruct
public void init() {
WATCHING_APPS.clear();
List<Application> applications = applicationManageService.list(
new LambdaQueryWrapper<Application>()
.eq(Application::getTracking, 1)
.notIn(Application::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
List<Application> applications =
applicationManageService.list(
new LambdaQueryWrapper<Application>()
.eq(Application::getTracking, 1)
.notIn(Application::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
applications.forEach(
(app) -> {
WATCHING_APPS.put(app.getId(), app);
Expand All @@ -202,7 +203,7 @@ public void doStop() {
*
* <p><strong>2) Normal information obtain, once every 5 seconds</strong>
*/
@Scheduled(fixedDelay = 1000)
@Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
Expand Down Expand Up @@ -298,18 +299,20 @@ private void getStateFromFlink(Application application) throws Exception {
if (FlinkExecutionMode.YARN_APPLICATION.equals(execMode)
|| FlinkExecutionMode.YARN_PER_JOB.equals(execMode)) {
if (jobsOverview.getJobs() != null) {
optional = jobsOverview.getJobs().size() > 1
? jobsOverview.getJobs().stream()
.filter(a -> StringUtils.equals(application.getJobId(), a.getId()))
.findFirst()
: jobsOverview.getJobs().stream().findFirst();
optional =
jobsOverview.getJobs().size() > 1
? jobsOverview.getJobs().stream()
.filter(a -> StringUtils.equals(application.getJobId(), a.getId()))
.findFirst()
: jobsOverview.getJobs().stream().findFirst();
} else {
optional = Optional.empty();
}
} else {
optional = jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.findFirst();
optional =
jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.findFirst();
}

if (optional.isPresent()) {
Expand Down Expand Up @@ -414,9 +417,10 @@ private void handleRunningState(
switch (releaseState) {
case NEED_RESTART:
case NEED_ROLLBACK:
LambdaUpdateWrapper<Application> updateWrapper = new LambdaUpdateWrapper<Application>()
.eq(Application::getId, appId)
.set(Application::getRelease, ReleaseStateEnum.DONE.get());
LambdaUpdateWrapper<Application> updateWrapper =
new LambdaUpdateWrapper<Application>()
.eq(Application::getId, appId)
.set(Application::getRelease, ReleaseStateEnum.DONE.get());
applicationManageService.update(updateWrapper);
break;
default:
Expand Down Expand Up @@ -617,7 +621,8 @@ private void getStateFromYarn(Application application) throws Exception {
}

private void doAlert(Application application, FlinkAppStateEnum flinkAppState) {
AlertTemplate alertTemplate = AlertTemplateUtils.createAlertTemplate(application, flinkAppState);
AlertTemplate alertTemplate =
AlertTemplateUtils.createAlertTemplate(application, flinkAppState);
alertService.alert(application.getAlertId(), alertTemplate);
}

Expand Down Expand Up @@ -756,9 +761,10 @@ private JobsOverview httpJobsOverview(Application application) throws Exception
String remoteUrl = cluster.getAddress() + "/" + flinkUrl;
JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class);
if (jobsOverview != null) {
List<JobsOverview.Job> jobs = jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.collect(Collectors.toList());
List<JobsOverview.Job> jobs =
jobsOverview.getJobs().stream()
.filter(x -> x.getId().equals(application.getJobId()))
.collect(Collectors.toList());
jobsOverview.setJobs(jobs);
}
return jobsOverview;
Expand Down Expand Up @@ -786,8 +792,8 @@ private CheckPoints httpCheckpoints(Application application) throws Exception {
return httpRemoteCluster(
application.getFlinkClusterId(),
cluster -> {
String remoteUrl = cluster.getAddress() + "/"
+ String.format(flinkUrl, application.getJobId());
String remoteUrl =
cluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId());
return httpRestRequest(remoteUrl, CheckPoints.class);
});
}
Expand All @@ -800,8 +806,9 @@ private <T> T yarnRestRequest(String url, Class<T> clazz) throws IOException {
}

private <T> T httpRestRequest(String url, Class<T> clazz) throws IOException {
String result = HttpClientUtils.httpGetRequest(
url, RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build());
String result =
HttpClientUtils.httpGetRequest(
url, RequestConfig.custom().setConnectTimeout(HTTP_TIMEOUT).build());
if (null == result) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,24 +85,25 @@ public class FlinkClusterWatcher {
/** Watcher cluster lists */
private static final Map<Long, FlinkCluster> WATCHER_CLUSTERS = new ConcurrentHashMap<>(8);

private static final Cache<Long, ClusterState> FAILED_STATES = Caffeine.newBuilder()
.expireAfterWrite(WATCHER_INTERVAL).build();
private static final Cache<Long, ClusterState> FAILED_STATES =
Caffeine.newBuilder().expireAfterWrite(WATCHER_INTERVAL).build();

private boolean immediateWatch = false;

/** Initialize cluster cache */
@PostConstruct
private void init() {
WATCHER_CLUSTERS.clear();
List<FlinkCluster> flinkClusters = flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState())
// excluding flink clusters on kubernetes
.notIn(FlinkCluster::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
List<FlinkCluster> flinkClusters =
flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
.eq(FlinkCluster::getClusterState, ClusterState.RUNNING.getState())
// excluding flink clusters on kubernetes
.notIn(FlinkCluster::getExecutionMode, FlinkExecutionMode.getKubernetesMode()));
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), cluster));
}

@Scheduled(fixedDelay = 1000)
@Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
private void start() {
Long timeMillis = System.currentTimeMillis();
if (immediateWatch || timeMillis - lastWatchTime >= WATCHER_INTERVAL.toMillis()) {
Expand All @@ -117,8 +118,7 @@ private void start() {
case LOST:
case UNKNOWN:
case KILLED:
flinkClusterService.updateClusterState(flinkCluster.getId(),
state);
flinkClusterService.updateClusterState(flinkCluster.getId(), state);
unWatching(flinkCluster);
alert(flinkCluster, state);
break;
Expand All @@ -134,11 +134,11 @@ private void alert(FlinkCluster cluster, ClusterState state) {
cluster.setAllJobs(applicationInfoService.countByClusterId(cluster.getId()));
cluster.setAffectedJobs(
applicationInfoService.countAffectedByClusterId(
cluster.getId(),
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.getId(), InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
cluster.setClusterState(state.getState());
cluster.setEndTime(new Date());
alertService.alert(cluster.getAlertId(), AlertTemplateUtils.createAlertTemplate(cluster, state));
alertService.alert(
cluster.getAlertId(), AlertTemplateUtils.createAlertTemplate(cluster, state));
}
}

Expand Down Expand Up @@ -213,13 +213,15 @@ private ClusterState httpClusterState(FlinkCluster flinkCluster) {
private ClusterState getStateFromFlinkRestApi(FlinkCluster flinkCluster) {
String address = flinkCluster.getAddress();
String jobManagerUrl = flinkCluster.getJobManagerUrl();
String flinkUrl = StringUtils.isBlank(jobManagerUrl)
? address.concat("/overview")
: jobManagerUrl.concat("/overview");
String flinkUrl =
StringUtils.isBlank(jobManagerUrl)
? address.concat("/overview")
: jobManagerUrl.concat("/overview");
try {
String res = HttpClientUtils.httpGetRequest(
flinkUrl,
RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build());
String res =
HttpClientUtils.httpGetRequest(
flinkUrl,
RequestConfig.custom().setConnectTimeout(5000, TimeUnit.MILLISECONDS).build());
JacksonUtils.read(res, Overview.class);
return ClusterState.RUNNING;
} catch (Exception ignored) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,8 @@ public class SparkAppHttpWatcher {
private static final Map<Long, SparkApplication> WATCHING_APPS = new ConcurrentHashMap<>(0);

/**
*
*
* <pre>
* StopFrom: Recording spark application stopped by streampark or stopped by other actions
* </pre>
Expand Down Expand Up @@ -149,7 +151,7 @@ public void doStop() {
*
* <p><strong>2) Normal information obtain, once every 5 seconds</strong>
*/
@Scheduled(fixedDelay = 1000)
@Scheduled(fixedDelayString = "${job.state-watcher.fixed-delayed:1000}")
public void start() {
Long timeMillis = System.currentTimeMillis();
if (lastWatchTime == null
Expand Down Expand Up @@ -217,7 +219,8 @@ private void getStateFromYarn(SparkApplication application) throws Exception {
summary.setUsedVCores(Long.parseLong(yarnAppInfo.getApp().getAllocatedVCores()));
application.fillRunningMetrics(summary);
} catch (IOException e) {
// This may happen when the job is finished right after the job status is abtained from yarn.
// This may happen when the job is finished right after the job status is abtained from
// yarn.
log.warn(
"[StreamPark][SparkAppHttpWatcher] getStateFromYarn, fetch spark job status failed. The job may have already been finished.");
}
Expand Down Expand Up @@ -310,8 +313,8 @@ private Job[] httpJobsStatus(SparkApplication application) throws IOException {
}

/**
* Calculate spark stage and task metric from yarn rest api.
* Only available when yarn application status is RUNNING.
* Calculate spark stage and task metric from yarn rest api. Only available when yarn application
* status is RUNNING.
*
* @param application
* @return task progress
Expand All @@ -323,12 +326,15 @@ private SparkApplicationSummary httpStageAndTaskStatus(SparkApplication applicat
if (jobs == null) {
return summary;
}
Arrays.stream(jobs).forEach(job -> {
summary.setNumTasks(job.getNumTasks() + summary.getNumTasks());
summary.setNumCompletedTasks(job.getNumCompletedTasks() + summary.getNumCompletedTasks());
summary.setNumStages(job.getStageIds().size() + summary.getNumStages());
summary.setNumStages(job.getNumCompletedStages() + summary.getNumCompletedStages());
});
Arrays.stream(jobs)
.forEach(
job -> {
summary.setNumTasks(job.getNumTasks() + summary.getNumTasks());
summary.setNumCompletedTasks(
job.getNumCompletedTasks() + summary.getNumCompletedTasks());
summary.setNumStages(job.getStageIds().size() + summary.getNumStages());
summary.setNumStages(job.getNumCompletedStages() + summary.getNumCompletedStages());
});
return summary;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,8 @@ network:
restrict-interface: docker0
# network IP gets priority, default inner outer
priority-strategy: default

# Add configurable job.state-watcher.fixed-delayed
job:
state-watcher:
fixed-delayed: 1000
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,7 @@ streampark:
local: /tmp
# remote: hdfs://hdfscluster/streampark
# hadoop-user-name: root

job:
state-watcher:
fixed-delayed: 1000
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,7 @@ network:
restrict-interface: docker0
# network IP gets priority, default inner outer
priority-strategy: default

job:
state-watcher:
fixed-delayed: 1000

0 comments on commit 7c2334f

Please sign in to comment.