#!/usr/bin/env python3 """ Ti-Pote Wake Word Detection Script. Runs OpenWakeWord model continuously, listening on the specified ALSA device. Prints "DETECTED" to stdout when the wake word is heard. Supports PAUSE/RESUME commands on stdin to temporarily stop/start listening without reloading the model. When paused, the audio stream is closed so other processes (arecord) can use the device. Usage: python3 wake_word.py --model hey_jarvis --threshold 0.5 --device default --sample-rate 16000 Requirements: pip install openwakeword pyaudio numpy """ import argparse import sys import os import signal import select import threading import numpy as np def main(): parser = argparse.ArgumentParser(description='Ti-Pote Wake Word Detection') parser.add_argument('--model', type=str, default='hey_jarvis', help='Wake word model name (default: hey_jarvis as placeholder)') parser.add_argument('--threshold', type=float, default=0.5, help='Detection threshold (0.0-1.0)') parser.add_argument('--device', type=str, default='default', help='ALSA audio capture device') parser.add_argument('--sample-rate', type=int, default=16000, help='Audio sample rate in Hz') args = parser.parse_args() try: from openwakeword.model import Model except ImportError: print("ERROR: openwakeword not installed. Run: pip install openwakeword", file=sys.stderr) sys.exit(1) try: import pyaudio except ImportError: print("ERROR: pyaudio not installed. Run: pip install pyaudio", file=sys.stderr) sys.exit(1) # ── Load the wake word model (one time only) ── print(f"Loading wake word model: {args.model}...", file=sys.stderr) import openwakeword pretrained_paths = openwakeword.get_pretrained_model_paths() model_path = None for p in pretrained_paths: basename = os.path.basename(p) if basename.startswith(args.model): model_path = p break if model_path is None: if os.path.isfile(args.model): model_path = args.model else: print(f"ERROR: model '{args.model}' not found in pretrained models", file=sys.stderr) print(f"Available models:", file=sys.stderr) for p in pretrained_paths: print(f" - {os.path.basename(p)}", file=sys.stderr) sys.exit(1) print(f"Resolved model path: {model_path}", file=sys.stderr) try: oww_model = Model(wakeword_model_paths=[model_path]) except Exception as e: print(f"ERROR loading model '{args.model}': {e}", file=sys.stderr) sys.exit(1) print(f"Wake word model loaded: {args.model}", file=sys.stderr) print(f"Threshold: {args.threshold}", file=sys.stderr) print(f"Listening on device: {args.device}", file=sys.stderr) # ── Initialize PyAudio ── pa = pyaudio.PyAudio() # Find the device index import re device_index = None if args.device != 'default': try: idx = int(args.device) info = pa.get_device_info_by_index(idx) if info.get('maxInputChannels', 0) > 0: device_index = idx print(f"Using device by index: [{idx}] {info['name']}", file=sys.stderr) except (ValueError, IOError): pass if device_index is None: hw_match = re.search(r'(\d+),(\d+)', args.device) hw_pattern = f"hw:{hw_match.group(1)},{hw_match.group(2)}" if hw_match else None for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info.get('maxInputChannels', 0) <= 0: continue name = str(info.get('name', '')) if (hw_pattern and hw_pattern in name) or args.device in name: device_index = i print(f"Matched device: [{i}] {name}", file=sys.stderr) break if device_index is None: print(f"WARNING: Device '{args.device}' not found, listing available inputs:", file=sys.stderr) for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info.get('maxInputChannels', 0) > 0: print(f" [{i}] {info['name']}", file=sys.stderr) print("Falling back to default device", file=sys.stderr) # ── Audio stream helpers ── chunk_size = 1280 # ~80ms at 16kHz (OpenWakeWord expects this) stream = None def open_stream(): nonlocal stream stream = pa.open( format=pyaudio.paInt16, channels=1, rate=args.sample_rate, input=True, frames_per_buffer=chunk_size, input_device_index=device_index, ) def close_stream(): nonlocal stream if stream is not None: try: stream.stop_stream() stream.close() except Exception: pass stream = None # ── Stdin command reader (PAUSE / RESUME) ── paused = False running = True lock = threading.Lock() def stdin_reader(): nonlocal paused, running while running: try: line = sys.stdin.readline() if not line: # EOF running = False break cmd = line.strip().upper() with lock: if cmd == 'PAUSE': if not paused: paused = True print("PAUSED", file=sys.stderr, flush=True) elif cmd == 'RESUME': if paused: paused = False print("RESUMED", file=sys.stderr, flush=True) elif cmd == 'QUIT': running = False break except Exception: break stdin_thread = threading.Thread(target=stdin_reader, daemon=True) stdin_thread.start() # ── Signal handling ── def handle_signal(sig, frame): nonlocal running running = False signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) # ── Main loop ── open_stream() print("READY", file=sys.stderr, flush=True) try: while running: with lock: is_paused = paused if is_paused: # Close the audio stream so arecord can use the device if stream is not None: close_stream() print("STREAM_CLOSED", file=sys.stderr, flush=True) # Wait a bit before checking again import time time.sleep(0.1) continue # Reopen stream if it was closed (after resume) if stream is None: open_stream() oww_model.reset() print("STREAM_REOPENED", file=sys.stderr, flush=True) try: audio_data = stream.read(chunk_size, exception_on_overflow=False) except Exception as e: print(f"Audio read error: {e}", file=sys.stderr) close_stream() import time time.sleep(0.5) continue audio_array = np.frombuffer(audio_data, dtype=np.int16) oww_model.predict(audio_array) for model_name, score in oww_model.prediction_buffer.items(): if len(score) > 0 and score[-1] > args.threshold: print("DETECTED", flush=True) oww_model.reset() break except KeyboardInterrupt: pass finally: close_stream() pa.terminate() print("Wake word detection stopped", file=sys.stderr) if __name__ == '__main__': main()