summaryrefslogtreecommitdiff
path: root/modules/input_handlers/pipewire_record.py
blob: 3ad295cdefec88548099a3a4e5e610d94daf347e (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import subprocess
import os.path
import signal
from time import sleep

import whisper

from modules.input_handlers.input_handler import InputHandler

FIFO_PATH = "/tmp/hestia-listening"
RECORD_PATH = "/tmp/hestia-record.mp3"

class PipeWireRecord(InputHandler):
    def cleanup(self):
        if os.path.exists(FIFO_PATH):
            os.remove(FIFO_PATH)


    def get_input(self):
        device = PipeWireRecord.get_device()

        self.cleanup()
        os.mkfifo(FIFO_PATH)

        while True:
            with open(FIFO_PATH):
                pass
                # TODO "I'm listening"

            try:
                ps = subprocess.Popen((f"pw-record --target {device} {RECORD_PATH}",), shell=True)
                with open(FIFO_PATH):
                    print("finished")
                ps.send_signal(signal.SIGINT)
                # TODO "acknowledged"
            except:
                if "ps" in locals():
                    ps.kill()
                # TODO "error"
                # TODO exit gracefully or try to recover
                raise StopIteration

            model = whisper.load_model("base")

            audio = whisper.load_audio(RECORD_PATH)
            audio = whisper.pad_or_trim(audio)

            mel = whisper.log_mel_spectrogram(audio).to(model.device)
            options = whisper.DecodingOptions(language="en", fp16=False)
            result = whisper.decode(model, mel, options)
            result_text = result.text.replace(",", "").replace(".", "").lower()

            print(result_text)

            yield result_text

    @staticmethod
    def get_device() -> str:
        already_warned = False

        while True:
            ps = subprocess.Popen(('pw-cli ls | \\grep -Poi "(?<=node.name = \\").*mic.*(?=\\")"',), shell=True, stdout=subprocess.PIPE)
            ps.wait()

            if ps.returncode == 0:
                return ps.stdout.read().decode().strip()

            elif not already_warned:
                already_warned = True
                # TODO warn about device not found

            sleep(3)