248 lines
8.0 KiB
Python
Executable File

#!/usr/bin/env python3
"""
Ti-Pote Wake Word Detection Script.
Runs OpenWakeWord model continuously, listening on the specified ALSA device.
Prints "DETECTED" to stdout when the wake word is heard.
Supports PAUSE/RESUME commands on stdin to temporarily stop/start listening
without reloading the model. When paused, the audio stream is closed so other
processes (arecord) can use the device.
Usage:
python3 wake_word.py --model hey_jarvis --threshold 0.5 --device default --sample-rate 16000
Requirements:
pip install openwakeword pyaudio numpy
"""
import argparse
import sys
import os
import signal
import select
import threading
import numpy as np
def main():
parser = argparse.ArgumentParser(description='Ti-Pote Wake Word Detection')
parser.add_argument('--model', type=str, default='hey_jarvis',
help='Wake word model name (default: hey_jarvis as placeholder)')
parser.add_argument('--threshold', type=float, default=0.5,
help='Detection threshold (0.0-1.0)')
parser.add_argument('--device', type=str, default='default',
help='ALSA audio capture device')
parser.add_argument('--sample-rate', type=int, default=16000,
help='Audio sample rate in Hz')
args = parser.parse_args()
try:
from openwakeword.model import Model
except ImportError:
print("ERROR: openwakeword not installed. Run: pip install openwakeword", file=sys.stderr)
sys.exit(1)
try:
import pyaudio
except ImportError:
print("ERROR: pyaudio not installed. Run: pip install pyaudio", file=sys.stderr)
sys.exit(1)
# ── Load the wake word model (one time only) ──
print(f"Loading wake word model: {args.model}...", file=sys.stderr)
import openwakeword
pretrained_paths = openwakeword.get_pretrained_model_paths()
model_path = None
for p in pretrained_paths:
basename = os.path.basename(p)
if basename.startswith(args.model):
model_path = p
break
if model_path is None:
if os.path.isfile(args.model):
model_path = args.model
else:
print(f"ERROR: model '{args.model}' not found in pretrained models", file=sys.stderr)
print(f"Available models:", file=sys.stderr)
for p in pretrained_paths:
print(f" - {os.path.basename(p)}", file=sys.stderr)
sys.exit(1)
print(f"Resolved model path: {model_path}", file=sys.stderr)
try:
oww_model = Model(wakeword_model_paths=[model_path])
except Exception as e:
print(f"ERROR loading model '{args.model}': {e}", file=sys.stderr)
sys.exit(1)
print(f"Wake word model loaded: {args.model}", file=sys.stderr)
print(f"Threshold: {args.threshold}", file=sys.stderr)
print(f"Listening on device: {args.device}", file=sys.stderr)
# ── Initialize PyAudio ──
pa = pyaudio.PyAudio()
# Find the device index
import re
device_index = None
if args.device != 'default':
try:
idx = int(args.device)
info = pa.get_device_info_by_index(idx)
if info.get('maxInputChannels', 0) > 0:
device_index = idx
print(f"Using device by index: [{idx}] {info['name']}", file=sys.stderr)
except (ValueError, IOError):
pass
if device_index is None:
hw_match = re.search(r'(\d+),(\d+)', args.device)
hw_pattern = f"hw:{hw_match.group(1)},{hw_match.group(2)}" if hw_match else None
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info.get('maxInputChannels', 0) <= 0:
continue
name = str(info.get('name', ''))
if (hw_pattern and hw_pattern in name) or args.device in name:
device_index = i
print(f"Matched device: [{i}] {name}", file=sys.stderr)
break
if device_index is None:
print(f"WARNING: Device '{args.device}' not found, listing available inputs:", file=sys.stderr)
for i in range(pa.get_device_count()):
info = pa.get_device_info_by_index(i)
if info.get('maxInputChannels', 0) > 0:
print(f" [{i}] {info['name']}", file=sys.stderr)
print("Falling back to default device", file=sys.stderr)
# ── Audio stream helpers ──
chunk_size = 1280 # ~80ms at 16kHz (OpenWakeWord expects this)
stream = None
def open_stream():
nonlocal stream
stream = pa.open(
format=pyaudio.paInt16,
channels=1,
rate=args.sample_rate,
input=True,
frames_per_buffer=chunk_size,
input_device_index=device_index,
)
def close_stream():
nonlocal stream
if stream is not None:
try:
stream.stop_stream()
stream.close()
except Exception:
pass
stream = None
# ── Stdin command reader (PAUSE / RESUME) ──
paused = False
running = True
lock = threading.Lock()
def stdin_reader():
nonlocal paused, running
while running:
try:
line = sys.stdin.readline()
if not line: # EOF
running = False
break
cmd = line.strip().upper()
with lock:
if cmd == 'PAUSE':
if not paused:
paused = True
print("PAUSED", file=sys.stderr, flush=True)
elif cmd == 'RESUME':
if paused:
paused = False
print("RESUMED", file=sys.stderr, flush=True)
elif cmd == 'QUIT':
running = False
break
except Exception:
break
stdin_thread = threading.Thread(target=stdin_reader, daemon=True)
stdin_thread.start()
# ── Signal handling ──
def handle_signal(sig, frame):
nonlocal running
running = False
signal.signal(signal.SIGTERM, handle_signal)
signal.signal(signal.SIGINT, handle_signal)
# ── Main loop ──
open_stream()
print("READY", file=sys.stderr, flush=True)
try:
while running:
with lock:
is_paused = paused
if is_paused:
# Close the audio stream so arecord can use the device
if stream is not None:
close_stream()
print("STREAM_CLOSED", file=sys.stderr, flush=True)
# Wait a bit before checking again
import time
time.sleep(0.1)
continue
# Reopen stream if it was closed (after resume)
if stream is None:
open_stream()
oww_model.reset()
print("STREAM_REOPENED", file=sys.stderr, flush=True)
try:
audio_data = stream.read(chunk_size, exception_on_overflow=False)
except Exception as e:
print(f"Audio read error: {e}", file=sys.stderr)
close_stream()
import time
time.sleep(0.5)
continue
audio_array = np.frombuffer(audio_data, dtype=np.int16)
oww_model.predict(audio_array)
for model_name, score in oww_model.prediction_buffer.items():
if len(score) > 0 and score[-1] > args.threshold:
print("DETECTED", flush=True)
oww_model.reset()
break
except KeyboardInterrupt:
pass
finally:
close_stream()
pa.terminate()
print("Wake word detection stopped", file=sys.stderr)
if __name__ == '__main__':
main()