Skip to content

Commit

Permalink
Merge pull request #61 from FINTLabs/feat/standalone-workflow
Browse files Browse the repository at this point in the history
FLA-510 Migrate to Standalone Workflow for Enhanced Customization and Improved Error Handling
  • Loading branch information
murillio4 authored Oct 19, 2024
2 parents 713a1c3 + 770a485 commit 926042c
Show file tree
Hide file tree
Showing 5 changed files with 197 additions and 87 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,21 @@
package no.fintlabs.operator

import io.javaoperatorsdk.operator.api.reconciler.Reconciler
import org.koin.core.qualifier.named
import org.koin.dsl.module

fun applicationReconcilerModule() = module {
single<Reconciler<*>> { no.fintlabs.operator.FlaisApplicationReconciler() }
single<Reconciler<*>>(named("flais-application-reconciler")) { FlaisApplicationReconciler() }
single { DeploymentDR() }
single { ServiceDR() }
single { PodMetricsDR() }
single { OnePasswordDR() }
single { IngressDR() }
single { PostgresUserDR() }
single { KafkaDR() }
single { CreatePodMetricsCondition() }
single { CreateOnePasswordCondition() }
single { CreateIngressCondition() }
single { CreatePostgresUserCondition() }
single { CreateKafkaCondition() }
}
13 changes: 6 additions & 7 deletions src/main/kotlin/no/fintlabs/operator/DeploymentDR.kt
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ class DeploymentDR : CRUDKubernetesDependentResource<Deployment, FlaisApplicatio
private val config: Config by inject()
private val logger = getLogger()

private val createKafkaCondition by inject<CreateKafkaCondition>()
private val createPostgresUserCondition by inject<CreatePostgresUserCondition>()
private val createOnePasswordCondition by inject<CreateOnePasswordCondition>()

override fun desired(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>) = Deployment().apply {
metadata = createObjectMeta(primary)
Expand Down Expand Up @@ -134,7 +137,7 @@ class DeploymentDR : CRUDKubernetesDependentResource<Deployment, FlaisApplicatio
secretRef = SecretEnvSource().apply {
name = "${primary.metadata.name}-kafka"
}
}.takeIf { creteKafkaCondition.isMet(null, primary, context) }
}.takeIf { createKafkaCondition.isMet(null, primary, context) }
)

return primary.spec.envFrom.toMutableSet()
Expand All @@ -149,24 +152,20 @@ class DeploymentDR : CRUDKubernetesDependentResource<Deployment, FlaisApplicatio
secret = SecretVolumeSource().apply {
secretName = "${primary.metadata.name}-kafka-certificates"
}
}.takeIf { creteKafkaCondition.isMet(null, primary, context) }
}.takeIf { createKafkaCondition.isMet(null, primary, context) }
)

private fun createContainerVolumeMounts(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>) = listOfNotNull(
VolumeMount().apply {
name = "credentials"
mountPath = "/credentials"
readOnly = true
}.takeIf { creteKafkaCondition.isMet(null, primary, context) }
}.takeIf { createKafkaCondition.isMet(null, primary, context) }
)

override fun useSSA(context: Context<FlaisApplicationCrd>?) = true

companion object {
const val COMPONENT = "deployment"

val creteKafkaCondition = no.fintlabs.operator.CreateKafkaCondition()
val createPostgresUserCondition = CreatePostgresUserCondition()
val createOnePasswordCondition = CreateOnePasswordCondition()
}
}
179 changes: 101 additions & 78 deletions src/main/kotlin/no/fintlabs/operator/FlaisApplicationReconciler.kt
Original file line number Diff line number Diff line change
@@ -1,122 +1,145 @@
package no.fintlabs.operator

import io.javaoperatorsdk.operator.api.reconciler.*
import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent
import io.javaoperatorsdk.operator.processing.dependent.workflow.Workflow
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowBuilder
import io.javaoperatorsdk.operator.processing.dependent.workflow.WorkflowReconcileResult
import io.javaoperatorsdk.operator.processing.event.source.EventSource
import io.javaoperatorsdk.operator.processing.retry.GradualRetry
import no.fintlabs.operator.api.DEPLOYMENT_CORRELATION_ID_ANNOTATION
import no.fintlabs.operator.api.ORG_ID_LABEL
import no.fintlabs.operator.api.TEAM_LABEL
import no.fintlabs.operator.api.v1alpha1.FlaisApplicationCrd
import no.fintlabs.operator.api.v1alpha1.FlaisApplicationState
import no.fintlabs.operator.api.v1alpha1.FlaisApplicationStatus
import org.koin.core.component.KoinComponent
import org.koin.core.component.get
import org.koin.core.component.inject
import org.slf4j.MDC
import java.util.*
import kotlin.jvm.optionals.getOrDefault
import kotlin.jvm.optionals.getOrNull


