Skip to content

Queue

The Queue class is the main entry point for adding and managing jobs. It delegates all storage operations to the configured StoreInterface.

typescript
import { Queue } from '@conveyor/core';

Constructor

typescript
new Queue<T = unknown>(name: string, options: QueueOptions)
ParameterTypeDescription
namestringThe queue name (e.g. "emails", "tasks")
options.storeStoreInterfaceThe store backend to use
options.defaultJobOptionsPartial<JobOptions>Default options applied to every job added to this queue
typescript
const queue = new Queue<EmailPayload>('emails', {
  store,
  defaultJobOptions: {
    attempts: 3,
    backoff: { type: 'exponential', delay: 1000 },
    removeOnComplete: true,
  },
});

Properties

PropertyTypeDescription
namestringThe queue name (readonly)
eventsEventBusEvent bus for queue-level events

Methods

add

Add a single job to the queue.

typescript
async add(name: string, data: T, opts?: JobOptions): Promise<Job<T>>
ParameterTypeDescription
namestringThe job name (e.g. "send-email")
dataTThe job payload
optsJobOptionsOptional per-job options (overrides defaultJobOptions)

Returns the created Job. If deduplication is configured and a matching job exists, returns the existing job instead.

typescript
const job = await queue.add('send-welcome', { to: '[email protected]' });

addBulk

Add multiple jobs at once. Deduplication is applied per-job.

typescript
async addBulk(
  jobs: Array<{ name: string; data: T; opts?: JobOptions }>
): Promise<Job<T>[]>
typescript
const jobs = await queue.addBulk([
  { name: 'send-welcome', data: { to: '[email protected]' } },
  { name: 'send-welcome', data: { to: '[email protected]' } },
]);

schedule

Schedule a job with a human-readable delay.

typescript
schedule(
  delay: ScheduleDelay | number,
  name: string,
  data: T,
  opts?: JobOptions
): Promise<Job<T>>
ParameterTypeDescription
delayScheduleDelay | numberDelay in ms or human-readable string (e.g. "5s", "in 10 minutes")
namestringThe job name
dataTThe job payload
optsJobOptionsOptional job options
typescript
await queue.schedule('in 10 minutes', 'send-reminder', { to: '[email protected]' });
await queue.schedule(5000, 'quick-task', { url: '/process' });

now

Add a job for immediate execution (no delay).

typescript
now(name: string, data: T, opts?: JobOptions): Promise<Job<T>>
typescript
await queue.now('notification', { to: '[email protected]', subject: 'Alert' });

every

Add a recurring job that repeats at a fixed interval.

typescript
every(
  interval: Delay,
  name: string,
  data: T,
  opts?: JobOptions
): Promise<Job<T>>
ParameterTypeDescription
intervalDelayRepeat interval in ms or human-readable string (e.g. "2 hours")
typescript
await queue.every('2 hours', 'cleanup', { type: 'temp-files' });
await queue.every(60_000, 'health-check', { url: '/status' });

cron

Add a cron-scheduled recurring job.

typescript
cron(
  cronExpr: string,
  name: string,
  data: T,
  opts?: JobOptions
): Promise<Job<T>>
ParameterTypeDescription
cronExprstringA cron expression (5, 6, or 7 fields)
typescript
// Every day at 9:00 AM
await queue.cron('0 9 * * *', 'daily-report', { type: 'summary' });

// Every Monday at 8:30 AM in New York timezone
await queue.cron('30 8 * * 1', 'weekly-digest', { type: 'weekly' }, {
  repeat: { tz: 'America/New_York' },
});

pause

Pause the queue. When paused, no new jobs will be processed by workers.

typescript
async pause(opts?: PauseOptions): Promise<void>
ParameterTypeDescription
opts.jobNamestringIf provided, only pause jobs with this name
typescript
// Pause all jobs
await queue.pause();

// Pause only "send-email" jobs
await queue.pause({ jobName: 'send-email' });

resume

Resume the queue (or a specific job name) after pausing.

typescript
async resume(opts?: PauseOptions): Promise<void>
typescript
await queue.resume();
await queue.resume({ jobName: 'send-email' });

drain

Remove all waiting and delayed jobs from the queue.

typescript
async drain(): Promise<void>

clean

Remove old jobs in a given state that are older than a grace period.

typescript
clean(state: JobState, grace: number): Promise<number>
ParameterTypeDescription
stateJobStateThe state to clean (e.g. "completed", "failed")
gracenumberGrace period in ms. Jobs older than this are removed

Returns the number of jobs removed.

typescript
// Remove completed jobs older than 1 hour
const removed = await queue.clean('completed', 3_600_000);

count

Count jobs in a given state.

typescript
count(state: JobState): Promise<number>
typescript
const waiting = await queue.count('waiting');
const failed = await queue.count('failed');

getJob

Retrieve a job by its ID.

typescript
async getJob(jobId: string): Promise<Job<T> | null>

Returns the Job, or null if not found.

getJobs

List jobs in a given state with pagination.

typescript
async getJobs(state: JobState, start?: number, end?: number): Promise<Job<T>[]>
ParameterTypeDefaultDescription
stateJobStateThe state to filter by
startnumber0Pagination offset (0-based)
endnumber100Pagination end (exclusive)

observe

Create a JobObservable to track a job's lifecycle and optionally cancel it.

typescript
observe(jobId: string): JobObservable<T>
typescript
const observable = queue.observe(job.id);
observable.subscribe({
  onCompleted: (job, result) => console.log('Done!', result),
  onFailed: (job, error) => console.error('Failed:', error),
});

close

Close the queue and remove all event listeners. After calling close(), all methods will throw.

typescript
close(): Promise<void>

The queue also supports Symbol.asyncDispose for use with await using:

typescript
await using queue = new Queue('emails', { store });
// queue.close() called automatically when scope exits

Released under the MIT License.