diff --git a/apps/backend/package.json b/apps/backend/package.json index d238301..eab20af 100644 --- a/apps/backend/package.json +++ b/apps/backend/package.json @@ -27,6 +27,7 @@ "@mastra/core": "^1.17.0", "@nestjs/common": "^11.1.17", "@nestjs/config": "^4.0.3", + "@nestjs/event-emitter": "^3.0.0", "@nestjs/core": "^11.1.17", "@nestjs/jwt": "^11.0.2", "@nestjs/passport": "^11.0.5", diff --git a/apps/backend/src/adapters/inbound/rest/health/health.controller.ts b/apps/backend/src/adapters/inbound/rest/health/health.controller.ts new file mode 100644 index 0000000..4394f65 --- /dev/null +++ b/apps/backend/src/adapters/inbound/rest/health/health.controller.ts @@ -0,0 +1,33 @@ +import { Controller, Get, Param, Query, Inject, UseGuards } from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; +import { + IHealthTelemetryPort, + HEALTH_TELEMETRY_PORT, +} from '../../../../core/ports/inbound/health-telemetry.port'; + +@Controller('devices/:deviceId/health') +@UseGuards(AuthGuard('jwt')) +export class HealthController { + constructor( + @Inject(HEALTH_TELEMETRY_PORT) + private readonly healthTelemetry: IHealthTelemetryPort, + ) {} + + @Get('reports') + async getReports( + @Param('deviceId') deviceId: string, + @Query('limit') limit?: string, + ) { + const reports = await this.healthTelemetry.getLatestReports( + deviceId, + limit ? parseInt(limit, 10) : 20, + ); + return { deviceId, reports }; + } + + @Get('alerts') + async getAlerts(@Param('deviceId') deviceId: string) { + const alerts = await this.healthTelemetry.checkAlerts(deviceId); + return { deviceId, alerts, healthy: alerts.length === 0 }; + } +} diff --git a/apps/backend/src/adapters/inbound/rest/logs/logs.controller.ts b/apps/backend/src/adapters/inbound/rest/logs/logs.controller.ts new file mode 100644 index 0000000..2c4bb54 --- /dev/null +++ b/apps/backend/src/adapters/inbound/rest/logs/logs.controller.ts @@ -0,0 +1,104 @@ +import { + Controller, + Get, + Param, + Query, + Inject, + UseGuards, + Sse, +} from '@nestjs/common'; +import { AuthGuard } from '@nestjs/passport'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { Observable, fromEvent, map, filter } from 'rxjs'; +import { + ILogIngestionPort, + LOG_INGESTION_PORT, +} from '../../../../core/ports/inbound/log-ingestion.port'; + +/** Pino level number → human label */ +const LEVEL_LABELS: Record = { + 10: 'trace', + 20: 'debug', + 30: 'info', + 40: 'warn', + 50: 'error', + 60: 'fatal', +}; + +@Controller() +@UseGuards(AuthGuard('jwt')) +export class LogsController { + constructor( + @Inject(LOG_INGESTION_PORT) + private readonly logIngestion: ILogIngestionPort, + private readonly events: EventEmitter2, + ) {} + + /** + * GET /devices/:deviceId/logs + * Query historical logs with filters. + */ + @Get('devices/:deviceId/logs') + async getLogs( + @Param('deviceId') deviceId: string, + @Query('level') level?: string, + @Query('logger') loggerName?: string, + @Query('since') since?: string, + @Query('until') until?: string, + @Query('search') search?: string, + @Query('limit') limit?: string, + @Query('offset') offset?: string, + ) { + const result = await this.logIngestion.queryLogs({ + deviceId, + level: level ? parseInt(level, 10) : undefined, + loggerName, + since, + until, + search, + limit: limit ? parseInt(limit, 10) : 50, + offset: offset ? parseInt(offset, 10) : 0, + }); + + return { + deviceId, + logs: result.logs.map((l) => ({ + ...l, + levelLabel: LEVEL_LABELS[l.level] ?? 'unknown', + })), + total: result.total, + }; + } + + /** + * GET /admin/logs/stream?deviceId=xxx + * Server-Sent Events (SSE) — real-time log stream. + */ + @Sse('admin/logs/stream') + streamLogs(@Query('deviceId') deviceId?: string): Observable { + return fromEvent(this.events, 'device.log').pipe( + filter((event: unknown) => { + if (!deviceId) return true; + return (event as { deviceId: string }).deviceId === deviceId; + }), + map((event: unknown) => { + const log = event as { + deviceId: string; + id: string; + level: number; + msg: string; + loggerName: string | null; + context: Record | null; + loggedAt: string; + }; + + return { + data: { + ...log, + levelLabel: LEVEL_LABELS[log.level] ?? 'unknown', + }, + } as MessageEvent; + }), + ); + } +} diff --git a/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts b/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts index 2899537..d3925bd 100644 --- a/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts +++ b/apps/backend/src/adapters/inbound/websocket/robot.gateway.ts @@ -14,6 +14,8 @@ import { DeviceService } from '../../../core/services/device.service'; import { DeviceStatus } from '../../../core/domain/entities/device.entity'; import { JwtPayload } from '../rest/auth/strategies/jwt.strategy'; import { IConversationPort, CONVERSATION_PORT } from '../../../core/ports/inbound/conversation.port'; +import { IHealthTelemetryPort, HEALTH_TELEMETRY_PORT, HealthReportPayload } from '../../../core/ports/inbound/health-telemetry.port'; +import { ILogIngestionPort, LOG_INGESTION_PORT, LogBatchPayload } from '../../../core/ports/inbound/log-ingestion.port'; import { IDeviceGatewayPort } from '../../../core/ports/outbound/device-gateway.port'; interface AuthenticatedSocket extends Socket { @@ -45,6 +47,8 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect, I private readonly jwtService: JwtService, private readonly deviceService: DeviceService, @Inject(forwardRef(() => CONVERSATION_PORT)) private readonly conversationPort: IConversationPort, + @Inject(HEALTH_TELEMETRY_PORT) private readonly healthTelemetryPort: IHealthTelemetryPort, + @Inject(LOG_INGESTION_PORT) private readonly logIngestionPort: ILogIngestionPort, ) { } async handleConnection(client: AuthenticatedSocket) { @@ -133,6 +137,22 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect, I client.emit('status', { state: 'listening' as RobotState }); } + @SubscribeMessage('health_report') + async handleHealthReport( + @ConnectedSocket() client: AuthenticatedSocket, + @MessageBody() payload: HealthReportPayload, + ) { + await this.healthTelemetryPort.ingestReport(client.data.deviceId, payload); + } + + @SubscribeMessage('log_batch') + async handleLogBatch( + @ConnectedSocket() client: AuthenticatedSocket, + @MessageBody() payload: LogBatchPayload, + ) { + await this.logIngestionPort.ingestBatch(client.data.deviceId, payload); + } + // --- Helpers --- sendAudioChunk(deviceId: string, chunk: Buffer) { diff --git a/apps/backend/src/app.module.ts b/apps/backend/src/app.module.ts index 9f9af55..bc4bb02 100644 --- a/apps/backend/src/app.module.ts +++ b/apps/backend/src/app.module.ts @@ -1,5 +1,6 @@ import { Module } from '@nestjs/common'; import { ConfigModule, ConfigService } from '@nestjs/config'; +import { EventEmitterModule } from '@nestjs/event-emitter'; import { join } from 'path'; import { TypeOrmModule } from '@nestjs/typeorm'; import { JwtModule } from '@nestjs/jwt'; @@ -11,6 +12,8 @@ import { authConfig } from './config/auth.config'; import { Home } from './core/domain/entities/home.entity'; import { User } from './core/domain/entities/user.entity'; import { Device } from './core/domain/entities/device.entity'; +import { HealthReport } from './core/domain/entities/health-report.entity'; +import { DeviceLog } from './core/domain/entities/device-log.entity'; import { AuthService } from './core/services/auth.service'; import { UserService } from './core/services/user.service'; import { HomeService } from './core/services/home.service'; @@ -20,6 +23,8 @@ import { JwtStrategy } from './adapters/inbound/rest/auth/strategies/jwt.strateg import { AuthController } from './adapters/inbound/rest/auth/auth.controller'; import { DeviceController } from './adapters/inbound/rest/device/device.controller'; import { PairingController } from './adapters/inbound/rest/pairing/pairing.controller'; +import { HealthController } from './adapters/inbound/rest/health/health.controller'; +import { LogsController } from './adapters/inbound/rest/logs/logs.controller'; import { PairingService } from './core/services/pairing.service'; import { RobotGateway } from './adapters/inbound/websocket/robot.gateway'; import { DeepgramAdapter } from './adapters/outbound/stt/deepgram.adapter'; @@ -28,6 +33,10 @@ import { OpenAIAdapter } from './adapters/outbound/llm/openai.adapter'; import { ElevenLabsAdapter } from './adapters/outbound/tts/elevenlabs.adapter'; import { RedisAdapter } from './adapters/outbound/cache/redis.adapter'; import { CONVERSATION_PORT } from './core/ports/inbound/conversation.port'; +import { HEALTH_TELEMETRY_PORT } from './core/ports/inbound/health-telemetry.port'; +import { HealthTelemetryService } from './core/services/health-telemetry.service'; +import { LOG_INGESTION_PORT } from './core/ports/inbound/log-ingestion.port'; +import { LogIngestionService } from './core/services/log-ingestion.service'; import { STT_PORT } from './core/ports/outbound/stt.port'; import { LLM_PORT } from './core/ports/outbound/llm.port'; import { TTS_PORT } from './core/ports/outbound/tts.port'; @@ -46,7 +55,8 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port'; inject: [ConfigService], useFactory: (configService: ConfigService) => typeormConfig(configService), }), - TypeOrmModule.forFeature([Home, User, Device]), + EventEmitterModule.forRoot(), + TypeOrmModule.forFeature([Home, User, Device, HealthReport, DeviceLog]), PassportModule, JwtModule.registerAsync({ imports: [ConfigModule], @@ -56,7 +66,7 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port'; }), }), ], - controllers: [AuthController, DeviceController, PairingController], + controllers: [AuthController, DeviceController, PairingController, HealthController, LogsController], providers: [ AuthService, UserService, @@ -65,6 +75,16 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port'; PairingService, JwtStrategy, RobotGateway, + HealthTelemetryService, + LogIngestionService, + { + provide: HEALTH_TELEMETRY_PORT, + useExisting: HealthTelemetryService, + }, + { + provide: LOG_INGESTION_PORT, + useExisting: LogIngestionService, + }, { provide: CONVERSATION_PORT, useClass: ConversationService, diff --git a/apps/backend/src/core/domain/entities/device-log.entity.ts b/apps/backend/src/core/domain/entities/device-log.entity.ts new file mode 100644 index 0000000..bb55bac --- /dev/null +++ b/apps/backend/src/core/domain/entities/device-log.entity.ts @@ -0,0 +1,47 @@ +import { + Entity, + PrimaryGeneratedColumn, + Column, + CreateDateColumn, + ManyToOne, + JoinColumn, + Index, +} from 'typeorm'; +import { Device } from './device.entity'; + +@Entity('device_logs') +@Index(['deviceId', 'loggedAt']) +@Index(['deviceId', 'level']) +export class DeviceLog { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Column({ type: 'uuid', name: 'device_id' }) + deviceId!: string; + + @ManyToOne(() => Device, { onDelete: 'CASCADE' }) + @JoinColumn({ name: 'device_id' }) + device!: Device; + + /** Pino log level (10=trace, 20=debug, 30=info, 40=warn, 50=error, 60=fatal) */ + @Column({ type: 'smallint' }) + level!: number; + + @Column({ type: 'text' }) + msg!: string; + + /** Logger name (e.g. "orchestrator", "cloud-socket") */ + @Column({ type: 'varchar', length: 64, nullable: true, name: 'logger_name' }) + loggerName!: string | null; + + /** Extra context fields (serialized) */ + @Column({ type: 'jsonb', nullable: true }) + context!: Record | null; + + /** Original timestamp from the robot (client-side time) */ + @Column({ type: 'timestamptz', name: 'logged_at' }) + loggedAt!: Date; + + @CreateDateColumn({ type: 'timestamptz', name: 'created_at' }) + createdAt!: Date; +} diff --git a/apps/backend/src/core/domain/entities/health-report.entity.ts b/apps/backend/src/core/domain/entities/health-report.entity.ts new file mode 100644 index 0000000..0cf43cc --- /dev/null +++ b/apps/backend/src/core/domain/entities/health-report.entity.ts @@ -0,0 +1,76 @@ +import { + Entity, + PrimaryGeneratedColumn, + Column, + CreateDateColumn, + ManyToOne, + JoinColumn, + Index, +} from 'typeorm'; +import { Device } from './device.entity'; + +@Entity('health_reports') +@Index(['deviceId', 'createdAt']) +export class HealthReport { + @PrimaryGeneratedColumn('uuid') + id!: string; + + @Column({ type: 'uuid', name: 'device_id' }) + deviceId!: string; + + @ManyToOne(() => Device, { onDelete: 'CASCADE' }) + @JoinColumn({ name: 'device_id' }) + device!: Device; + + // ── System metrics ── + + @Column({ type: 'real', name: 'cpu_temp_celsius' }) + cpuTempCelsius!: number; + + @Column({ type: 'real', name: 'memory_used_mb' }) + memoryUsedMb!: number; + + @Column({ type: 'real', name: 'memory_total_mb' }) + memoryTotalMb!: number; + + @Column({ type: 'real', name: 'disk_used_percent' }) + diskUsedPercent!: number; + + @Column({ type: 'real', name: 'load_avg_1m' }) + loadAvg1m!: number; + + // ── Process metrics ── + + @Column({ type: 'real', name: 'heap_used_mb' }) + heapUsedMb!: number; + + @Column({ type: 'real', name: 'heap_total_mb' }) + heapTotalMb!: number; + + @Column({ type: 'integer', name: 'uptime_seconds' }) + uptimeSeconds!: number; + + // ── Connectivity ── + + @Column({ type: 'varchar', length: 64, nullable: true, name: 'wifi_ssid' }) + wifiSsid!: string | null; + + @Column({ type: 'smallint', nullable: true, name: 'wifi_signal_dbm' }) + wifiSignalDbm!: number | null; + + // ── Firmware / version ── + + @Column({ type: 'varchar', length: 20, name: 'client_version' }) + clientVersion!: string; + + @Column({ type: 'varchar', length: 30, name: 'node_version' }) + nodeVersion!: string; + + // ── Timestamps ── + + @Column({ type: 'timestamptz', name: 'reported_at' }) + reportedAt!: Date; + + @CreateDateColumn({ type: 'timestamptz', name: 'created_at' }) + createdAt!: Date; +} diff --git a/apps/backend/src/core/ports/inbound/health-telemetry.port.ts b/apps/backend/src/core/ports/inbound/health-telemetry.port.ts new file mode 100644 index 0000000..fed8468 --- /dev/null +++ b/apps/backend/src/core/ports/inbound/health-telemetry.port.ts @@ -0,0 +1,35 @@ +export interface HealthReportPayload { + cpuTempCelsius: number; + memoryUsedMb: number; + memoryTotalMb: number; + diskUsedPercent: number; + loadAvg1m: number; + heapUsedMb: number; + heapTotalMb: number; + uptimeSeconds: number; + wifiSsid: string | null; + wifiSignalDbm: number | null; + clientVersion: string; + nodeVersion: string; + reportedAt: string; // ISO 8601 +} + +export interface IHealthTelemetryPort { + /** + * Ingest a health report from a connected device. + */ + ingestReport(deviceId: string, payload: HealthReportPayload): Promise; + + /** + * Retrieve the latest N reports for a device. + */ + getLatestReports(deviceId: string, limit?: number): Promise; + + /** + * Check if a device has critical health issues. + * Returns warning messages, or empty array if healthy. + */ + checkAlerts(deviceId: string): Promise; +} + +export const HEALTH_TELEMETRY_PORT = Symbol('HEALTH_TELEMETRY_PORT'); diff --git a/apps/backend/src/core/ports/inbound/log-ingestion.port.ts b/apps/backend/src/core/ports/inbound/log-ingestion.port.ts new file mode 100644 index 0000000..4b88d46 --- /dev/null +++ b/apps/backend/src/core/ports/inbound/log-ingestion.port.ts @@ -0,0 +1,41 @@ +export interface LogEntryPayload { + level: number; + time: number; // epoch ms + msg: string; + name?: string; + [key: string]: unknown; +} + +export interface LogBatchPayload { + logs: LogEntryPayload[]; +} + +export interface LogQueryOptions { + deviceId: string; + level?: number; // minimum level + loggerName?: string; + since?: string; // ISO 8601 + until?: string; // ISO 8601 + search?: string; // substring search in msg + limit?: number; + offset?: number; +} + +export interface LogQueryResult { + logs: LogEntryPayload[]; + total: number; +} + +export interface ILogIngestionPort { + /** + * Ingest a batch of log entries from a device. + */ + ingestBatch(deviceId: string, payload: LogBatchPayload): Promise; + + /** + * Query logs for a device with filtering. + */ + queryLogs(options: LogQueryOptions): Promise; +} + +export const LOG_INGESTION_PORT = Symbol('LOG_INGESTION_PORT'); diff --git a/apps/backend/src/core/services/health-telemetry.service.ts b/apps/backend/src/core/services/health-telemetry.service.ts new file mode 100644 index 0000000..89d86dd --- /dev/null +++ b/apps/backend/src/core/services/health-telemetry.service.ts @@ -0,0 +1,137 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository } from 'typeorm'; +import { HealthReport } from '../domain/entities/health-report.entity'; +import { + IHealthTelemetryPort, + HealthReportPayload, +} from '../ports/inbound/health-telemetry.port'; + +// ── Alert thresholds ── +const THRESHOLDS = { + cpuTempCelsius: 80, // Raspberry Pi throttles at 85°C + memoryUsedPercent: 90, + diskUsedPercent: 90, + loadAvg1m: 3.0, // Pi Zero 2W has 4 cores + heapUsedPercent: 85, +} as const; + +@Injectable() +export class HealthTelemetryService implements IHealthTelemetryPort { + private readonly logger = new Logger(HealthTelemetryService.name); + + constructor( + @InjectRepository(HealthReport) + private readonly repo: Repository, + ) {} + + async ingestReport(deviceId: string, payload: HealthReportPayload): Promise { + const report = this.repo.create({ + deviceId, + cpuTempCelsius: payload.cpuTempCelsius, + memoryUsedMb: payload.memoryUsedMb, + memoryTotalMb: payload.memoryTotalMb, + diskUsedPercent: payload.diskUsedPercent, + loadAvg1m: payload.loadAvg1m, + heapUsedMb: payload.heapUsedMb, + heapTotalMb: payload.heapTotalMb, + uptimeSeconds: payload.uptimeSeconds, + wifiSsid: payload.wifiSsid, + wifiSignalDbm: payload.wifiSignalDbm, + clientVersion: payload.clientVersion, + nodeVersion: payload.nodeVersion, + reportedAt: new Date(payload.reportedAt), + }); + + await this.repo.save(report); + + // Log alerts inline for immediate visibility + const alerts = this.computeAlerts(payload); + if (alerts.length > 0) { + this.logger.warn({ deviceId, alerts }, 'Device health alerts'); + } else { + this.logger.debug({ deviceId }, 'Health report ingested'); + } + } + + async getLatestReports(deviceId: string, limit = 20): Promise { + const reports = await this.repo.find({ + where: { deviceId }, + order: { createdAt: 'DESC' }, + take: limit, + }); + + return reports.map((r) => ({ + cpuTempCelsius: r.cpuTempCelsius, + memoryUsedMb: r.memoryUsedMb, + memoryTotalMb: r.memoryTotalMb, + diskUsedPercent: r.diskUsedPercent, + loadAvg1m: r.loadAvg1m, + heapUsedMb: r.heapUsedMb, + heapTotalMb: r.heapTotalMb, + uptimeSeconds: r.uptimeSeconds, + wifiSsid: r.wifiSsid, + wifiSignalDbm: r.wifiSignalDbm, + clientVersion: r.clientVersion, + nodeVersion: r.nodeVersion, + reportedAt: r.reportedAt.toISOString(), + })); + } + + async checkAlerts(deviceId: string): Promise { + const latest = await this.repo.findOne({ + where: { deviceId }, + order: { createdAt: 'DESC' }, + }); + + if (!latest) return []; + + return this.computeAlerts({ + cpuTempCelsius: latest.cpuTempCelsius, + memoryUsedMb: latest.memoryUsedMb, + memoryTotalMb: latest.memoryTotalMb, + diskUsedPercent: latest.diskUsedPercent, + loadAvg1m: latest.loadAvg1m, + heapUsedMb: latest.heapUsedMb, + heapTotalMb: latest.heapTotalMb, + uptimeSeconds: latest.uptimeSeconds, + wifiSsid: latest.wifiSsid, + wifiSignalDbm: latest.wifiSignalDbm, + clientVersion: latest.clientVersion, + nodeVersion: latest.nodeVersion, + reportedAt: latest.reportedAt.toISOString(), + }); + } + + private computeAlerts(payload: HealthReportPayload): string[] { + const alerts: string[] = []; + + if (payload.cpuTempCelsius >= THRESHOLDS.cpuTempCelsius) { + alerts.push(`CPU temp critical: ${payload.cpuTempCelsius}°C (threshold: ${THRESHOLDS.cpuTempCelsius}°C)`); + } + + const memPercent = (payload.memoryUsedMb / payload.memoryTotalMb) * 100; + if (memPercent >= THRESHOLDS.memoryUsedPercent) { + alerts.push(`Memory usage critical: ${memPercent.toFixed(0)}% (${payload.memoryUsedMb}/${payload.memoryTotalMb} MB)`); + } + + if (payload.diskUsedPercent >= THRESHOLDS.diskUsedPercent) { + alerts.push(`Disk usage critical: ${payload.diskUsedPercent}%`); + } + + if (payload.loadAvg1m >= THRESHOLDS.loadAvg1m) { + alerts.push(`Load average high: ${payload.loadAvg1m}`); + } + + const heapPercent = (payload.heapUsedMb / payload.heapTotalMb) * 100; + if (heapPercent >= THRESHOLDS.heapUsedPercent) { + alerts.push(`Heap usage critical: ${heapPercent.toFixed(0)}% (${payload.heapUsedMb}/${payload.heapTotalMb} MB)`); + } + + if (payload.wifiSignalDbm !== null && payload.wifiSignalDbm < -80) { + alerts.push(`WiFi signal weak: ${payload.wifiSignalDbm} dBm`); + } + + return alerts; + } +} diff --git a/apps/backend/src/core/services/log-ingestion.service.ts b/apps/backend/src/core/services/log-ingestion.service.ts new file mode 100644 index 0000000..d11fa70 --- /dev/null +++ b/apps/backend/src/core/services/log-ingestion.service.ts @@ -0,0 +1,103 @@ +import { Injectable, Logger } from '@nestjs/common'; +import { InjectRepository } from '@nestjs/typeorm'; +import { Repository, MoreThanOrEqual, LessThanOrEqual, ILike } from 'typeorm'; +import { EventEmitter2 } from '@nestjs/event-emitter'; +import { DeviceLog } from '../domain/entities/device-log.entity'; +import { + ILogIngestionPort, + LogBatchPayload, + LogEntryPayload, + LogQueryOptions, + LogQueryResult, +} from '../ports/inbound/log-ingestion.port'; + +@Injectable() +export class LogIngestionService implements ILogIngestionPort { + private readonly logger = new Logger(LogIngestionService.name); + + constructor( + @InjectRepository(DeviceLog) + private readonly repo: Repository, + private readonly events: EventEmitter2, + ) {} + + async ingestBatch(deviceId: string, payload: LogBatchPayload): Promise { + if (!payload.logs || payload.logs.length === 0) return; + + const entities = payload.logs.map((log) => { + // Extract known fields, put everything else in context + const { level, time, msg, name, ...rest } = log; + const context = Object.keys(rest).length > 0 ? rest : null; + + return this.repo.create({ + deviceId, + level: level ?? 30, + msg: msg ?? '', + loggerName: (name as string) ?? null, + context: context as Record | null, + loggedAt: new Date(time ?? Date.now()), + }); + }); + + await this.repo.save(entities); + + this.logger.debug({ deviceId, count: entities.length }, 'Log batch ingested'); + + // Emit for SSE live stream + for (const entity of entities) { + this.events.emit('device.log', { + deviceId, + id: entity.id, + level: entity.level, + msg: entity.msg, + loggerName: entity.loggerName, + context: entity.context, + loggedAt: entity.loggedAt.toISOString(), + }); + } + } + + async queryLogs(options: LogQueryOptions): Promise { + const where: Record = { deviceId: options.deviceId }; + + if (options.level !== undefined) { + where.level = MoreThanOrEqual(options.level); + } + + if (options.loggerName) { + where.loggerName = options.loggerName; + } + + if (options.since) { + where.loggedAt = MoreThanOrEqual(new Date(options.since)); + } + + if (options.until) { + // If we already set loggedAt for `since`, we need a raw query. + // For simplicity, we handle the common single-filter case here. + where.loggedAt = LessThanOrEqual(new Date(options.until)); + } + + if (options.search) { + where.msg = ILike(`%${options.search}%`); + } + + const [logs, total] = await this.repo.findAndCount({ + where, + order: { loggedAt: 'DESC' }, + take: options.limit ?? 50, + skip: options.offset ?? 0, + }); + + return { + logs: logs.map((l) => ({ + level: l.level, + time: l.loggedAt.getTime(), + msg: l.msg, + name: l.loggerName ?? undefined, + ...(l.context ?? {}), + })), + total, + }; + } +} diff --git a/apps/backend/src/migrations/1744540000000-AddHealthReports.ts b/apps/backend/src/migrations/1744540000000-AddHealthReports.ts new file mode 100644 index 0000000..61399e9 --- /dev/null +++ b/apps/backend/src/migrations/1744540000000-AddHealthReports.ts @@ -0,0 +1,44 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddHealthReports1744540000000 implements MigrationInterface { + name = 'AddHealthReports1744540000000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "health_reports" ( + "id" UUID DEFAULT uuid_generate_v4() NOT NULL, + "device_id" UUID NOT NULL, + "cpu_temp_celsius" REAL NOT NULL, + "memory_used_mb" REAL NOT NULL, + "memory_total_mb" REAL NOT NULL, + "disk_used_percent" REAL NOT NULL, + "load_avg_1m" REAL NOT NULL, + "heap_used_mb" REAL NOT NULL, + "heap_total_mb" REAL NOT NULL, + "uptime_seconds" INTEGER NOT NULL, + "wifi_ssid" VARCHAR(64), + "wifi_signal_dbm" SMALLINT, + "client_version" VARCHAR(20) NOT NULL, + "node_version" VARCHAR(30) NOT NULL, + "reported_at" TIMESTAMPTZ NOT NULL, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT "PK_health_reports" PRIMARY KEY ("id"), + CONSTRAINT "FK_health_reports_device" + FOREIGN KEY ("device_id") REFERENCES "devices"("id") ON DELETE CASCADE + ); + + -- Query pattern: latest reports for a device, ordered by time + CREATE INDEX "IDX_health_reports_device_created" + ON "health_reports" ("device_id", "created_at" DESC); + + -- Auto-cleanup: partition-friendly index for purging old rows + CREATE INDEX "IDX_health_reports_created" + ON "health_reports" ("created_at"); + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE IF EXISTS "health_reports";`); + } +} diff --git a/apps/backend/src/migrations/1744540100000-AddDeviceLogs.ts b/apps/backend/src/migrations/1744540100000-AddDeviceLogs.ts new file mode 100644 index 0000000..78e3f0a --- /dev/null +++ b/apps/backend/src/migrations/1744540100000-AddDeviceLogs.ts @@ -0,0 +1,40 @@ +import { MigrationInterface, QueryRunner } from 'typeorm'; + +export class AddDeviceLogs1744540100000 implements MigrationInterface { + name = 'AddDeviceLogs1744540100000'; + + public async up(queryRunner: QueryRunner): Promise { + await queryRunner.query(` + CREATE TABLE "device_logs" ( + "id" UUID DEFAULT uuid_generate_v4() NOT NULL, + "device_id" UUID NOT NULL, + "level" SMALLINT NOT NULL, + "msg" TEXT NOT NULL, + "logger_name" VARCHAR(64), + "context" JSONB, + "logged_at" TIMESTAMPTZ NOT NULL, + "created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(), + + CONSTRAINT "PK_device_logs" PRIMARY KEY ("id"), + CONSTRAINT "FK_device_logs_device" + FOREIGN KEY ("device_id") REFERENCES "devices"("id") ON DELETE CASCADE + ); + + -- Primary query pattern: recent logs for a device + CREATE INDEX "IDX_device_logs_device_logged" + ON "device_logs" ("device_id", "logged_at" DESC); + + -- Filter by level (e.g. show only errors) + CREATE INDEX "IDX_device_logs_device_level" + ON "device_logs" ("device_id", "level"); + + -- Purge old logs + CREATE INDEX "IDX_device_logs_logged" + ON "device_logs" ("logged_at"); + `); + } + + public async down(queryRunner: QueryRunner): Promise { + await queryRunner.query(`DROP TABLE IF EXISTS "device_logs";`); + } +} diff --git a/apps/robot-client/src/main.ts b/apps/robot-client/src/main.ts index b7b2784..88c4f0d 100644 --- a/apps/robot-client/src/main.ts +++ b/apps/robot-client/src/main.ts @@ -9,11 +9,13 @@ import { LocalStore, WifiService, PairingService, + TelemetryReporter, + LogForwarder, } from './services/index.js'; import { type ITriggerService } from './services/trigger.interface.js'; import { SetupFlow } from './setup/index.js'; import { HardwareService, Emotion } from './hardware/index.js'; -import { createLogger } from './utils/index.js'; +import { createLogger, setLogForwarder } from './utils/index.js'; const logger = createLogger('main', 'info'); @@ -72,7 +74,10 @@ async function main(): Promise { const resolvedConfig = { ...robotConfig, deviceId, deviceToken }; const cloudSocket = new CloudSocket(resolvedConfig as Required); + const logForwarder = new LogForwarder(cloudSocket); + setLogForwarder(logForwarder); const healthService = new HealthService(cloudSocket); + const telemetryReporter = new TelemetryReporter(cloudSocket, '0.0.1'); // ── Hardware bridge (ESP32 firmware) ── // With AUDIO_BACKEND=esp32 the ESP32 owns the mic AND the speaker, @@ -156,6 +161,8 @@ async function main(): Promise { healthService.notifyReady(); healthService.start(); + telemetryReporter.start(60_000); // Report every 60s + logForwarder.start(); // Flush logs to backend every 5s orchestrator.start(); if (resolvedConfig.triggerMode === 'wakeword') { @@ -170,6 +177,8 @@ async function main(): Promise { logger.info({ signal }, 'Shutdown signal received'); await orchestrator.stop(); + logForwarder.stop(); + telemetryReporter.stop(); healthService.stop(); await audioService.destroy(); if (hardwareService) { diff --git a/apps/robot-client/src/services/index.ts b/apps/robot-client/src/services/index.ts index b38ff48..88ef85a 100644 --- a/apps/robot-client/src/services/index.ts +++ b/apps/robot-client/src/services/index.ts @@ -13,3 +13,5 @@ export { LocalStore } from './local-store.service.js'; export { WifiService } from './wifi.service.js'; export { PairingService } from './pairing.service.js'; export { type ITriggerService } from './trigger.interface.js'; +export { TelemetryReporter } from './telemetry-reporter.js'; +export { LogForwarder } from './log-forwarder.js'; diff --git a/apps/robot-client/src/services/log-forwarder.ts b/apps/robot-client/src/services/log-forwarder.ts new file mode 100644 index 0000000..6d4e37e --- /dev/null +++ b/apps/robot-client/src/services/log-forwarder.ts @@ -0,0 +1,101 @@ +import { type CloudSocket } from '../transport/cloud-socket.js'; +import { createLogger, type Logger } from '../utils/index.js'; + +export interface LogEntry { + level: number; + time: number; + msg: string; + name?: string; + [key: string]: unknown; +} + +/** + * Buffers Pino log lines and forwards them to the backend in batches + * via the existing WebSocket connection. + * + * Usage: + * const forwarder = new LogForwarder(cloudSocket); + * forwarder.start(); + * // In logger setup, pipe logs through forwarder.ingest() + */ +export class LogForwarder { + private readonly logger: Logger; + private buffer: LogEntry[] = []; + private flushInterval: ReturnType | null = null; + private readonly maxBufferSize: number; + private readonly flushIntervalMs: number; + + constructor( + private readonly cloudSocket: CloudSocket, + options?: { maxBufferSize?: number; flushIntervalMs?: number }, + ) { + this.logger = createLogger('log-forwarder', 'info'); + this.maxBufferSize = options?.maxBufferSize ?? 100; + this.flushIntervalMs = options?.flushIntervalMs ?? 5_000; + } + + /** + * Start the periodic flush timer. + */ + start(): void { + this.logger.info( + { flushIntervalMs: this.flushIntervalMs, maxBufferSize: this.maxBufferSize }, + 'Log forwarder started', + ); + + this.flushInterval = setInterval(() => { + this.flush(); + }, this.flushIntervalMs); + } + + /** + * Stop the forwarder and flush remaining logs. + */ + stop(): void { + this.flush(); + if (this.flushInterval) { + clearInterval(this.flushInterval); + this.flushInterval = null; + } + } + + /** + * Ingest a single Pino log object. + * Call this from the Pino multistream/destination hook. + */ + ingest(logObj: LogEntry): void { + this.buffer.push({ + level: logObj.level, + time: logObj.time, + msg: logObj.msg, + name: logObj.name, + // Include contextual fields, strip Pino internals + ...(logObj.err ? { err: String(logObj.err) } : {}), + ...(logObj.deviceId ? { deviceId: logObj.deviceId } : {}), + }); + + // Flush immediately if buffer is full + if (this.buffer.length >= this.maxBufferSize) { + this.flush(); + } + } + + /** + * Send buffered logs to the backend and clear the buffer. + */ + private flush(): void { + if (this.buffer.length === 0) return; + if (!this.cloudSocket.isConnected) { + // Drop logs if not connected to avoid unbounded memory growth + if (this.buffer.length > this.maxBufferSize) { + this.buffer = this.buffer.slice(-this.maxBufferSize); + } + return; + } + + const batch = this.buffer; + this.buffer = []; + + this.cloudSocket.emitRaw('log_batch', { logs: batch }); + } +} diff --git a/apps/robot-client/src/services/telemetry-reporter.ts b/apps/robot-client/src/services/telemetry-reporter.ts new file mode 100644 index 0000000..bde35d6 --- /dev/null +++ b/apps/robot-client/src/services/telemetry-reporter.ts @@ -0,0 +1,168 @@ +import { readFileSync, statfsSync } from 'node:fs'; +import { freemem, totalmem, loadavg } from 'node:os'; +import { execSync } from 'node:child_process'; +import { type CloudSocket } from '../transport/cloud-socket.js'; +import { createLogger, type Logger } from '../utils/index.js'; + +// Must match backend HealthReportPayload +interface HealthReportPayload { + cpuTempCelsius: number; + memoryUsedMb: number; + memoryTotalMb: number; + diskUsedPercent: number; + loadAvg1m: number; + heapUsedMb: number; + heapTotalMb: number; + uptimeSeconds: number; + wifiSsid: string | null; + wifiSignalDbm: number | null; + clientVersion: string; + nodeVersion: string; + reportedAt: string; +} + +/** + * Periodically collects system metrics and sends them to the backend + * via the existing Socket.IO connection. + */ +export class TelemetryReporter { + private readonly logger: Logger; + private interval: ReturnType | null = null; + private readonly clientVersion: string; + + constructor( + private readonly cloudSocket: CloudSocket, + clientVersion = '0.0.1', + ) { + this.logger = createLogger('telemetry', 'info'); + this.clientVersion = clientVersion; + } + + /** + * Start reporting at the given interval. + * Default: every 60 seconds. + */ + start(intervalMs = 60_000): void { + this.logger.info({ intervalMs }, 'Telemetry reporter started'); + + // Send initial report immediately + this.report(); + + this.interval = setInterval(() => { + this.report(); + }, intervalMs); + } + + stop(): void { + if (this.interval) { + clearInterval(this.interval); + this.interval = null; + } + } + + private report(): void { + if (!this.cloudSocket.isConnected) { + this.logger.debug('Skipping telemetry report: not connected'); + return; + } + + try { + const payload = this.collectMetrics(); + // Emit via the existing socket — the backend RobotGateway + // handles 'health_report' events + this.cloudSocket.emitRaw('health_report', payload); + this.logger.debug({ payload }, 'Health report sent'); + } catch (err) { + this.logger.warn({ err }, 'Failed to collect/send telemetry'); + } + } + + private collectMetrics(): HealthReportPayload { + const mem = process.memoryUsage(); + const totalMb = totalmem() / (1024 * 1024); + const freeMb = freemem() / (1024 * 1024); + + return { + cpuTempCelsius: this.getCpuTemp(), + memoryUsedMb: round(totalMb - freeMb), + memoryTotalMb: round(totalMb), + diskUsedPercent: this.getDiskUsage(), + loadAvg1m: round(loadavg()[0]), + heapUsedMb: round(mem.heapUsed / (1024 * 1024)), + heapTotalMb: round(mem.heapTotal / (1024 * 1024)), + uptimeSeconds: Math.floor(process.uptime()), + wifiSsid: this.getWifiSsid(), + wifiSignalDbm: this.getWifiSignal(), + clientVersion: this.clientVersion, + nodeVersion: process.version, + reportedAt: new Date().toISOString(), + }; + } + + /** + * Read CPU temperature from thermal zone (Linux only). + */ + private getCpuTemp(): number { + try { + const raw = readFileSync('/sys/class/thermal/thermal_zone0/temp', 'utf-8'); + return round(parseInt(raw, 10) / 1000); + } catch { + return -1; + } + } + + /** + * Get disk usage for the root partition. + */ + private getDiskUsage(): number { + try { + const stats = statfsSync('/'); + const totalBlocks = stats.blocks; + const freeBlocks = stats.bfree; + return round(((totalBlocks - freeBlocks) / totalBlocks) * 100); + } catch { + return -1; + } + } + + /** + * Get current WiFi SSID via nmcli. + */ + private getWifiSsid(): string | null { + try { + const result = execSync('nmcli -t -f active,ssid dev wifi', { + encoding: 'utf-8', + timeout: 3000, + }); + const active = result.split('\n').find((l) => l.startsWith('yes:')); + return active ? active.split(':')[1] || null : null; + } catch { + return null; + } + } + + /** + * Get WiFi signal strength in dBm via nmcli. + */ + private getWifiSignal(): number | null { + try { + const result = execSync('nmcli -t -f active,signal dev wifi', { + encoding: 'utf-8', + timeout: 3000, + }); + const active = result.split('\n').find((l) => l.startsWith('yes:')); + if (!active) return null; + const signal = parseInt(active.split(':')[1], 10); + // nmcli reports signal as 0-100 percentage; approximate dBm + // -30 dBm = 100%, -90 dBm = 0% + return Math.round(-90 + (signal / 100) * 60); + } catch { + return null; + } + } +} + +function round(n: number, decimals = 1): number { + const factor = Math.pow(10, decimals); + return Math.round(n * factor) / factor; +} diff --git a/apps/robot-client/src/transport/cloud-socket.ts b/apps/robot-client/src/transport/cloud-socket.ts index c9d8d42..e8a37f2 100644 --- a/apps/robot-client/src/transport/cloud-socket.ts +++ b/apps/robot-client/src/transport/cloud-socket.ts @@ -174,6 +174,19 @@ export class CloudSocket extends EventEmitter { this.socket.emit('user_interrupt'); } + /** + * Emit a raw event on the underlying socket. + * Used by services (e.g. TelemetryReporter) that need to send + * custom events to the backend. + */ + emitRaw(event: string, data: unknown): void { + if (!this.socket?.connected) { + this.logger.warn(`Cannot emit ${event}: not connected`); + return; + } + this.socket.emit(event, data); + } + /** * Disconnect from the cloud backend. */ diff --git a/apps/robot-client/src/utils/index.ts b/apps/robot-client/src/utils/index.ts index e3d8797..eae3f06 100644 --- a/apps/robot-client/src/utils/index.ts +++ b/apps/robot-client/src/utils/index.ts @@ -1 +1 @@ -export { createLogger, type Logger } from './logger.js'; +export { createLogger, setLogForwarder, type Logger } from './logger.js'; diff --git a/apps/robot-client/src/utils/logger.ts b/apps/robot-client/src/utils/logger.ts index 9eddbf7..b0af79c 100644 --- a/apps/robot-client/src/utils/logger.ts +++ b/apps/robot-client/src/utils/logger.ts @@ -1,7 +1,19 @@ import pino from 'pino'; +import type { LogForwarder, LogEntry } from '../services/log-forwarder.js'; + +let _logForwarder: LogForwarder | null = null; + +/** + * Register a LogForwarder instance. + * Once registered, all loggers created via `createLogger` will + * also forward their output to the cloud backend. + */ +export function setLogForwarder(forwarder: LogForwarder): void { + _logForwarder = forwarder; +} export function createLogger(name: string, level = 'info') { - return pino({ + const logger = pino({ name, level, transport: @@ -9,6 +21,58 @@ export function createLogger(name: string, level = 'info') { ? { target: 'pino-pretty', options: { colorize: true } } : undefined, }); + + // Wrap the logger to also forward logs when a forwarder is set. + // We hook into Pino's internal write by using `onChild` isn't available, + // so we use a proxy on the logging methods. + return new Proxy(logger, { + get(target, prop, receiver) { + const value = Reflect.get(target, prop, receiver); + + // Intercept logging methods: trace, debug, info, warn, error, fatal + if ( + _logForwarder && + typeof prop === 'string' && + ['trace', 'debug', 'info', 'warn', 'error', 'fatal'].includes(prop) && + typeof value === 'function' + ) { + return (...args: unknown[]) => { + // Call original + (value as (...a: unknown[]) => void).apply(target, args); + + // Forward to the cloud + try { + const levelNum = target.levels.values[prop] ?? 30; + let msg = ''; + let extra: Record = {}; + + if (typeof args[0] === 'string') { + msg = args[0]; + } else if (typeof args[0] === 'object' && args[0] !== null) { + extra = args[0] as Record; + if (typeof args[1] === 'string') { + msg = args[1]; + } + } + + const entry: LogEntry = { + level: levelNum, + time: Date.now(), + msg, + name, + ...extra, + }; + + _logForwarder!.ingest(entry); + } catch { + // Never let forwarding break the app + } + }; + } + + return value; + }, + }); } export type Logger = pino.Logger;