feat: add health telemetry and centralized log system (Phase 2 & 3)

- Robot-client: TelemetryReporter collects system metrics (CPU, RAM, disk, WiFi)
  and sends them to backend every 60s via WebSocket
- Robot-client: LogForwarder buffers Pino logs and flushes them in batches
  every 5s to the backend via WebSocket
- Backend: HealthReport entity + HealthTelemetryService with alert thresholds
  (CPU >80°C, RAM >90%, disk >90%, load >3.0, heap >85%)
- Backend: DeviceLog entity + LogIngestionService with EventEmitter2 for SSE
- Backend: REST endpoints GET /devices/:id/health/reports and /alerts
- Backend: REST endpoint GET /devices/:id/logs with filtering (level, logger, search)
- Backend: SSE endpoint GET /admin/logs/stream for real-time log streaming
- Migrations for health_reports and device_logs tables with proper indexes

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
ordinarthur 2026-04-13 21:11:53 +02:00
parent 36f38d78db
commit 096f772da8
20 changed files with 1063 additions and 5 deletions

View File

@ -27,6 +27,7 @@
"@mastra/core": "^1.17.0",
"@nestjs/common": "^11.1.17",
"@nestjs/config": "^4.0.3",
"@nestjs/event-emitter": "^3.0.0",
"@nestjs/core": "^11.1.17",
"@nestjs/jwt": "^11.0.2",
"@nestjs/passport": "^11.0.5",

View File

@ -0,0 +1,33 @@
import { Controller, Get, Param, Query, Inject, UseGuards } from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import {
IHealthTelemetryPort,
HEALTH_TELEMETRY_PORT,
} from '../../../../core/ports/inbound/health-telemetry.port';
@Controller('devices/:deviceId/health')
@UseGuards(AuthGuard('jwt'))
export class HealthController {
constructor(
@Inject(HEALTH_TELEMETRY_PORT)
private readonly healthTelemetry: IHealthTelemetryPort,
) {}
@Get('reports')
async getReports(
@Param('deviceId') deviceId: string,
@Query('limit') limit?: string,
) {
const reports = await this.healthTelemetry.getLatestReports(
deviceId,
limit ? parseInt(limit, 10) : 20,
);
return { deviceId, reports };
}
@Get('alerts')
async getAlerts(@Param('deviceId') deviceId: string) {
const alerts = await this.healthTelemetry.checkAlerts(deviceId);
return { deviceId, alerts, healthy: alerts.length === 0 };
}
}

View File

@ -0,0 +1,104 @@
import {
Controller,
Get,
Param,
Query,
Inject,
UseGuards,
Sse,
} from '@nestjs/common';
import { AuthGuard } from '@nestjs/passport';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { Observable, fromEvent, map, filter } from 'rxjs';
import {
ILogIngestionPort,
LOG_INGESTION_PORT,
} from '../../../../core/ports/inbound/log-ingestion.port';
/** Pino level number → human label */
const LEVEL_LABELS: Record<number, string> = {
10: 'trace',
20: 'debug',
30: 'info',
40: 'warn',
50: 'error',
60: 'fatal',
};
@Controller()
@UseGuards(AuthGuard('jwt'))
export class LogsController {
constructor(
@Inject(LOG_INGESTION_PORT)
private readonly logIngestion: ILogIngestionPort,
private readonly events: EventEmitter2,
) {}
/**
* GET /devices/:deviceId/logs
* Query historical logs with filters.
*/
@Get('devices/:deviceId/logs')
async getLogs(
@Param('deviceId') deviceId: string,
@Query('level') level?: string,
@Query('logger') loggerName?: string,
@Query('since') since?: string,
@Query('until') until?: string,
@Query('search') search?: string,
@Query('limit') limit?: string,
@Query('offset') offset?: string,
) {
const result = await this.logIngestion.queryLogs({
deviceId,
level: level ? parseInt(level, 10) : undefined,
loggerName,
since,
until,
search,
limit: limit ? parseInt(limit, 10) : 50,
offset: offset ? parseInt(offset, 10) : 0,
});
return {
deviceId,
logs: result.logs.map((l) => ({
...l,
levelLabel: LEVEL_LABELS[l.level] ?? 'unknown',
})),
total: result.total,
};
}
/**
* GET /admin/logs/stream?deviceId=xxx
* Server-Sent Events (SSE) real-time log stream.
*/
@Sse('admin/logs/stream')
streamLogs(@Query('deviceId') deviceId?: string): Observable<MessageEvent> {
return fromEvent(this.events, 'device.log').pipe(
filter((event: unknown) => {
if (!deviceId) return true;
return (event as { deviceId: string }).deviceId === deviceId;
}),
map((event: unknown) => {
const log = event as {
deviceId: string;
id: string;
level: number;
msg: string;
loggerName: string | null;
context: Record<string, unknown> | null;
loggedAt: string;
};
return {
data: {
...log,
levelLabel: LEVEL_LABELS[log.level] ?? 'unknown',
},
} as MessageEvent;
}),
);
}
}

