2026-04-14 02:14:54 +02:00

334 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Ti-Pote Wake Word Detection Script.
Runs OpenWakeWord continuously and prints "DETECTED" to stdout when
the wake word is heard.
Two input modes:
1. --input alsa (default, legacy)
Opens an ALSA capture device via PyAudio. PAUSE/RESUME/QUIT
commands are read from stdin.
2. --input stdin
Reads raw S16 mono PCM audio from stdin (fd 0). This is used when
the Raspberry Pi is just an orchestrator and the microphone lives
on the ESP32 — the Node client forwards AUDIO_UP frames into this
script's stdin. Control commands are read from a separate file
descriptor specified by --control-fd (default: 3).
Control commands (one per line, uppercase):
PAUSE — stop emitting DETECTED events (audio keeps flowing so
we don't overflow the pipe, but predictions are ignored).
RESUME — resume emitting and reset the model buffer.
RESET — reset the model buffer without touching the pause flag.
QUIT — exit cleanly.
Usage (ALSA):
python3 wake_word.py --model hey_jarvis --device default
Usage (stdin / ESP32 backend):
python3 wake_word.py --model hey_jarvis --input stdin --control-fd 3
"""
import argparse
import os
import signal
import sys
import threading
import time
import numpy as np
CHUNK_SAMPLES = 1280 # ≈ 80 ms @ 16 kHz (OpenWakeWord's preferred size)
def load_model(model_name: str):
try:
from openwakeword.model import Model
except ImportError:
print("ERROR: openwakeword not installed. Run: pip install openwakeword",
file=sys.stderr)
sys.exit(1)
import openwakeword
pretrained = openwakeword.get_pretrained_model_paths()
model_path = next(
(p for p in pretrained if os.path.basename(p).startswith(model_name)),
None,
)
if model_path is None:
if os.path.isfile(model_name):
model_path = model_name
else:
print(f"ERROR: model '{model_name}' not found", file=sys.stderr)
for p in pretrained:
print(f" - {os.path.basename(p)}", file=sys.stderr)
sys.exit(1)
print(f"Loading wake word model: {model_name}...", file=sys.stderr)
try:
return Model(wakeword_model_paths=[model_path])
except Exception as e:
print(f"ERROR loading model '{model_name}': {e}", file=sys.stderr)
sys.exit(1)
class State:
"""Shared mutable state between the audio and control threads."""
def __init__(self):
self.paused = False
self.running = True
self.reset_requested = False
self.lock = threading.Lock()
def start_control_reader(state: State, fd: int):
"""Background thread that reads PAUSE/RESUME/RESET/QUIT commands."""
try:
f = os.fdopen(fd, 'r', buffering=1)
except OSError as e:
print(f"ERROR opening control fd {fd}: {e}", file=sys.stderr)
return
def reader():
while state.running:
try:
line = f.readline()
except Exception:
break
if not line:
break
cmd = line.strip().upper()
with state.lock:
if cmd == 'PAUSE' and not state.paused:
state.paused = True
print("PAUSED", file=sys.stderr, flush=True)
elif cmd == 'RESUME' and state.paused:
state.paused = False
state.reset_requested = True
print("RESUMED", file=sys.stderr, flush=True)
elif cmd == 'RESET':
state.reset_requested = True
elif cmd == 'QUIT':
state.running = False
break
t = threading.Thread(target=reader, daemon=True)
t.start()
def run_predict_loop(oww_model, read_chunk, state: State, threshold: float):
"""
Shared loop: pull a chunk from `read_chunk()`, feed the model,
optionally emit DETECTED. Exits when `read_chunk()` returns None
or state.running is False.
"""
print("READY", file=sys.stderr, flush=True)
try:
while state.running:
with state.lock:
if state.reset_requested:
oww_model.reset()
state.reset_requested = False
audio_data = read_chunk()
if audio_data is None:
# EOF / error; exit cleanly
break
audio_array = np.frombuffer(audio_data, dtype=np.int16)
oww_model.predict(audio_array)
with state.lock:
if state.paused:
# Keep draining but don't emit detections.
continue
for name, score in oww_model.prediction_buffer.items():
if len(score) > 0:
s = score[-1]
if s > 0.05:
print(f"SCORE: {name}={s:.3f}", file=sys.stderr, flush=True)
if s > threshold:
print("DETECTED", flush=True)
oww_model.reset()
break
except KeyboardInterrupt:
pass
# ─────────────────────────────────────────────────────────────────
# ALSA input (legacy backend)
# ─────────────────────────────────────────────────────────────────
def run_alsa_mode(args, oww_model, state: State):
import re
try:
import pyaudio
except ImportError:
print("ERROR: pyaudio not installed. Run: pip install pyaudio",
file=sys.stderr)
sys.exit(1)
pa = pyaudio.PyAudio()
device_index = None
if args.device != 'default':
try:
idx = int(args.device)
info = pa.get_device_info_by_index(idx)
if info.get('maxInputChannels', 0) > 0:
device_index = idx
print(f"Using device by index: [{idx}] {info['name']}",
file=sys.stderr)
except (ValueError, IOError):
pass
if device_index is None:
hw_match = re.search(r'(\d+),(\d+)', args.device)
hw_pattern = f"hw:{hw_match.group(1)},{hw_match.group(2)}" if hw_match else None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info.get('maxInputChannels', 0) <= 0:
continue
name = str(info.get('name', ''))
if (hw_pattern and hw_pattern in name) or args.device in name:
device_index = i
print(f"Matched device: [{i}] {name}", file=sys.stderr)
break
stream = {'handle': None}
def open_stream():
stream['handle'] = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=args.sample_rate,
input=True,
frames_per_buffer=CHUNK_SAMPLES,
input_device_index=device_index,
)
def close_stream():
h = stream['handle']
if h is not None:
try:
h.stop_stream()
h.close()
except Exception:
pass
stream['handle'] = None
def read_chunk():
with state.lock:
is_paused = state.paused
# In ALSA mode, pausing means physically releasing the device.
if is_paused:
if stream['handle'] is not None:
close_stream()
print("STREAM_CLOSED", file=sys.stderr, flush=True)
time.sleep(0.1)
return b'\x00' * (CHUNK_SAMPLES * 2) # dummy silence; won't be predicted
if stream['handle'] is None:
open_stream()
oww_model.reset()
print("STREAM_REOPENED", file=sys.stderr, flush=True)
try:
return stream['handle'].read(CHUNK_SAMPLES, exception_on_overflow=False)
except Exception as e:
print(f"Audio read error: {e}", file=sys.stderr)
close_stream()
time.sleep(0.5)
return b'\x00' * (CHUNK_SAMPLES * 2)
open_stream()
try:
run_predict_loop(oww_model, read_chunk, state, args.threshold)
finally:
close_stream()
pa.terminate()
print("Wake word detection stopped", file=sys.stderr)
# ─────────────────────────────────────────────────────────────────
# Stdin input (ESP32 backend)
# ─────────────────────────────────────────────────────────────────
def run_stdin_mode(args, oww_model, state: State):
"""
Audio bytes arrive on stdin (fd 0), 16-bit signed LE mono at
`args.sample_rate`. We block until a full CHUNK_SAMPLES chunk is
available and hand it to the model.
"""
print("Listening on stdin for raw S16LE mono PCM", file=sys.stderr)
chunk_bytes = CHUNK_SAMPLES * 2
stdin = sys.stdin.buffer
buf = bytearray()
def read_chunk():
# Keep reading until we have a full chunk or hit EOF.
while len(buf) < chunk_bytes and state.running:
try:
data = stdin.read(chunk_bytes - len(buf))
except Exception as e:
print(f"stdin read error: {e}", file=sys.stderr)
return None
if not data:
return None
buf.extend(data)
if len(buf) < chunk_bytes:
return None
chunk = bytes(buf[:chunk_bytes])
del buf[:chunk_bytes]
return chunk
try:
run_predict_loop(oww_model, read_chunk, state, args.threshold)
finally:
print("Wake word detection stopped", file=sys.stderr)
# ─────────────────────────────────────────────────────────────────
# Entrypoint
# ─────────────────────────────────────────────────────────────────
def main():
parser = argparse.ArgumentParser(description='Ti-Pote Wake Word Detection')
parser.add_argument('--model', type=str, default='hey_jarvis')
parser.add_argument('--threshold', type=float, default=0.5)
parser.add_argument('--input', type=str, choices=['alsa', 'stdin'], default='alsa',
help="Audio source. 'alsa' opens PyAudio, 'stdin' reads from fd 0.")
parser.add_argument('--device', type=str, default='default',
help='ALSA audio capture device (only used with --input alsa).')
parser.add_argument('--control-fd', type=int, default=0,
help='File descriptor to read control commands from. '
'Default 0 (stdin) for ALSA, pass 3 for stdin mode.')
parser.add_argument('--sample-rate', type=int, default=16000)
args = parser.parse_args()
state = State()
def handle_signal(_sig, _frame):
state.running = False
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
oww_model = load_model(args.model)
print(f"Wake word model loaded: {args.model}", file=sys.stderr)
print(f"Threshold: {args.threshold}", file=sys.stderr)
start_control_reader(state, args.control_fd)
if args.input == 'stdin':
run_stdin_mode(args, oww_model, state)
else:
print(f"Listening on device: {args.device}", file=sys.stderr)
run_alsa_mode(args, oww_model, state)
if __name__ == '__main__':
main()