Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP][Feature] yarn application mode support prometheus #3215

Closed
wants to merge 9 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,17 @@ object CommonConfig {
description = "Use the database type"
)

val STREAMPARK_PROMETHEUS_ENABLE: InternalOption = InternalOption(
key = "streampark.prometheus.enable",
defaultValue = false,
classType = classOf[Boolean],
description = "Enable prometheus"
)

val STREAMPARK_PROMGATEWAY_HOST_URL: InternalOption = InternalOption(
key = "streampark.promgateway.hostUrl",
defaultValue = "http://localhost:9091",
classType = classOf[String],
description = "Promgateway hostUrl"
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@

### 1. linux install pushgateway

#### 1.1. Download pushgateway

```shell
wget https://github.com/prometheus/pushgateway/releases/download/v1.6.2/pushgateway-1.6.2.linux-amd64.tar.gz

tar -zxvf pushgateway-1.6.2.linux-amd64.tar.gz

mv pushgateway-1.6.2.linux-amd64 pushgateway
```

#### 1.2. Start pushgateway

```shell
cd pushgateway

nohup ./pushgateway --web.listen-address :9091 > ./pushgateway.log 2>&1 &
```

### 2. linux install prometheus

#### 2.1. Download prometheus

```shell
wget https://github.com/prometheus/prometheus/releases/download/v2.47.1/prometheus-2.47.1.linux-amd64.tar.gz

tar -zxvf prometheus-2.47.1.linux-amd64.tar.gz

mv prometheus-2.47.1.linux-amd64 prometheus
```

#### 2.2. Modified prometheus.yml, added

```text
- job_name: "pushgateway"
static_configs:
- targets: ["ip:9091"]
labels:
instance: pushgateway
```

#### 2.3. Start prometheus

```shell
cd prometheus

nohup ./prometheus --config.file=prometheus.yml > ./prometheus.log 2>&1 &
```

### 3. Modify the streampark configuration

```text
org.apache.streampark.common.conf.CommonConfig.STREAMPARK_PROMETHEUS_ENABLE
streampark.prometheus.enable: true

org.apache.streampark.common.conf.CommonConfig.STREAMPARK_PROMGATEWAY_HOST_URL
streampark.promgateway.hostUrl: http://ip:9091
```
Current only yarn-application is supported. Other modes have not been tested

#### 4. Stop

#### 4.1. Stop prometheus

```shell
pgrep prometheus

kill {pid}
```

#### 4.2. Stop pushgateway

```shell
pgrep pushgateway

kill {pid}
```

#### 5. Reference document

```text
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/deployment/metric_reporters/#prometheuspushgateway

https://prometheus.io/docs/prometheus/latest/getting_started/
```



Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

package org.apache.streampark.flink.client.`trait`

import org.apache.streampark.common.conf.{ConfigConst, Workspace}
import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder, Workspace}
import org.apache.streampark.common.conf.ConfigConst._
import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
Expand Down Expand Up @@ -156,6 +156,27 @@ trait FlinkClientTrait extends Logger {
})
}

// set prometheus
val enablePrometheus: Boolean =
InternalConfigHolder.get(CommonConfig.STREAMPARK_PROMETHEUS_ENABLE)
val promgatewayHostUrl: String =
InternalConfigHolder.get(CommonConfig.STREAMPARK_PROMGATEWAY_HOST_URL)

if (
submitRequest.executionMode == FlinkExecutionMode.YARN_APPLICATION && enablePrometheus && StringUtils
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it only enabled in yarn application mode? Doesn't it work in other modes?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

At that time, only yarn application mode was tested

.isNotBlank(promgatewayHostUrl)
) {
flinkConfig.setString(
"metrics.reporter.promgateway.factory.class",
"org.apache.flink.metrics.prometheus.PrometheusPushGatewayReporterFactory")
flinkConfig.setString("metrics.reporter.promgateway.hostUrl", promgatewayHostUrl)
flinkConfig.setString("metrics.reporter.promgateway.jobName", submitRequest.appName)
flinkConfig.setString("metrics.reporter.promgateway.randomJobNameSuffix", "true")
flinkConfig.setString("metrics.reporter.promgateway.deleteOnShutdown", "false")
flinkConfig.setString("metrics.reporter.promgateway.groupingKey", "k1=v1;k2=v2")
flinkConfig.setString("metrics.reporter.promgateway.interval", "10 SECONDS")
}

setConfig(submitRequest, flinkConfig)

doSubmit(submitRequest, flinkConfig)
Expand Down