Skip to content

Commit

Permalink
health check fixes (#45)
Browse files Browse the repository at this point in the history
* refactor: use Kafka interfaces instead of implementations

* fix: align compileJava and compileKotlin tasks target

* feat: make it possible to register multiple instances of the same check

where it makes sense

* fix(DatabaseConnectionHealthCheck): return `Unhealthy` when connection is invalid

* fix(MongoConnectionHealthCheck): make the health check usable with both sync and async clients
  • Loading branch information
monosoul authored Jan 23, 2024
1 parent 9cf97b0 commit 31b961f
Show file tree
Hide file tree
Showing 40 changed files with 141 additions and 115 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,14 @@ class DatabaseConnectionHealthCheck(
private val ds: DataSource,
private val timeout: Duration = 1.seconds,
private val query: String? = null,
override val name: String = "database_connection",
) : HealthCheck {

override val name: String = "database_connection"

override suspend fun check(): HealthCheckResult = runCatching {
ds.connection.use { conn ->
conn.isValid(timeout.inWholeSeconds.toInt())
if (!conn.isValid(timeout.inWholeSeconds.toInt())) {
return@use HealthCheckResult.unhealthy("Connection is invalid")
}
if (query != null)
conn.createStatement().use { it.execute(query) }
HealthCheckResult.healthy("Connected to database successfully")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import javax.sql.DataSource
class DatabaseHealthCheck(
private val ds: DataSource,
private val query: String = "SELECT 1",
override val name: String = "database",
) : HealthCheck {

override val name: String = "database"

override suspend fun check(): HealthCheckResult = ds.connection.use { conn ->
conn.createStatement().executeQuery(query)
HealthCheckResult.healthy("Connected to database successfully")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,10 @@ import javax.naming.directory.InitialDirContext
* Context.PROVIDER_URL to "ldap://localhost:10389"
* )
*/
class c(private val environment: Map<String, String>) : HealthCheck {
class LdapHealthCheck(
private val environment: Map<String, String>,
override val name: String = "ldap",
) : HealthCheck {

override suspend fun check(): HealthCheckResult = runCatching {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@ import kotlin.time.Duration.Companion.seconds
class TcpHealthCheck(
private val host: String,
private val port: Int,
private val connectionTimeout: Duration = 4.seconds
private val connectionTimeout: Duration = 4.seconds,
override val name: String = "tcp",
) : HealthCheck {

override suspend fun check(): HealthCheckResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import kotlin.math.roundToInt
*/
class DiskSpaceHealthCheck(
private val fileStore: FileStore,
private val minFreeSpacePercentage: Double = 10.0
private val minFreeSpacePercentage: Double = 10.0,
override val name: String = "disk_space_free",
) : HealthCheck {

override val name: String = "disk_space_free"

override suspend fun check(): HealthCheckResult {
return try {
val availablePercent = (fileStore.usableSpace.toDouble() / fileStore.totalSpace.toDouble() * 100).roundToInt()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import com.zaxxer.hikari.HikariConfig
import com.zaxxer.hikari.HikariDataSource
import io.kotest.core.spec.style.FunSpec
import io.kotest.matchers.shouldBe
import java.sql.Connection
import javax.sql.DataSource

class DatabaseConnectionHealthCheckTest : FunSpec({

Expand All @@ -30,12 +32,35 @@ class DatabaseConnectionHealthCheckTest : FunSpec({
).check().status shouldBe HealthStatus.Unhealthy
}

test("should return unhealthy for invalid connection") {
DatabaseConnectionHealthCheck(
createHikariDS().also { it.isBroken = true },
).check().status shouldBe HealthStatus.Unhealthy
}

})

fun createHikariDS(): HikariDataSource =
HikariConfig().apply {
jdbcUrl = "jdbc:h2:mem:kjs;DB_CLOSE_DELAY=-1"
username = "sa"
password = ""
maximumPoolSize = 1
}.let { HikariDataSource(it) }
internal fun createHikariDS() =
HikariConfig()
.apply {
jdbcUrl = "jdbc:h2:mem:kjs;DB_CLOSE_DELAY=-1"
username = "sa"
password = ""
maximumPoolSize = 1
}
.let(::HikariDataSource)
.let(::ProxyDataSource)

internal class ProxyDataSource(
private val delegate: DataSource,
var isBroken: Boolean = false,
) : DataSource by delegate {
override fun getConnection() = ConnectionProxy(delegate.connection, isBroken)
}

internal class ConnectionProxy(
private val delegate: Connection,
private val isBroken: Boolean,
) : Connection by delegate {
override fun isValid(timeout: Int) = if (isBroken) false else delegate.isValid(timeout)
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import kotlinx.coroutines.runInterruptible
*/
class DynamoDBHealthCheck(
val createClient: () -> AmazonDynamoDB = { AmazonDynamoDBClient.builder().build() },
override val name: String = "aws_dynamodb",
) : HealthCheck {

override val name: String = "aws_dynamodb"

private fun <T> AmazonDynamoDB.use(f: (AmazonDynamoDB) -> T): Result<T> {
val result = runCatching { f(this) }
this.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,10 @@ import kotlinx.coroutines.runInterruptible
*/
class S3ReadBucketHealthCheck(
private val bucketName: String,
val createClient: () -> AmazonS3 = { AmazonS3Client.builder().build() }
val createClient: () -> AmazonS3 = { AmazonS3Client.builder().build() },
override val name: String = "aws_s3_bucket",
) : HealthCheck {

override val name: String = "aws_s3_bucket"

private suspend fun use(client: AmazonS3): Result<HeadBucketResult> {
return runInterruptible(Dispatchers.IO) {
runCatching {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import kotlin.random.Random
*/
class S3WriteBucketHealthCheck(
private val bucketName: String,
val createClient: () -> AmazonS3 = { AmazonS3Client.builder().build() }
) : HealthCheck {

val createClient: () -> AmazonS3 = { AmazonS3Client.builder().build() },
override val name: String = "aws_s3_bucket_write"
) : HealthCheck {

private suspend fun use(client: AmazonS3): Result<Unit> {
return runInterruptible(Dispatchers.IO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,9 @@ import com.sksamuel.tabby.results.flatMap
*/
class SNSHealthCheck(
val createClient: () -> AmazonSNS = { AmazonSNSClient.builder().build() },
override val name: String = "aws_sns_topic",
) : HealthCheck {

override val name: String = "aws_sns_topic"

private fun use(client: AmazonSNS): Result<ListTopicsResult> {
return runCatching { client.listTopics() }.also { client.shutdown() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,9 @@ import com.sksamuel.tabby.results.flatMap
class SQSQueueHealthCheck(
private val queue: String,
val createClient: () -> AmazonSQS = { AmazonSQSClient.builder().build() },
override val name: String = "aws_sqs_queue",
) : HealthCheck {

override val name: String = "aws_sqs_queue"

private fun use(client: AmazonSQS): Result<GetQueueUrlResult> {
return runCatching { client.getQueueUrl(queue) }.also { client.shutdown() }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ import org.apache.commons.dbcp2.BasicDataSource
class DbcpConnectionsHealthCheck(
private val ds: BasicDataSource,
private val minConnections: Int,
override val name: String = "dbcp_connections",
) : HealthCheck {
override suspend fun check(): HealthCheckResult {
override suspend fun check(): HealthCheckResult {
val total = ds.numActive + ds.numIdle
val msg = "Database connections is $total [min required is $minConnections]"
return if (total >= minConnections) HealthCheckResult.healthy(msg) else HealthCheckResult.unhealthy(msg, null)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,10 @@ import org.apache.commons.dbcp2.BasicDataSource
class DbcpMinIdleHealthCheck(
private val ds: BasicDataSource,
private val minIdle: Int,
override val name: String = "dbcp_min_idle",
) : HealthCheck {
override suspend fun check(): HealthCheckResult {

override suspend fun check(): HealthCheckResult {
val msg = "Idle connections ${ds.numIdle} [min required is $minIdle]"
return if (ds.numIdle >= minIdle) HealthCheckResult.healthy(msg) else HealthCheckResult.unhealthy(msg, null)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,9 @@ import org.elasticsearch.client.RestHighLevelClient
class ElasticClusterCommandCheck(
private val client: RestHighLevelClient,
private val command: (RestHighLevelClient) -> HealthCheckResult,
override val name: String = "elastic_cluster",
) : HealthCheck {

override val name: String = "elastic_cluster"

override suspend fun check(): HealthCheckResult {
return runCatching {
withContext(Dispatchers.IO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@ import org.elasticsearch.cluster.health.ClusterHealthStatus
*/
class ElasticClusterHealthCheck(
private val client: RestHighLevelClient,
private val errorOnYellow: Boolean = false
private val errorOnYellow: Boolean = false,
override val name: String = "elastic_cluster_health",
) : HealthCheck {

override val name: String = "elastic_cluster_health"

override suspend fun check(): HealthCheckResult {
return runCatching {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ class ElasticIndexHealthCheck(
private val client: ElasticsearchClient,
private val index: String,
private val failIfEmpty: Boolean = false,
override val name: String = "elastic_index",
) : HealthCheck {

override val name: String = "elastic_index"

override suspend fun check(): HealthCheckResult {
return runCatching {
withContext(Dispatchers.IO) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,9 @@ import com.zaxxer.hikari.HikariDataSource
class HikariConnectionsHealthCheck(
private val ds: HikariDataSource,
private val minConnections: Int,
override val name: String = "hikari_open_connections",
) : HealthCheck {

override val name: String = "hikari_open_connections"

override suspend fun check(): HealthCheckResult {
val conns = ds.hikariPoolMXBean.totalConnections
val msg = "$conns connection(s) to Hikari db-pool ${ds.poolName} [minConnections:$minConnections]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ import com.zaxxer.hikari.HikariDataSource
class HikariMinIdleHealthCheck(
private val ds: HikariDataSource,
private val minIdle: Int,
override val name: String = "hikari_min_idle",
) : HealthCheck {

override val name: String = "hikari_min_idle"

override suspend fun check(): HealthCheckResult {
val idleConnections = ds.hikariPoolMXBean.idleConnections
val msg = "Idle connections $idleConnections [min required is $minIdle]"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,9 @@ import com.zaxxer.hikari.HikariDataSource
class HikariPendingThreadsHealthCheck(
private val ds: HikariDataSource,
private val maxAwaiting: Int,
override val name: String = "hikari_pending_threads",
) : HealthCheck {

override val name: String = "hikari_pending_threads"

override suspend fun check(): HealthCheckResult {
val awaiting = ds.hikariPoolMXBean.threadsAwaitingConnection
return if (awaiting <= maxAwaiting) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,10 @@ import io.ktor.http.isSuccess
*/
class EndpointHealthCheck(
private val eval: suspend (HttpResponse) -> Boolean = { it.status.isSuccess() },
override val name: String = "endpoint_request",
private val fn: suspend (HttpClient) -> HttpResponse,
) : HealthCheck {

override val name: String = "endpoint_request"

private val client = HttpClient(Apache5) {
expectSuccess = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,10 @@ import java.util.concurrent.atomic.AtomicBoolean
*/
class EndpointStartupHealthCheck(
private val client: HttpClient,
override val name: String = "endpoint_startup_request",
private val eval: suspend (HttpClient) -> Boolean,
) : HealthCheck {

override val name: String = "endpoint_startup_request"

private val successful = AtomicBoolean(false)

override suspend fun check(): HealthCheckResult {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,9 @@ class HttpHealthCheck(
private val body: ByteArray? = null,
private val headers: Map<String, String> = emptyMap(),
private val successCodes: Set<Int> = setOf(200, 201, 202, 203, 204, 205),
override val name: String = "http_call",
) : HealthCheck {

override val name: String = "http_call"

private val client = HttpClient(Apache5) {
expectSuccess = false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ class RedisClusterHealthCheck(
HealthCheckResult.unhealthy("Connected to redis cluster but zero nodes detected", null)
}
},
override val name: String = "redis_cluster",
) : HealthCheck {

companion object {
Expand All @@ -49,8 +50,6 @@ class RedisClusterHealthCheck(
}
}

override val name: String = "redis_cluster"

override suspend fun check(): HealthCheckResult {
return runInterruptible(Dispatchers.IO) {
runCatching {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ class RedisConnectionHealthCheck(
HealthCheckResult.healthy("Ping to redis cluster failed")
}
},
override val name: String = "redis",
) : HealthCheck {

companion object {
Expand All @@ -46,8 +47,6 @@ class RedisConnectionHealthCheck(
}
}

override val name: String = "redis"

override suspend fun check(): HealthCheckResult {
return runInterruptible(Dispatchers.IO) {
runCatching {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,13 @@ import com.sksamuel.cohort.HealthCheckResult
import com.sksamuel.tabby.either.Either
import com.sksamuel.tabby.either.left
import com.sksamuel.tabby.either.right
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.consumer.Consumer
import org.apache.kafka.common.Metric

/**
* Kafka metrics can subclass this class and use the provided methods to retrieve metrics.
*/
abstract class AbstractKafkaConsumerMetricHealthCheck(private val consumer: KafkaConsumer<*, *>) : HealthCheck {
abstract class AbstractKafkaConsumerMetricHealthCheck(private val consumer: Consumer<*, *>) : HealthCheck {

/**
* Returns the metric with the given [name] that has the least number of tags (most generic metric value).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@ import org.apache.kafka.clients.admin.Admin
* A [HealthCheck] that checks that a connection can be made to a kafka cluster, the controller
* can be located, and at least one node is present.
*/
class KafkaClusterHealthCheck(private val admin: Admin) : HealthCheck {

override val name: String = "kafka_cluster"
class KafkaClusterHealthCheck(
private val admin: Admin,
override val name: String = "kafka_cluster",
) : HealthCheck {

override suspend fun check(): HealthCheckResult {
return try {
Expand Down
Loading

0 comments on commit 31b961f

Please sign in to comment.