123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236 |
- var util = require('util');
- var events = require('events').EventEmitter;
- var qjob = function(options) {
- if(false === (this instanceof qjob)) {
- return new qjob(options);
- }
- this.maxConcurrency = 10;
- this.jobsRunning = 0;
- this.jobsDone = 0;
- this.jobsTotal = 0;
- this.timeStart;
- this.jobId = 0;
- this.jobsList = [];
- this.paused = false;
- this.pausedId = null;
- this.lastPause = 0;
- this.interval = null;
- this.stopAdding = false;
- this.sleeping = false;
- this.aborting = false;
- if (options) {
- this.maxConcurrency = options.maxConcurrency || this.maxConcurrency;
- this.interval = options.interval || this.interval;
- }
- events.call(this);
- };
- util.inherits(qjob, events);
- /*
- * helper to set max concurrency
- */
- qjob.prototype.setConcurrency = function(max) {
- this.maxConcurrency = max;
- }
- /*
- * helper to set delay between rafales
- */
- qjob.prototype.setInterval = function(delay) {
- this.interval = delay;
- }
- /*
- * add some jobs in the queue
- */
- qjob.prototype.add = function(job,args) {
- var self = this;
- self.jobsList.push([job,args]);
- self.jobsTotal++;
- }
- /*
- *
- */
- qjob.prototype.sleepDueToInterval = function() {
- var self = this;
- if (this.interval === null) {
- return;
- }
- if (this.sleeping) {
- return true;
- }
- if (this.stopAdding) {
- if (this.jobsRunning > 0) {
- //console.log('waiting for '+jobsRunning+' jobs to finish');
- return true;
- }
- //console.log('waiting for '+rafaleDelay+' ms');
- this.sleeping = true;
- self.emit('sleep');
- setTimeout(function() {
- this.stopAdding = false;
- this.sleeping = false;
- self.emit('continu');
- self.run();
- }.bind(self),this.interval);
- return true;
- }
- if (this.jobsRunning + 1 == this.maxConcurrency) {
- //console.log('max concurrent jobs reached');
- this.stopAdding = true;
- return true;
- }
- }
- /*
- * run the queue
- */
- qjob.prototype.run = function() {
- var self = this;
- // first launch, let's emit start event
- if (this.jobsDone == 0) {
- self.emit('start');
- this.timeStart = Date.now();
- }
- if (self.sleepDueToInterval()) return;
- if (self.aborting) {
- this.jobsList = [];
- }
- // while queue is empty and number of job running
- // concurrently are less than max job running,
- // then launch the next job
- while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) {
- // get the next job and
- // remove it from the queue
- var job = self.jobsList.shift();
- // increment number of job running
- self.jobsRunning++;
- // fetch args for the job
- var args = job[1];
- // add jobId in args
- args._jobId = this.jobId++;
- // emit jobStart event
- self.emit('jobStart',args);
- // run the job
- setTimeout(function() {
- this.j(this.args,self.next.bind(self,this.args));
- }.bind({j:job[0],args:args}),1);
- }
- // all jobs done ? emit end event
- if (this.jobsList.length == 0 && this.jobsRunning == 0) {
- self.emit('end');
- }
- }
- /*
- * a task has been terminated,
- * so 'next()' has been called
- */
- qjob.prototype.next = function(args) {
- var self = this;
- // update counters
- this.jobsRunning--;
- this.jobsDone++;
- // emit 'jobEnd' event
- self.emit('jobEnd',args);
- // if queue has been set to pause
- // then do nothing
- if (this.paused) return;
- // else, execute run() function
- self.run();
- }
- /*
- * You can 'pause' jobs.
- * it will not pause running jobs, but
- * it will stop launching pending jobs
- * until paused = false
- */
- qjob.prototype.pause = function(status) {
- var self = this;
- this.paused = status;
- if (!this.paused && this.pausedId) {
- clearInterval(this.pausedId);
- self.emit('unpause');
- this.run();
- }
- if (this.paused && !this.pausedId) {
- self.lastPause = Date.now();
- this.pausedId = setInterval(function() {
- var since = Date.now() - self.lastPause;
- self.emit('pause',since);
- },1000);
- return;
- }
- }
- qjob.prototype.stats = function() {
- var now = Date.now();
- var o = {};
- o._timeStart = this.timeStart || 'N/A';
- o._timeElapsed = (now - this.timeStart) || 'N/A';
- o._jobsTotal = this.jobsTotal;
- o._jobsRunning = this.jobsRunning;
- o._jobsDone = this.jobsDone;
- o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100);
- o._concurrency = this.maxConcurrency;
- if (this.paused) {
- o._status = 'Paused';
- return o;
- }
- if (o._timeElapsed == 'N/A') {
- o._status = 'Starting';
- return o;
- }
- if (this.jobsTotal == this.jobsDone) {
- o._status = 'Finished';
- return o;
- }
- o._status = 'Running';
- return o;
- }
- qjob.prototype.abort = function() {
- this.aborting = true;
- }
- module.exports = qjob;
|