diff --git a/apps/backend/package.json b/apps/backend/package.json index 0043b0b..dc1ce3e 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -22,6 +22,7 @@ "migration:revert": "pnpm typeorm migration:revert -d src/config/typeorm.config.ts" }, "dependencies": { + "@deepgram/sdk": "^5.0.0", "@nestjs/common": "^11.1.17", "@nestjs/config": "^4.0.3", "@nestjs/core": "^11.1.17", diff --git a/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts b/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts index 3438e19..888e4be 100644 --- a/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts +++ b/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts @@ -7,11 +7,12 @@ import { MessageBody, ConnectedSocket, } from '@nestjs/websockets'; -import { Logger } from '@nestjs/common'; +import { Inject, Logger } from '@nestjs/common'; import { JwtService } from '@nestjs/jwt'; import { Server, Socket } from 'socket.io'; import { DeviceService } from '../../../core/services/device.service'; import { JwtPayload } from '../rest/auth/strategies/jwt.strategy'; +import { IConversationPort, CONVERSATION_PORT } from '../../../core/ports/inbound/conversation.port'; interface AuthenticatedSocket extends Socket { data: { @@ -20,13 +21,11 @@ interface AuthenticatedSocket extends Socket { }; } -// Types des messages Robot → Core interface AudioChunkMessage { data: Buffer; sampleRate: number; } -// Types des messages Core → Robot type RobotState = 'listening' | 'thinking' | 'speaking' | 'idle'; @WebSocketGateway({ @@ -43,7 +42,8 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect { constructor( private readonly jwtService: JwtService, private readonly deviceService: DeviceService, - ) {} + @Inject(CONVERSATION_PORT) private readonly conversationPort: IConversationPort, + ) { } async handleConnection(client: AuthenticatedSocket) { try { @@ -71,7 +71,6 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect { return; } - // Attach device info to socket client.data.deviceId = payload.sub; client.data.homeId = payload.homeId; @@ -90,15 +89,16 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect { const deviceId = client.data?.deviceId; if (deviceId) { this.connectedDevices.delete(deviceId); + this.conversationPort.interrupt(deviceId); this.logger.log(`Device disconnected: ${deviceId}`); } } @SubscribeMessage('wake_word_detected') - handleWakeWord(@ConnectedSocket() client: AuthenticatedSocket) { + async handleWakeWord(@ConnectedSocket() client: AuthenticatedSocket) { this.logger.log(`Wake word detected on device ${client.data.deviceId}`); client.emit('status', { state: 'listening' as RobotState }); - // TODO: start STT stream + await this.conversationPort.startListening(client.data.deviceId); } @SubscribeMessage('audio_chunk') @@ -106,25 +106,29 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect { @ConnectedSocket() client: AuthenticatedSocket, @MessageBody() message: AudioChunkMessage, ) { - this.logger.debug(`Audio chunk from ${client.data.deviceId}: ${message.data?.length} bytes`); - // TODO: forward to STT service + this.conversationPort.processAudioChunk(client.data.deviceId, message.data, message.sampleRate); } @SubscribeMessage('speech_end') - handleSpeechEnd(@ConnectedSocket() client: AuthenticatedSocket) { + async handleSpeechEnd(@ConnectedSocket() client: AuthenticatedSocket) { this.logger.log(`Speech ended on device ${client.data.deviceId}`); client.emit('status', { state: 'thinking' as RobotState }); - // TODO: finalize STT, send to LLM + + const transcription = await this.conversationPort.stopListening(client.data.deviceId); + this.logger.log(`Transcription: "${transcription}"`); + + // TODO: plus tard, le ConversationService enverra la réponse LLM+TTS + client.emit('status', { state: 'idle' as RobotState }); } @SubscribeMessage('user_interrupt') handleUserInterrupt(@ConnectedSocket() client: AuthenticatedSocket) { this.logger.log(`User interrupt on device ${client.data.deviceId}`); - // TODO: stop TTS playback, switch back to listening + this.conversationPort.interrupt(client.data.deviceId); client.emit('status', { state: 'listening' as RobotState }); } - // --- Helpers pour envoyer des messages au robot --- + // --- Helpers --- sendAudioChunk(deviceId: string, chunk: Buffer) { this.connectedDevices.get(deviceId)?.emit('audio_chunk', { data: chunk }); diff --git a/apps/backend/src/adapters/outbound/stt/deepgram.adapter.ts b/apps/backend/src/adapters/outbound/stt/deepgram.adapter.ts new file mode 100644 index 0000000..818fb03 --- /dev/null +++ b/apps/backend/src/adapters/outbound/stt/deepgram.adapter.ts @@ -0,0 +1,92 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { ConfigService } from '@nestjs/config'; +import { DeepgramClient } from '@deepgram/sdk'; +import { ISTTPort, TranscriptionResult } from '../../../core/ports/outbound/stt.port'; + +@Injectable() +export class DeepgramAdapter implements ISTTPort { + private readonly logger = new Logger(DeepgramAdapter.name); + private readonly deepgram: DeepgramClient; + private readonly apiKey: string; + private socket: any = null; + + constructor(private readonly configService: ConfigService) { + const apiKey = this.configService.get('DEEPGRAM_API_KEY'); + if (!apiKey) { + throw new Error('DEEPGRAM_API_KEY is not set'); + } + this.apiKey = apiKey; + this.deepgram = new DeepgramClient({ apiKey }); + } + + async startStream(onResult: (result: TranscriptionResult) => void): Promise { + const socket = await this.deepgram.listen.v1.connect({ + model: 'nova-3', + language: 'fr', + smart_format: 'true', + encoding: 'linear16', + sample_rate: 16000, + channels: 1, + interim_results: 'true', + utterance_end_ms: 1000, + vad_events: 'true', + Authorization: `Token ${this.apiKey}`, + }); + + socket.on('message', (data: any) => { + if (data.type === 'Results' && data.channel?.alternatives?.[0]) { + const alternative = data.channel.alternatives[0]; + if (alternative.transcript) { + onResult({ + text: alternative.transcript, + confidence: alternative.confidence ?? 0, + isFinal: data.is_final ?? false, + }); + } + } + }); + + socket.on('error', (error: Error) => { + this.logger.error('Deepgram error:', error); + }); + + socket.on('close', () => { + this.logger.log('Deepgram stream closed'); + }); + + socket.connect(); + await socket.waitForOpen(); + this.socket = socket; + this.logger.log('Deepgram stream opened'); + } + + sendAudio(chunk: Buffer): void { + if (!this.socket) { + this.logger.warn('No active Deepgram stream, ignoring audio chunk'); + return; + } + this.socket.sendMedia(chunk); + } + + async endStream(): Promise { + if (!this.socket) return; + this.socket.close(); + this.socket = null; + } + + async transcribe(audioChunk: Buffer, sampleRate: number): Promise { + const response = await this.deepgram.listen.v1.media.transcribeFile(audioChunk, { + model: 'nova-3', + language: 'fr', + smart_format: true, + }); + + const transcript = (response as any).results?.channels?.[0]?.alternatives?.[0]; + + return { + text: transcript?.transcript ?? '', + confidence: transcript?.confidence ?? 0, + isFinal: true, + }; + } +} diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 45f340e..1ec1a8b 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -1,5 +1,7 @@ + import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; +import { join } from 'path'; import { TypeOrmModule } from '@nestjs/typeorm'; import { JwtModule } from '@nestjs/jwt'; import { PassportModule } from '@nestjs/passport'; @@ -14,15 +16,20 @@ import { AuthService } from './core/services/auth.service'; import { UserService } from './core/services/user.service'; import { HomeService } from './core/services/home.service'; import { DeviceService } from './core/services/device.service'; +import { ConversationService } from './core/services/conversation.service'; import { JwtStrategy } from './adapters/inbound/rest/auth/strategies/jwt.strategy'; import { AuthController } from './adapters/inbound/rest/auth/auth.controller'; import { DeviceController } from './adapters/inbound/rest/device/device.controller'; import { RobotGateway } from './adapters/inbound/websocket/robot.gateway'; +import { DeepgramAdapter } from './adapters/outbound/stt/deepgram.adapter'; +import { CONVERSATION_PORT } from './core/ports/inbound/conversation.port'; +import { STT_PORT } from './core/ports/outbound/stt.port'; @Module({ imports: [ ConfigModule.forRoot({ isGlobal: true, + envFilePath: join(__dirname, '..', '..', '..', '.env'), load: [appConfig, redisConfig, authConfig], }), TypeOrmModule.forRootAsync({ @@ -41,6 +48,21 @@ import { RobotGateway } from './adapters/inbound/websocket/robot.gateway'; }), ], controllers: [AuthController, DeviceController], - providers: [AuthService, UserService, HomeService, DeviceService, JwtStrategy, RobotGateway], + providers: [ + AuthService, + UserService, + HomeService, + DeviceService, + JwtStrategy, + RobotGateway, + { + provide: CONVERSATION_PORT, + useClass: ConversationService, + }, + { + provide: STT_PORT, + useClass: DeepgramAdapter, + }, + ], }) -export class AppModule {} +export class AppModule { } diff --git a/apps/backend/src/core/ports/inbound/conversation.port.ts b/apps/backend/src/core/ports/inbound/conversation.port.ts new file mode 100644 index 0000000..0c08e8e --- /dev/null +++ b/apps/backend/src/core/ports/inbound/conversation.port.ts @@ -0,0 +1,8 @@ +export interface IConversationPort { + startListening(deviceId: string): Promise; + processAudioChunk(deviceId: string, chunk: Buffer, sampleRate: number): void; + stopListening(deviceId: string): Promise; + interrupt(deviceId: string): void; +} + +export const CONVERSATION_PORT = Symbol('CONVERSATION_PORT'); \ No newline at end of file diff --git a/apps/backend/src/core/ports/outbound/conversation.port.ts b/apps/backend/src/core/ports/outbound/conversation.port.ts new file mode 100644 index 0000000..6cf9dd1 --- /dev/null +++ b/apps/backend/src/core/ports/outbound/conversation.port.ts @@ -0,0 +1,8 @@ +export interface IConversationPort { + startListening(deviceId: string): void + processAudioChunk(deviceId: string, chunk: Buffer, sampleRate: number): void + stopListening(deviceId: string): Promise + interrupt(deviceId: string): void +} + +export const CONVERSATION_PORT = Symbol('CONVERSATION_PORT'); \ No newline at end of file diff --git a/apps/backend/src/core/ports/outbound/stt.port.ts b/apps/backend/src/core/ports/outbound/stt.port.ts index 21d35b2..1658a7d 100644 --- a/apps/backend/src/core/ports/outbound/stt.port.ts +++ b/apps/backend/src/core/ports/outbound/stt.port.ts @@ -6,8 +6,9 @@ export interface TranscriptionResult { export interface ISTTPort { transcribe(audioChunk: Buffer, sampleRate: number): Promise; - startStream(onResult: (result: TranscriptionResult) => void): void; + startStream(onResult: (result: TranscriptionResult) => void): Promise; endStream(): Promise; + sendAudio(chunk: Buffer): void; } export const STT_PORT = Symbol('STT_PORT'); diff --git a/apps/backend/src/core/services/conversation.service.ts b/apps/backend/src/core/services/conversation.service.ts new file mode 100644 index 0000000..f935927 --- /dev/null +++ b/apps/backend/src/core/services/conversation.service.ts @@ -0,0 +1,76 @@ +import { Inject, Injectable, Logger } from '@nestjs/common'; +import { IConversationPort } from '../ports/inbound/conversation.port'; +import { ISTTPort, STT_PORT, TranscriptionResult } from '../ports/outbound/stt.port'; + +interface ActiveSession { + deviceId: string; + transcription: string; +} + +@Injectable() +export class ConversationService implements IConversationPort { + private readonly logger = new Logger(ConversationService.name); + private readonly activeSessions = new Map(); + + constructor( + @Inject(STT_PORT) private readonly sttPort: ISTTPort, + ) { } + + async startListening(deviceId: string): Promise { + this.logger.log(`Start listening for device ${deviceId}`); + + const session: ActiveSession = { + deviceId, + transcription: '', + }; + + this.activeSessions.set(deviceId, session); + + await this.sttPort.startStream((result: TranscriptionResult) => { + this.logger.debug(`STT [${deviceId}]: "${result.text}" (final: ${result.isFinal}, confidence: ${result.confidence})`); + + if (result.isFinal) { + session.transcription += result.text + ' '; + } + }); + } + + processAudioChunk(deviceId: string, chunk: Buffer, sampleRate: number): void { + const session = this.activeSessions.get(deviceId); + if (!session) { + this.logger.warn(`No active session for device ${deviceId}, ignoring audio chunk`); + return; + } + + this.sttPort.sendAudio(chunk); + } + + async stopListening(deviceId: string): Promise { + const session = this.activeSessions.get(deviceId); + if (!session) { + this.logger.warn(`No active session for device ${deviceId}`); + return null; + } + + await this.sttPort.endStream(); + + const finalText = session.transcription.trim() || null; + this.activeSessions.delete(deviceId); + + this.logger.log(`Final transcription for ${deviceId}: "${finalText}"`); + + // TODO: plus tard, envoyer finalText au LLM via ILLMPort + + return finalText; + } + + interrupt(deviceId: string): void { + const session = this.activeSessions.get(deviceId); + if (!session) return; + + this.logger.log(`Interrupting session for device ${deviceId}`); + this.activeSessions.delete(deviceId); + + // TODO: plus tard, couper aussi le TTS en cours + } +} diff --git a/apps/simulator/src/App.tsx b/apps/simulator/src/App.tsx index 7749c67..5199cf3 100644 --- a/apps/simulator/src/App.tsx +++ b/apps/simulator/src/App.tsx @@ -52,6 +52,7 @@ function App() { const handleWakeWord = () => { emit('wake_word_detected'); + if (!recording) startMic(); }; const handleInterrupt = () => { diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 54e2e41..da8350f 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -10,6 +10,9 @@ importers: apps/backend: dependencies: + '@deepgram/sdk': + specifier: ^5.0.0 + version: 5.0.0 '@nestjs/common': specifier: ^11.1.17 version: 11.1.17(class-transformer@0.5.1)(class-validator@0.15.1)(reflect-metadata@0.2.2)(rxjs@7.8.2) @@ -369,6 +372,10 @@ packages: resolution: {integrity: sha512-IchNf6dN4tHoMFIn/7OE8LWZ19Y6q/67Bmf6vnGREv8RSbBVb9LPJxEcnwrcwX6ixSvaiGoomAUvu4YSxXrVgw==} engines: {node: '>=12'} + '@deepgram/sdk@5.0.0': + resolution: {integrity: sha512-x1wMiOgDGqcLEaQpQBQLTtk5mLbXbYgcBEpp7cfJIyEtqdIGgijCZH+a/esiVp+xIcTYYroTxG47RVppZOHbWw==} + engines: {node: '>=18.0.0'} + '@emnapi/core@1.9.1': resolution: {integrity: sha512-mukuNALVsoix/w1BJwFzwXBN/dHeejQtuVzcDsfOEsdpCumXb/E9j8w11h5S54tT1xhifGfbbSm/ICrObRb3KA==} @@ -4060,6 +4067,13 @@ snapshots: dependencies: '@jridgewell/trace-mapping': 0.3.9 + '@deepgram/sdk@5.0.0': + dependencies: + ws: 8.18.3 + transitivePeerDependencies: + - bufferutil + - utf-8-validate + '@emnapi/core@1.9.1': dependencies: '@emnapi/wasi-threads': 1.2.0