Migrations :
- relance_tasks (uuid id, organization_id FK CASCADE [scope direct sans join], invoice_id FK CASCADE, plan_step_id FK RESTRICT, send_at, status ENUM scheduled/sent/cancelled/failed, sent_at, queue_job_id pour cancel via BullMQ.remove). Indexes (org,status), (invoice_id), (send_at).
- checkin_tasks (uuid id, org_id, invoice_id, send_at, token_hash unique [SHA-256 du HMAC, TTL 24h], status ENUM scheduled/sent/answered/expired, answer 'paid'|'still_pending'). Pas encore branché — flow check-in arrivera dans un commit séparé (cf. backend.md §13.3).
Schema rules : status enums + answer typés.
Models RelanceTask + CheckinTask avec belongsTo Invoice / PlanStep.
Service relance_scheduler.ts :
- scheduleRelancesForInvoice(invoice) : pour chaque step du plan, calcule sendAt = dueDate + offsetDays. Si sendAt < now (facture importée en retard), on programme à `now + 1min` plutôt que skip — l'utilisateur "rattrape" une dette de relance, l'envoi immédiat est cohérent. Crée la RelanceTask + enqueue BullMQ avec delay, retry 5x exponential, jobId = `relance:<taskId>` pour idempotency. Cancelle les tasks scheduled existantes avant de re-programmer (gestion changement de plan).
- cancelFutureRelances(invoiceId, trx) : appelé par mark-paid pour stopper la chaîne.
Service queue.ts :
- getQueue(name) singleton lazy par queue
- registerWorker(name, handler) avec concurrency 5, log failed/completed
- shutdownQueue() pour le terminating hook Adonis
start/queue.ts (preload) : registerWorker('relances', sendRelanceJob) seulement quand `app.getEnvironment() === 'web'` (pas en tests/REPL — pas de connexion Redis pendant Japa).
Job send_relance_job.ts :
- Idempotent : si task.status !== 'scheduled', no-op
- Hook critique : si invoice paid/cancelled entre-temps, task.status = cancelled
- Mise en demeure (step.requiresManualValidation) : on n'envoie PAS, on log un activity_event 'warning_drafted' (cf. CLAUDE.md → Principes : validation manuelle obligatoire)
- Sinon : sendRelanceEmail + task.status=sent + invoice.rubisEarned+1 + organizations.rubis_count+1 + activity_event 'relance_sent'. Si invoice.status='pending', passe en 'in_relance' (sortie de l'état silencieux).
Service mail_dispatcher.ts : sendRelanceEmail interpole step.subject/body via mini moteur Mustache-like (renderTemplate, services/template.ts) avec {{client.name}}/{{numero}}/{{amount}}/{{dueDate}}/{{signature}}, puis @adonisjs/mail.use(MAIL_DRIVER) → Mailpit en dev, Resend en prod. Texte brut V1.
Triggers branchés :
- InvoicesController.store : si planId, scheduleRelancesForInvoice après création
- ImportBatchesController.validateDraft : pareil
- InvoicesController.markPaid : cancelFutureRelances dans la même tx que le paiement
#jobs/* ajouté aux imports package.json. Adonisrc preload start/queue.ts.
Bruno : doc 05-Invoices/04 Create maj avec instructions pour tester l'envoi immédiat (dueDate dans le passé → relance à now+1min → email visible dans Mailpit http://localhost:8025).
61 lines
1.9 KiB
TypeScript
61 lines
1.9 KiB
TypeScript
import { Queue, Worker, type Processor } from 'bullmq'
|
|
import { redisConnection, queueNames, type QueueName } from '#config/queue'
|
|
import logger from '@adonisjs/core/services/logger'
|
|
|
|
/**
|
|
* Wrappers BullMQ partagés. Chaque queue a 1 instance Queue (producer)
|
|
* et N workers (consumers) avec le bon handler.
|
|
*
|
|
* V1 : on garde tout en mémoire process — workers et HTTP partagent le
|
|
* même Node. Quand le volume justifie le coût, on extrait les workers
|
|
* dans un Deployment K3s séparé (cf. backend.md §13.4).
|
|
*/
|
|
|
|
const queues = new Map<QueueName, Queue>()
|
|
const workers: Worker[] = []
|
|
|
|
export function getQueue(name: QueueName): Queue {
|
|
let q = queues.get(name)
|
|
if (!q) {
|
|
q = new Queue(name, { connection: redisConnection })
|
|
queues.set(name, q)
|
|
}
|
|
return q
|
|
}
|
|
|
|
export type JobHandler<T = unknown> = Processor<T>
|
|
|
|
/**
|
|
* Enregistre un Worker BullMQ sur une queue. Démarre tout de suite.
|
|
* Appelé par start/queue.ts au boot pour câbler les handlers.
|
|
*/
|
|
export function registerWorker<T = unknown>(name: QueueName, handler: JobHandler<T>): Worker {
|
|
const worker = new Worker<T>(name, handler, {
|
|
connection: redisConnection,
|
|
concurrency: 5,
|
|
})
|
|
worker.on('failed', (job, err) => {
|
|
logger.error({ err, queue: name, jobId: job?.id }, 'job failed')
|
|
})
|
|
worker.on('completed', (job) => {
|
|
logger.info({ queue: name, jobId: job.id }, 'job completed')
|
|
})
|
|
workers.push(worker)
|
|
return worker
|
|
}
|
|
|
|
/**
|
|
* Stoppe proprement tous les workers + queues. Appelé au shutdown du
|
|
* process via Adonis terminating hook.
|
|
*/
|
|
export async function shutdownQueue(): Promise<void> {
|
|
await Promise.all(workers.map((w) => w.close()))
|
|
await Promise.all(Array.from(queues.values()).map((q) => q.close()))
|
|
}
|
|
|
|
/**
|
|
* Liste des noms de queue (re-export du config pour ne pas exposer la
|
|
* connection Redis ailleurs dans l'app).
|
|
*/
|
|
export const QUEUES = queueNames
|