Appearance
Operations ↔ Schedule Integration
The Operations module integrates with the Schedule Module to fire deferred in-process notifications when an event or task starts or ends. The integration is best-effort: a scheduling failure never rolls back the underlying domain operation (create/update/delete). All scheduling errors are logged but swallowed at the command-handler boundary.
What Triggers a Schedule
Every command that mutates an Event or Task calls INotificationRuleScheduler after persisting the aggregate. The scheduler decides which rules to create, update, or cancel based on the subject's current and previous dates.
| Command | Scheduler call |
|---|---|
CreateEventCommand | onSubjectCreated(event) |
UpdateEventCommand | onSubjectDatesChanged(event, prevDates) |
DeleteEventCommand | onSubjectDeleted(SubjectType.Event, id) |
CreateTaskCommand | onSubjectCreated(task) |
UpdateTaskCommand | onSubjectDatesChanged(task, prevDates) |
DeleteTaskCommand | onSubjectDeleted(SubjectType.Task, id) |
Date presence governs rule creation
onSubjectCreated iterates over [start, end] kinds. For each kind, it resolves the anchor date (e.g., event.startDate for start, task.finishedAt for end). If the anchor is null, no rule is created for that kind.
This means a task created with no startedAt / finishedAt produces zero notification rules. Rules are created only when the anchor date is available.
How Rules Are Persisted
Each scheduled notification is represented by a NotificationRule domain entity stored in operations.notification_rules. A single subject (event or task) may have up to two rules: one for start and one for end.
NotificationRule fields
| Field | Type | Description |
|---|---|---|
id | UUIDv7 | Auto-generated primary key |
subjectType | 'event' | 'task' | The type of the originating subject |
subjectId | UUID | ID of the originating event or task |
kind | 'start' | 'end' | Whether this rule targets the start or end date |
rule | JSONB | Discriminated union ({ type: 'at_subject_date', anchor: 'start' | 'end' }) |
scheduleId | UUID | null | ID of the corresponding ScheduledJob in the Schedule module |
syncStatus | 'pending' | 'synced' | 'failed' | Current sync state with the Schedule module |
syncError | text | null | Last error message if sync failed |
channels | varchar[] | Notification channels (default: ['email']) |
active | boolean | Whether the rule is still active (soft-deleted rules have active = false) |
deletedAt | timestamptz | null | Soft-delete timestamp |
Write sequence for a new rule
NotificationRule.new(...)— creates entity inPendingstate (scheduleId = null).repo.save(rule)— persists the row; the ID is now available for the Schedule payload.scheduleService.scheduleAt(spec)— submits the job to the Schedule module.- If scheduling succeeds:
rule.markSynced(job.id)→repo.save(rule)— updatesscheduleIdandsyncStatus = 'synced'. - If scheduling fails:
rule.markFailed(error.message)→repo.save(rule)— recordssyncStatus = 'failed'with the error message.
This two-save pattern is intentional: the first save ensures the rule row exists (with its ID) before calling the external schedule service. A later reconciler (not yet implemented) can use syncStatus = 'failed' or syncStatus = 'pending' rows to retry.
The syncStatus Lifecycle
┌─────────────────────────────────┐
│ │
new rule ──► Pending ──► scheduleAt() OK ──► Synced
│
└────► scheduleAt() fails ──► Failed
│
┌─────────────────────────────────┘
│
(retry path — not yet implemented)
▼
Pending (markPending) ──► scheduleAt() ──► Synced / FailedState transitions:
| Method | From | To | Side-effect |
|---|---|---|---|
NotificationRule.new() | — | Pending | scheduleId = null, syncError = null |
markSynced(jobId) | any | Synced | scheduleId = jobId, syncError = null |
markFailed(reason) | any | Failed | scheduleId = null, syncError = reason |
markPending() | any | Pending | syncError = null (for retry flows) |
Topic Naming
Schedule jobs created by this module use the topic pattern:
operations.{subjectType}.{kind}Examples:
operations.event.start— fires when an event's start date arrivesoperations.event.end— fires when an event's end date arrivesoperations.task.start— fires when a task's start date arrivesoperations.task.end— fires when a task's end date arrives
Topics are emitted by the Schedule module as schedule.arrived events. Future notification consumers should listen on these topic-specific aliases.
Payload Contract (ID-based, no snapshot)
The payload stored in the scheduled job contains only stable identifiers — no mutable domain data:
ts
{
subjectType: 'event' | 'task', // type of the originating subject
subjectId: string, // UUID of the event or task
notificationRuleId: string, // UUID of the NotificationRule row
kind: 'start' | 'end', // which anchor this job represents
}When the job fires, consumers must fetch the current state of the subject from the database using subjectId. This ensures the payload is always valid even if the subject was updated after the job was scheduled.
All-Day Events
For all-day events (isAllDay = true), the runAt instant is set to 12:00:00 UTC on the anchor date:
runAt = Date.UTC(year, month, day, 12, 0, 0)For non-all-day events and tasks, runAt equals the anchor date as-is (already a UTC instant).
Date-Change Reconciliation
When onSubjectDatesChanged is called, the scheduler reconciles the existing rules for that subject:
| Scenario | Action |
|---|---|
Existing rule + anchor date removed (null) | Cancel the schedule job + soft-delete the rule |
| No existing rule + anchor date newly added | Create a new rule and schedule job |
| Existing rule + anchor date changed | Reschedule the existing job (or create a fresh one if scheduleId is null); mark synced/failed |
| Existing rule + anchor date unchanged | No-op (skipped) |
Best-Effort Resilience Model
The scheduler integration is best-effort. Command handlers call scheduler.onSubjectCreated(...) (etc.) with .catch(...) — any exception that escapes NotificationRuleSchedulerService is logged to the request's LogContext under scheduler_errors but never rethrown. The domain operation (create/update/delete) always succeeds regardless of scheduling outcome.
Internal to NotificationRuleSchedulerService, individual rule failures are also isolated: if scheduling one rule fails, the others are still attempted.
Current Limitations
- UTC-only: The
timezonefield in all schedule jobs is hardcoded to'UTC'. Organization or user timezone preferences are not considered yet. - No custom rules yet: Only
at_subject_daterules are supported. Relative rules (e.g., "24 hours before") are not implemented. No notifications module integration: See Email delivery on schedule arrival below — this limitation has been resolved.- No reconciler:
NotificationRulerows withsyncStatus = 'failed'orsyncStatus = 'pending'are not automatically retried. A reconciler job (similar toScheduleReconcilerService) is planned. - No public API:
NotificationRuleis an internal aggregate. There are no HTTP endpoints for listing, updating, or deleting rules directly. - Single channel:
channelsdefaults to['email']. Multi-channel delivery is not implemented.
Email Delivery on Schedule Arrival
When a scheduled job fires, the Schedule module emits a schedule.arrived in-process event (via EventEmitter2). ScheduleArrivedOperationsListener — registered in OperationsModule — intercepts that event and delivers the notification over SMTP.
End-to-end flow
BullMQ Worker (schedule queue)
→ ScheduleProcessor._processJob
→ publisher.emitArrived (awaited)
→ EventEmitter2.emitAsync('schedule.arrived', ev) (awaited)
→ ScheduleArrivedOperationsListener.handle(ev)
1. topic filter — only operations.* topics proceed
2. payload parse — validates { subjectType, subjectId, notificationRuleId, kind }
3. rule lookup — findByIdIncludingDeleted (soft-deleted rules are still readable)
4. rule guards — canceled / inactive / already-delivered → skip + log
5. subject hydration — OperationsSubjectHydrator resolves title, whenLabel, recipients
6. recipient check — empty → skip + warn
7. sendEmail — INotificationSender → template render → SMTP per recipient
8. markSent — notificationRuleRepo.markSent(id, new Date())
← throw on SMTP/persist failure → BullMQ retryIdempotency guard via lastSentAt
NotificationRule has a nullable lastSentAt: Date | null column (last_sent_at timestamptz NULL in operations.notification_rules). Before calling sendEmail, the listener checks:
ts
if (rule.lastSentAt !== null) {
// log 'already-delivered' and return — no duplicate send
}After a successful send, notificationRuleRepo.markSent(id, new Date()) persists the timestamp. Because BullMQ delivers each job at least once, this guard prevents duplicate emails on retry.
Soft-delete semantics
The listener calls findByIdIncludingDeleted — not findById — so it can distinguish three states:
| State | Condition | Action |
|---|---|---|
| Missing | null returned | WARN log rule-missing, return |
| Canceled | deletedAt !== null | INFO log rule-canceled, return gracefully |
| Active | deletedAt === null | Proceed with delivery |
Soft-deleted rules are never retried — the listener exits cleanly instead of throwing, so BullMQ does not re-queue the job.
Recipient resolution
OperationsSubjectHydrator resolves recipients differently per subject type:
| Subject type | Recipients | Deduplication |
|---|---|---|
event | creator + participants | by email (case-insensitive) |
task | creator + relatedUsers | by email (case-insensitive) |
If the resolved set is empty (e.g., all users lack an email), the listener logs no-recipients and returns without sending.
Retry semantics via BullMQ
The listener wraps the rule-lookup-through-markSent pipeline in try/catch. If sendEmail or markSent throws, the error is logged and rethrown. This propagates up through EventEmitter2.emitAsync → emitArrived → ScheduleProcessor._processJob, which catches and rethrows, causing BullMQ to re-queue the job for retry according to the queue's backoff policy.
The idempotency guard (lastSentAt) ensures that if a retry fires after a partial success (email sent but markSent failed), the second attempt detects lastSentAt !== null and skips gracefully.
Observability: structured log events
| Log key | Level | Attributes |
|---|---|---|
notifications.operations.skipped | warn / info | reason, notificationRuleId, contextual fields |
notifications.operations.sent | info | recipientCount, notificationRuleId, subjectType, subjectId, kind |
notifications.operations.failed | error | errorName, errorMessage, notificationRuleId |
notifications.sender.sent | info | templateKey, recipient |
notifications.sender.failed | error | templateKey, recipient, provider |
Reason codes for skipped:
| Code | Meaning |
|---|---|
invalid-payload | BullMQ job payload failed validation |
rule-missing | No NotificationRule row found for the given ID |
rule-canceled | Rule row exists but deletedAt is set |
rule-inactive | Rule active = false |
already-delivered | lastSentAt is non-null (idempotency guard) |
subject-missing | Hydrator returned null (event/task deleted) |
no-recipients | Recipient list resolved to zero entries |
Migration: last_sent_at column
Before deploying this feature, the following migration must be applied:
sql
ALTER TABLE operations.notification_rules
ADD COLUMN last_sent_at timestamptz NULL;Generate the TypeORM migration file with:
bash
pnpm --filter api typeorm migration:generate \
apps/api/src/modules/operations/infrastructure/migrations/AddNotificationRuleLastSentAtThen run migrations before the new version goes live. See the migration runbook for step-by-step operator instructions.
Cross-reference
- Operations Alerts — Notifications Module —
INotificationSenderport, template registry, and SMTP adapter.
Cross-Links
- Schedule Module — the scheduling infrastructure this integration relies on.
- Job Scheduling — how
scheduleAt,cancel, andreschedulework. - Retry and Failure — retry semantics for scheduled jobs.