#!/usr/bin/env python3 """ Ti-Pote Wake Word Detection Script. Runs OpenWakeWord continuously and prints "DETECTED" to stdout when the wake word is heard. Two input modes: 1. --input alsa (default, legacy) Opens an ALSA capture device via PyAudio. PAUSE/RESUME/QUIT commands are read from stdin. 2. --input stdin Reads raw S16 mono PCM audio from stdin (fd 0). This is used when the Raspberry Pi is just an orchestrator and the microphone lives on the ESP32 — the Node client forwards AUDIO_UP frames into this script's stdin. Control commands are read from a separate file descriptor specified by --control-fd (default: 3). Control commands (one per line, uppercase): PAUSE — stop emitting DETECTED events (audio keeps flowing so we don't overflow the pipe, but predictions are ignored). RESUME — resume emitting and reset the model buffer. RESET — reset the model buffer without touching the pause flag. QUIT — exit cleanly. Usage (ALSA): python3 wake_word.py --model hey_jarvis --device default Usage (stdin / ESP32 backend): python3 wake_word.py --model hey_jarvis --input stdin --control-fd 3 """ import argparse import os import signal import sys import threading import time import numpy as np CHUNK_SAMPLES = 1280 # ≈ 80 ms @ 16 kHz (OpenWakeWord's preferred size) def load_model(model_name: str): try: from openwakeword.model import Model except ImportError: print("ERROR: openwakeword not installed. Run: pip install openwakeword", file=sys.stderr) sys.exit(1) import openwakeword pretrained = openwakeword.get_pretrained_model_paths() model_path = next( (p for p in pretrained if os.path.basename(p).startswith(model_name)), None, ) if model_path is None: if os.path.isfile(model_name): model_path = model_name else: print(f"ERROR: model '{model_name}' not found", file=sys.stderr) for p in pretrained: print(f" - {os.path.basename(p)}", file=sys.stderr) sys.exit(1) print(f"Loading wake word model: {model_name}...", file=sys.stderr) try: return Model(wakeword_model_paths=[model_path]) except Exception as e: print(f"ERROR loading model '{model_name}': {e}", file=sys.stderr) sys.exit(1) class State: """Shared mutable state between the audio and control threads.""" def __init__(self): self.paused = False self.running = True self.reset_requested = False self.lock = threading.Lock() def start_control_reader(state: State, fd: int): """Background thread that reads PAUSE/RESUME/RESET/QUIT commands.""" try: f = os.fdopen(fd, 'r', buffering=1) except OSError as e: print(f"ERROR opening control fd {fd}: {e}", file=sys.stderr) return def reader(): while state.running: try: line = f.readline() except Exception: break if not line: break cmd = line.strip().upper() with state.lock: if cmd == 'PAUSE' and not state.paused: state.paused = True print("PAUSED", file=sys.stderr, flush=True) elif cmd == 'RESUME' and state.paused: state.paused = False state.reset_requested = True print("RESUMED", file=sys.stderr, flush=True) elif cmd == 'RESET': state.reset_requested = True elif cmd == 'QUIT': state.running = False break t = threading.Thread(target=reader, daemon=True) t.start() def run_predict_loop(oww_model, read_chunk, state: State, threshold: float): """ Shared loop: pull a chunk from `read_chunk()`, feed the model, optionally emit DETECTED. Exits when `read_chunk()` returns None or state.running is False. """ print("READY", file=sys.stderr, flush=True) try: while state.running: with state.lock: if state.reset_requested: oww_model.reset() state.reset_requested = False audio_data = read_chunk() if audio_data is None: # EOF / error; exit cleanly break audio_array = np.frombuffer(audio_data, dtype=np.int16) oww_model.predict(audio_array) with state.lock: if state.paused: # Keep draining but don't emit detections. continue for _, score in oww_model.prediction_buffer.items(): if len(score) > 0 and score[-1] > threshold: print("DETECTED", flush=True) oww_model.reset() break except KeyboardInterrupt: pass # ───────────────────────────────────────────────────────────────── # ALSA input (legacy backend) # ───────────────────────────────────────────────────────────────── def run_alsa_mode(args, oww_model, state: State): import re try: import pyaudio except ImportError: print("ERROR: pyaudio not installed. Run: pip install pyaudio", file=sys.stderr) sys.exit(1) pa = pyaudio.PyAudio() device_index = None if args.device != 'default': try: idx = int(args.device) info = pa.get_device_info_by_index(idx) if info.get('maxInputChannels', 0) > 0: device_index = idx print(f"Using device by index: [{idx}] {info['name']}", file=sys.stderr) except (ValueError, IOError): pass if device_index is None: hw_match = re.search(r'(\d+),(\d+)', args.device) hw_pattern = f"hw:{hw_match.group(1)},{hw_match.group(2)}" if hw_match else None for i in range(pa.get_device_count()): info = pa.get_device_info_by_index(i) if info.get('maxInputChannels', 0) <= 0: continue name = str(info.get('name', '')) if (hw_pattern and hw_pattern in name) or args.device in name: device_index = i print(f"Matched device: [{i}] {name}", file=sys.stderr) break stream = {'handle': None} def open_stream(): stream['handle'] = pa.open( format=pyaudio.paInt16, channels=1, rate=args.sample_rate, input=True, frames_per_buffer=CHUNK_SAMPLES, input_device_index=device_index, ) def close_stream(): h = stream['handle'] if h is not None: try: h.stop_stream() h.close() except Exception: pass stream['handle'] = None def read_chunk(): with state.lock: is_paused = state.paused # In ALSA mode, pausing means physically releasing the device. if is_paused: if stream['handle'] is not None: close_stream() print("STREAM_CLOSED", file=sys.stderr, flush=True) time.sleep(0.1) return b'\x00' * (CHUNK_SAMPLES * 2) # dummy silence; won't be predicted if stream['handle'] is None: open_stream() oww_model.reset() print("STREAM_REOPENED", file=sys.stderr, flush=True) try: return stream['handle'].read(CHUNK_SAMPLES, exception_on_overflow=False) except Exception as e: print(f"Audio read error: {e}", file=sys.stderr) close_stream() time.sleep(0.5) return b'\x00' * (CHUNK_SAMPLES * 2) open_stream() try: run_predict_loop(oww_model, read_chunk, state, args.threshold) finally: close_stream() pa.terminate() print("Wake word detection stopped", file=sys.stderr) # ───────────────────────────────────────────────────────────────── # Stdin input (ESP32 backend) # ───────────────────────────────────────────────────────────────── def run_stdin_mode(args, oww_model, state: State): """ Audio bytes arrive on stdin (fd 0), 16-bit signed LE mono at `args.sample_rate`. We block until a full CHUNK_SAMPLES chunk is available and hand it to the model. """ print("Listening on stdin for raw S16LE mono PCM", file=sys.stderr) chunk_bytes = CHUNK_SAMPLES * 2 stdin = sys.stdin.buffer buf = bytearray() def read_chunk(): # Keep reading until we have a full chunk or hit EOF. while len(buf) < chunk_bytes and state.running: try: data = stdin.read(chunk_bytes - len(buf)) except Exception as e: print(f"stdin read error: {e}", file=sys.stderr) return None if not data: return None buf.extend(data) if len(buf) < chunk_bytes: return None chunk = bytes(buf[:chunk_bytes]) del buf[:chunk_bytes] return chunk try: run_predict_loop(oww_model, read_chunk, state, args.threshold) finally: print("Wake word detection stopped", file=sys.stderr) # ───────────────────────────────────────────────────────────────── # Entrypoint # ───────────────────────────────────────────────────────────────── def main(): parser = argparse.ArgumentParser(description='Ti-Pote Wake Word Detection') parser.add_argument('--model', type=str, default='hey_jarvis') parser.add_argument('--threshold', type=float, default=0.5) parser.add_argument('--input', type=str, choices=['alsa', 'stdin'], default='alsa', help="Audio source. 'alsa' opens PyAudio, 'stdin' reads from fd 0.") parser.add_argument('--device', type=str, default='default', help='ALSA audio capture device (only used with --input alsa).') parser.add_argument('--control-fd', type=int, default=0, help='File descriptor to read control commands from. ' 'Default 0 (stdin) for ALSA, pass 3 for stdin mode.') parser.add_argument('--sample-rate', type=int, default=16000) args = parser.parse_args() state = State() def handle_signal(_sig, _frame): state.running = False signal.signal(signal.SIGTERM, handle_signal) signal.signal(signal.SIGINT, handle_signal) oww_model = load_model(args.model) print(f"Wake word model loaded: {args.model}", file=sys.stderr) print(f"Threshold: {args.threshold}", file=sys.stderr) start_control_reader(state, args.control_fd) if args.input == 'stdin': run_stdin_mode(args, oww_model, state) else: print(f"Listening on device: {args.device}", file=sys.stderr) run_alsa_mode(args, oww_model, state) if __name__ == '__main__': main()