Skip to content

Base Classes

The base classes are responsible for running the core functionality such as streaming, multiprocessing and other parallelization.

stt.base.BaseEar

BaseEar(silence_seconds=2, not_interrupt_words=None, listener=None, stream=False, listen_interruptions=True, logger=None)

Initializes the BaseEar class.

Parameters:

Name Type Description Default
silence_seconds float, optional

Number of seconds of silence to detect. Defaults to 2.

2
not_interrupt_words list, optional

List of words that should not be considered as interruptions.

None
listener object, optional

Listener object to receive the audio from. Defaults to None.

None
stream bool, optional

Flag indicating whether to stream the audio or process it as a whole. Defaults to False.

False
listen_interruptions bool, optional

Flag indicating whether to listen for interruptions. Defaults to True.

True

Methods:

Name Description
transcribe

Given an audio input, return the transcription

transcribe_stream

:param audio_queue: Queue containing audio chunks from pyaudio stream

listen

records audio using record_user and returns its transcription

interrupt_listen

Records audio with interruption. Transcribes audio if

Attributes:

Name Type Description
silence_seconds
not_interrupt_words
vad
listener
stream
listen_interruptions
logger
Source code in openvoicechat/stt/base.py
def __init__(
    self,
    silence_seconds=2,
    not_interrupt_words=None,
    listener=None,
    stream=False,
    listen_interruptions=True,
    logger=None,
):
    """
    Initializes the BaseEar class.

    :param silence_seconds: Number of seconds of silence to detect. Defaults to 2.
    :type silence_seconds: float, optional
    :param not_interrupt_words: List of words that should not be considered as interruptions.
    :type not_interrupt_words: list, optional
    :param listener: Listener object to receive the audio from. Defaults to None.
    :type listener: object, optional
    :param stream: Flag indicating whether to stream the audio or process it as a whole. Defaults to False.
    :type stream: bool, optional
    :param listen_interruptions: Flag indicating whether to listen for interruptions. Defaults to True.
    :type listen_interruptions: bool, optional
    """

    if not_interrupt_words is None:
        not_interrupt_words = [
            "you",
            "yes",
            "yeah",
            "hmm",
        ]  # you because whisper says "you" in silence
    self.silence_seconds = silence_seconds
    self.not_interrupt_words = not_interrupt_words
    self.vad = VoiceActivityDetection()
    self.listener = listener
    self.stream = stream
    self.listen_interruptions = listen_interruptions
    self.logger = logger

silence_seconds instance-attribute

silence_seconds = silence_seconds

not_interrupt_words instance-attribute

not_interrupt_words = not_interrupt_words

vad instance-attribute

vad = VoiceActivityDetection()

listener instance-attribute

listener = listener

stream instance-attribute

stream = stream

listen_interruptions instance-attribute

listen_interruptions = listen_interruptions

logger instance-attribute

logger = logger

transcribe

transcribe(input_audio: np.ndarray) -> str

Given an audio input, return the transcription

Parameters:

Name Type Description Default
input_audio ndarray
required

Returns:

Type Description
str

transcription

Source code in openvoicechat/stt/base.py
def transcribe(self, input_audio: np.ndarray) -> str:
    """
    Given an audio input, return the transcription
    :param input_audio:
    :return: transcription
    """
    raise NotImplementedError("This method should be implemented by the subclass")

transcribe_stream

transcribe_stream(audio_queue: Queue, transcription_queue: Queue)

Parameters:

Name Type Description Default
audio_queue Queue

Queue containing audio chunks from pyaudio stream

required
transcription_queue Queue

Queue to put transcriptions

required
Source code in openvoicechat/stt/base.py
def transcribe_stream(self, audio_queue: Queue, transcription_queue: Queue):
    """
    :param audio_queue: Queue containing audio chunks from pyaudio stream
    :param transcription_queue: Queue to put transcriptions
    """
    raise NotImplementedError("This method should be implemented by the subclass")

_sim_transcribe_stream

_sim_transcribe_stream(input_audio: np.ndarray) -> str

Simulates the transcribe stream using a single audio input

Parameters:

Name Type Description Default
input_audio ndarray

fp32 numpy array of the audio

required

Returns:

Type Description
str

transcription

