Skip to content

Speech to text utils

stt.utils

Functions:

Name Description
make_stream
record_interruption_parallel
record_interruption
record_user
record_user_stream

Attributes:

Name Type Description
CHUNK
FORMAT
CHANNELS
RATE

CHUNK module-attribute

CHUNK = int(1024 * 2)

FORMAT module-attribute

FORMAT = paInt16

CHANNELS module-attribute

CHANNELS = 1

RATE module-attribute

RATE = 16000

make_stream

make_stream()
Source code in openvoicechat/stt/utils.py
def make_stream():
    p = pyaudio.PyAudio()
    return p.open(
        format=FORMAT, channels=CHANNELS, rate=RATE, input=True, frames_per_buffer=CHUNK
    )

record_interruption_parallel

record_interruption_parallel(vad, listen_queue)
Source code in openvoicechat/stt/utils.py
def record_interruption_parallel(vad, listen_queue):
    # listen for interruption untill the queue is not empty
    frames = []
    stream = make_stream()
    while True:
        a = listen_queue.get()
        if a is None:
            break
        data = stream.read(CHUNK)
        frames.append(data)
        contains_speech = vad.contains_speech(frames[int(RATE / CHUNK) * -2 :])
        if contains_speech:
            stream.close()
            frames = np.frombuffer(b"".join(frames), dtype=np.int16)
            frames = frames / (1 << 15)
            return frames.astype(np.float32)
    stream.close()
    return None

record_interruption

record_interruption(vad, record_seconds=100, streamer=None, logger=None)
Source code in openvoicechat/stt/utils.py
def record_interruption(vad, record_seconds=100, streamer=None, logger=None):
    if logger:
        logger.info(
            "recording for interruption",
            extra={
                "details": "record_interruption",
                "further": f"{record_seconds} seconds",
            },
        )
    frames = []
    if streamer is None:
        stream = make_stream()
        global CHUNK
        global RATE
    else:
        stream = streamer.make_stream()
        CHUNK = streamer.CHUNK
        RATE = streamer.RATE

    for _ in range(0, int(RATE / CHUNK * record_seconds)):
        data = stream.read(CHUNK)
        assert len(data) == CHUNK * 2, "chunk size does not match 2 bytes per sample"
        frames.append(data)
        contains_speech = vad.contains_speech(frames[int(RATE / CHUNK) * -2 :])
        if contains_speech:
            stream.close()
            frames = np.frombuffer(b"".join(frames), dtype=np.int16)
            frames = frames / (1 << 15)
            return frames.astype(np.float32)
    stream.close()
    return None

record_user

record_user(silence_seconds, vad, streamer=None, started=False, logger=None)
Source code in openvoicechat/stt/utils.py
def record_user(silence_seconds, vad, streamer=None, started=False, logger=None):
    frames = []

    if streamer is None:
        stream = make_stream()
        global CHUNK
        global RATE
    else:
        stream = streamer.make_stream()
        CHUNK = streamer.CHUNK
        RATE = streamer.RATE
    one_second_iters = int(RATE / CHUNK)
    if logger:
        logger.info(
            "user recording started",
            extra={"details": "record_user", "further": f"{silence_seconds} seconds"},
        )

    while True:
        data = stream.read(CHUNK)
        assert len(data) == CHUNK * 2, "chunk size does not match 2 bytes per sample"
        frames.append(data)
        if len(frames) < one_second_iters * silence_seconds:
            continue
        contains_speech = vad.contains_speech(
            frames[int(-one_second_iters * silence_seconds) :]
        )
        if not started and contains_speech:
            started = True
            if logger:
                logger.info(
                    "speech detected",
                    extra={"details": "record_user", "further": ""},
                )
        if started and contains_speech is False:
            break
    stream.close()
    if logger:
        logger.info(
            "user recording ended",
            extra={"details": "record_user", "further": ""},
        )

    # creating a np array from buffer
    frames = np.frombuffer(b"".join(frames), dtype=np.int16)

    # normalization see https://discuss.pytorch.org/t/torchaudio-load-normalization-question/71470
    frames = frames / (1 << 15)

    return frames.astype(np.float32)

record_user_stream

record_user_stream(silence_seconds, vad, audio_queue, streamer=None)
Source code in openvoicechat/stt/utils.py
def record_user_stream(silence_seconds, vad, audio_queue, streamer=None):
    frames = []

    started = False
    if streamer is None:
        stream = make_stream()
        global CHUNK
        global RATE
    else:
        stream = streamer.make_stream()
        CHUNK = streamer.CHUNK
        RATE = streamer.RATE

    one_second_iters = int(RATE / CHUNK)
    print("* recording")

    while True:
        data = stream.read(CHUNK)
        assert len(data) == CHUNK * 2, "chunk size does not match 2 bytes per sample"
        frames.append(data)
        audio_queue.put(data)
        contains_speech = vad.contains_speech(
            frames[int(-one_second_iters * silence_seconds) :]
        )
        if not started and contains_speech:
            started = True
            print("*listening to speech*")
        if started and contains_speech is False:
            break
    audio_queue.put(None)
    stream.close()
    print("* done recording")

stt.vad.VoiceActivityDetection

VoiceActivityDetection(sampling_rate=16000)

Methods:

Name Description
contains_speech

Attributes:

Name Type Description
sampling_rate
Source code in openvoicechat/stt/vad.py
def __init__(self, sampling_rate=16000):
    self.model, utils = torch.hub.load(
        repo_or_dir="snakers4/silero-vad", model="silero_vad", force_reload=False
    )

    (
        self.get_speech_timestamps,
        self.save_audio,
        self.read_audio,
        self.VADIterator,
        self.collect_chunks,
    ) = utils

    self.sampling_rate = sampling_rate

sampling_rate instance-attribute

sampling_rate = sampling_rate

contains_speech

contains_speech(audio)
Source code in openvoicechat/stt/vad.py
def contains_speech(self, audio):
    frames = np.frombuffer(b"".join(audio), dtype=np.int16)

    # normalization see https://discuss.pytorch.org/t/torchaudio-load-normalization-question/71470
    frames = frames / (1 << 15)

    audio = torch.tensor(frames.astype(np.float32))
    speech_timestamps = self.get_speech_timestamps(
        audio, self.model, sampling_rate=self.sampling_rate
    )
    return len(speech_timestamps) > 0