PostgreSQL Example
This example demonstrates Conveyor with PgStore for production use. It covers PostgreSQL setup, cron scheduling, and graceful shutdown.
Prerequisites
A running PostgreSQL instance. Use the provided Docker Compose file or your own database:
# Using Docker Compose (from the repo root)
docker-compose up -d
# Or set your own connection string
export PG_URL="postgres://user:pass@localhost:5432/mydb"Run the example:
deno run --allow-all examples/with-pg/main.tsFull Source (Annotated)
Store Setup
import { Queue, Worker } from '@conveyor/core';
import { PgStore } from '@conveyor/store-pg';
const pgUrl = Deno.env.get('PG_URL');
if (!pgUrl) {
console.error('Missing PG_URL environment variable');
Deno.exit(1);
}
const store = new PgStore({ connection: pgUrl });
await store.connect();
console.log('Connected to PostgreSQL');PgStore accepts a PostgreSQL connection string. On connect(), it automatically runs migrations to create the conveyor_jobs, conveyor_paused, and conveyor_migrations tables.
Key PostgreSQL features:
FOR UPDATE SKIP LOCKEDfor atomic job fetching -- workers never double-process jobs.LISTEN/NOTIFYfor real-time event delivery across processes.- JSONB for job data and options storage.
- TIMESTAMPTZ for timezone-aware timestamps.
Define Queue with Default Options
interface EmailPayload {
to: string;
subject: string;
}
const queue = new Queue<EmailPayload>('emails', {
store,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
},
});Create Worker
const worker = new Worker<EmailPayload>(
'emails',
async (job) => {
console.log(`[${job.id}] Sending "${job.data.subject}" to ${job.data.to}`);
await job.updateProgress(50);
// Simulate email sending
await new Promise((r) => setTimeout(r, 300));
await job.updateProgress(100);
return { sent: true, at: new Date().toISOString() };
},
{
store,
concurrency: 5,
lockDuration: 30_000,
},
);With PostgreSQL, you can run multiple workers across different processes. The FOR UPDATE SKIP LOCKED locking mechanism ensures each job is processed exactly once.
Events
worker.on('completed', (data: unknown) => {
const { result } = data as { job: unknown; result: unknown };
console.log(' -> completed:', result);
});
worker.on('failed', (data: unknown) => {
const { error } = data as { job: unknown; error: Error };
console.error(' -> failed:', error.message);
});Adding Jobs and Cron Scheduling
// Standard jobs
await queue.add('welcome', {
to: '[email protected]',
subject: 'Welcome to Conveyor!',
});
await queue.add('notification', {
to: '[email protected]',
subject: 'New notification',
});
// Cron-scheduled job: daily report at 9 AM
await queue.cron('0 9 * * *', 'daily-report', {
to: '[email protected]',
subject: 'Daily Report',
});Cron expressions support 5, 6, or 7 fields. You can specify a timezone with the repeat.tz option:
await queue.cron('0 9 * * *', 'daily-report', data, {
repeat: { tz: 'America/New_York' },
});Graceful Shutdown
console.log('Shutting down...');
await worker.close(); // Stop polling, wait for active jobs to finish
await queue.close(); // Close queue
await store.disconnect(); // Close PostgreSQL connection
console.log('Done!');In a production application, wire this to process signals:
const shutdown = async () => {
console.log('Received shutdown signal');
await worker.close();
await queue.close();
await store.disconnect();
process.exit(0);
};
process.on('SIGTERM', shutdown);
process.on('SIGINT', shutdown);Multi-Process Setup
One of PostgreSQL's key advantages is multi-process support. You can run separate queue producers and workers:
Producer process:
const store = new PgStore({ connection: pgUrl });
await store.connect();
const queue = new Queue('emails', { store });
await queue.add('send', { to: '[email protected]', subject: 'Hello' });Worker process (can run multiple instances):
const store = new PgStore({ connection: pgUrl });
await store.connect();
const worker = new Worker('emails', processor, {
store,
concurrency: 10,
maxGlobalConcurrency: 50, // Limit across all worker processes
});The maxGlobalConcurrency option ensures that no more than 50 jobs are active across all worker instances, regardless of how many are running.
