Skip to content

Commit

Permalink
#142 Publish workspace event to telegram (#148)
Browse files Browse the repository at this point in the history
  • Loading branch information
vityaman authored Jun 15, 2024
1 parent 8044cd6 commit c977fd1
Show file tree
Hide file tree
Showing 24 changed files with 477 additions and 62 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package ru.vityaman.lms.botalka.app.spring.api.http.message

import ru.vityaman.lms.botalka.app.spring.api.http.server.WorkspaceCommentMessage
import ru.vityaman.lms.botalka.app.spring.api.http.server.WorkspaceEventMessage
import ru.vityaman.lms.botalka.app.spring.api.http.server.WorkspaceFeedbackMessage
import ru.vityaman.lms.botalka.app.spring.api.http.server.WorkspaceSubmissionMessage
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.core.model.Teacher
import ru.vityaman.lms.botalka.core.model.User
import ru.vityaman.lms.botalka.core.model.Workspace

fun WorkspaceEventMessage.toModel() =
when (this) {
is WorkspaceCommentMessage -> {
this.toModel()
}

is WorkspaceSubmissionMessage -> {
this.toModel()
}

is WorkspaceFeedbackMessage -> {
this.toModel()
}

else -> {
throw NotImplementedError(
buildString {
append("WorkspaceEvent type '${this@toModel.kind}' ")
append("is not yet supported")
},
)
}
}

fun WorkspaceCommentMessage.toModel() =
Workspace.Comment(
id = Workspace.Event.Id(this.id),
producer = User.Id(this.producerId),
text = this.text,
creationMoment = this.creationMoment,
)

fun WorkspaceSubmissionMessage.toModel() =
Workspace.Submission(
id = Workspace.Event.Id(this.id),
producer = User.Id(this.producerId),
note = this.note,
creationMoment = this.creationMoment,
)

fun WorkspaceFeedbackMessage.toModel() =
Workspace.Feedback(
id = Workspace.Event.Id(this.id),
teacher = Teacher(User.Id(this.producerId)),
comment = this.comment,
score = this.score?.let { Homework.Score(it.toShort()) },
creationMoment = this.creationMoment,
)
Original file line number Diff line number Diff line change
Expand Up @@ -12,23 +12,22 @@ import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