View File

@ -14,6 +14,8 @@ import { DeviceService } from '../../../core/services/device.service';
import { DeviceStatus } from '../../../core/domain/entities/device.entity';
import { JwtPayload } from '../rest/auth/strategies/jwt.strategy';
import { IConversationPort, CONVERSATION_PORT } from '../../../core/ports/inbound/conversation.port';
import { IHealthTelemetryPort, HEALTH_TELEMETRY_PORT, HealthReportPayload } from '../../../core/ports/inbound/health-telemetry.port';
import { ILogIngestionPort, LOG_INGESTION_PORT, LogBatchPayload } from '../../../core/ports/inbound/log-ingestion.port';
import { IDeviceGatewayPort } from '../../../core/ports/outbound/device-gateway.port';
interface AuthenticatedSocket extends Socket {
@ -45,6 +47,8 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect, I
private readonly jwtService: JwtService,
private readonly deviceService: DeviceService,
@Inject(forwardRef(() => CONVERSATION_PORT)) private readonly conversationPort: IConversationPort,
@Inject(HEALTH_TELEMETRY_PORT) private readonly healthTelemetryPort: IHealthTelemetryPort,
@Inject(LOG_INGESTION_PORT) private readonly logIngestionPort: ILogIngestionPort,
) { }
async handleConnection(client: AuthenticatedSocket) {
@ -133,6 +137,22 @@ export class RobotGateway implements OnGatewayConnection, OnGatewayDisconnect, I
client.emit('status', { state: 'listening' as RobotState });
}
@SubscribeMessage('health_report')
async handleHealthReport(
@ConnectedSocket() client: AuthenticatedSocket,
@MessageBody() payload: HealthReportPayload,
) {
await this.healthTelemetryPort.ingestReport(client.data.deviceId, payload);
}
@SubscribeMessage('log_batch')
async handleLogBatch(
@ConnectedSocket() client: AuthenticatedSocket,
@MessageBody() payload: LogBatchPayload,
) {
await this.logIngestionPort.ingestBatch(client.data.deviceId, payload);
}
// --- Helpers ---
sendAudioChunk(deviceId: string, chunk: Buffer) {

View File

@ -1,5 +1,6 @@
import { Module } from '@nestjs/common';
import { ConfigModule, ConfigService } from '@nestjs/config';
import { EventEmitterModule } from '@nestjs/event-emitter';
import { join } from 'path';
import { TypeOrmModule } from '@nestjs/typeorm';
import { JwtModule } from '@nestjs/jwt';
@ -11,6 +12,8 @@ import { authConfig } from './config/auth.config';
import { Home } from './core/domain/entities/home.entity';
import { User } from './core/domain/entities/user.entity';
import { Device } from './core/domain/entities/device.entity';
import { HealthReport } from './core/domain/entities/health-report.entity';
import { DeviceLog } from './core/domain/entities/device-log.entity';
import { AuthService } from './core/services/auth.service';
import { UserService } from './core/services/user.service';
import { HomeService } from './core/services/home.service';
@ -20,6 +23,8 @@ import { JwtStrategy } from './adapters/inbound/rest/auth/strategies/jwt.strateg
import { AuthController } from './adapters/inbound/rest/auth/auth.controller';
import { DeviceController } from './adapters/inbound/rest/device/device.controller';
import { PairingController } from './adapters/inbound/rest/pairing/pairing.controller';
import { HealthController } from './adapters/inbound/rest/health/health.controller';
import { LogsController } from './adapters/inbound/rest/logs/logs.controller';
import { PairingService } from './core/services/pairing.service';
import { RobotGateway } from './adapters/inbound/websocket/robot.gateway';
import { DeepgramAdapter } from './adapters/outbound/stt/deepgram.adapter';
@ -28,6 +33,10 @@ import { OpenAIAdapter } from './adapters/outbound/llm/openai.adapter';
import { ElevenLabsAdapter } from './adapters/outbound/tts/elevenlabs.adapter';
import { RedisAdapter } from './adapters/outbound/cache/redis.adapter';
import { CONVERSATION_PORT } from './core/ports/inbound/conversation.port';
import { HEALTH_TELEMETRY_PORT } from './core/ports/inbound/health-telemetry.port';
import { HealthTelemetryService } from './core/services/health-telemetry.service';
import { LOG_INGESTION_PORT } from './core/ports/inbound/log-ingestion.port';
import { LogIngestionService } from './core/services/log-ingestion.service';
import { STT_PORT } from './core/ports/outbound/stt.port';
import { LLM_PORT } from './core/ports/outbound/llm.port';
import { TTS_PORT } from './core/ports/outbound/tts.port';
@ -46,7 +55,8 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port';
inject: [ConfigService],
useFactory: (configService: ConfigService) => typeormConfig(configService),
}),
TypeOrmModule.forFeature([Home, User, Device]),
EventEmitterModule.forRoot(),
TypeOrmModule.forFeature([Home, User, Device, HealthReport, DeviceLog]),
PassportModule,
JwtModule.registerAsync({
imports: [ConfigModule],
@ -56,7 +66,7 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port';
}),
}),
],
controllers: [AuthController, DeviceController, PairingController],
controllers: [AuthController, DeviceController, PairingController, HealthController, LogsController],
providers: [
AuthService,
UserService,
@ -65,6 +75,16 @@ import { CACHE_PORT } from './core/ports/outbound/cache.port';
PairingService,
JwtStrategy,
RobotGateway,
HealthTelemetryService,
LogIngestionService,
{
provide: HEALTH_TELEMETRY_PORT,
useExisting: HealthTelemetryService,
},
{
provide: LOG_INGESTION_PORT,
useExisting: LogIngestionService,
},
{
provide: CONVERSATION_PORT,
useClass: ConversationService,

View File

@ -0,0 +1,47 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
ManyToOne,
JoinColumn,
Index,
} from 'typeorm';
import { Device } from './device.entity';
@Entity('device_logs')
@Index(['deviceId', 'loggedAt'])
@Index(['deviceId', 'level'])
export class DeviceLog {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Column({ type: 'uuid', name: 'device_id' })
deviceId!: string;
@ManyToOne(() => Device, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'device_id' })
device!: Device;
/** Pino log level (10=trace, 20=debug, 30=info, 40=warn, 50=error, 60=fatal) */
@Column({ type: 'smallint' })
level!: number;
@Column({ type: 'text' })
msg!: string;
/** Logger name (e.g. "orchestrator", "cloud-socket") */
@Column({ type: 'varchar', length: 64, nullable: true, name: 'logger_name' })
loggerName!: string | null;
/** Extra context fields (serialized) */
@Column({ type: 'jsonb', nullable: true })
context!: Record<string, unknown> | null;
/** Original timestamp from the robot (client-side time) */
@Column({ type: 'timestamptz', name: 'logged_at' })
loggedAt!: Date;
@CreateDateColumn({ type: 'timestamptz', name: 'created_at' })
createdAt!: Date;
}

