diff --git a/python/deepsignal/transcription/transcriber.py b/python/deepsignal/transcription/transcriber.py index 197deaf..2012a84 100644 --- a/python/deepsignal/transcription/transcriber.py +++ b/python/deepsignal/transcription/transcriber.py @@ -1,8 +1,7 @@ -""" Implements Transcriber Worker for RT cases""" +"""Implemet Transcriber worker""" import time -import json -from multiprocessing import Pipe, Process -from multiprocessing.connection import Connection +from multiprocessing import Queue +from multiprocessing import Process import whisper import numpy as np @@ -15,46 +14,78 @@ def transcribe(model: whisper.Whisper, chunks: list) -> dict: return model.transcribe(arr) -def transcribe_worker(conn: Connection, model: whisper.Whisper): +def transcribe_worker(queue_in: Queue, queue_out: Queue, model: whisper.Whisper): """stream processing implementation""" - chunks = [] - print(model.num_languages) + chunks, prev_res, times = [], None, 0 + + def time_ms(): + return time.time() * 1000 + + start = time_ms() while True: - if conn.poll(): - chunk = conn.recv() + if not queue_in.empty(): + chunk = queue_in.get() + + # should stop transcription if chunk == b"": if len(chunks) > 0: result = transcribe(model, chunks) - conn.send("finsihed") + queue_out.put( + {"result": result, "is_partial": False, "time": time_ms() - start} + ) + queue_out.put( + {"result": "etp session ended", "is_partial": False, "time": time_ms() - start} + ) break + # append chunks chunks.append(chunk) else: + if len(chunks) > 0: - conn.send(str(len(chunks))) - chunks.clear() + result = transcribe(model, chunks) + if prev_res == result["text"]: + times += 1 + else: + times = 0 + + close_segment = times > 2 and result != "" + + queue_out.put( + {"result": result, "is_partial": not close_segment, "time": time_ms() - start} + ) + + if close_segment: + chunks.clear() + prev_res = None + times = 0 + else: + prev_res = result["text"] + else: - time.sleep(0.1) + time.sleep(1) class Transcriber: """Stream State Manager""" - def __init__(self, model_name="base") -> None: - self.conn = Pipe() - self.model = whisper.load_model(model_name) + def __init__(self, model: whisper.Whisper) -> None: + self.queue_in = Queue() + self.queue_out = Queue() + self.model = model self.worker = Process( target=transcribe_worker, args=( - self.conn[1], + self.queue_in, + self.queue_out, self.model, ), ) def send(self, chunk: bytes) -> None: """Send audio chunk""" - self.conn[0].send(chunk) + self.queue_in.put(chunk) def start(self) -> None: """Start streaming worker""" @@ -67,9 +98,9 @@ def stop(self, timeout=3) -> None: self.send(b"") self.worker.join(timeout) - def get_result(self) -> str: + def get_result(self) -> list: """Read all results produced""" result = [] - while self.conn[0].poll(): - result.append(self.conn[0].recv()) - return "\n".join(result) + while not self.queue_out.empty(): + result.append(self.queue_out.get()) + return result \ No newline at end of file