StoreInterface
The StoreInterface is the contract that all storage backends must implement. The core Queue and Worker classes only interact with this interface, making it possible to swap backends by changing a single line of configuration.
import type { StoreInterface } from '@conveyor/shared';Available Implementations
| Store | Package | Backend |
|---|---|---|
MemoryStore | @conveyor/store-memory | In-memory Map + mutex. Ideal for testing. |
PgStore | @conveyor/store-pg | PostgreSQL. Uses FOR UPDATE SKIP LOCKED for locking, LISTEN/NOTIFY for events. |
SqliteStore | @conveyor/store-sqlite-node | SQLite via node:sqlite. WAL mode, BEGIN IMMEDIATE for locking, polling for events. |
SqliteStore | @conveyor/store-sqlite-bun | SQLite for Bun runtime. |
SqliteStore | @conveyor/store-sqlite-deno | SQLite for Deno runtime. |
Connection Lifecycle
connect
Initialize the store connection and run migrations if configured.
connect(): Promise<void>Must be called before any other store operations. Migrations are run automatically unless autoMigrate: false is set.
disconnect
Close the store connection and release resources.
disconnect(): Promise<void>Job CRUD
saveJob
Save a job to the store.
saveJob(queueName: string, job: Omit<JobData, 'id'>): Promise<string>Returns the generated job ID.
saveBulk
Save multiple jobs in a single batch.
saveBulk(queueName: string, jobs: Omit<JobData, 'id'>[]): Promise<string[]>Returns an array of job IDs in the same order as the input.
getJob
Retrieve a job by ID.
getJob(queueName: string, jobId: string): Promise<JobData | null>updateJob
Update specific fields of a job.
updateJob(queueName: string, jobId: string, updates: Partial<JobData>): Promise<void>removeJob
Remove a job from the store.
removeJob(queueName: string, jobId: string): Promise<void>Deduplication
findByDeduplicationKey
Find an active job matching a deduplication key (respecting TTL).
findByDeduplicationKey(queueName: string, key: string): Promise<JobData | null>Returns the matching job, or null if none found or TTL expired.
Job Fetching and Locking
fetchNextJob
Atomically fetch and lock the next available job for processing.
fetchNextJob(
queueName: string,
workerId: string,
lockDuration: number,
opts?: FetchOptions
): Promise<JobData | null>| Parameter | Type | Description |
|---|---|---|
queueName | string | The queue to fetch from |
workerId | string | The worker claiming the job |
lockDuration | number | How long to hold the lock (ms) |
opts.lifo | boolean | Fetch most recently added job first |
opts.jobName | string | Filter by job name |
opts.groupConcurrency | number | Max concurrent active jobs per group |
opts.excludeGroups | string[] | Group IDs to exclude (e.g. rate-limited groups) |
extendLock
Extend the lock on an active job.
extendLock(queueName: string, jobId: string, duration: number): Promise<boolean>Returns true if the lock was extended, false if the job is no longer active.
releaseLock
Release the lock on a job.
releaseLock(queueName: string, jobId: string): Promise<void>Queries
getActiveCount
Count currently active (processing) jobs.
getActiveCount(queueName: string): Promise<number>listJobs
List jobs in a given state with pagination.
listJobs(
queueName: string,
state: JobState,
start?: number,
end?: number
): Promise<JobData[]>countJobs
Count jobs in a given state.
countJobs(queueName: string, state: JobState): Promise<number>getNextDelayedTimestamp
Get the earliest delayed job timestamp.
getNextDelayedTimestamp(queueName: string): Promise<number | null>Delayed Job Promotion
promoteDelayedJobs
Promote delayed jobs whose delayUntil has passed.
promoteDelayedJobs(queueName: string, timestamp: number): Promise<number>Returns the number of promoted jobs.
Pause / Resume
pauseJobName
Pause processing of a specific job name (or "__all__" for global pause).
pauseJobName(queueName: string, jobName: string): Promise<void>resumeJobName
Resume processing of a specific job name.
resumeJobName(queueName: string, jobName: string): Promise<void>getPausedJobNames
Get the list of currently paused job names.
getPausedJobNames(queueName: string): Promise<string[]>Stalled Job Detection
getStalledJobs
Detect stalled jobs -- active jobs whose lock has expired.
getStalledJobs(queueName: string, stalledThreshold: number): Promise<JobData[]>Cleanup
clean
Remove old jobs in a terminal state older than a grace period.
clean(queueName: string, state: JobState, grace: number): Promise<number>drain
Remove all waiting and delayed jobs from a queue.
drain(queueName: string): Promise<void>Pub/Sub
subscribe
Subscribe to store events for a queue.
subscribe(queueName: string, callback: (event: StoreEvent) => void): voidunsubscribe
Unsubscribe from store events.
unsubscribe(queueName: string, callback?: (event: StoreEvent) => void): voidpublish
Publish an event through the store's pub/sub mechanism.
publish(event: StoreEvent): Promise<void>Flow Operations
saveFlow
Save an entire flow tree atomically (children + parent in one transaction).
saveFlow(
jobs: Array<{ queueName: string; job: Omit<JobData, 'id'> }>
): Promise<string[]>notifyChildCompleted
Called when a child completes; decrements parent's pending counter. If counter reaches 0, transitions parent to 'waiting'.
notifyChildCompleted(parentQueueName: string, parentId: string): Promise<JobState>failParentOnChildFailure
Called when a child fails with 'fail' policy; marks parent as failed.
failParentOnChildFailure(
parentQueueName: string,
parentId: string,
reason: string
): Promise<boolean>getChildrenJobs
Get all children of a parent job.
getChildrenJobs(parentQueueName: string, parentId: string): Promise<JobData[]>Group Operations
getGroupActiveCount
Count active jobs in a specific group.
getGroupActiveCount(queueName: string, groupId: string): Promise<number>getWaitingGroupCount
Count waiting jobs in a specific group.
getWaitingGroupCount(queueName: string, groupId: string): Promise<number>Implementing a Custom Store
To implement a custom store, create a class that implements the full StoreInterface:
import type { StoreInterface, StoreOptions } from '@conveyor/shared';
export class MyCustomStore implements StoreInterface {
constructor(options: StoreOptions & {/* your options */}) {
// ...
}
async connect(): Promise<void> {/* ... */}
async disconnect(): Promise<void> {/* ... */}
// ... implement all methods
}Key implementation considerations:
- Atomicity:
fetchNextJobmust atomically select and lock a job to prevent double-processing. - Locking: Use database-level locking (e.g.
FOR UPDATE SKIP LOCKEDin PostgreSQL,BEGIN IMMEDIATEin SQLite). - Pub/Sub: Events enable cross-process coordination. Use native mechanisms (LISTEN/NOTIFY, polling) where available.
- Migrations: Store schemas should be auto-migrated on
connect()with a versioned migration table.