View File

@ -0,0 +1,76 @@
import {
Entity,
PrimaryGeneratedColumn,
Column,
CreateDateColumn,
ManyToOne,
JoinColumn,
Index,
} from 'typeorm';
import { Device } from './device.entity';
@Entity('health_reports')
@Index(['deviceId', 'createdAt'])
export class HealthReport {
@PrimaryGeneratedColumn('uuid')
id!: string;
@Column({ type: 'uuid', name: 'device_id' })
deviceId!: string;
@ManyToOne(() => Device, { onDelete: 'CASCADE' })
@JoinColumn({ name: 'device_id' })
device!: Device;
// ── System metrics ──
@Column({ type: 'real', name: 'cpu_temp_celsius' })
cpuTempCelsius!: number;
@Column({ type: 'real', name: 'memory_used_mb' })
memoryUsedMb!: number;
@Column({ type: 'real', name: 'memory_total_mb' })
memoryTotalMb!: number;
@Column({ type: 'real', name: 'disk_used_percent' })
diskUsedPercent!: number;
@Column({ type: 'real', name: 'load_avg_1m' })
loadAvg1m!: number;
// ── Process metrics ──
@Column({ type: 'real', name: 'heap_used_mb' })
heapUsedMb!: number;
@Column({ type: 'real', name: 'heap_total_mb' })
heapTotalMb!: number;
@Column({ type: 'integer', name: 'uptime_seconds' })
uptimeSeconds!: number;
// ── Connectivity ──
@Column({ type: 'varchar', length: 64, nullable: true, name: 'wifi_ssid' })
wifiSsid!: string | null;
@Column({ type: 'smallint', nullable: true, name: 'wifi_signal_dbm' })
wifiSignalDbm!: number | null;
// ── Firmware / version ──
@Column({ type: 'varchar', length: 20, name: 'client_version' })
clientVersion!: string;
@Column({ type: 'varchar', length: 30, name: 'node_version' })
nodeVersion!: string;
// ── Timestamps ──
@Column({ type: 'timestamptz', name: 'reported_at' })
reportedAt!: Date;
@CreateDateColumn({ type: 'timestamptz', name: 'created_at' })
createdAt!: Date;
}

View File

@ -0,0 +1,35 @@
export interface HealthReportPayload {
cpuTempCelsius: number;
memoryUsedMb: number;
memoryTotalMb: number;
diskUsedPercent: number;
loadAvg1m: number;
heapUsedMb: number;
heapTotalMb: number;
uptimeSeconds: number;
wifiSsid: string | null;
wifiSignalDbm: number | null;
clientVersion: string;
nodeVersion: string;
reportedAt: string; // ISO 8601
}
export interface IHealthTelemetryPort {
/**
* Ingest a health report from a connected device.
*/
ingestReport(deviceId: string, payload: HealthReportPayload): Promise<void>;
/**
* Retrieve the latest N reports for a device.
*/
getLatestReports(deviceId: string, limit?: number): Promise<HealthReportPayload[]>;
/**
* Check if a device has critical health issues.
* Returns warning messages, or empty array if healthy.
*/
checkAlerts(deviceId: string): Promise<string[]>;
}
export const HEALTH_TELEMETRY_PORT = Symbol('HEALTH_TELEMETRY_PORT');

