import { Queue, Worker, type Processor } from 'bullmq' import { redisConnection, queueNames, type QueueName } from '#config/queue' import logger from '@adonisjs/core/services/logger' /** * Wrappers BullMQ partagés. Chaque queue a 1 instance Queue (producer) * et N workers (consumers) avec le bon handler. * * V1 : on garde tout en mémoire process — workers et HTTP partagent le * même Node. Quand le volume justifie le coût, on extrait les workers * dans un Deployment K3s séparé (cf. backend.md §13.4). */ const queues = new Map() const workers: Worker[] = [] export function getQueue(name: QueueName): Queue { let q = queues.get(name) if (!q) { q = new Queue(name, { connection: redisConnection }) queues.set(name, q) } return q } export type JobHandler = Processor /** * Enregistre un Worker BullMQ sur une queue. Démarre tout de suite. * Appelé par start/queue.ts au boot pour câbler les handlers. */ export function registerWorker(name: QueueName, handler: JobHandler): Worker { const worker = new Worker(name, handler, { connection: redisConnection, concurrency: 5, }) worker.on('failed', (job, err) => { logger.error({ err, queue: name, jobId: job?.id }, 'job failed') }) worker.on('completed', (job) => { logger.info({ queue: name, jobId: job.id }, 'job completed') }) workers.push(worker) return worker } /** * Stoppe proprement tous les workers + queues. Appelé au shutdown du * process via Adonis terminating hook. */ export async function shutdownQueue(): Promise { await Promise.all(workers.map((w) => w.close())) await Promise.all(Array.from(queues.values()).map((q) => q.close())) } /** * Liste des noms de queue (re-export du config pour ne pas exposer la * connection Redis ailleurs dans l'app). */ export const QUEUES = queueNames