Redis Example
This example demonstrates Conveyor with RedisStore — the same queue and worker API as the PostgreSQL example, backed by Redis via redis@^5 (node-redis v5). It is the natural starting point for teams migrating from BullMQ who want to keep their Redis infrastructure.
Prerequisites
A running Redis 7+ instance. Use the provided Docker Compose file or your own Redis:
# Using Docker Compose (from the repo root)
docker compose up -d redis
# Or point at your own Redis (defaults to redis://localhost:6379 if unset)
export REDIS_URL="redis://localhost:6379"Run the example:
# Deno
deno run --allow-all examples/redis/main.ts
# Node.js
node --experimental-strip-types examples/redis/main.ts
# Bun
bun run examples/redis/main.tsFull Source (Annotated)
Store Setup
import { Queue, Worker } from '@conveyor/core';
import { RedisStore } from '@conveyor/store-redis';
import process from 'node:process';
const redisUrl = process.env.REDIS_URL ?? 'redis://localhost:6379';
const store = new RedisStore({ url: redisUrl });
await store.connect();
console.log('Connected to Redis at', redisUrl);The example reads REDIS_URL via node:process so the same script runs unchanged on Deno, Node.js, and Bun. RedisStore has no schema / migrations — Redis is schemaless. On connect() the store opens two connections (main client + subscriber), preloads its Lua scripts via SCRIPT LOAD, and writes a conveyor:<queue>:schema marker for future forward-compatibility.
Key Redis features:
- Lua scripts for atomic multi-step operations (fetch + lock + rate-limit + group cap, delayed promotion, flow parent completion).
- Pub/Sub on
conveyor:eventsfor real-time cross-process event delivery. Best-effort: subscribers disconnected at publish time miss the message. - Hash-tag-safe keys (
{prefix:queueName}:…) so a future Redis Cluster upgrade requires no data migration. SET NX PXdedup reservation so two concurrent saves with the same key resolve to a single winner.
Define Queue with Default Options
interface EmailPayload {
to: string;
subject: string;
}
const queue = new Queue<EmailPayload>('emails', {
store,
defaultJobOptions: {
attempts: 3,
backoff: { type: 'exponential', delay: 1000 },
removeOnComplete: true,
},
});Create Worker
const worker = new Worker<EmailPayload>(
'emails',
async (job) => {
console.log(`[${job.id}] Sending "${job.data.subject}" to ${job.data.to}`);
await job.updateProgress(50);
// Simulate email sending
await new Promise((r) => setTimeout(r, 300));
await job.updateProgress(100);
return { sent: true, at: new Date().toISOString() };
},
{
store,
concurrency: 5,
lockDuration: 30_000,
},
);Multiple workers across different processes share the same Redis and lease jobs atomically via the fetchNextJob.lua script.
Events
worker.on('completed', (data: unknown) => {
const { result } = data as { job: unknown; result: unknown };
console.log(' -> completed:', result);
});
worker.on('failed', (data: unknown) => {
const { error } = data as { job: unknown; error: Error };
console.error(' -> failed:', error.message);
});Events fan out synchronously to in-process subscribers and publish to the conveyor:events Pub/Sub channel for cross-process delivery. A per-instance originId tag ensures messages that echo back from Redis to their source are skipped — no double-fire.
Adding Jobs and Cron Scheduling
// Standard jobs
await queue.add('welcome', {
to: '[email protected]',
subject: 'Welcome to Conveyor!',
});
await queue.add('notification', {
to: '[email protected]',
subject: 'New notification',
});
// Cron-scheduled job: daily report at 9 AM
await queue.cron('0 9 * * *', 'daily-report', {
to: '[email protected]',
subject: 'Daily Report',
});Graceful Shutdown
console.log('Shutting down...');
await worker.close(); // Stop polling, wait for active jobs to finish
await queue.close(); // Close queue
await store.disconnect(); // QUIT subscriber + main client
console.log('Done!');store.disconnect() is idempotent. A disconnected RedisStore cannot be reconnected — construct a new one.
Multi-Process Setup
Every process runs its own RedisStore against the same Redis instance:
Producer process:
const store = new RedisStore({ url: redisUrl });
await store.connect();
const queue = new Queue('emails', { store });
await queue.add('send', { to: '[email protected]', subject: 'Hello' });Worker process (can run multiple instances):
const store = new RedisStore({ url: redisUrl });
await store.connect();
const worker = new Worker('emails', processor, {
store,
concurrency: 10,
maxGlobalConcurrency: 50, // Limit across all worker processes
});Events published by one process reach all others via Pub/Sub. Leasing is atomic across processes thanks to the Lua scripts.
Deno Permissions
Minimal flag set for a Deno runtime:
--allow-net=<redis-host>:<port>— TCP connection to Redis (plus the TLS endpoint if you userediss://).--allow-read— the store reads Lua scripts from the installed package directory (typically~/.cache/deno/) onconnect().--allow-env=REDIS_URL— only if you read the URL from the environment (as this example does).
-A during development covers all three.
Caveats
- Persistence is your responsibility. Redis without AOF or RDB is lossy. Tune to your RPO needs, same as any Redis application.
- Pub/Sub is fire-and-forget. Subscribers that were disconnected during a publish miss the message.
- Priority and group round-robin are deferred. See the Redis store guide for the full v1 caveats list.
See Also
- Redis store guide
- Migrating from BullMQ
- PostgreSQL example for a side-by-side comparison