Source code in openvoicechat/stt/base.py
def _sim_transcribe_stream(self, input_audio: np.ndarray) -> str:
    """
    Simulates the transcribe stream using a single audio input
    :param input_audio: fp32 numpy array of the audio
    :return: transcription
    """
    audio_queue = Queue()
    transcription_queue = Queue()

    input_buffer = (input_audio * (1 << 15)).astype(np.int16).tobytes()
    audio_queue.put(input_buffer)
    audio_queue.put(None)
    transcription_thread = Thread(
        target=self.transcribe_stream, args=(audio_queue, transcription_queue)
    )
    transcription_thread.start()
    transcription_thread.join()
    text = ""
    while True:
        _ = transcription_queue.get()
        if _ is None:
            break
        text += _ + " "
    return text

_log_event

_log_event(event: str, details: str, further: str = '')
Source code in openvoicechat/stt/base.py
def _log_event(self, event: str, details: str, further: str = ""):
    if self.logger:
        self.logger.info(
            event, extra={"details": details, "further": f'"{further}"'}
        )

_listen

_listen() -> str

records audio using record_user and returns its transcription

Returns:

Type Description
str

transcription

Source code in openvoicechat/stt/base.py
def _listen(self) -> str:
    """
    records audio using record_user and returns its transcription
    :return: transcription
    """
    import pysbd

    seg = pysbd.Segmenter(language="en", clean=False)

    sentence_finished = False
    first = True
    audio = np.zeros(0, dtype=np.float32)
    n = 2  # number of times to see if the sentence ends
    while not sentence_finished and n > 0:

        new_audio = record_user(
            self.silence_seconds,
            self.vad,
            self.listener,
            started=not first,
            logger=self.logger,
        )

        audio = np.concatenate((audio, new_audio), 0)

        self._log_event("transcribing", "STT")
        text = self.transcribe(audio)
        self._log_event("transcribed", "STT", text)

        self._log_event("segmenting", "STT", text)
        first = False
        if len(seg.segment(text + " .")) > 1:
            sentence_finished = True
            self._log_event("sentence boundary detected", "STT", text)
        else:
            n -= 1
            self._log_event(
                "no sentence boundary detected",
                "STT",
                text + ". tries left: " + str(n),
            )
    return text

_listen_stream

_listen_stream() -> str

records audio using record_user and returns its transcription

Returns:

Type Description
str

transcription

Source code in openvoicechat/stt/base.py
def _listen_stream(self) -> str:
    """
    records audio using record_user and returns its transcription
    :return: transcription
    """

    audio_queue = Queue()
    transcription_queue = Queue()

    audio_thread = Thread(
        target=record_user_stream,
        args=(self.silence_seconds, self.vad, audio_queue, self.listener),
    )
    transcription_thread = Thread(
        target=self.transcribe_stream, args=(audio_queue, transcription_queue)
    )

    audio_thread.start()
    transcription_thread.start()

    text = ""
    while True:
        _ = transcription_queue.get()
        if _ is None:
            break
        text += _ + " "
    audio_thread.join()
    transcription_thread.join()
    return text

listen

listen() -> str

records audio using record_user and returns its transcription

Returns:

Type Description
str

transcription

Source code in openvoicechat/stt/base.py
def listen(self) -> str:
    """
    records audio using record_user and returns its transcription
    :return: transcription
    """
    if self.stream:
        return self._listen_stream()
    else:
        return self._listen()

interrupt_listen

interrupt_listen(record_seconds=100) -> str

Records audio with interruption. Transcribes audio if voice activity detected and returns True if transcription indicates interruption.

Parameters:

Name Type Description Default
record_seconds

Max seconds to record for

100

Returns:

Type Description
str

boolean indicating the if an interruption occured

Source code in openvoicechat/stt/base.py
def interrupt_listen(self, record_seconds=100) -> str:
    """
    Records audio with interruption. Transcribes audio if
    voice activity detected and returns True if transcription indicates
    interruption.

    :param record_seconds: Max seconds to record for
    :return: boolean indicating the if an interruption occured
    """
    if not self.listen_interruptions:
        return False
    while record_seconds > 0:
        interruption_audio = record_interruption(
            self.vad, record_seconds, streamer=self.listener, logger=self.logger
        )
        # duration of interruption audio
        if interruption_audio is None:
            return ""
        else:
            duration = len(interruption_audio) / 16_000
            self._log_event(
                "transcribing interruption", "STT", f"{duration} seconds"
            )
            if self.stream:
                text = self._sim_transcribe_stream(interruption_audio)
            else:
                text = self.transcribe(interruption_audio)
            self._log_event("interruption transcribed", "STT", text)
            # remove any punctuation using re
            text = re.sub(r"[^\w\s]", "", text)
            text = text.lower()
            text = text.strip()
            if text in self.not_interrupt_words:
                self._log_event("not interruption", "STT", text)
                record_seconds -= duration
            else:
                return text

