diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala index 569c5a2fc5..269ca6bc30 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala @@ -90,14 +90,14 @@ class FlinkMetricWatcher(conf: MetricWatcherConfig = MetricWatcherConfig.default future.onComplete(_.getOrElse(None) match { case Some(metric) => val clusterKey = id.toClusterKey - // update current flink cluster metrics on cache - watchController.flinkMetrics.put(clusterKey, metric) val isMetricChanged = { val preMetric = watchController.flinkMetrics.get(clusterKey) preMetric == null || !preMetric.equalsPayload(metric) } if (isMetricChanged) { eventBus.postAsync(FlinkClusterMetricChangeEvent(id, metric)) + // update current flink cluster metrics on cache + watchController.flinkMetrics.put(clusterKey, metric) } case _ => })