Skip to content

Job Scheduling

How to Schedule a Job

Inject IScheduleService using the ScheduleServiceKey DI token, then call scheduleAt or scheduleRepeat.

One-shot job (fires once at a specific instant)

ts
import { Inject, Injectable } from '@nestjs/common';
import {
  IScheduleService,
  IScheduleOneShotSpec,
  ScheduleServiceKey,
} from '@api/modules/schedule/domain/services/schedule.service.interface';

@Injectable()
export class ReminderService {
  constructor(
    @Inject(ScheduleServiceKey)
    private readonly scheduleService: IScheduleService,
  ) {}

  async scheduleTaskReminder(userId: string, runAt: Date): Promise<void> {
    const spec: IScheduleOneShotSpec = {
      topic: 'notifications.task-reminder',
      runAt,                              // must be a future UTC instant
      timezone: 'America/Mexico_City',    // IANA string — echoed in the event
      payload: { userId, message: 'Your task is due soon' },
      metadata: { ownerId: userId },
      // optional: retry: { attempts: 3, backoff: { type: 'fixed', delay: 5_000 } },
    };

    await this.scheduleService.scheduleAt(spec);
  }
}

Recurring job (cron or interval)

ts
await this.scheduleService.scheduleRepeat({
  topic: 'ai.agent-heartbeat',
  timezone: 'UTC',
  payload: { agentId: 'agent-xyz' },
  repeat: { type: 'cron', expression: '0 9 * * 1-5', timezone: 'America/Mexico_City' },
  metadata: { correlationId: newUUID() },
});

For fixed-interval jobs, use { type: 'interval', everyMs: 60_000 }.

How to Listen for a Fired Job

Use @OnEvent from @nestjs/event-emitter. Both schedule.arrived (all jobs) and schedule.<topic>.arrived (per-topic alias) are emitted on every fire.

ts
import { Injectable } from '@nestjs/common';
import { OnEvent } from '@nestjs/event-emitter';
import {
  SCHEDULE_ARRIVED_EVENT,
  IScheduleArrivedEvent,
  scheduleArrivedEventName,
} from '@api/modules/schedule/domain/events/schedule-arrived.event.interface';

@Injectable()
export class TaskReminderListener {
  // Listens to all schedule firings (uniform event)
  @OnEvent(SCHEDULE_ARRIVED_EVENT)
  async onAnySchedule(event: IScheduleArrivedEvent): Promise<void> {
    if (event.topic !== 'notifications.task-reminder') return;
    // handle...
  }

  // Listens only to this specific topic (alias event)
  @OnEvent(scheduleArrivedEventName('notifications.task-reminder'))
  async onTaskReminder(event: IScheduleArrivedEvent): Promise<void> {
    const { userPayload, metadata, timezone, firedAt } = event;
    // handle reminder...
  }
}

Listeners must be idempotent. Under at-least-once semantics, the same event may arrive more than once (crash + replay, retry). Use event.scheduledJobId as a deduplication key if needed.

If no listener is registered for a topic, the job transitions to completed — the scheduler does not require subscribers.

Cancellation and Rescheduling

ts
// Cancel a pending or active job (idempotent — no error if already cancelled or completed)
await this.scheduleService.cancel(jobId);

// Replace with a new spec — returns a new IScheduledJob with a new id
const newJob = await this.scheduleService.reschedule(jobId, newSpec);

// Query
const job = await this.scheduleService.getById(jobId);
const page = await this.scheduleService.list({
  ownerId: userId,
  status: ['pending', 'active'],
  limit: 20,
});

Authorization Boundary

The Schedule module is internal — it has no HTTP controllers and performs no authentication checks. Metadata fields (ownerId, tenantId, correlationId, clientRequestId) are for audit, filtering, and tracing. They are NOT used for access control.

Callers must authorize the operation before calling IScheduleService. When exposing scheduling actions via HTTP, authorization guards and principal extraction belong in the controller layer, not in the service.

Architecture: Dual-Store

The module uses two stores for durability:

Postgres (schedule_jobs)  ←  Source of truth for state, history, list(), getById()
BullMQ / Redis            ←  Execution substrate — timing, delay, cron evaluation, retry

Write order for scheduleAt / scheduleRepeat:

  1. Begin Postgres transaction.
  2. Insert schedule_jobs row (status = 'pending').
  3. Enqueue BullMQ job (inside the transaction callback).
  4. Commit. If the BullMQ enqueue throws, the transaction rolls back — no orphan rows.
  5. The rare case where commit succeeds but BullMQ had already accepted: a compensating delete is attempted, then the reconciler heals on next boot.

Boot reconciler (ScheduleReconcilerService.onApplicationBootstrap):

  1. Queries all rows with status IN ('pending', 'active').
  2. For each row, checks whether the corresponding BullMQ entry exists.
  3. Missing entries are re-enqueued. If runAt is already in the past, the job fires immediately (delay = 0).
  4. Logs a summary: { scanned, healed, skipped }.
  5. Runs once per process start — not continuously.

This means a Redis flush is self-healing on the next restart.

In-Process Worker

The BullMQ worker runs inside the same Node.js process as the API (single-VPS assumption). Events are delivered via @nestjs/event-emitter (in-process). Scaling to multiple API instances requires replacing the event publisher with a Redis Pub/Sub bridge — the IScheduleEventPublisherPort abstraction makes this a one-adapter swap.

Idempotency (clientRequestId)

If a clientRequestId is supplied, it must be globally unique across all jobs in all statuses. Passing a duplicate returns SCHEDULE_CLIENT_REQUEST_ID_IN_USE — there is no "return existing" path. Always mint a fresh UUIDv7 for each scheduling intent.

Timezone Model

  • Every schedule creation requires a valid IANA timezone string.
  • runAt and all stored timestamps are UTC (timestamptz). The timezone string is stored verbatim.
  • For cron jobs, the expression is evaluated in the given timezone by BullMQ's native tz support.
  • The timezone is echoed in every IScheduleArrivedEvent. Listeners that need to format the time for display must convert using this field.