@Configuration
class KafkaConfig(private val jackson: ObjectMapper) {
class KafkaTopicConfig(private val jackson: ObjectMapper) {
@Qualifier(BeanName.HOMEWORK_TOPIC)
@Bean
fun publicationTopic(
@Value("\${broker.topic.publication}") topic: String,
): KafkaTopic<Homework.Id, Homework> =
KafkaTopic(
name = topic,
keyEncode = { it.number.toString() },
keyDecode = { Homework.Id(it.toInt()) },
valueEncode = { jackson.writeValueAsString(it.toMessage()) },
valueDecode = {
jackson
.readValue(it, HomeworkMessage::class.java)
.toModel()
},
)
@Value("\${broker.topic.homework}") topic: String,
) = KafkaTopic(
name = topic,
keyEncode = { it.number.toString() },
keyDecode = { Homework.Id(it.toInt()) },
valueEncode = { jackson.writeValueAsString(it.toMessage()) },
valueDecode = {
jackson
.readValue(it, HomeworkMessage::class.java)
.toModel()
},
)

object BeanName {
const val HOMEWORK_TOPIC = "homeworkTopic"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package ru.vityaman.lms.botalka.app.spring.event

import kotlinx.coroutines.runBlocking
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.scheduling.annotation.Scheduled
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.commons.Runnable
import java.util.concurrent.TimeUnit

@Component
class SpringPublicationTasks(
@Qualifier(SpringWorkspaceEventConfig.BeanName.PUBLICATION_TASK)
private val workspacePublication: Runnable,
) {
@Scheduled(
fixedRateString = "\${task.scheduled.publication.precision-seconds}",
initialDelayString = "\${task.scheduled.publication.precision-seconds}",
timeUnit = TimeUnit.SECONDS,
)
fun doWorkspacePublication(): Unit =
runBlocking { workspacePublication.run() }
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package ru.vityaman.lms.botalka.app.spring.event

import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.launch
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.commons.Runnable

@Component
data class SpringTelegramActors(
@Qualifier(SpringWorkspaceEventConfig.BeanName.EVENT_CONSUMING_ACTOR)
private val workspaceActor: Runnable,
) {
private val scope = CoroutineScope(Dispatchers.Default)

init {
scope.launch { workspaceActor.run() }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ru.vityaman.lms.botalka.app.spring.event

import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.core.event.aspect.SlownessConfig

@Component
data class SpringTelegramConfig(
@Value("\${external.service.telegram.duration-seconds.retry.min}")
val retryDurationSecondsMin: Int,

@Value("\${external.service.telegram.duration-seconds.retry.max}")
val retryDurationSecondsMax: Int,

@Value("\${external.service.telegram.duration-seconds.relax.min}")
val relaxDurationSecondsMin: Int,

@Value("\${external.service.telegram.duration-seconds.relax.max}")
val relaxDurationSecondsMax: Int,
) {
fun toSlowness() = SlownessConfig(
retryDurationSecondsMin = retryDurationSecondsMin,
retryDurationSecondsMax = retryDurationSecondsMax,
relaxDurationSecondsMin = relaxDurationSecondsMin,
relaxDurationSecondsMax = relaxDurationSecondsMax,
)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
package ru.vityaman.lms.botalka.app.spring.event

import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import ru.vityaman.lms.botalka.app.spring.api.http.message.toMessage
import ru.vityaman.lms.botalka.app.spring.api.http.message.toModel
import ru.vityaman.lms.botalka.app.spring.api.http.server.WorkspaceEventMessage
import ru.vityaman.lms.botalka.app.spring.storage.MainR2dbcConfig
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.commons.Runnable
import ru.vityaman.lms.botalka.core.event.EventConsumingActor
import ru.vityaman.lms.botalka.core.event.EventPublicationTask
import ru.vityaman.lms.botalka.core.event.EventSource
import ru.vityaman.lms.botalka.core.event.aspect.sequenced
import ru.vityaman.lms.botalka.core.event.aspect.slowed
import ru.vityaman.lms.botalka.core.event.loggingEventCallbacks
import ru.vityaman.lms.botalka.core.external.telegram.TelegramBot
import ru.vityaman.lms.botalka.core.external.telegram.TelegramChat
import ru.vityaman.lms.botalka.core.external.telegram.TelegramConsumer
import ru.vityaman.lms.botalka.core.logging.Slf4jLog
import ru.vityaman.lms.botalka.core.model.Workspace
import ru.vityaman.lms.botalka.core.tx.TxEnv
import ru.vityaman.lms.botalka.storage.jooq.JooqDatabase
import ru.vityaman.lms.botalka.storage.jooq.event.WorkspaceEvents
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaConsumer
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaProducer
import ru.vityaman.lms.botalka.storage.kafka.KafkaTopic

@Configuration
class SpringWorkspaceEventConfig {
@Bean
@Qualifier(BeanName.WORKSPACE_TOPIC)
fun workspaceTopic(
@Value("\${broker.topic.workspace}") topic: String,
jackson: ObjectMapper,
) = KafkaTopic(
name = topic,
keyEncode = { it.number.toString() },
keyDecode = { Workspace.Event.Id(it.toInt()) },
valueEncode = { jackson.writeValueAsString(it.toMessage()) },
valueDecode = {
jackson
.readValue(it, WorkspaceEventMessage::class.java)
.toModel()
},
)

@Bean
@Qualifier(BeanName.KAFKA_EVENT_SOURCE)
fun eventSource(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,
@Value("\${broker.consumer.workspace.group}")
groupId: String,
@Qualifier(BeanName.WORKSPACE_TOPIC)
topic: KafkaTopic<Workspace.Event.Id, Workspace.Event>,
) = BasicKafkaConsumer(
BasicKafkaConsumer.Config(
bootstrapServers = bootstrapServers,
groupId = groupId,
),
topic,
)

@Bean
@Qualifier(BeanName.KAFKA_CONSUMER)
fun consumer(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,
@Qualifier(BeanName.WORKSPACE_TOPIC)
topic: KafkaTopic<Workspace.Event.Id, Workspace.Event>,
) = BasicKafkaProducer(BasicKafkaProducer.Config(bootstrapServers), topic)
.asConsumerWithKey(Workspace.Event::id)

@Bean
@Qualifier(BeanName.EVENTS)
fun events(
@Qualifier(MainR2dbcConfig.BeanName.TX_ENV)
mainTx: TxEnv,
@Qualifier(MainR2dbcConfig.BeanName.DATABASE)
database: JooqDatabase,
) = WorkspaceEvents(database, mainTx)

@Bean
@Qualifier(BeanName.PUBLICATION_TASK)
fun publicationTask(
@Qualifier(BeanName.EVENTS)
events: WorkspaceEvents,
@Qualifier(BeanName.KAFKA_CONSUMER)
consumer: Consumer<Workspace.Event>,
) = EventPublicationTask(
consumer = consumer,
events = events,
callbacks = loggingEventCallbacks(
Slf4jLog("WorkspaceNotifyPushTask"),
),
)

@Bean
@Qualifier(BeanName.TELEGRAM_CONSUMER)
fun telegramConsumer(
telegram: TelegramBot,
@Value("\${external.service.telegram.admin-chat-id}")
adminChatId: Long,
) = TelegramConsumer<Workspace.Event>(
telegram,
TelegramChat(adminChatId),
) {
buildString {
append("New workspace event!\n")
append("From user with id ${it.producer}\n")
append("EventId: ${it.id}\n")
append("Kind: ")
append(
when (it) {
is Workspace.Feedback -> "Feedback"
is Workspace.Submission -> "Submission"
is Workspace.Comment -> "Comment"
},
)
append("\n")
append(
when (it) {
is Workspace.Feedback -> it.comment
is Workspace.Submission -> it.note
is Workspace.Comment -> it.text
},
)
}
}

@Bean
@Qualifier(BeanName.EVENT_CONSUMING_ACTOR)
fun eventConsumingActor(
@Qualifier(BeanName.KAFKA_EVENT_SOURCE)
mailbox: EventSource<Workspace.Event>,
@Qualifier(BeanName.TELEGRAM_CONSUMER)
consumer: Consumer<Workspace.Event>,
config: SpringTelegramConfig,
): Runnable {
val log = Slf4jLog("TelegramWorkspaceEventActor")
return EventConsumingActor(
mailbox = mailbox,
consumer = consumer,
callbacks = sequenced(
EventConsumingActor.Callbacks(
onStart = { log.info("Starting...") },
onSuccess = { log.info("Sent event with id ${it.id}") },
onError = { log.info("Failed: ${it.message}") },
),
slowed(config.toSlowness()),
),
)
}

object BeanName {
const val WORKSPACE_TOPIC = "workspaceTopic"
const val KAFKA_EVENT_SOURCE = "kafkaWorkspaceEventSource"
const val KAFKA_CONSUMER = "kafkaWorkspaceConsumer"
const val EVENTS = "workspaceEvents"
const val PUBLICATION_TASK = "workspacePublicationTask"
const val TELEGRAM_CONSUMER = "telegramWorkspaceConsumer"
const val EVENT_CONSUMING_ACTOR = "workspaceEventConsumingActor"
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ru.vityaman.lms.botalka.app.spring.event.homework
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.event.KafkaConfig
import ru.vityaman.lms.botalka.app.spring.event.KafkaTopicConfig
import ru.vityaman.lms.botalka.core.event.EventSource
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaConsumer
Expand All @@ -15,10 +15,10 @@ class SpringEventSource(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,

@Value("\${broker.consumer.publication.group}")
@Value("\${broker.consumer.homework.group}")
groupId: String,

@Qualifier(KafkaConfig.BeanName.HOMEWORK_TOPIC)
@Qualifier(KafkaTopicConfig.BeanName.HOMEWORK_TOPIC)
topic: KafkaTopic<Homework.Id, Homework>,
) : EventSource<Homework> by
BasicKafkaConsumer(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package ru.vityaman.lms.botalka.app.spring.event.homework
import org.springframework.beans.factory.annotation.Qualifier
import org.springframework.beans.factory.annotation.Value
import org.springframework.stereotype.Component
import ru.vityaman.lms.botalka.app.spring.event.KafkaConfig
import ru.vityaman.lms.botalka.app.spring.event.KafkaTopicConfig
import ru.vityaman.lms.botalka.commons.Consumer
import ru.vityaman.lms.botalka.core.model.Homework
import ru.vityaman.lms.botalka.storage.kafka.BasicKafkaProducer
Expand All @@ -15,7 +15,7 @@ class SpringKafkaConsumer(
@Value("\${broker.bootstrap-servers}")
bootstrapServers: String,

@Qualifier(KafkaConfig.BeanName.HOMEWORK_TOPIC)
@Qualifier(KafkaTopicConfig.BeanName.HOMEWORK_TOPIC)
topic: KafkaTopic<Homework.Id, Homework>,
) : Consumer<Homework> by
BasicKafkaProducer(BasicKafkaProducer.Config(bootstrapServers), topic)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class SpringPublicationTask(
) {
private val logic = EventPublicationTask(
consumer = consumer,
notifications = HomeworkEvents(homeworks, mainTx, clock),
events = HomeworkEvents(homeworks, mainTx, clock),
callbacks = loggingEventCallbacks(
Slf4jLog("HomeworkNotifyPushTask"),
),
Expand Down
Loading

0 comments on commit c977fd1

Please sign in to comment.