Skip to content

Worker

The Worker class polls for jobs from a queue, locks them, and executes a processor function. It handles retries, backoff, lock renewal, stalled job detection, and repeat scheduling.

typescript
import { Worker } from '@conveyor/core';

Constructor

typescript
new Worker<T = unknown>(
  queueName: string,
  processor: ProcessorFn<T> | BatchProcessorFn<T>,
  options: WorkerOptions
)
ParameterTypeDescription
queueNamestringThe queue name to process jobs from
processorProcessorFn<T> | BatchProcessorFn<T>The function that processes each job (or batch)
optionsWorkerOptionsWorker configuration

WorkerOptions

OptionTypeDefaultDescription
storeStoreInterface(required)The store backend to use
concurrencynumber1Max concurrent jobs on this worker
maxGlobalConcurrencynumber--Max concurrent active jobs across ALL workers
limiterLimiterOptions--Rate limiter: { max, duration }
lockDurationnumber30_000Lock duration in ms
stalledIntervalnumber30_000Stalled check interval in ms
autoStartbooleantrueStart polling immediately on construction
lifobooleanfalseFetch most recently added job first
batchBatchOptions--Batch processing config: { size }
groupGroupWorkerOptions--Per-group concurrency and rate limiting

Processor Types

Single Job Processor

Processes one job at a time (per concurrency slot).

typescript
type ProcessorFn<T> = (job: Job<T>, signal: AbortSignal) => Promise<unknown>;

The signal parameter is an AbortSignal that fires when the job is cancelled or the worker is closing. Check signal.aborted or register a listener to handle graceful cancellation.

typescript
const worker = new Worker<EmailPayload>('emails', async (job, signal) => {
  console.log(`Sending to ${job.data.to}`);
  await sendEmail(job.data, { signal });
  return { sent: true };
}, { store, concurrency: 5 });

Batch Processor

Processes multiple jobs in a single call. Requires the batch option.

typescript
type BatchProcessorFn<T> = (
  jobs: Job<T>[],
  signal: AbortSignal,
) => Promise<BatchResult[]>;

Each element in the returned array corresponds to the job at the same index:

typescript
type BatchResult =
  | { status: 'completed'; value?: unknown }
  | { status: 'failed'; error: Error };
typescript
const worker = new Worker<EmailPayload>('emails', async (jobs) => {
  const results = await sendBulkEmails(jobs.map((j) => j.data));
  return results.map((r) => ({
    status: r.ok ? 'completed' : 'failed',
    ...(r.ok ? { value: r.data } : { error: new Error(r.message) }),
  }));
}, {
  store,
  batch: { size: 20 },
  concurrency: 3,
});

Properties

PropertyTypeDescription
queueNamestringThe queue name this worker processes (readonly)
idstringUnique worker identifier, e.g. "worker-a1b2c3d4" (readonly)
eventsEventBusEvent bus for worker-level events (readonly)

Methods

on

Register an event handler on the worker's event bus.

typescript
on(event: QueueEventType, handler: (data: unknown) => void): void

This is a convenience shortcut for worker.events.on(event, handler).

See EventBus for the full list of events.

typescript
worker.on('completed', (data) => {
  const { job, result } = data as { job: Job; result: unknown };
  console.log(`Job ${job.id} completed:`, result);
});

worker.on('failed', (data) => {
  const { job, error } = data as { job: Job; error: Error };
  console.error(`Job ${job.id} failed:`, error.message);
});

worker.on('error', (error) => {
  console.error('Worker error:', error);
});

pause

Pause the worker. Active jobs continue, but no new jobs are fetched.

typescript
pause(): void

resume

Resume a paused worker.

typescript
resume(): void

close

Gracefully shut down the worker. Waits for active jobs to complete, stops polling, and clears all timers.

typescript
async close(): Promise<void>
typescript
// Graceful shutdown
process.on('SIGTERM', async () => {
  await worker.close();
  await store.disconnect();
});

Concurrency and Rate Limiting

Per-Worker Concurrency

The concurrency option controls how many jobs this worker processes simultaneously.

typescript
const worker = new Worker('tasks', processor, {
  store,
  concurrency: 10,
});

Global Concurrency

The maxGlobalConcurrency option limits active jobs across all workers sharing the same store. The store enforces this limit atomically.

typescript
const worker = new Worker('tasks', processor, {
  store,
  concurrency: 5,
  maxGlobalConcurrency: 20,
});

Rate Limiting

The limiter option applies a sliding-window rate limit (worker-local).

typescript
const worker = new Worker('api-calls', processor, {
  store,
  limiter: { max: 100, duration: 60_000 }, // 100 jobs per minute
});

Group Options

Per-group concurrency and rate limiting, applied when jobs have a group.id.

typescript
const worker = new Worker('tasks', processor, {
  store,
  group: {
    concurrency: 2, // max 2 active per group
    limiter: { max: 5, duration: 1000 }, // 5 per second per group
  },
});

Stalled Job Detection

The worker periodically checks for stalled jobs -- active jobs whose lock has expired (the processing worker crashed or hung). Stalled jobs are automatically re-enqueued to waiting state.

Configure with stalledInterval (default: 30 seconds) and lockDuration (default: 30 seconds). The worker renews locks at half the lockDuration interval.

Retry and Backoff

Retry behavior is configured per-job via JobOptions. The worker automatically retries failed jobs according to the configured strategy.

typescript
await queue.add('task', data, {
  attempts: 5,
  backoff: { type: 'exponential', delay: 1000 },
});

See BackoffOptions for all backoff strategies.

Released under the MIT License.