Skip to content

Commit

Permalink
addtranscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
dimastatz committed May 6, 2024
1 parent ddb3c63 commit 6393f86
Showing 1 changed file with 53 additions and 22 deletions.
75 changes: 53 additions & 22 deletions python/deepsignal/transcription/transcriber.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
""" Implements Transcriber Worker for RT cases"""
"""Implemet Transcriber worker"""
import time
import json
from multiprocessing import Pipe, Process
from multiprocessing.connection import Connection
from multiprocessing import Queue
from multiprocessing import Process

import whisper
import numpy as np
Expand All @@ -15,46 +14,78 @@ def transcribe(model: whisper.Whisper, chunks: list) -> dict:
return model.transcribe(arr)


def transcribe_worker(conn: Connection, model: whisper.Whisper):
def transcribe_worker(queue_in: Queue, queue_out: Queue, model: whisper.Whisper):
"""stream processing implementation"""
chunks = []
print(model.num_languages)
chunks, prev_res, times = [], None, 0

def time_ms():
return time.time() * 1000

start = time_ms()

while True:
if conn.poll():
chunk = conn.recv()
if not queue_in.empty():
chunk = queue_in.get()

# should stop transcription
if chunk == b"":
if len(chunks) > 0:
result = transcribe(model, chunks)
conn.send("finsihed")
queue_out.put(
{"result": result, "is_partial": False, "time": time_ms() - start}
)
queue_out.put(
{"result": "etp session ended", "is_partial": False, "time": time_ms() - start}
)
break

# append chunks
chunks.append(chunk)
else:

if len(chunks) > 0:
conn.send(str(len(chunks)))
chunks.clear()
result = transcribe(model, chunks)
if prev_res == result["text"]:
times += 1
else:
times = 0

close_segment = times > 2 and result != ""

queue_out.put(
{"result": result, "is_partial": not close_segment, "time": time_ms() - start}
)

if close_segment:
chunks.clear()
prev_res = None
times = 0
else:
prev_res = result["text"]

else:
time.sleep(0.1)
time.sleep(1)


class Transcriber:
"""Stream State Manager"""

def __init__(self, model_name="base") -> None:
self.conn = Pipe()
self.model = whisper.load_model(model_name)
def __init__(self, model: whisper.Whisper) -> None:
self.queue_in = Queue()
self.queue_out = Queue()
self.model = model
self.worker = Process(
target=transcribe_worker,
args=(
self.conn[1],
self.queue_in,
self.queue_out,
self.model,
),
)

def send(self, chunk: bytes) -> None:
"""Send audio chunk"""
self.conn[0].send(chunk)
self.queue_in.put(chunk)

def start(self) -> None:
"""Start streaming worker"""
Expand All @@ -67,9 +98,9 @@ def stop(self, timeout=3) -> None:
self.send(b"")
self.worker.join(timeout)

def get_result(self) -> str:
def get_result(self) -> list:
"""Read all results produced"""
result = []
while self.conn[0].poll():
result.append(self.conn[0].recv())
return "\n".join(result)
while not self.queue_out.empty():
result.append(self.queue_out.get())
return result

0 comments on commit 6393f86

Please sign in to comment.