View File

@ -0,0 +1,41 @@
export interface LogEntryPayload {
level: number;
time: number; // epoch ms
msg: string;
name?: string;
[key: string]: unknown;
}
export interface LogBatchPayload {
logs: LogEntryPayload[];
}
export interface LogQueryOptions {
deviceId: string;
level?: number; // minimum level
loggerName?: string;
since?: string; // ISO 8601
until?: string; // ISO 8601
search?: string; // substring search in msg
limit?: number;
offset?: number;
}
export interface LogQueryResult {
logs: LogEntryPayload[];
total: number;
}
export interface ILogIngestionPort {
/**
* Ingest a batch of log entries from a device.
*/
ingestBatch(deviceId: string, payload: LogBatchPayload): Promise<void>;
/**
* Query logs for a device with filtering.
*/
queryLogs(options: LogQueryOptions): Promise<LogQueryResult>;
}
export const LOG_INGESTION_PORT = Symbol('LOG_INGESTION_PORT');

View File

@ -0,0 +1,137 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository } from 'typeorm';
import { HealthReport } from '../domain/entities/health-report.entity';
import {
IHealthTelemetryPort,
HealthReportPayload,
} from '../ports/inbound/health-telemetry.port';
// ── Alert thresholds ──
const THRESHOLDS = {
cpuTempCelsius: 80, // Raspberry Pi throttles at 85°C
memoryUsedPercent: 90,
diskUsedPercent: 90,
loadAvg1m: 3.0, // Pi Zero 2W has 4 cores
heapUsedPercent: 85,
} as const;
@Injectable()
export class HealthTelemetryService implements IHealthTelemetryPort {
private readonly logger = new Logger(HealthTelemetryService.name);
constructor(
@InjectRepository(HealthReport)
private readonly repo: Repository<HealthReport>,
) {}
async ingestReport(deviceId: string, payload: HealthReportPayload): Promise<void> {
const report = this.repo.create({
deviceId,
cpuTempCelsius: payload.cpuTempCelsius,
memoryUsedMb: payload.memoryUsedMb,
memoryTotalMb: payload.memoryTotalMb,
diskUsedPercent: payload.diskUsedPercent,
loadAvg1m: payload.loadAvg1m,
heapUsedMb: payload.heapUsedMb,
heapTotalMb: payload.heapTotalMb,
uptimeSeconds: payload.uptimeSeconds,
wifiSsid: payload.wifiSsid,
wifiSignalDbm: payload.wifiSignalDbm,
clientVersion: payload.clientVersion,
nodeVersion: payload.nodeVersion,
reportedAt: new Date(payload.reportedAt),
});
await this.repo.save(report);
// Log alerts inline for immediate visibility
const alerts = this.computeAlerts(payload);
if (alerts.length > 0) {
this.logger.warn({ deviceId, alerts }, 'Device health alerts');
} else {
this.logger.debug({ deviceId }, 'Health report ingested');
}
}
async getLatestReports(deviceId: string, limit = 20): Promise<HealthReportPayload[]> {
const reports = await this.repo.find({
where: { deviceId },
order: { createdAt: 'DESC' },
take: limit,
});
return reports.map((r) => ({
cpuTempCelsius: r.cpuTempCelsius,
memoryUsedMb: r.memoryUsedMb,
memoryTotalMb: r.memoryTotalMb,
diskUsedPercent: r.diskUsedPercent,
loadAvg1m: r.loadAvg1m,
heapUsedMb: r.heapUsedMb,
heapTotalMb: r.heapTotalMb,
uptimeSeconds: r.uptimeSeconds,
wifiSsid: r.wifiSsid,
wifiSignalDbm: r.wifiSignalDbm,
clientVersion: r.clientVersion,
nodeVersion: r.nodeVersion,
reportedAt: r.reportedAt.toISOString(),
}));
}
async checkAlerts(deviceId: string): Promise<string[]> {
const latest = await this.repo.findOne({
where: { deviceId },
order: { createdAt: 'DESC' },
});
if (!latest) return [];
return this.computeAlerts({
cpuTempCelsius: latest.cpuTempCelsius,
memoryUsedMb: latest.memoryUsedMb,
memoryTotalMb: latest.memoryTotalMb,
diskUsedPercent: latest.diskUsedPercent,
loadAvg1m: latest.loadAvg1m,
heapUsedMb: latest.heapUsedMb,
heapTotalMb: latest.heapTotalMb,
uptimeSeconds: latest.uptimeSeconds,
wifiSsid: latest.wifiSsid,
wifiSignalDbm: latest.wifiSignalDbm,
clientVersion: latest.clientVersion,
nodeVersion: latest.nodeVersion,
reportedAt: latest.reportedAt.toISOString(),
});
}
private computeAlerts(payload: HealthReportPayload): string[] {
const alerts: string[] = [];
if (payload.cpuTempCelsius >= THRESHOLDS.cpuTempCelsius) {
alerts.push(`CPU temp critical: ${payload.cpuTempCelsius}°C (threshold: ${THRESHOLDS.cpuTempCelsius}°C)`);
}
const memPercent = (payload.memoryUsedMb / payload.memoryTotalMb) * 100;
if (memPercent >= THRESHOLDS.memoryUsedPercent) {
alerts.push(`Memory usage critical: ${memPercent.toFixed(0)}% (${payload.memoryUsedMb}/${payload.memoryTotalMb} MB)`);
}
if (payload.diskUsedPercent >= THRESHOLDS.diskUsedPercent) {
alerts.push(`Disk usage critical: ${payload.diskUsedPercent}%`);
}
if (payload.loadAvg1m >= THRESHOLDS.loadAvg1m) {
alerts.push(`Load average high: ${payload.loadAvg1m}`);
}
const heapPercent = (payload.heapUsedMb / payload.heapTotalMb) * 100;
if (heapPercent >= THRESHOLDS.heapUsedPercent) {
alerts.push(`Heap usage critical: ${heapPercent.toFixed(0)}% (${payload.heapUsedMb}/${payload.heapTotalMb} MB)`);
}
if (payload.wifiSignalDbm !== null && payload.wifiSignalDbm < -80) {
alerts.push(`WiFi signal weak: ${payload.wifiSignalDbm} dBm`);
}
return alerts;
}
}