llm.base.BaseChatbot

BaseChatbot(logger=None)

Initialize the model and other things here

Methods:

Name Description
run

Yields the response to the input text

post_process

Post process the response before returning

generate_response

:param input_text: The user input

generate_response_stream

:param input_text: The user input

Attributes:

Name Type Description
logger
Source code in openvoicechat/llm/base.py
5
6
7
8
9
def __init__(self, logger=None):
    """
    Initialize the model and other things here
    """
    self.logger = logger

logger instance-attribute

logger = logger

run

run(input_text: str)

Yields the response to the input text

Source code in openvoicechat/llm/base.py
def run(self, input_text: str):
    """
    Yields the response to the input text
    """
    raise NotImplementedError("This method should be implemented by the subclass")

post_process

post_process(response: str) -> str

Post process the response before returning

Source code in openvoicechat/llm/base.py
def post_process(self, response: str) -> str:
    """
    Post process the response before returning
    """
    return response

generate_response

generate_response(input_text: str) -> str

Parameters:

Name Type Description Default
input_text str

The user input

required

Returns:

Type Description
str

The chatbot's response.

Source code in openvoicechat/llm/base.py
def generate_response(self, input_text: str) -> str:
    """
    :param input_text: The user input
    :return: The chatbot's response.
    """
    out = self.run(input_text)
    response_text = ""
    for o in out:
        text = o
        response_text += text
    response = self.post_process(response_text)
    return response

_log_event

_log_event(event: str, details: str, further: str)
Source code in openvoicechat/llm/base.py
def _log_event(self, event: str, details: str, further: str):
    if self.logger:
        self.logger.info(event, extra={"details": details, "further": further})

generate_response_stream

generate_response_stream(input_text: str, output_queue: queue.Queue, interrupt_queue: queue.Queue) -> str

Parameters:

Name Type Description Default
input_text str

The user input

required
output_queue Queue

The text output queue where the result is accumulated.

required
interrupt_queue Queue

The interrupt queue which stores the transcription if interruption occurred. Used to stop generating.

required

Returns:

Type Description
str

The chatbot's response after running self.post_process

Source code in openvoicechat/llm/base.py
def generate_response_stream(
    self, input_text: str, output_queue: queue.Queue, interrupt_queue: queue.Queue
) -> str:
    """
    :param input_text: The user input
    :param output_queue: The text output queue where the result is accumulated.
    :param interrupt_queue: The interrupt queue which stores the transcription if interruption occurred. Used to stop generating.
    :return: The chatbot's response after running self.post_process
    """
    self._log_event("llm request sent", "LLM", "")
    out = self.run(input_text)
    response_text = ""
    for text in out:
        if not interrupt_queue.empty():
            self._log_event("interruption detected", "LLM", "")
            break
        self._log_event("llm token received", "LLM", f'"{text}"')
        output_queue.put(text)
        response_text += text
    output_queue.put(None)
    self._log_event("llm post processing", "LLM", "")
    response = self.post_process(response_text)
    return response

tts.base.BaseMouth

BaseMouth(sample_rate: int, player=sd, wait=True, logger=None)

Initializes the BaseMouth class.

Parameters:

Name Type Description Default
sample_rate int

The sample rate of the audio.

required
player

The audio player object. Defaults to sounddevice.

sounddevice
wait

Whether to wait for the audio to finish playing. Defaults to True.

True

Methods:

Name Description
run_tts

:param text: The text to synthesize speech for

say_text

calls run_tts and plays the audio using the player.

say

Plays the audios in the queue using the player. Stops if interruption occurred.

say_multiple

Splits the text into sentences. Then plays the sentences one by one

say_multiple_stream

Receives text from the text_queue. As soon as a sentence is made run_tts is called to

