diff --git a/backend/app/AppComponents.scala b/backend/app/AppComponents.scala index 97281c13..247f7c14 100644 --- a/backend/app/AppComponents.scala +++ b/backend/app/AppComponents.scala @@ -85,7 +85,7 @@ class AppComponents(context: Context, config: Config) // data storage services val ingestStorage = S3IngestStorage(s3Client, config.s3.buckets.ingestion, config.s3.buckets.deadLetter).valueOr(failure => throw new Exception(failure.msg)) val blobStorage = S3ObjectStorage(s3Client, config.s3.buckets.collections).valueOr(failure => throw new Exception(failure.msg)) - val transcriptionStorage = S3ObjectStorage(s3Client, config.s3.buckets.transcription).valueOr(failure => throw new Exception(failure.msg)) + val transcriptStorage = S3ObjectStorage(s3Client, config.s3.buckets.transcription).valueOr(failure => throw new Exception(failure.msg)) val postgresClient = config.postgres match { case Some(postgresConfig) => new PostgresClientImpl(postgresConfig) @@ -155,7 +155,7 @@ class AppComponents(context: Context, config: Config) val transcriptionExtractor = if (config.worker.useExternalExtractors) { - new ExternalTranscriptionExtractor(esResources, config.transcribe, blobStorage, transcriptionStorage, sqsClient) + new ExternalTranscriptionExtractor(esResources, config.transcribe, blobStorage, transcriptStorage, sqsClient) } else { new TranscriptionExtractor(esResources, scratchSpace, config.transcribe) } @@ -224,7 +224,7 @@ class AppComponents(context: Context, config: Config) applicationLifecycle.addStopHook(() => workerScheduler.stop()) // external extractor - val externalWorker = new ExternalTranscriptionWorker(manifest, sqsClient, config.transcribe, transcriptionStorage, esResources) + val externalWorker = new ExternalTranscriptionWorker(manifest, sqsClient, config.transcribe, transcriptStorage, esResources) val externalWorkerScheduler = new ExternalWorkerScheduler(actorSystem, externalWorker, config.worker.interval)(workerExecutionContext) externalWorkerScheduler.start() applicationLifecycle.addStopHook(() => externalWorkerScheduler.stop()) diff --git a/backend/app/extraction/ExternalTranscriptionExtractor.scala b/backend/app/extraction/ExternalTranscriptionExtractor.scala index f821da10..ee2fbfba 100644 --- a/backend/app/extraction/ExternalTranscriptionExtractor.scala +++ b/backend/app/extraction/ExternalTranscriptionExtractor.scala @@ -144,7 +144,7 @@ class ExternalTranscriptionExtractor(index: Index, transcribeConfig: TranscribeC transcriptionJob.flatMap { job => { try { - logger.info(s"sending message to Transcription Service Queue with message attribute") + logger.info(s"sending message to Transcription Service Queue") val sendMessageCommand = new SendMessageRequest() .withQueueUrl(transcribeConfig.transcriptionServiceQueueUrl) diff --git a/backend/app/extraction/ExternalTranscriptionWorker.scala b/backend/app/extraction/ExternalTranscriptionWorker.scala index 8899347d..ba38f238 100644 --- a/backend/app/extraction/ExternalTranscriptionWorker.scala +++ b/backend/app/extraction/ExternalTranscriptionWorker.scala @@ -31,9 +31,9 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama if (messages.size() > 0) logger.info(s"retrieved ${messages.size()} messages from queue Transcription Output Queue") else - logger.info("No message found") + logger.info("No sqs message found") - messages.asScala.toList.foldLeft(0) { (completed, message) => + val messagesCompleted = messages.asScala.toList.foldLeft(0) { (completed, message) => getMessageAttribute(message) match { case Right(messageAttributes) => handleMessage(message, messageAttributes, completed) @@ -42,6 +42,10 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama completed } } + + logger.info(s"${messagesCompleted} out of ${messages.size()} number of messages successfully completed") + + messagesCompleted } @@ -64,7 +68,9 @@ class ExternalTranscriptionWorker(manifest: WorkerManifest, amazonSQSClient: Ama completed + 1 case Left(failure) => logger.error(s"failed to process sqs message", failure.toThrowable) + // TODO make this constant if (messageAttributes.receiveCount > 2) { + // TODO make extractor name a constant markAsFailure(new Uri(messageAttributes.messageGroupId), "ExternalTranscriptionExtractor", failure.msg) } completed