Queue
The Queue class is the main entry point for adding and managing jobs. It delegates all storage operations to the configured StoreInterface.
import { Queue } from '@conveyor/core';Constructor
new Queue<T = unknown>(name: string, options: QueueOptions)| Parameter | Type | Description |
|---|---|---|
name | string | The queue name (e.g. "emails", "tasks") |
options.store | StoreInterface | The store backend to use |
options.defaultJobOptions | Partial<JobOptions> | Default options applied to every job added to this queue |
const queue = new Queue<EmailPayload>('emails', {
store,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
},
});Properties
| Property | Type | Description |
|---|---|---|
name | string | The queue name (readonly) |
events | EventBus | Event bus for queue-level events |
Methods
add
Add a single job to the queue.
async add(name: string, data: T, opts?: JobOptions): Promise<Job<T>>| Parameter | Type | Description |
|---|---|---|
name | string | The job name (e.g. "send-email") |
data | T | The job payload |
opts | JobOptions | Optional per-job options (overrides defaultJobOptions) |
Returns the created Job. If deduplication is configured and a matching job exists, returns the existing job instead.
const job = await queue.add('send-welcome', { to: '[email protected]' });addBulk
Add multiple jobs at once. Deduplication is applied per-job.
async addBulk(
jobs: Array<{ name: string; data: T; opts?: JobOptions }>
): Promise<Job<T>[]>const jobs = await queue.addBulk([
{ name: 'send-welcome', data: { to: '[email protected]' } },
{ name: 'send-welcome', data: { to: '[email protected]' } },
]);schedule
Schedule a job with a human-readable delay.
schedule(
delay: ScheduleDelay | number,
name: string,
data: T,
opts?: JobOptions
): Promise<Job<T>>| Parameter | Type | Description |
|---|---|---|
delay | ScheduleDelay | number | Delay in ms or human-readable string (e.g. "5s", "in 10 minutes") |
name | string | The job name |
data | T | The job payload |
opts | JobOptions | Optional job options |
await queue.schedule('in 10 minutes', 'send-reminder', { to: '[email protected]' });
await queue.schedule(5000, 'quick-task', { url: '/process' });now
Add a job for immediate execution (no delay).
now(name: string, data: T, opts?: JobOptions): Promise<Job<T>>await queue.now('notification', { to: '[email protected]', subject: 'Alert' });every
Add a recurring job that repeats at a fixed interval.
every(
interval: Delay,
name: string,
data: T,
opts?: JobOptions
): Promise<Job<T>>| Parameter | Type | Description |
|---|---|---|
interval | Delay | Repeat interval in ms or human-readable string (e.g. "2 hours") |
await queue.every('2 hours', 'cleanup', { type: 'temp-files' });
await queue.every(60_000, 'health-check', { url: '/status' });cron
Add a cron-scheduled recurring job.
cron(
cronExpr: string,
name: string,
data: T,
opts?: JobOptions
): Promise<Job<T>>| Parameter | Type | Description |
|---|---|---|
cronExpr | string | A cron expression (5, 6, or 7 fields) |
// Every day at 9:00 AM
await queue.cron('0 9 * * *', 'daily-report', { type: 'summary' });
// Every Monday at 8:30 AM in New York timezone
await queue.cron('30 8 * * 1', 'weekly-digest', { type: 'weekly' }, {
repeat: { tz: 'America/New_York' },
});pause
Pause the queue. When paused, no new jobs will be processed by workers.
async pause(opts?: PauseOptions): Promise<void>| Parameter | Type | Description |
|---|---|---|
opts.jobName | string | If provided, only pause jobs with this name |
// Pause all jobs
await queue.pause();
// Pause only "send-email" jobs
await queue.pause({ jobName: 'send-email' });resume
Resume the queue (or a specific job name) after pausing.
async resume(opts?: PauseOptions): Promise<void>await queue.resume();
await queue.resume({ jobName: 'send-email' });drain
Remove all waiting and delayed jobs from the queue.
async drain(): Promise<void>clean
Remove old jobs in a given state that are older than a grace period.
clean(state: JobState, grace: number): Promise<number>| Parameter | Type | Description |
|---|---|---|
state | JobState | The state to clean (e.g. "completed", "failed") |
grace | number | Grace period in ms. Jobs older than this are removed |
Returns the number of jobs removed.
// Remove completed jobs older than 1 hour
const removed = await queue.clean('completed', 3_600_000);count
Count jobs in a given state.
count(state: JobState): Promise<number>const waiting = await queue.count('waiting');
const failed = await queue.count('failed');getJob
Retrieve a job by its ID.
async getJob(jobId: string): Promise<Job<T> | null>Returns the Job, or null if not found.
getJobs
List jobs in a given state with pagination.
async getJobs(state: JobState, start?: number, end?: number): Promise<Job<T>[]>| Parameter | Type | Default | Description |
|---|---|---|---|
state | JobState | The state to filter by | |
start | number | 0 | Pagination offset (0-based) |
end | number | 100 | Pagination end (exclusive) |
observe
Create a JobObservable to track a job's lifecycle and optionally cancel it.
observe(jobId: string): JobObservable<T>const observable = queue.observe(job.id);
observable.subscribe({
onCompleted: (job, result) => console.log('Done!', result),
onFailed: (job, error) => console.error('Failed:', error),
});close
Close the queue and remove all event listeners. After calling close(), all methods will throw.
close(): Promise<void>The queue also supports Symbol.asyncDispose for use with await using:
await using queue = new Queue('emails', { store });
// queue.close() called automatically when scope exits