Attributes:

Name Type Description
sample_rate
interrupted
player
seg
wait
logger
Source code in openvoicechat/tts/base.py
def __init__(self, sample_rate: int, player=sd, wait=True, logger=None):
    """
    Initializes the BaseMouth class.

    :param sample_rate: The sample rate of the audio.
    :param player: The audio player object. Defaults to sounddevice.
    :param wait: Whether to wait for the audio to finish playing. Defaults to True.
    """
    self.sample_rate = sample_rate
    self.interrupted = ""
    self.player = player
    self.seg = pysbd.Segmenter(language="en", clean=True)
    self.wait = wait
    self.logger = logger

sample_rate instance-attribute

sample_rate = sample_rate

interrupted instance-attribute

interrupted = ''

player instance-attribute

player = player

seg instance-attribute

seg = Segmenter(language='en', clean=True)

wait instance-attribute

wait = wait

logger instance-attribute

logger = logger

run_tts

run_tts(text: str) -> np.ndarray

Parameters:

Name Type Description Default
text str

The text to synthesize speech for

required

Returns:

Type Description
ndarray

audio numpy array for sounddevice

Source code in openvoicechat/tts/base.py
def run_tts(self, text: str) -> np.ndarray:
    """
    :param text: The text to synthesize speech for
    :return: audio numpy array for sounddevice
    """
    raise NotImplementedError("This method should be implemented by the subclass")

say_text

say_text(text: str)

calls run_tts and plays the audio using the player.

Parameters:

Name Type Description Default
text str

The text to synthesize speech for

required
Source code in openvoicechat/tts/base.py
def say_text(self, text: str):
    """
    calls run_tts and plays the audio using the player.
    :param text: The text to synthesize speech for
    """
    output = self.run_tts(text)
    self.player.play(output, samplerate=self.sample_rate)
    self.player.wait()

say

say(audio_queue: queue.Queue, listen_interruption_func: Callable)

Plays the audios in the queue using the player. Stops if interruption occurred.

Parameters:

Name Type Description Default
audio_queue Queue

The queue where the audio is stored for it to be played

required
listen_interruption_func Callable

callable function from the ear class.

required
Source code in openvoicechat/tts/base.py
def say(self, audio_queue: queue.Queue, listen_interruption_func: Callable):
    """
    Plays the audios in the queue using the player. Stops if interruption occurred.
    :param audio_queue: The queue where the audio is stored for it to be played
    :param listen_interruption_func: callable function from the ear class.
    """
    self.interrupted = ""
    while True:
        output, text = audio_queue.get()
        if output is None:
            self.player.wait()  # wait for the last audio to finish
            break
        # get the duration of audio
        duration = len(output) / self.sample_rate
        self._log_event("playing audio", "TTS", f"{duration} seconds")
        self.player.play(output, samplerate=self.sample_rate)
        interruption = listen_interruption_func(duration)
        if interruption:
            self._log_event("audio interrupted", f"TTS")
            self.player.stop()
            self.interrupted = (interruption, text)
            break
        else:
            if self.wait:
                self.player.wait()  # No need for wait here

say_multiple

say_multiple(text: str, listen_interruption_func: Callable)

Splits the text into sentences. Then plays the sentences one by one using run_tts() and say()

Parameters:

Name Type Description Default
text str

Input text to synthesize

required
listen_interruption_func Callable

callable function from the ear class

required
Source code in openvoicechat/tts/base.py
def say_multiple(self, text: str, listen_interruption_func: Callable):
    """
    Splits the text into sentences. Then plays the sentences one by one
    using run_tts() and say()

    :param text: Input text to synthesize
    :param listen_interruption_func: callable function from the ear class
    """
    sentences = self.seg.segment(text)
    print(sentences)
    audio_queue = queue.Queue()
    say_thread = threading.Thread(
        target=self.say, args=(audio_queue, listen_interruption_func)
    )
    say_thread.start()
    for sentence in sentences:
        output = self.run_tts(sentence)
        audio_queue.put((output, sentence))
        if self.interrupted:
            break
    audio_queue.put((None, ""))
    say_thread.join()

_handle_interruption

