Skip to main content

Redis Queue

Here is an example of Redis Bull queue. :::

Installation

npm install bull

There are two files for the Redis bull, One is Producer and the other is worker.

Producer

const Queue = require('bull');
const config = require('../config/config');
const { saveNotification, sendSignUpEmail } = require('./redisQueueWorker');

const redisConfig = {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
},
};

const notificationCreateQueue = new Queue('notificationCreateQueue', redisConfig);
const sendSignupEmailQueue = new Queue('sendSignupEmail', redisConfig);

notificationCreateQueue.process(saveNotification);
sendSignupEmailQueue.process(sendSignUpEmail);

const subscribeToNotificationQueue = async (data) => { notificationCreateQueue.add(data); };
const subscribeToSignupEmail = async (data) => { sendSignupEmailQueue.add(data); }

module.exports = {
subscribeToNotificationQueue,
subscribeToSignupEmail,
};

Worker

const Job = require('bull');
const logger = require('../config/logger');
const { notificationService, moviesService } = require('../services');
const { tokenService, postmarkService } = require('../services');

const settings = {
accessKeyID: process.env.S3_KEY,
secretAccessKey: process.env.S3_SECRET,
region: process.env.S3_REGION,
s3bucket: process.env.S3_BUCKET,
folderName: process.env.S3_FOLDER,
};

const saveNotification = (Job) => {
try {
notificationService.createNotification(Job.data);
} catch (error) {
logger.info('Worker error - Create notification : ', error);
}
};

const sendSignUpEmail = async (Job) => {
try {
// Send email
const verifyEmailToken = await tokenService.generateVerifyEmailToken(Job.data);
await postmarkService.sendVerificationEmail(Job.data.email, verifyEmailToken);
} catch (error) {
logger.info('Worker error - Send signup email : ', error);
}
};

module.exports = {
saveNotification,
sendSignUpEmail,
};

How to use

Import the modules exported

const { subscribeToNotificationQueue } = require('../utils/redisQueueProducer');

Call the above imported function

const notification = {
user_id: req.user.id,
type: 'movies',
type_id: movie._id,
message: `Movie ${insertUser.name} is created successfully.`,
};
subscribeToNotificationQueue(notification);

This will trigger the function in the Producer file and it Process the actions.

Parameters need to pass

const notification = {
user_id: '', // id of the user who need to get the notification
type: '', // type of notification (movies / users / employee) ie, the url of those sections
type_id: '', // id of that particular section(movie / user / employee)
message: '', // The message you want to display
};

Basic usage

const Queue = require('bull');

const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '127.0.0.1', password: 'foobared' } }); // Specify Redis connection using object
const imageQueue = new Queue('image transcoding');
const pdfQueue = new Queue('pdf transcoding');

videoQueue.process(function (job, done) {

// job.data contains the custom data passed when the job was created
// job.id contains id of this job.

// transcode video asynchronously and report progress
job.progress(42);

// call done when finished
done();

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});

audioQueue.process(function (job, done) {
// transcode audio asynchronously and report progress
job.progress(42);

// call done when finished
done();

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { samplerate: 48000 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});

imageQueue.process(function (job, done) {
// transcode image asynchronously and report progress
job.progress(42);

// call done when finished
done();

// or give an error if error
done(new Error('error transcoding'));

// or pass it a result
done(null, { width: 1280, height: 720 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});

pdfQueue.process(function (job) {
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});

videoQueue.add({ video: 'http://example.com/video1.mov' });
audioQueue.add({ audio: 'http://example.com/audio1.mp3' });
imageQueue.add({ image: 'http://example.com/image1.tiff' });

Using Promises

Alternatively, you can use return promises instead of using the done callback:

videoQueue.process(function (job) { // don't forget to remove the done callback!
// Simply return a promise
return fetchVideo(job.data.url).then(transcodeVideo);

// Handles promise rejection
return Promise.reject(new Error('error transcoding'));

// Passes the value the promise is resolved with to the "completed" event
return Promise.resolve({ framerate: 29.5 /* etc... */ });

// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
// same as
return Promise.reject(new Error('some unexpected error'));
});

Repeated jobs

A job can be added to a queue and processed repeatedly according to a cron specification:

  paymentsQueue.process(function (job) {
// Check payments
});

// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });

Pause / Resume

A queue can be paused and resumed globally (pass true to pause processing for just this worker):

queue.pause().then(function () {
// queue is paused now
});

queue.resume().then(function () {
// queue is resumed now
})

Events

A queue emits some useful events, for example...

.on('completed', function (job, result) {
// Job completed with output result!
})