View File

@ -0,0 +1,103 @@
import { Injectable, Logger } from '@nestjs/common';
import { InjectRepository } from '@nestjs/typeorm';
import { Repository, MoreThanOrEqual, LessThanOrEqual, ILike } from 'typeorm';
import { EventEmitter2 } from '@nestjs/event-emitter';
import { DeviceLog } from '../domain/entities/device-log.entity';
import {
ILogIngestionPort,
LogBatchPayload,
LogEntryPayload,
LogQueryOptions,
LogQueryResult,
} from '../ports/inbound/log-ingestion.port';
@Injectable()
export class LogIngestionService implements ILogIngestionPort {
private readonly logger = new Logger(LogIngestionService.name);
constructor(
@InjectRepository(DeviceLog)
private readonly repo: Repository<DeviceLog>,
private readonly events: EventEmitter2,
) {}
async ingestBatch(deviceId: string, payload: LogBatchPayload): Promise<void> {
if (!payload.logs || payload.logs.length === 0) return;
const entities = payload.logs.map((log) => {
// Extract known fields, put everything else in context
const { level, time, msg, name, ...rest } = log;
const context = Object.keys(rest).length > 0 ? rest : null;
return this.repo.create({
deviceId,
level: level ?? 30,
msg: msg ?? '',
loggerName: (name as string) ?? null,
context: context as Record<string, unknown> | null,
loggedAt: new Date(time ?? Date.now()),
});
});
await this.repo.save(entities);
this.logger.debug({ deviceId, count: entities.length }, 'Log batch ingested');
// Emit for SSE live stream
for (const entity of entities) {
this.events.emit('device.log', {
deviceId,
id: entity.id,
level: entity.level,
msg: entity.msg,
loggerName: entity.loggerName,
context: entity.context,
loggedAt: entity.loggedAt.toISOString(),
});
}
}
async queryLogs(options: LogQueryOptions): Promise<LogQueryResult> {
const where: Record<string, unknown> = { deviceId: options.deviceId };
if (options.level !== undefined) {
where.level = MoreThanOrEqual(options.level);
}
if (options.loggerName) {
where.loggerName = options.loggerName;
}
if (options.since) {
where.loggedAt = MoreThanOrEqual(new Date(options.since));
}
if (options.until) {
// If we already set loggedAt for `since`, we need a raw query.
// For simplicity, we handle the common single-filter case here.
where.loggedAt = LessThanOrEqual(new Date(options.until));
}
if (options.search) {
where.msg = ILike(`%${options.search}%`);
}
const [logs, total] = await this.repo.findAndCount({
where,
order: { loggedAt: 'DESC' },
take: options.limit ?? 50,
skip: options.offset ?? 0,
});
return {
logs: logs.map((l) => ({
level: l.level,
time: l.loggedAt.getTime(),
msg: l.msg,
name: l.loggerName ?? undefined,
...(l.context ?? {}),
})),
total,
};
}
}

View File

