src/microwork.js
import amqp from 'amqplib';
import uuid from 'uuid';
import sleep from './sleep';
import createLogger from './logger';
import winston from 'winston';
/**
* Core Microwork class that provides a way to create new microservice
*/
export class Microwork {
/**
* Microwork class construct
* @param {object} opts Microwork instance options
* @param {string} opts.host RabbitMQ host to use
* @param {string} opts.exchange RabbitMQ exchange to use
* @param {Number} opts.reconnectTimeout Timeout before trying to reconnect to RabbitMQ on failure
* @return {void}
* @example
* const service = new Microwork({host: 'localhost', exchange: 'test.exchange'});
*/
constructor({
host = 'localhost',
exchange = 'microwork.default.exchange',
reconnectTimeout = 5000,
loggingTransports,
}) {
// init logger
this.initLogger(loggingTransports);
// log
this.logger.debug('construct with', host, exchange);
/**
* Service unique ID
* @type {string}
*/
this.id = uuid.v4();
/**
* RabbitMQ host address
* @type {string}
*/
this.host = host;
/**
* RabbitMQ exchange name
* @type {string}
*/
this.exchange = exchange;
/**
* Active route handlers and queues
* @type {Object}
*/
this.routeHandlers = {};
/**
* Connecting indicator
* @type {Boolean}
* @private
*/
this.connecting = false;
/**
* Connection to RabbitMQ instance
* @type {Object}
* @private
*/
this.connection = undefined;
/**
* Connection to RabbitMQ instance
* @type {Object}
* @private
*/
this.channel = undefined;
/**
* Reconnect timeout reference
* @type {Number}
* @private
*/
this.reconnect = undefined;
/**
* Reconnect timeout timer stored for later usage
* @type {Number}
*/
this.reconnectTimeout = reconnectTimeout;
// init connection
this.connect().catch(this.tryReconnect.bind(this));
}
/**
* Initialize logger with new options
* @param {Object} transports Logger options, see winston.js for reference
* @return {void}
* @private
*/
initLogger(transports = []) {
if (transports.length === 0) {
// only show info in production mode
let level = process.env.NODE_ENV === 'production' ? 'info' : 'debug';
// only show erros in test mode
/* istanbul ignore if */
if (process.env.NODE_ENV === 'test') {
level = 'error';
}
transports.push(new winston.transports.Console({level}));
}
/**
* Logger
* @private
*/
this.logger = createLogger(transports);
}
tryReconnect(e) {
if (e.code === 'ECONNREFUSED' && !this.reconnect) {
this.logger.info(`Couldn't connect to rabbit, retrying in ${Math.floor(this.reconnectTimeout / 1000)}s...`);
this.connecting = false;
this.reconnect = setTimeout(() => {
this.reconnect = undefined;
this.connect(true).catch(this.tryReconnect.bind(this));
}, this.reconnectTimeout);
return;
}
this.logger.error('Error connecting:', e);
throw e;
}
/**
* Register new Microwork plugin
* @param {Object} plugin Microwork plugin object
* @return {void}
* @example
* import myMicroworkPlugin from 'my-microwork-plugin';
* microworkInstance.registerPlugin(myMicroworkPlugin);
*/
registerPlugin(plugin) {
for (const prop in plugin) {
// only apply non-existent properties
if (!this.hasOwnProperty(prop)) {
/**
* New property from plugin
* @private
*/
this[prop] = plugin[prop];
}
}
}
/**
* Initializes connection to RabbitMQ
* @param {Boolean} calledFromTimer Defines whether function was called from reconnect timer
* @return {Promise} Returns promise that can be awaited to ensure connection
* @private
*/
async connect(calledFromTimer = false) {
// if not called from timer and reconnect pending - return self after delay
if (!calledFromTimer && this.reconnect) {
return sleep(this.reconnectTimeout).then(() => this.connect());
}
// if connecting, wait a bit, then return self
if (this.connecting) {
return sleep(50).then(() => this.connect());
}
// do not do anything if already connected
if (this.connection) {
return true;
}
this.logger.debug('connecting...');
// we're connecting
this.connecting = true;
// connect
this.connection = await amqp.connect(`amqp://${this.host}`);
this.logger.debug('connected to rabbit');
// get two channels - receive and send
this.channel = await this.connection.createChannel();
this.logger.debug('got channels');
// assing topic
await this.channel.assertExchange(this.exchange, 'topic');
this.logger.debug('got exchanges');
// say we want to prefetch only 1 msg
await this.channel.prefetch(1);
this.logger.debug('prefetch set');
// we're done connecting
this.connecting = false;
}
/**
* Removes existing subscription or worker.
* If consumerTag is given only corresponding subscription will be removed.
* Otherwise, all consumers for given topic will be terminated.
* @param {string} topic Topic to remove subscription/worker from
* @param {string} consumerTag Consumer tag to unsubscribe with
* @return {Promise} Returns promise that can be awaited to ensure removal
* @example <caption>Remove one subscription with consumerTag</caption>
* await microworkInstance.unsubscribe('test.topic', 'tag');
* @example <caption>Remove all subscriptions with topic</caption>
* await microworkInstance.unsubscribe('test.topic');
*/
async unsubscribe(topic, consumerTag) {
// if we have consumerTag - only unsub from it
if (consumerTag) {
// find index
const subIndex = this.routeHandlers[topic].findIndex(it => it.consumerTag === consumerTag);
// cancel consuming
await this.channel.cancel(consumerTag);
// remove from subs
this.routeHandlers[topic].splice(subIndex, 1);
return;
}
// cancel consuming
await Promise.all(this.routeHandlers[topic].map(it => this.channel.cancel(it.consumerTag)));
// remove whole topic
delete this.routeHandlers[topic];
}
/**
* Stops the service, closes all workers/subscriptions and terminates the connection to RabbitMQ
* @return {Promise} Returns promise that can be awaited to ensure termination
* @example
* await microworkInstance.stop();
*/
async stop() {
if (!this.connection && this.reconnect) {
this.logger.debug('not connected, cleaning reconnect timeout');
clearTimeout(this.reconnect);
return;
}
// cleanup queues and routes if any are present
const paths = Object.keys(this.routeHandlers);
if (paths.length) {
await Promise.all(paths.map(path => this.unsubscribe(path)));
}
// close channel & connection
await this.channel.close();
await this.connection.close();
}
/**
* Send given data to the specified topic
* @param {string} topic Topic to send data to
* @param {Any} data Data to send
* @param {Object} opts Publish options for RabbitMQ
* @return {Promise} Returns promise that can be awaited to ensure termination
* @example
* await microworkInstance.send('test.topic', 'test');
* await microworkInstance.send('test.topic', {json: 'works too'});
*/
async send(topic, data = '', opts = {}) {
// wait for connection
await this.connect();
// send
this.logger.debug('sending to', topic, 'data:', data);
this.channel.publish(this.exchange, topic, new Buffer(JSON.stringify(data)), opts);
}
/**
* Create subscription to given topic that will pass all incoming messages to given handler
* @param {string} topic Topic to subscribe to
* @param {Function} handler Handler function that will get all incoming messages
* @param {Object} queueConfig Queue config to pass to RabbitMQ
* @param {Object} consumeConfig Consume config to pass to RabbitMQ
* @param {Object} config Config for subscriber (e.g. wether to auto-ack messages)
* @return {string} Consumer tag that can be used for more precise unsubscribe action
* @example <caption>Simple subscribe usage</caption>
* await microworkInstance.subscribe('test.topic', (msg, reply) => {
* if (msg === 'ping') {
* reply('test.reply', 'pong');
* }
* });
* @example <caption>Subscribe with custom RabbitMQ options</caption>
* await microworkInstance.subscribe('test.topic', (msg, reply) => {
* if (msg === 'ping') {
* reply('test.reply', 'pong');
* }
* }, {durable: true, autoDelete: true, exclusive: true});
* @example <caption>Subscribe without auto-ack</caption>
* await microworkInstance.subscribe('test.topic', (msg, reply, ack, nack) => {
* if (msg === 'ping') {
* ack();
* reply('test.reply', 'pong');
* } else {
* nack();
* }
* }, {}, {}, {ack: false});
*/
async subscribe(
topic,
handler,
queueConfig = {},
consumeConfig = {},
config = {}
) {
// merge queueConfig with defaults
queueConfig = {
durable: true,
autoDelete: true,
...queueConfig,
};
// merge consumeConfig with defaults
consumeConfig = {
noAck: false,
...consumeConfig,
};
// merge config with defaults
config = {
ack: true,
...config,
};
// wait for connection
await this.connect();
// get queue
this.logger.debug('adding worker for:', topic);
const {queue} = await this.channel.assertQueue(`microwork-${topic}-queue`, queueConfig);
await this.channel.bindQueue(queue, this.exchange, topic);
this.logger.debug('bound queue...');
// consume if needed
this.logger.debug('initiating consuming...');
// listen for messages
const {consumerTag} = await this.channel.consume(queue, data => {
if (!data) {
return;
}
const msg = JSON.parse(data.content.toString());
// ack
if (config.ack) {
this.channel.ack(data);
}
// pass to handler
const reply = this.send.bind(this);
const ack = this.channel.ack.bind(this.channel, data);
const nack = this.channel.nack.bind(this.channel, data);
handler(msg, reply, ack, nack);
}, consumeConfig);
// push to cleanup
if (!this.routeHandlers[topic]) {
this.routeHandlers[topic] = [];
}
this.routeHandlers[topic].push({
queue,
consumerTag,
});
this.logger.info('worker inited, consuming...');
return consumerTag;
}
}