Skip to main content

Redis Queue

Here is an example of Redis Bull queue. :::


npm install bull

There are two files for the Redis bull, One is Producer and the other is worker.


const Queue = require('bull');
const config = require('../config/config');
const { saveNotification, sendSignUpEmail } = require('./redisQueueWorker');

const redisConfig = {
redis: {
port: config.redis.port,
password: config.redis.pass,

const notificationCreateQueue = new Queue('notificationCreateQueue', redisConfig);
const sendSignupEmailQueue = new Queue('sendSignupEmail', redisConfig);


const subscribeToNotificationQueue = async (data) => { notificationCreateQueue.add(data); };
const subscribeToSignupEmail = async (data) => { sendSignupEmailQueue.add(data); }

module.exports = {


const Job = require('bull');
const logger = require('../config/logger');
const { notificationService, moviesService } = require('../services');
const { tokenService, postmarkService } = require('../services');

const settings = {
accessKeyID: process.env.S3_KEY,
secretAccessKey: process.env.S3_SECRET,
region: process.env.S3_REGION,
s3bucket: process.env.S3_BUCKET,
folderName: process.env.S3_FOLDER,

const saveNotification = (Job) => {
try {
} catch (error) {'Worker error - Create notification : ', error);

const sendSignUpEmail = async (Job) => {
try {
// Send email
const verifyEmailToken = await tokenService.generateVerifyEmailToken(;
await postmarkService.sendVerificationEmail(, verifyEmailToken);
} catch (error) {'Worker error - Send signup email : ', error);

module.exports = {

How to use

Import the modules exported

const { subscribeToNotificationQueue } = require('../utils/redisQueueProducer');

Call the above imported function

const notification = {
type: 'movies',
type_id: movie._id,
message: `Movie ${} is created successfully.`,

This will trigger the function in the Producer file and it Process the actions.

Parameters need to pass

const notification = {
user_id: '', // id of the user who need to get the notification
type: '', // type of notification (movies / users / employee) ie, the url of those sections
type_id: '', // id of that particular section(movie / user / employee)
message: '', // The message you want to display

Basic usage

const Queue = require('bull');

const videoQueue = new Queue('video transcoding', 'redis://');
const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '', password: 'foobared' } }); // Specify Redis connection using object
const imageQueue = new Queue('image transcoding');
const pdfQueue = new Queue('pdf transcoding');

videoQueue.process(function (job, done) {

// contains the custom data passed when the job was created
// contains id of this job.

// transcode video asynchronously and report progress

// call done when finished

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');

audioQueue.process(function (job, done) {
// transcode audio asynchronously and report progress

// call done when finished

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { samplerate: 48000 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');

imageQueue.process(function (job, done) {
// transcode image asynchronously and report progress

// call done when finished

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { width: 1280, height: 720 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');

pdfQueue.process(function (job) {
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();

videoQueue.add({ video: '' });
audioQueue.add({ audio: '' });
imageQueue.add({ image: '' });

Using Promises

Alternatively, you can use return promises instead of using the done callback:

videoQueue.process(function (job) { // don't forget to remove the done callback!
// Simply return a promise
return fetchVideo(;

// Handles promise rejection
return Promise.reject(new Error('error transcoding'));

// Passes the value the promise is resolved with to the "completed" event
return Promise.resolve({ framerate: 29.5 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
// same as
return Promise.reject(new Error('some unexpected error'));

Repeated jobs

A job can be added to a queue and processed repeatedly according to a cron specification:

  paymentsQueue.process(function (job) {
// Check payments

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

queue.pause().then(function () {
// queue is paused now

queue.resume().then(function () {
// queue is resumed now


A queue emits some useful events, for example...

.on('completed', function (job, result) {
// Job completed with output result!