Basic Example (MemoryStore)
This walkthrough demonstrates the core Conveyor API using the in-memory store. It covers creating a queue, processing jobs with a worker, listening to events, scheduling, and deduplication.
Run this example with:
deno run --allow-all examples/basic/main.tsFull Source (Annotated)
Setup: Store and Imports
import { Queue, Worker } from '@conveyor/core';
import { MemoryStore } from '@conveyor/store-memory';
// Create a shared store instance.
// All queues and workers that need to communicate must share the same store.
const store = new MemoryStore();
await store.connect();The MemoryStore requires no configuration. Data lives in memory and is lost when the process exits. This makes it ideal for testing and prototyping.
Define a Typed Queue
interface EmailPayload {
to: string;
subject: string;
body: string;
}
const emailQueue = new Queue<EmailPayload>('emails', {
store,
defaultJobOptions: {
attempts: 3, // Retry up to 3 times
backoff: { type: 'exponential', delay: 1000 }, // 1s, 2s, 4s between retries
removeOnComplete: true, // Auto-remove completed jobs
},
});The generic parameter <EmailPayload> provides type safety for all add() calls on this queue. The defaultJobOptions apply to every job unless overridden per-job.
Create a Worker
const worker = new Worker<EmailPayload>(
'emails', // Must match the queue name
async (job) => {
console.log(`Sending email to ${job.data.to}: "${job.data.subject}"`);
await job.updateProgress(50); // Report progress (0-100)
// Simulate email sending
await new Promise((r) => setTimeout(r, 500));
await job.updateProgress(100);
console.log(`Email sent to ${job.data.to}`);
return { sent: true, timestamp: new Date().toISOString() }; // Return value
},
{
store,
concurrency: 3, // Process up to 3 jobs simultaneously
lockDuration: 30_000, // 30-second lock (renewed automatically)
},
);The worker starts polling immediately (unless autoStart: false). The processor function receives a Job instance. The return value is stored as job.returnvalue.
Listen to Events
worker.on('completed', (data: unknown) => {
const { result } = data as { job: unknown; result: unknown };
console.log('Job completed:', result);
});
worker.on('failed', (data: unknown) => {
const { error } = data as { job: unknown; error: Error };
console.error('Job failed:', error.message);
});See EventBus for all available events.
Add Jobs
// Regular add -- job goes to "waiting" state immediately
await emailQueue.add('welcome', {
to: '[email protected]',
subject: 'Welcome!',
body: 'Welcome to Conveyor',
});Convenience Methods
// now() -- explicit "no delay" (same as add without delay option)
await emailQueue.now('notification', {
to: '[email protected]',
subject: 'New notification',
body: 'You have a new message',
});
// schedule() -- human-readable delay
await emailQueue.schedule('2s', 'reminder', {
to: '[email protected]',
subject: 'Reminder',
body: "Don't forget!",
});
// every() -- recurring job at fixed interval
await emailQueue.every('3s', 'digest', {
to: '[email protected]',
subject: 'Daily digest',
body: 'Here is your digest',
});The schedule() method accepts human-readable strings like "2s", "10 minutes", or "in 1 hour". The every() method creates a recurring job that re-enqueues itself after each execution.
Deduplication
// First add creates the job
await emailQueue.add('alert', {
to: '[email protected]',
subject: 'System alert',
body: 'CPU usage high',
}, { deduplication: { key: 'cpu-alert' } });
// Second add with same dedup key returns the existing job
const deduped = await emailQueue.add('alert', {
to: '[email protected]',
subject: 'System alert (duplicate)',
body: 'CPU usage high again',
}, { deduplication: { key: 'cpu-alert' } });
console.log(`Dedup test: second add returned existing job ${deduped.id}`);Deduplication prevents duplicate jobs. You can use a custom key or set hash: true to automatically hash the payload.
Cleanup
await worker.close(); // Stop processing, wait for active jobs
await emailQueue.close(); // Close the queue
await store.disconnect(); // Release store resourcesAlways close workers before queues, and queues before the store. The worker.close() call waits for any active jobs to finish.
