Appearance
Job Scheduling
How to Schedule a Job
Inject IScheduleService using the ScheduleServiceKey DI token, then call scheduleAt or scheduleRepeat.
One-shot job (fires once at a specific instant)
ts
import { Inject, Injectable } from '@nestjs/common';
import {
IScheduleService,
IScheduleOneShotSpec,
ScheduleServiceKey,
} from '@api/modules/schedule/domain/services/schedule.service.interface';
@Injectable()
export class ReminderService {
constructor(
@Inject(ScheduleServiceKey)
private readonly scheduleService: IScheduleService,
) {}
async scheduleTaskReminder(userId: string, runAt: Date): Promise<void> {
const spec: IScheduleOneShotSpec = {
topic: 'notifications.task-reminder',
runAt, // must be a future UTC instant
timezone: 'America/Mexico_City', // IANA string — echoed in the event
payload: { userId, message: 'Your task is due soon' },
metadata: { ownerId: userId },
// optional: retry: { attempts: 3, backoff: { type: 'fixed', delay: 5_000 } },
};
await this.scheduleService.scheduleAt(spec);
}
}Recurring job (cron or interval)
ts
await this.scheduleService.scheduleRepeat({
topic: 'ai.agent-heartbeat',
timezone: 'UTC',
payload: { agentId: 'agent-xyz' },
repeat: { type: 'cron', expression: '0 9 * * 1-5', timezone: 'America/Mexico_City' },
metadata: { correlationId: newUUID() },
});For fixed-interval jobs, use { type: 'interval', everyMs: 60_000 }.
How to Listen for a Fired Job
Use @OnEvent from @nestjs/event-emitter. Both schedule.arrived (all jobs) and schedule.<topic>.arrived (per-topic alias) are emitted on every fire.
ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
SCHEDULE_ARRIVED_EVENT,
IScheduleArrivedEvent,
scheduleArrivedEventName,
} from '@api/modules/schedule/domain/events/schedule-arrived.event.interface';
@Injectable()
export class TaskReminderListener {
// Listens to all schedule firings (uniform event)
@OnEvent(SCHEDULE_ARRIVED_EVENT)
async onAnySchedule(event: IScheduleArrivedEvent): Promise<void> {
if (event.topic !== 'notifications.task-reminder') return;
// handle...
}
// Listens only to this specific topic (alias event)
@OnEvent(scheduleArrivedEventName('notifications.task-reminder'))
async onTaskReminder(event: IScheduleArrivedEvent): Promise<void> {
const { userPayload, metadata, timezone, firedAt } = event;
// handle reminder...
}
}Listeners must be idempotent. Under at-least-once semantics, the same event may arrive more than once (crash + replay, retry). Use event.scheduledJobId as a deduplication key if needed.
If no listener is registered for a topic, the job transitions to completed — the scheduler does not require subscribers.
Cancellation and Rescheduling
ts
// Cancel a pending or active job (idempotent — no error if already cancelled or completed)
await this.scheduleService.cancel(jobId);
// Replace with a new spec — returns a new IScheduledJob with a new id
const newJob = await this.scheduleService.reschedule(jobId, newSpec);
// Query
const job = await this.scheduleService.getById(jobId);
const page = await this.scheduleService.list({
ownerId: userId,
status: ['pending', 'active'],
limit: 20,
});Authorization Boundary
The Schedule module is internal — it has no HTTP controllers and performs no authentication checks. Metadata fields (ownerId, tenantId, correlationId, clientRequestId) are for audit, filtering, and tracing. They are NOT used for access control.
Callers must authorize the operation before calling IScheduleService. When exposing scheduling actions via HTTP, authorization guards and principal extraction belong in the controller layer, not in the service.
Architecture: Dual-Store
The module uses two stores for durability:
Postgres (schedule_jobs) ← Source of truth for state, history, list(), getById()
BullMQ / Redis ← Execution substrate — timing, delay, cron evaluation, retryWrite order for scheduleAt / scheduleRepeat:
- Begin Postgres transaction.
- Insert
schedule_jobsrow (status = 'pending'). - Enqueue BullMQ job (inside the transaction callback).
- Commit. If the BullMQ enqueue throws, the transaction rolls back — no orphan rows.
- The rare case where commit succeeds but BullMQ had already accepted: a compensating delete is attempted, then the reconciler heals on next boot.
Boot reconciler (ScheduleReconcilerService.onApplicationBootstrap):
- Queries all rows with
status IN ('pending', 'active'). - For each row, checks whether the corresponding BullMQ entry exists.
- Missing entries are re-enqueued. If
runAtis already in the past, the job fires immediately (delay = 0). - Logs a summary:
{ scanned, healed, skipped }. - Runs once per process start — not continuously.
This means a Redis flush is self-healing on the next restart.
In-Process Worker
The BullMQ worker runs inside the same Node.js process as the API (single-VPS assumption). Events are delivered via @nestjs/event-emitter (in-process). Scaling to multiple API instances requires replacing the event publisher with a Redis Pub/Sub bridge — the IScheduleEventPublisherPort abstraction makes this a one-adapter swap.
Idempotency (clientRequestId)
If a clientRequestId is supplied, it must be globally unique across all jobs in all statuses. Passing a duplicate returns SCHEDULE_CLIENT_REQUEST_ID_IN_USE — there is no "return existing" path. Always mint a fresh UUIDv7 for each scheduling intent.
Timezone Model
- Every schedule creation requires a valid IANA
timezonestring. runAtand all stored timestamps are UTC (timestamptz). Thetimezonestring is stored verbatim.- For cron jobs, the expression is evaluated in the given timezone by BullMQ's native
tzsupport. - The
timezoneis echoed in everyIScheduleArrivedEvent. Listeners that need to format the time for display must convert using this field.