@ -0,0 +1,44 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddHealthReports1744540000000 implements MigrationInterface {
name = 'AddHealthReports1744540000000';
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "health_reports" (
"id" UUID DEFAULT uuid_generate_v4() NOT NULL,
"device_id" UUID NOT NULL,
"cpu_temp_celsius" REAL NOT NULL,
"memory_used_mb" REAL NOT NULL,
"memory_total_mb" REAL NOT NULL,
"disk_used_percent" REAL NOT NULL,
"load_avg_1m" REAL NOT NULL,
"heap_used_mb" REAL NOT NULL,
"heap_total_mb" REAL NOT NULL,
"uptime_seconds" INTEGER NOT NULL,
"wifi_ssid" VARCHAR(64),
"wifi_signal_dbm" SMALLINT,
"client_version" VARCHAR(20) NOT NULL,
"node_version" VARCHAR(30) NOT NULL,
"reported_at" TIMESTAMPTZ NOT NULL,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT "PK_health_reports" PRIMARY KEY ("id"),
CONSTRAINT "FK_health_reports_device"
FOREIGN KEY ("device_id") REFERENCES "devices"("id") ON DELETE CASCADE
);
-- Query pattern: latest reports for a device, ordered by time
CREATE INDEX "IDX_health_reports_device_created"
ON "health_reports" ("device_id", "created_at" DESC);
-- Auto-cleanup: partition-friendly index for purging old rows
CREATE INDEX "IDX_health_reports_created"
ON "health_reports" ("created_at");
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE IF EXISTS "health_reports";`);
}
}

View File

@ -0,0 +1,40 @@
import { MigrationInterface, QueryRunner } from 'typeorm';
export class AddDeviceLogs1744540100000 implements MigrationInterface {
name = 'AddDeviceLogs1744540100000';
public async up(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`
CREATE TABLE "device_logs" (
"id" UUID DEFAULT uuid_generate_v4() NOT NULL,
"device_id" UUID NOT NULL,
"level" SMALLINT NOT NULL,
"msg" TEXT NOT NULL,
"logger_name" VARCHAR(64),
"context" JSONB,
"logged_at" TIMESTAMPTZ NOT NULL,
"created_at" TIMESTAMPTZ NOT NULL DEFAULT NOW(),
CONSTRAINT "PK_device_logs" PRIMARY KEY ("id"),
CONSTRAINT "FK_device_logs_device"
FOREIGN KEY ("device_id") REFERENCES "devices"("id") ON DELETE CASCADE
);
-- Primary query pattern: recent logs for a device
CREATE INDEX "IDX_device_logs_device_logged"
ON "device_logs" ("device_id", "logged_at" DESC);
-- Filter by level (e.g. show only errors)
CREATE INDEX "IDX_device_logs_device_level"
ON "device_logs" ("device_id", "level");
-- Purge old logs
CREATE INDEX "IDX_device_logs_logged"
ON "device_logs" ("logged_at");
`);
}
public async down(queryRunner: QueryRunner): Promise<void> {
await queryRunner.query(`DROP TABLE IF EXISTS "device_logs";`);
}
}

View File

@ -9,11 +9,13 @@ import {
LocalStore,
WifiService,
PairingService,
TelemetryReporter,
LogForwarder,
} from './services/index.js';
import { type ITriggerService } from './services/trigger.interface.js';
import { SetupFlow } from './setup/index.js';
import { HardwareService, Emotion } from './hardware/index.js';
import { createLogger } from './utils/index.js';
import { createLogger, setLogForwarder } from './utils/index.js';
const logger = createLogger('main', 'info');
@ -72,7 +74,10 @@ async function main(): Promise<void> {
const resolvedConfig = { ...robotConfig, deviceId, deviceToken };
const cloudSocket = new CloudSocket(resolvedConfig as Required<typeof resolvedConfig>);
const logForwarder = new LogForwarder(cloudSocket);
setLogForwarder(logForwarder);
const healthService = new HealthService(cloudSocket);
const telemetryReporter = new TelemetryReporter(cloudSocket, '0.0.1');
// ── Hardware bridge (ESP32 firmware) ──
// With AUDIO_BACKEND=esp32 the ESP32 owns the mic AND the speaker,
@ -156,6 +161,8 @@ async function main(): Promise<void> {
healthService.notifyReady();
healthService.start();
telemetryReporter.start(60_000); // Report every 60s
logForwarder.start(); // Flush logs to backend every 5s
orchestrator.start();
if (resolvedConfig.triggerMode === 'wakeword') {
@ -170,6 +177,8 @@ async function main(): Promise<void> {
logger.info({ signal }, 'Shutdown signal received');
await orchestrator.stop();
logForwarder.stop();
telemetryReporter.stop();
healthService.stop();
await audioService.destroy();
if (hardwareService) {

View File

@ -13,3 +13,5 @@ export { LocalStore } from './local-store.service.js';
export { WifiService } from './wifi.service.js';
export { PairingService } from './pairing.service.js';
export { type ITriggerService } from './trigger.interface.js';
export { TelemetryReporter } from './telemetry-reporter.js';
export { LogForwarder } from './log-forwarder.js';

View File

@ -0,0 +1,101 @@
import { type CloudSocket } from '../transport/cloud-socket.js';
import { createLogger, type Logger } from '../utils/index.js';
export interface LogEntry {
level: number;
time: number;
msg: string;
name?: string;
[key: string]: unknown;
}
/**
* Buffers Pino log lines and forwards them to the backend in batches
* via the existing WebSocket connection.
*
* Usage:
* const forwarder = new LogForwarder(cloudSocket);
* forwarder.start();
* // In logger setup, pipe logs through forwarder.ingest()
*/
export class LogForwarder {
private readonly logger: Logger;
private buffer: LogEntry[] = [];
private flushInterval: ReturnType<typeof setInterval> | null = null;
private readonly maxBufferSize: number;
private readonly flushIntervalMs: number;
constructor(
private readonly cloudSocket: CloudSocket,
options?: { maxBufferSize?: number; flushIntervalMs?: number },
) {
this.logger = createLogger('log-forwarder', 'info');
this.maxBufferSize = options?.maxBufferSize ?? 100;
this.flushIntervalMs = options?.flushIntervalMs ?? 5_000;
}
/**
* Start the periodic flush timer.
*/
start(): void {
this.logger.info(
{ flushIntervalMs: this.flushIntervalMs, maxBufferSize: this.maxBufferSize },
'Log forwarder started',
);
this.flushInterval = setInterval(() => {
this.flush();
}, this.flushIntervalMs);
}
/**
* Stop the forwarder and flush remaining logs.
*/
stop(): void {
this.flush();
if (this.flushInterval) {
clearInterval(this.flushInterval);
this.flushInterval = null;
}
}
/**
* Ingest a single Pino log object.
* Call this from the Pino multistream/destination hook.
*/
ingest(logObj: LogEntry): void {
this.buffer.push({
level: logObj.level,
time: logObj.time,
msg: logObj.msg,
name: logObj.name,
// Include contextual fields, strip Pino internals
...(logObj.err ? { err: String(logObj.err) } : {}),
...(logObj.deviceId ? { deviceId: logObj.deviceId } : {}),
});
// Flush immediately if buffer is full
if (this.buffer.length >= this.maxBufferSize) {
this.flush();
}
}
/**
* Send buffered logs to the backend and clear the buffer.
*/
private flush(): void {
if (this.buffer.length === 0) return;
if (!this.cloudSocket.isConnected) {
// Drop logs if not connected to avoid unbounded memory growth
if (this.buffer.length > this.maxBufferSize) {
this.buffer = this.buffer.slice(-this.maxBufferSize);
}
return;
}
const batch = this.buffer;
this.buffer = [];
this.cloudSocket.emitRaw('log_batch', { logs: batch });
}
}

View File

@ -0,0 +1,168 @@
import { readFileSync, statfsSync } from 'node:fs';
import { freemem, totalmem, loadavg } from 'node:os';
import { execSync } from 'node:child_process';
import { type CloudSocket } from '../transport/cloud-socket.js';
import { createLogger, type Logger } from '../utils/index.js';
// Must match backend HealthReportPayload
interface HealthReportPayload {
cpuTempCelsius: number;
memoryUsedMb: number;
memoryTotalMb: number;
diskUsedPercent: number;
loadAvg1m: number;
heapUsedMb: number;
heapTotalMb: number;
uptimeSeconds: number;
wifiSsid: string | null;
wifiSignalDbm: number | null;
clientVersion: string;
nodeVersion: string;
reportedAt: string;
}
/**
* Periodically collects system metrics and sends them to the backend
* via the existing Socket.IO connection.
*/
export class TelemetryReporter {
private readonly logger: Logger;
private interval: ReturnType<typeof setInterval> | null = null;
private readonly clientVersion: string;
constructor(
private readonly cloudSocket: CloudSocket,
clientVersion = '0.0.1',
) {
this.logger = createLogger('telemetry', 'info');
this.clientVersion = clientVersion;
}
/**
* Start reporting at the given interval.
* Default: every 60 seconds.
*/
start(intervalMs = 60_000): void {
this.logger.info({ intervalMs }, 'Telemetry reporter started');
// Send initial report immediately
this.report();
this.interval = setInterval(() => {
this.report();
}, intervalMs);
}
stop(): void {
if (this.interval) {
clearInterval(this.interval);
this.interval = null;
}
}
private report(): void {
if (!this.cloudSocket.isConnected) {
this.logger.debug('Skipping telemetry report: not connected');
return;
}
try {
const payload = this.collectMetrics();
// Emit via the existing socket — the backend RobotGateway
// handles 'health_report' events
this.cloudSocket.emitRaw('health_report', payload);
this.logger.debug({ payload }, 'Health report sent');
} catch (err) {
this.logger.warn({ err }, 'Failed to collect/send telemetry');
}
}
private collectMetrics(): HealthReportPayload {
const mem = process.memoryUsage();
const totalMb = totalmem() / (1024 * 1024);
const freeMb = freemem() / (1024 * 1024);
return {
cpuTempCelsius: this.getCpuTemp(),
memoryUsedMb: round(totalMb - freeMb),
memoryTotalMb: round(totalMb),
diskUsedPercent: this.getDiskUsage(),
loadAvg1m: round(loadavg()[0]),
heapUsedMb: round(mem.heapUsed / (1024 * 1024)),
heapTotalMb: round(mem.heapTotal / (1024 * 1024)),
uptimeSeconds: Math.floor(process.uptime()),
wifiSsid: this.getWifiSsid(),
wifiSignalDbm: this.getWifiSignal(),
clientVersion: this.clientVersion,
nodeVersion: process.version,
reportedAt: new Date().toISOString(),
};
}
/**
* Read CPU temperature from thermal zone (Linux only).
*/
private getCpuTemp(): number {
try {
const raw = readFileSync('/sys/class/thermal/thermal_zone0/temp', 'utf-8');
return round(parseInt(raw, 10) / 1000);
} catch {
return -1;
}
}
/**
* Get disk usage for the root partition.
*/
private getDiskUsage(): number {
try {
const stats = statfsSync('/');
const totalBlocks = stats.blocks;
const freeBlocks = stats.bfree;
return round(((totalBlocks - freeBlocks) / totalBlocks) * 100);
} catch {
return -1;
}
}
/**
* Get current WiFi SSID via nmcli.
*/
private getWifiSsid(): string | null {
try {
const result = execSync('nmcli -t -f active,ssid dev wifi', {
encoding: 'utf-8',
timeout: 3000,
});
const active = result.split('\n').find((l) => l.startsWith('yes:'));
return active ? active.split(':')[1] || null : null;
} catch {
return null;
}
}
/**
* Get WiFi signal strength in dBm via nmcli.
*/
private getWifiSignal(): number | null {
try {
const result = execSync('nmcli -t -f active,signal dev wifi', {
encoding: 'utf-8',
timeout: 3000,
});
const active = result.split('\n').find((l) => l.startsWith('yes:'));
if (!active) return null;
const signal = parseInt(active.split(':')[1], 10);
// nmcli reports signal as 0-100 percentage; approximate dBm
// -30 dBm = 100%, -90 dBm = 0%
return Math.round(-90 + (signal / 100) * 60);
} catch {
return null;
}
}
}
function round(n: number, decimals = 1): number {
const factor = Math.pow(10, decimals);
return Math.round(n * factor) / factor;
}

View File

@ -174,6 +174,19 @@ export class CloudSocket extends EventEmitter {
this.socket.emit('user_interrupt');
}
/**
* Emit a raw event on the underlying socket.
* Used by services (e.g. TelemetryReporter) that need to send
* custom events to the backend.
*/
emitRaw(event: string, data: unknown): void {
if (!this.socket?.connected) {
this.logger.warn(`Cannot emit ${event}: not connected`);
return;
}
this.socket.emit(event, data);
}
/**
* Disconnect from the cloud backend.
*/

View File

@ -1 +1 @@
export { createLogger, type Logger } from './logger.js';
export { createLogger, setLogForwarder, type Logger } from './logger.js';

View File

@ -1,7 +1,19 @@
import pino from 'pino';
import type { LogForwarder, LogEntry } from '../services/log-forwarder.js';
let _logForwarder: LogForwarder | null = null;
/**
* Register a LogForwarder instance.
* Once registered, all loggers created via `createLogger` will
* also forward their output to the cloud backend.
*/
export function setLogForwarder(forwarder: LogForwarder): void {
_logForwarder = forwarder;
}
export function createLogger(name: string, level = 'info') {
return pino({
const logger = pino({
name,
level,
transport:
@ -9,6 +21,58 @@ export function createLogger(name: string, level = 'info') {
? { target: 'pino-pretty', options: { colorize: true } }
: undefined,
});
// Wrap the logger to also forward logs when a forwarder is set.
// We hook into Pino's internal write by using `onChild` isn't available,
// so we use a proxy on the logging methods.
return new Proxy(logger, {
get(target, prop, receiver) {
const value = Reflect.get(target, prop, receiver);
// Intercept logging methods: trace, debug, info, warn, error, fatal
if (
_logForwarder &&
typeof prop === 'string' &&
['trace', 'debug', 'info', 'warn', 'error', 'fatal'].includes(prop) &&
typeof value === 'function'
) {
return (...args: unknown[]) => {
// Call original
(value as (...a: unknown[]) => void).apply(target, args);
// Forward to the cloud
try {
const levelNum = target.levels.values[prop] ?? 30;
let msg = '';
let extra: Record<string, unknown> = {};
if (typeof args[0] === 'string') {
msg = args[0];
} else if (typeof args[0] === 'object' && args[0] !== null) {
extra = args[0] as Record<string, unknown>;
if (typeof args[1] === 'string') {
msg = args[1];
}
}
const entry: LogEntry = {
level: levelNum,
time: Date.now(),
msg,
name,
...extra,
};
_logForwarder!.ingest(entry);
} catch {
// Never let forwarding break the app
}
};
}
return value;
},
});
}
export type Logger = pino.Logger;