_handle_interruption(responses_list, interrupt_queue)
Source code in openvoicechat/tts/base.py
def _handle_interruption(self, responses_list, interrupt_queue):
    interrupt_transcription, interrupt_text = self.interrupted
    self._log_event("interruption detected", "TTS", interrupt_transcription)
    idx = responses_list.index(interrupt_text)
    assert (
        idx != -1
    ), "Interrupted text not found in responses list. This should not happen. Raise an issue."
    responses_list = responses_list[:idx] + ["..."]
    interrupt_queue.put(interrupt_transcription)
    return responses_list

_get_all_text

_get_all_text(text_queue)
Source code in openvoicechat/tts/base.py
def _get_all_text(self, text_queue):
    text = text_queue.get()
    while not text_queue.empty():
        new_text = text_queue.get()
        if new_text is not None:
            text += new_text
        else:
            text_queue.put(None)
            break
    return text

_log_event

_log_event(event: str, details: str, further: str = '')
Source code in openvoicechat/tts/base.py
def _log_event(self, event: str, details: str, further: str = ""):
    if self.logger:
        self.logger.info(
            event, extra={"details": details, "further": f'"{further}"'}
        )

say_multiple_stream

say_multiple_stream(text_queue: queue.Queue, listen_interruption_func: Callable, interrupt_queue: queue.Queue, audio_queue: queue.Queue = None)

Receives text from the text_queue. As soon as a sentence is made run_tts is called to synthesize its speech and sent to the audio_queue for it to be played.

Parameters:

Name Type Description Default
text_queue Queue

The queue where the llm adds the predicted tokens

required
listen_interruption_func Callable

callable function from the ear class

required
interrupt_queue Queue

The queue where True is put when interruption occurred.

required
audio_queue Queue

The queue where the audio to be played is placed

None
Source code in openvoicechat/tts/base.py
def say_multiple_stream(
    self,
    text_queue: queue.Queue,
    listen_interruption_func: Callable,
    interrupt_queue: queue.Queue,
    audio_queue: queue.Queue = None,
):
    """
    Receives text from the text_queue. As soon as a sentence is made run_tts is called to
    synthesize its speech and sent to the audio_queue for it to be played.

    :param text_queue: The queue where the llm adds the predicted tokens
    :param listen_interruption_func: callable function from the ear class
    :param interrupt_queue: The queue where True is put when interruption occurred.
    :param audio_queue: The queue where the audio to be played is placed

    """
    response = ""
    all_response = []
    interrupt_text_list = []

    if audio_queue is None:
        audio_queue = queue.Queue()
    say_thread = threading.Thread(
        target=self.say, args=(audio_queue, listen_interruption_func)
    )
    self._log_event("audio play thread started", "TTS")
    say_thread.start()
    text = ""

    while text is not None:
        self._log_event("getting all text", "TTS")
        text = self._get_all_text(text_queue)
        self._log_event("all text received", "TTS")

        if text is None:
            self._log_event("Stream ended", "TTS")
            sentence = response
        else:
            response += text
            self._log_event("segmenting text", "TTS", response)
            sentences = self.seg.segment(response)
            # if there are multiple sentences we split and play the first one
            if len(sentences) > 1:
                self._log_event("multiple sentences detected", "TTS")
                sentence = sentences[0]
                response = " ".join([s for s in sentences[1:] if s != "."])
            else:
                self._log_event("single sentence detected", "TTS")
                continue

        if sentence.strip() != "":
            self._log_event("cleaning sentence", "TTS")
            clean_sentence = remove_words_in_brackets_and_spaces(sentence).strip()
            if (
                clean_sentence.strip() != ""
            ):  # sentence only contains words in brackets
                self._log_event("running tts", "TTS", clean_sentence)
                output = self.run_tts(clean_sentence)
                self._log_event("tts output received", "TTS")
                audio_queue.put((output, clean_sentence))
                interrupt_text_list.append(clean_sentence)
            all_response.append(sentence)
        # if interruption occurred, handle it
        if self.interrupted:
            all_response = self._handle_interruption(
                interrupt_text_list, interrupt_queue
            )
            self.interrupted = ""
            break

    audio_queue.put((None, ""))

    say_thread.join()
    self._log_event("audio play thread ended", "TTS")
    if self.interrupted:
        all_response = self._handle_interruption(
            interrupt_text_list, interrupt_queue
        )
    text_queue.queue.clear()
    text_queue.put(" ".join(all_response))