Redis Queue
Here is an example of Redis Bull queue. :::
Installation
npm install bull
There are two files for the Redis bull, One is Producer and the other is worker.
Producer
const Queue = require('bull');
const config = require('../config/config');
const { saveNotification, sendSignUpEmail } = require('./redisQueueWorker');
const redisConfig = {
redis: {
port: config.redis.port,
host: config.redis.host,
password: config.redis.pass,
},
};
const notificationCreateQueue = new Queue('notificationCreateQueue', redisConfig);
const sendSignupEmailQueue = new Queue('sendSignupEmail', redisConfig);
notificationCreateQueue.process(saveNotification);
sendSignupEmailQueue.process(sendSignUpEmail);
const subscribeToNotificationQueue = async (data) => { notificationCreateQueue.add(data); };
const subscribeToSignupEmail = async (data) => { sendSignupEmailQueue.add(data); }
module.exports = {
subscribeToNotificationQueue,
subscribeToSignupEmail,
};
Worker
const Job = require('bull');
const logger = require('../config/logger');
const { notificationService, moviesService } = require('../services');
const { tokenService, postmarkService } = require('../services');
const settings = {
accessKeyID: process.env.S3_KEY,
secretAccessKey: process.env.S3_SECRET,
region: process.env.S3_REGION,
s3bucket: process.env.S3_BUCKET,
folderName: process.env.S3_FOLDER,
};
const saveNotification = (Job) => {
try {
notificationService.createNotification(Job.data);
} catch (error) {
logger.info('Worker error - Create notification : ', error);
}
};
const sendSignUpEmail = async (Job) => {
try {
// Send email
const verifyEmailToken = await tokenService.generateVerifyEmailToken(Job.data);
await postmarkService.sendVerificationEmail(Job.data.email, verifyEmailToken);
} catch (error) {
logger.info('Worker error - Send signup email : ', error);
}
};
module.exports = {
saveNotification,
sendSignUpEmail,
};
How to use
Import the modules exported
const { subscribeToNotificationQueue } = require('../utils/redisQueueProducer');
Call the above imported function
const notification = {
user_id: req.user.id,
type: 'movies',
type_id: movie._id,
message: `Movie ${insertUser.name} is created successfully.`,
};
subscribeToNotificationQueue(notification);
This will trigger the function in the Producer file and it Process the actions.
Parameters need to pass
const notification = {
user_id: '', // id of the user who need to get the notification
type: '', // type of notification (movies / users / employee) ie, the url of those sections
type_id: '', // id of that particular section(movie / user / employee)
message: '', // The message you want to display
};
Basic usage
const Queue = require('bull');
const videoQueue = new Queue('video transcoding', 'redis://127.0.0.1:6379');
const audioQueue = new Queue('audio transcoding', { redis: { port: 6379, host: '127.0.0.1', password: 'foobared' } }); // Specify Redis connection using object
const imageQueue = new Queue('image transcoding');
const pdfQueue = new Queue('pdf transcoding');
videoQueue.process(function (job, done) {
// job.data contains the custom data passed when the job was created
// job.id contains id of this job.
// transcode video asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give an error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { framerate: 29.5 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
audioQueue.process(function (job, done) {
// transcode audio asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give an error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { samplerate: 48000 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
imageQueue.process(function (job, done) {
// transcode image asynchronously and report progress
job.progress(42);
// call done when finished
done();
// or give an error if error
done(new Error('error transcoding'));
// or pass it a result
done(null, { width: 1280, height: 720 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
});
pdfQueue.process(function (job) {
// Processors can also return promises instead of using the done callback
return pdfAsyncProcessor();
});
videoQueue.add({ video: 'http://example.com/video1.mov' });
audioQueue.add({ audio: 'http://example.com/audio1.mp3' });
imageQueue.add({ image: 'http://example.com/image1.tiff' });
Using Promises
Alternatively, you can use return promises instead of using the done callback:
videoQueue.process(function (job) { // don't forget to remove the done callback!
// Simply return a promise
return fetchVideo(job.data.url).then(transcodeVideo);
// Handles promise rejection
return Promise.reject(new Error('error transcoding'));
// Passes the value the promise is resolved with to the "completed" event
return Promise.resolve({ framerate: 29.5 /* etc... */ });
// If the job throws an unhandled exception it is also handled correctly
throw new Error('some unexpected error');
// same as
return Promise.reject(new Error('some unexpected error'));
});
Repeated jobs
A job can be added to a queue and processed repeatedly according to a cron specification:
paymentsQueue.process(function (job) {
// Check payments
});
// Repeat payment job once every day at 3:15 (am)
paymentsQueue.add(paymentsData, { repeat: { cron: '15 3 * * *' } });
Pause / Resume
A queue can be paused and resumed globally (pass true to pause processing for just this worker):
queue.pause().then(function () {
// queue is paused now
});
queue.resume().then(function () {
// queue is resumed now
})
Events
A queue emits some useful events, for example...
.on('completed', function (job, result) {
// Job completed with output result!
})