Skip to content

Commit

Permalink
improve nameing and logging
Browse files Browse the repository at this point in the history
  • Loading branch information
marjisound committed Oct 4, 2024
1 parent 6696bbe commit 748da57
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 6 deletions.
6 changes: 3 additions & 3 deletions backend/app/AppComponents.scala
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ class AppComponents(context: Context, config: Config)
// data storage services
val ingestStorage = S3IngestStorage(s3Client, config.s3.buckets.ingestion, config.s3.buckets.deadLetter).valueOr(failure => throw new Exception(failure.msg))
val blobStorage = S3ObjectStorage(s3Client, config.s3.buckets.collections).valueOr(failure => throw new Exception(failure.msg))
val transcriptionStorage = S3ObjectStorage(s3Client, config.s3.buckets.transcription).valueOr(failure => throw new Exception(failure.msg))
val transcriptStorage = S3ObjectStorage(s3Client, config.s3.buckets.transcription).valueOr(failure => throw new Exception(failure.msg))

val postgresClient = config.postgres match {
case Some(postgresConfig) => new PostgresClientImpl(postgresConfig)
Expand Down Expand Up @@ -155,7 +155,7 @@ class AppComponents(context: Context, config: Config)


val transcriptionExtractor = if (config.worker.useExternalExtractors) {
new ExternalTranscriptionExtractor(esResources, config.transcribe, blobStorage, transcriptionStorage, sqsClient)
new ExternalTranscriptionExtractor(esResources, config.transcribe, blobStorage, transcriptStorage, sqsClient)
} else {
new TranscriptionExtractor(esResources, scratchSpace, config.transcribe)
}
Expand Down Expand Up @@ -224,7 +224,7 @@ class AppComponents(context: Context, config: Config)
applicationLifecycle.addStopHook(() => workerScheduler.stop())

// external extractor
val externalWorker = new ExternalTranscriptionWorker(manifest, sqsClient, config.transcribe, transcriptionStorage, esResources)
val externalWorker = new ExternalTranscriptionWorker(manifest, sqsClient, config.transcribe, transcriptStorage, esResources)
val externalWorkerScheduler = new ExternalWorkerScheduler(actorSystem, externalWorker, config.worker.interval)(workerExecutionContext)
externalWorkerScheduler.start()
applicationLifecycle.addStopHook(() => externalWorkerScheduler.stop())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class ExternalTranscriptionExtractor(index: Index, transcribeConfig: TranscribeC
transcriptionJob.flatMap {
job => {
try {
logger.info(s"sending message to Transcription Service Queue with message attribute")
logger.info(s"sending message to Transcription Service Queue")

val sendMessageCommand = new SendMessageRequest()
.withQueueUrl(transcribeConfig.transcriptionServiceQueueUrl)
Expand Down
10 changes: 8 additions & 2 deletions backend/app/extraction/ExternalTranscriptionWorker.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama
if (messages.size() > 0)
logger.info(s"retrieved ${messages.size()} messages from queue Transcription Output Queue")
else
logger.info("No message found")
logger.info("No sqs message found")

messages.asScala.toList.foldLeft(0) { (completed, message) =>
val messagesCompleted = messages.asScala.toList.foldLeft(0) { (completed, message) =>
getMessageAttribute(message) match {
case Right(messageAttributes) =>
handleMessage(message, messageAttributes, completed)
Expand All @@ -42,6 +42,10 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama
completed
}
}

logger.info(s"${messagesCompleted} out of ${messages.size()} number of messages successfully completed")

messagesCompleted
}


Expand All @@ -64,7 +68,9 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama
completed + 1
case Left(failure) =>
logger.error(s"failed to process sqs message", failure.toThrowable)
// TODO make this constant
if (messageAttributes.receiveCount > 2) {
// TODO make extractor name a constant
markAsFailure(new Uri(messageAttributes.messageGroupId), "ExternalTranscriptionExtractor", failure.msg)
}
completed
Expand Down

0 comments on commit 748da57

Please sign in to comment.