Skip to content

Operations ↔ Schedule Integration

The Operations module integrates with the Schedule Module to fire deferred in-process notifications when an event or task starts or ends. The integration is best-effort: a scheduling failure never rolls back the underlying domain operation (create/update/delete). All scheduling errors are logged but swallowed at the command-handler boundary.

What Triggers a Schedule

Every command that mutates an Event or Task calls INotificationRuleScheduler after persisting the aggregate. The scheduler decides which rules to create, update, or cancel based on the subject's current and previous dates.

CommandScheduler call
CreateEventCommandonSubjectCreated(event)
UpdateEventCommandonSubjectDatesChanged(event, prevDates)
DeleteEventCommandonSubjectDeleted(SubjectType.Event, id)
CreateTaskCommandonSubjectCreated(task)
UpdateTaskCommandonSubjectDatesChanged(task, prevDates)
DeleteTaskCommandonSubjectDeleted(SubjectType.Task, id)

Date presence governs rule creation

onSubjectCreated iterates over [start, end] kinds. For each kind, it resolves the anchor date (e.g., event.startDate for start, task.finishedAt for end). If the anchor is null, no rule is created for that kind.

This means a task created with no startedAt / finishedAt produces zero notification rules. Rules are created only when the anchor date is available.

How Rules Are Persisted

Each scheduled notification is represented by a NotificationRule domain entity stored in operations.notification_rules. A single subject (event or task) may have up to two rules: one for start and one for end.

NotificationRule fields

FieldTypeDescription
idUUIDv7Auto-generated primary key
subjectType'event' | 'task'The type of the originating subject
subjectIdUUIDID of the originating event or task
kind'start' | 'end'Whether this rule targets the start or end date
ruleJSONBDiscriminated union ({ type: 'at_subject_date', anchor: 'start' | 'end' })
scheduleIdUUID | nullID of the corresponding ScheduledJob in the Schedule module
syncStatus'pending' | 'synced' | 'failed'Current sync state with the Schedule module
syncErrortext | nullLast error message if sync failed
channelsvarchar[]Notification channels (default: ['email'])
activebooleanWhether the rule is still active (soft-deleted rules have active = false)
deletedAttimestamptz | nullSoft-delete timestamp

Write sequence for a new rule

  1. NotificationRule.new(...) — creates entity in Pending state (scheduleId = null).
  2. repo.save(rule) — persists the row; the ID is now available for the Schedule payload.
  3. scheduleService.scheduleAt(spec) — submits the job to the Schedule module.
  4. If scheduling succeeds: rule.markSynced(job.id)repo.save(rule) — updates scheduleId and syncStatus = 'synced'.
  5. If scheduling fails: rule.markFailed(error.message)repo.save(rule) — records syncStatus = 'failed' with the error message.

This two-save pattern is intentional: the first save ensures the rule row exists (with its ID) before calling the external schedule service. A later reconciler (not yet implemented) can use syncStatus = 'failed' or syncStatus = 'pending' rows to retry.

The syncStatus Lifecycle

                   ┌─────────────────────────────────┐
                   │                                 │
  new rule ──►  Pending ──► scheduleAt() OK ──►  Synced

                   └────► scheduleAt() fails ──►  Failed

                   ┌─────────────────────────────────┘

              (retry path — not yet implemented)

                Pending (markPending) ──► scheduleAt() ──► Synced / Failed

State transitions:

MethodFromToSide-effect
NotificationRule.new()PendingscheduleId = null, syncError = null
markSynced(jobId)anySyncedscheduleId = jobId, syncError = null
markFailed(reason)anyFailedscheduleId = null, syncError = reason
markPending()anyPendingsyncError = null (for retry flows)

Topic Naming

Schedule jobs created by this module use the topic pattern:

operations.{subjectType}.{kind}

Examples:

  • operations.event.start — fires when an event's start date arrives
  • operations.event.end — fires when an event's end date arrives
  • operations.task.start — fires when a task's start date arrives
  • operations.task.end — fires when a task's end date arrives

Topics are emitted by the Schedule module as schedule.arrived events. Future notification consumers should listen on these topic-specific aliases.

Payload Contract (ID-based, no snapshot)

The payload stored in the scheduled job contains only stable identifiers — no mutable domain data:

ts
{
  subjectType: 'event' | 'task',   // type of the originating subject
  subjectId: string,                // UUID of the event or task
  notificationRuleId: string,       // UUID of the NotificationRule row
  kind: 'start' | 'end',           // which anchor this job represents
}

When the job fires, consumers must fetch the current state of the subject from the database using subjectId. This ensures the payload is always valid even if the subject was updated after the job was scheduled.

All-Day Events

For all-day events (isAllDay = true), the runAt instant is set to 12:00:00 UTC on the anchor date:

runAt = Date.UTC(year, month, day, 12, 0, 0)

For non-all-day events and tasks, runAt equals the anchor date as-is (already a UTC instant).

Date-Change Reconciliation

When onSubjectDatesChanged is called, the scheduler reconciles the existing rules for that subject:

ScenarioAction
Existing rule + anchor date removed (null)Cancel the schedule job + soft-delete the rule
No existing rule + anchor date newly addedCreate a new rule and schedule job
Existing rule + anchor date changedReschedule the existing job (or create a fresh one if scheduleId is null); mark synced/failed
Existing rule + anchor date unchangedNo-op (skipped)

Best-Effort Resilience Model

