Jobs and Events
SPFN provides a robust background job system powered by pg-boss, integrated with a decoupled event system for building scalable applications.
Overview
The job/event system follows the same fluent API pattern as routes:
typescript// Route pattern route.get('/users/:id') .input({ params: Type.Object({ id: Type.String() }) }) .handler(async (c) => { ... }); // Job pattern (similar fluent API) job('send-email') .input(Type.Object({ to: Type.String() })) .options({ retryLimit: 3 }) .handler(async (input) => { ... });
Job Types
1. Standard Job
Jobs that are triggered programmatically:
typescriptimport { job, defineJobRouter } from '@spfn/core/job'; import { Type } from '@sinclair/typebox'; export const sendWelcomeEmail = job('send-welcome-email') .input(Type.Object({ userId: Type.String(), email: Type.String(), })) .options({ retryLimit: 3, retryDelay: 5000, }) .handler(async (input) => { await emailService.send(input.email, 'Welcome!'); }); // Trigger the job await sendWelcomeEmail.send({ userId: '123', email: 'user@example.com' });
2. Cron Job
Scheduled jobs that run periodically:
typescriptexport const dailyReport = job('daily-report') .cron('0 9 * * *') // Every day at 9 AM .handler(async () => { await reportService.generateDaily(); }); export const weeklyCleanup = job('weekly-cleanup') .cron('0 0 * * 0') // Every Sunday at midnight .handler(async () => { await cleanupService.run(); });
3. RunOnce Job
Jobs that run once when the server starts:
typescriptexport const initCache = job('init-cache') .runOnce() .handler(async () => { await cache.warmup(); }); export const seedDatabase = job('seed-database') .runOnce() .handler(async () => { await database.seed(); });
4. Event-Driven Job
Jobs that subscribe to events (decoupled architecture):
typescriptimport { defineEvent } from '@spfn/core/event'; // Define event export const userCreated = defineEvent('user.created', Type.Object({ userId: Type.String(), email: Type.String(), })); // Job subscribes to event export const onUserCreated = job('on-user-created') .on(userCreated) // Input type inferred from event .handler(async (payload) => { // payload is typed as { userId: string, email: string } await notificationService.notifyTeam(payload.userId); }); // Emit event (triggers all subscribed jobs) await userCreated.emit({ userId: '123', email: 'user@example.com' });
Events
Defining Events
typescriptimport { defineEvent } from '@spfn/core/event'; import { Type } from '@sinclair/typebox'; // Event with typed payload export const orderPlaced = defineEvent('order.placed', Type.Object({ orderId: Type.String(), userId: Type.String(), total: Type.Number(), })); // Event without payload export const systemHealthCheck = defineEvent('system.health-check');
Subscribing to Events
typescript// In-memory subscription (same process) const unsubscribe = orderPlaced.subscribe((payload) => { console.log('Order placed:', payload.orderId); }); // Unsubscribe when done unsubscribe(); // Unsubscribe all handlers orderPlaced.unsubscribeAll();
Multi-Instance Support
For applications running multiple instances, use cache-based pub/sub:
typescriptimport { createCache } from '@spfn/core/cache'; const cache = createCache({ url: process.env.REDIS_URL }); // Enable cache-based pub/sub await userCreated.useCache(cache); // Now events are broadcast to all instances await userCreated.emit({ userId: '123', email: 'user@example.com' });
Job Router
Group jobs together for registration:
typescriptimport { defineJobRouter } from '@spfn/core/job'; // Flat structure export const jobRouter = defineJobRouter({ sendWelcomeEmail, dailyReport, initCache, onUserCreated, }); // Nested structure export const jobRouter = defineJobRouter({ email: defineJobRouter({ sendWelcome: sendWelcomeEmailJob, sendReset: sendResetPasswordJob, }), reports: defineJobRouter({ daily: dailyReportJob, weekly: weeklyReportJob, }), });
Server Configuration
Register jobs with the server:
typescriptimport { defineServerConfig } from '@spfn/core/server'; import { appRouter } from './routes'; import { jobRouter } from './jobs'; export default defineServerConfig() .routes(appRouter) .jobs(jobRouter, { connectionString: process.env.DATABASE_URL!, schema: 'spfn_queue', clearOnStart: process.env.NODE_ENV === 'development', }) .build();
Job Options
Configure job behavior:
typescriptjob('important-task') .input(Type.Object({ id: Type.String() })) .options({ // Retry configuration retryLimit: 5, // Max retry attempts (default: 3) retryDelay: 5000, // Delay between retries in ms (default: 1000) // Timeout expireInSeconds: 600, // Job expires after 10 minutes (default: 300) // Priority priority: 10, // Higher = more important (default: 0) // Singleton singletonKey: 'unique', // Only one job with this key can exist // Retention retentionSeconds: 86400, // Keep completed jobs for 1 day (default: 604800) }) .handler(async (input) => { ... });
Send Options
Configure per-invocation options:
typescript// Delay execution await sendEmail.send( { to: 'user@example.com' }, { startAfter: 60 } // Start after 60 seconds ); // Schedule for specific time await sendEmail.send( { to: 'user@example.com' }, { startAfter: new Date('2024-12-25T00:00:00Z') } ); // Singleton key for this invocation await sendEmail.send( { to: 'user@example.com' }, { singletonKey: 'user-123-welcome' } ); // Priority override await sendEmail.send( { to: 'user@example.com' }, { priority: 100 } );
Testing
Direct Execution
Use run() for synchronous testing:
typescriptimport { sendWelcomeEmail } from './jobs'; describe('sendWelcomeEmail', () => { it('should send welcome email', async () => { // Run synchronously (bypasses pg-boss queue) await sendWelcomeEmail.run({ userId: '123', email: 'test@example.com', }); expect(emailService.send).toHaveBeenCalled(); }); });
Event Testing
typescriptimport { userCreated } from './events'; describe('userCreated event', () => { it('should trigger handlers', async () => { const handler = vi.fn(); userCreated.subscribe(handler); await userCreated.emit({ userId: '123', email: 'test@example.com' }); expect(handler).toHaveBeenCalledWith({ userId: '123', email: 'test@example.com', }); }); });
Type Inference
Extract types from job and event definitions:
typescriptimport type { InferJobInput } from '@spfn/core/job'; import type { InferEventPayload } from '@spfn/core/event'; // Infer job input type type SendEmailInput = InferJobInput<typeof sendWelcomeEmail>; // { userId: string, email: string } // Infer event payload type type UserCreatedPayload = InferEventPayload<typeof userCreated>; // { userId: string, email: string }
Architecture
┌─────────────────────────────────────────────────────────────────┐
│ Application │
├─────────────────────────────────────────────────────────────────┤
│ Event.emit() ──┬──> In-memory handlers (same process) │
│ │ │
│ ├──> Cache pub/sub (multi-instance) │
│ │ │
│ └──> Job queues (pg-boss) │
│ │ │
│ ▼ │
│ pg-boss worker │
│ │ │
│ ▼ │
│ Job.handler() │
└─────────────────────────────────────────────────────────────────┘
Best Practices
-
Use events for decoupling: Instead of directly calling services, emit events and let jobs handle the work.
-
Configure appropriate retries: Set
retryLimitandretryDelaybased on the operation's characteristics. -
Use singleton keys: Prevent duplicate job execution with
singletonKey. -
Set reasonable timeouts: Configure
expireInSecondsto prevent stuck jobs. -
Clear on start in development: Use
clearOnStart: truein development to start fresh. -
Use
run()for testing: Bypass the queue for unit tests with synchronous execution.