@GradualRetry(maxAttempts = 3)
@ControllerConfiguration(
dependents = [
Dependent(
name = DeploymentDR.COMPONENT,
type = DeploymentDR::class
),
Dependent(
name = ServiceDR.COMPONENT,
type = ServiceDR::class
),
Dependent(
name = PodMetricsDR.COMPONENT,
type = PodMetricsDR::class,
reconcilePrecondition = CreatePodMetricsCondition::class
),
Dependent(
name = OnePasswordDR.COMPONENT,
type = OnePasswordDR::class,
reconcilePrecondition = CreateOnePasswordCondition::class
),
Dependent(
name = IngressDR.COMPONENT,
type = IngressDR::class,
reconcilePrecondition = CreateIngressCondition::class
),
Dependent(
name = PostgresUserDR.COMPONENT,
type = PostgresUserDR::class,
reconcilePrecondition = CreatePostgresUserCondition::class
),
Dependent(
name = KafkaDR.COMPONENT,
type = KafkaDR::class,
reconcilePrecondition = CreateKafkaCondition::class
)
],
labelSelector = "$ORG_ID_LABEL,$TEAM_LABEL"
)
class FlaisApplicationReconciler : Reconciler<FlaisApplicationCrd>, Cleaner<FlaisApplicationCrd>, ContextInitializer<FlaisApplicationCrd> {
class FlaisApplicationReconciler : Reconciler<FlaisApplicationCrd>, Cleaner<FlaisApplicationCrd>, ErrorStatusHandler<FlaisApplicationCrd>, EventSourceInitializer<FlaisApplicationCrd>, KoinComponent {
private val logger = getLogger()

private val deployment by inject<DeploymentDR>()
private val service by inject<ServiceDR>()
private val podMetrics by inject<PodMetricsDR>()
private val onePassword by inject<OnePasswordDR>()
private val ingress by inject<IngressDR>()
private val postgresUser by inject<PostgresUserDR>()
private val kafka by inject<KafkaDR>()

private val workflow: Workflow<FlaisApplicationCrd> = WorkflowBuilder<FlaisApplicationCrd>()
.addDependentResource(deployment, DeploymentDR.COMPONENT)
.addDependentResource(service, ServiceDR.COMPONENT)
.addDependentResource(podMetrics, PodMetricsDR.COMPONENT)
.withReconcilePrecondition(get<CreatePodMetricsCondition>())
.addDependentResource(onePassword, OnePasswordDR.COMPONENT)
.withReconcilePrecondition(get<CreateOnePasswordCondition>())
.addDependentResource(ingress, IngressDR.COMPONENT)
.withReconcilePrecondition(get<CreateIngressCondition>())
.addDependentResource(postgresUser, PostgresUserDR.COMPONENT)
.withReconcilePrecondition(get<CreatePostgresUserCondition>())
.addDependentResource(kafka, KafkaDR.COMPONENT)
.withReconcilePrecondition(get<CreateKafkaCondition>())
.withThrowExceptionFurther(false)
.build()

override fun reconcile(resource: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>): UpdateControl<FlaisApplicationCrd> {
setMDC(resource)
logger.info("Reconciling FlaisApplication ${resource.metadata.name}")
val updateControl = determineUpdateControl(resource, updateStatus(resource, context), updateResource(context))
removeMDC()
return updateControl
}
initReconciliation(resource)?.let {
removeMDC()
return it
}

private fun determineUpdateControl(resource: FlaisApplicationCrd, statusUpdated: Boolean, resourceUpdated: Boolean) = when {
statusUpdated && resourceUpdated -> UpdateControl.updateResourceAndStatus(resource)
resourceUpdated -> UpdateControl.updateResource(resource)
statusUpdated -> UpdateControl.updateStatus(resource)
else -> UpdateControl.noUpdate()
val result = workflow.reconcile(resource, context)
if (result.erroredDependentsExist()) {
context.managedDependentResourceContext().put(WORKFLOW_RESULT_KEY, result)
result.throwAggregateExceptionIfErrorsPresent()
}
val statusUpdated = updateStatus(resource, determineNewStatus(resource, context, result))

return when {
statusUpdated -> UpdateControl.patchStatus(resource)
else -> UpdateControl.noUpdate()
}.also {
removeMDC()
}
}

override fun cleanup(resource: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>): DeleteControl {
return DeleteControl.defaultDelete()
}

override fun initContext(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>) {
ensureCorrelationId(primary, context)
override fun updateErrorStatus(resource: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>, e: Exception?): ErrorStatusUpdateControl<FlaisApplicationCrd> {
val result = context.managedDependentResourceContext().get(WORKFLOW_RESULT_KEY, WorkflowReconcileResult::class.java)
return when (updateStatus(resource, determineNewStatus(resource, context, result.getOrNull()))) {
true -> ErrorStatusUpdateControl.updateStatus(resource)
else -> ErrorStatusUpdateControl.noStatusUpdate()
}
}

private fun updateResource(context: Context<FlaisApplicationCrd>): Boolean {
return context.managedDependentResourceContext()
.get(DEPLOYMENT_CORRELATION_ID_GENERATED, Boolean::class.javaObjectType)
.getOrDefault(false)
}
override fun prepareEventSources(context: EventSourceContext<FlaisApplicationCrd>): MutableMap<String, EventSource> =
EventSourceInitializer.eventSourcesFromWorkflow(context, workflow)

private fun updateStatus(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>): Boolean {
val newStatus = determineNewStatus(primary, context)
return if (primary.status != newStatus) {
primary.status = newStatus
true
} else false
}
private fun initReconciliation(resource: FlaisApplicationCrd) : UpdateControl<FlaisApplicationCrd>? {
val annotations = resource.metadata.annotations
val hasCorrelationId = annotations.containsKey(DEPLOYMENT_CORRELATION_ID_ANNOTATION)
if (!hasCorrelationId) {
val uuid = UUID.randomUUID().toString()
logger.debug("Generating correlation ID $uuid for ${resource.metadata.name}")
annotations[DEPLOYMENT_CORRELATION_ID_ANNOTATION] = uuid
}

private fun determineNewStatus(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>): FlaisApplicationStatus {
val workflowResult = context.managedDependentResourceContext().workflowReconcileResult.get()
val ready = workflowResult.allDependentResourcesReady()
val failed = workflowResult.erroredDependentsExist()
val newStatus = resource.status.copy(
state = FlaisApplicationState.PENDING,
correlationId = annotations[DEPLOYMENT_CORRELATION_ID_ANNOTATION]
)
val statusUpdated = updateStatus(resource, newStatus)

return when {
!hasCorrelationId -> UpdateControl.updateResourceAndStatus(resource)
statusUpdated -> UpdateControl.updateStatus(resource)
else -> null
}?.rescheduleAfter(0)
}

for ((dep, res) in workflowResult.reconcileResults) {
logger.info("Reconcile result for ${dep.javaClass.simpleName} - ${res.singleOperation}")
private fun determineNewStatus(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>, workflowResult: WorkflowReconcileResult?): FlaisApplicationStatus {
val ready = workflowResult?.allDependentResourcesReady() ?: false
val failed = workflowResult?.erroredDependentsExist() ?: true
val isLastAttempt = context.retryInfo.getOrNull()?.isLastAttempt ?: false

if (workflowResult != null) {
for ((dep, res) in workflowResult.reconcileResults) {
logger.info("Reconcile result for ${dep.javaClass.simpleName} - ${res.singleOperation}")
}
for ((dep, error) in workflowResult.erroredDependents) {
logger.info("Reconcile error for ${dep.javaClass.simpleName} - $error")
}
}

return primary.status.copy(
state = when {
failed -> FlaisApplicationState.FAILED
ready -> FlaisApplicationState.DEPLOYED
isLastAttempt && failed -> FlaisApplicationState.FAILED
ready && !failed -> FlaisApplicationState.DEPLOYED
else -> FlaisApplicationState.PENDING
},
dependentErrors = workflowResult?.erroredDependents?.map { it.key.resourceType().simpleName to (it.value.message ?: "") }?.toMap(),
correlationId = primary.metadata.annotations[DEPLOYMENT_CORRELATION_ID_ANNOTATION]
)
}

private fun ensureCorrelationId(primary: FlaisApplicationCrd, context: Context<FlaisApplicationCrd>) {
primary.metadata.annotations.computeIfAbsent(DEPLOYMENT_CORRELATION_ID_ANNOTATION) {
val uuid = UUID.randomUUID().toString()
logger.debug("Generating correlation ID $uuid for ${primary.metadata.name}")
context.managedDependentResourceContext().put(DEPLOYMENT_CORRELATION_ID_GENERATED, true)
uuid
}
private fun updateStatus(primary: FlaisApplicationCrd, newStatus: FlaisApplicationStatus): Boolean {
return if (primary.status != newStatus) {
primary.status = newStatus
true
} else false
}

private fun setMDC(resource: FlaisApplicationCrd) {
Expand All @@ -128,6 +151,6 @@ class FlaisApplicationReconciler : Reconciler<FlaisApplicationCrd>, Cleaner<Flai
}

companion object {
const val DEPLOYMENT_CORRELATION_ID_GENERATED = "deployment-correlation-id-generated"
const val WORKFLOW_RESULT_KEY = "workflow_result"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,6 @@ import io.javaoperatorsdk.operator.api.ObservedGenerationAwareStatus
data class FlaisApplicationStatus(
val state: FlaisApplicationState = FlaisApplicationState.PENDING,
val correlationId: String? = null,
val dependentResourceStatus: List<String> = emptyList()
val dependentResourceStatus: List<String> = emptyList(),
val dependentErrors: Map<String, String>? = null
) : ObservedGenerationAwareStatus()
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package no.fintlabs.operator

import no.fintlabs.extensions.KubernetesOperatorContext
import no.fintlabs.operator.Utils.createAndGetResource
import no.fintlabs.operator.Utils.createKoinTestExtension
import no.fintlabs.operator.Utils.createKubernetesOperatorExtension
import no.fintlabs.operator.Utils.createTestFlaisApplication
import no.fintlabs.operator.Utils.waitUntil
import no.fintlabs.operator.api.v1alpha1.FlaisApplicationCrd
import no.fintlabs.operator.api.v1alpha1.FlaisApplicationState
import org.junit.jupiter.api.extension.RegisterExtension
import org.koin.core.component.inject
import org.koin.test.KoinTest
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.test.assertNotNull

class FlaisApplicationReconcilerTest : KoinTest {

@Test
fun `should set correlation id on FlaisApplication`(context: KubernetesOperatorContext) {
val flaisApplication = createTestFlaisApplication()
val app = context.createAndGetApplication(flaisApplication)

assertNotNull(app)
assertContains(app.metadata.annotations, "fintlabs.no/deployment-correlation-id")
assertNotNull(app.status.correlationId)
assertEquals(app.metadata.annotations["fintlabs.no/deployment-correlation-id"], app.status.correlationId)
}

@Test
fun `should not set correlation id on FlaisApplication if exists`(context: KubernetesOperatorContext) {
val flaisApplication = createTestFlaisApplication().apply {
metadata.annotations["fintlabs.no/deployment-correlation-id"] = "123"
}
val app = context.createAndGetApplication(flaisApplication)

assertNotNull(app)
assertContains(app.metadata.annotations, "fintlabs.no/deployment-correlation-id")
assertEquals("123", app.metadata.annotations["fintlabs.no/deployment-correlation-id"])
assertNotNull(app.status.correlationId)
assertEquals("123", app.status.correlationId)
}

@Test
fun `should handle dependent errors`(context: KubernetesOperatorContext) {
val service: ServiceDR by inject()
service.setResourceDiscriminator { _, _, _ ->
throw RuntimeException("test")
}

val flaisApplication = createTestFlaisApplication()
context.create(flaisApplication)
context.waitUntil<FlaisApplicationCrd>(flaisApplication.metadata.name) { it.status.state != FlaisApplicationState.PENDING }
val app = context.get<FlaisApplicationCrd>(flaisApplication.metadata.name)

assertNotNull(app)
assertEquals(1, app.status.dependentErrors?.size)
assertEquals("test", app.status.dependentErrors?.get("Service"))
}


private fun KubernetesOperatorContext.createAndGetApplication(app: FlaisApplicationCrd) =
createAndGetResource<FlaisApplicationCrd>(app)

companion object {
@RegisterExtension
val koinTestExtension = createKoinTestExtension()

@RegisterExtension
val kubernetesOperatorExtension = createKubernetesOperatorExtension()
}
}

0 comments on commit 926042c

Please sign in to comment.