The scheduler integration is best-effort. Command handlers call scheduler.onSubjectCreated(...) (etc.) with .catch(...) — any exception that escapes NotificationRuleSchedulerService is logged to the request's LogContext under scheduler_errors but never rethrown. The domain operation (create/update/delete) always succeeds regardless of scheduling outcome.

Internal to NotificationRuleSchedulerService, individual rule failures are also isolated: if scheduling one rule fails, the others are still attempted.

Current Limitations

  • UTC-only: The timezone field in all schedule jobs is hardcoded to 'UTC'. Organization or user timezone preferences are not considered yet.
  • No custom rules yet: Only at_subject_date rules are supported. Relative rules (e.g., "24 hours before") are not implemented.
  • No notifications module integration: See Email delivery on schedule arrival below — this limitation has been resolved.
  • No reconciler: NotificationRule rows with syncStatus = 'failed' or syncStatus = 'pending' are not automatically retried. A reconciler job (similar to ScheduleReconcilerService) is planned.
  • No public API: NotificationRule is an internal aggregate. There are no HTTP endpoints for listing, updating, or deleting rules directly.
  • Single channel: channels defaults to ['email']. Multi-channel delivery is not implemented.

Email Delivery on Schedule Arrival

When a scheduled job fires, the Schedule module emits a schedule.arrived in-process event (via EventEmitter2). ScheduleArrivedOperationsListener — registered in OperationsModule — intercepts that event and delivers the notification over SMTP.

End-to-end flow

BullMQ Worker (schedule queue)
  → ScheduleProcessor._processJob
    → publisher.emitArrived (awaited)
      → EventEmitter2.emitAsync('schedule.arrived', ev) (awaited)
        → ScheduleArrivedOperationsListener.handle(ev)
            1. topic filter          — only operations.* topics proceed
            2. payload parse         — validates { subjectType, subjectId, notificationRuleId, kind }
            3. rule lookup           — findByIdIncludingDeleted (soft-deleted rules are still readable)
            4. rule guards           — canceled / inactive / already-delivered → skip + log
            5. subject hydration     — OperationsSubjectHydrator resolves title, whenLabel, recipients
            6. recipient check       — empty → skip + warn
            7. sendEmail             — INotificationSender → template render → SMTP per recipient
            8. markSent              — notificationRuleRepo.markSent(id, new Date())
  ← throw on SMTP/persist failure → BullMQ retry

Idempotency guard via lastSentAt

NotificationRule has a nullable lastSentAt: Date | null column (last_sent_at timestamptz NULL in operations.notification_rules). Before calling sendEmail, the listener checks:

ts
if (rule.lastSentAt !== null) {
  // log 'already-delivered' and return — no duplicate send
}

After a successful send, notificationRuleRepo.markSent(id, new Date()) persists the timestamp. Because BullMQ delivers each job at least once, this guard prevents duplicate emails on retry.

Soft-delete semantics

The listener calls findByIdIncludingDeleted — not findById — so it can distinguish three states:

StateConditionAction
Missingnull returnedWARN log rule-missing, return
CanceleddeletedAt !== nullINFO log rule-canceled, return gracefully
ActivedeletedAt === nullProceed with delivery

Soft-deleted rules are never retried — the listener exits cleanly instead of throwing, so BullMQ does not re-queue the job.

Recipient resolution

OperationsSubjectHydrator resolves recipients differently per subject type:

Subject typeRecipientsDeduplication
eventcreator + participantsby email (case-insensitive)
taskcreator + relatedUsersby email (case-insensitive)

If the resolved set is empty (e.g., all users lack an email), the listener logs no-recipients and returns without sending.

Retry semantics via BullMQ

The listener wraps the rule-lookup-through-markSent pipeline in try/catch. If sendEmail or markSent throws, the error is logged and rethrown. This propagates up through EventEmitter2.emitAsyncemitArrivedScheduleProcessor._processJob, which catches and rethrows, causing BullMQ to re-queue the job for retry according to the queue's backoff policy.

The idempotency guard (lastSentAt) ensures that if a retry fires after a partial success (email sent but markSent failed), the second attempt detects lastSentAt !== null and skips gracefully.

Observability: structured log events

Log keyLevelAttributes
notifications.operations.skippedwarn / inforeason, notificationRuleId, contextual fields
notifications.operations.sentinforecipientCount, notificationRuleId, subjectType, subjectId, kind
notifications.operations.failederrorerrorName, errorMessage, notificationRuleId
notifications.sender.sentinfotemplateKey, recipient
notifications.sender.failederrortemplateKey, recipient, provider

Reason codes for skipped:

CodeMeaning
invalid-payloadBullMQ job payload failed validation
rule-missingNo NotificationRule row found for the given ID
rule-canceledRule row exists but deletedAt is set
rule-inactiveRule active = false
already-deliveredlastSentAt is non-null (idempotency guard)
subject-missingHydrator returned null (event/task deleted)
no-recipientsRecipient list resolved to zero entries

Migration: last_sent_at column

Before deploying this feature, the following migration must be applied:

sql
ALTER TABLE operations.notification_rules
  ADD COLUMN last_sent_at timestamptz NULL;

Generate the TypeORM migration file with:

bash
pnpm --filter api typeorm migration:generate \
  apps/api/src/modules/operations/infrastructure/migrations/AddNotificationRuleLastSentAt

Then run migrations before the new version goes live. See the migration runbook for step-by-step operator instructions.

Cross-reference