qjobs.js 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236
  1. var util = require('util');
  2. var events = require('events').EventEmitter;
  3. var qjob = function(options) {
  4. if(false === (this instanceof qjob)) {
  5. return new qjob(options);
  6. }
  7. this.maxConcurrency = 10;
  8. this.jobsRunning = 0;
  9. this.jobsDone = 0;
  10. this.jobsTotal = 0;
  11. this.timeStart;
  12. this.jobId = 0;
  13. this.jobsList = [];
  14. this.paused = false;
  15. this.pausedId = null;
  16. this.lastPause = 0;
  17. this.interval = null;
  18. this.stopAdding = false;
  19. this.sleeping = false;
  20. this.aborting = false;
  21. if (options) {
  22. this.maxConcurrency = options.maxConcurrency || this.maxConcurrency;
  23. this.interval = options.interval || this.interval;
  24. }
  25. events.call(this);
  26. };
  27. util.inherits(qjob, events);
  28. /*
  29. * helper to set max concurrency
  30. */
  31. qjob.prototype.setConcurrency = function(max) {
  32. this.maxConcurrency = max;
  33. }
  34. /*
  35. * helper to set delay between rafales
  36. */
  37. qjob.prototype.setInterval = function(delay) {
  38. this.interval = delay;
  39. }
  40. /*
  41. * add some jobs in the queue
  42. */
  43. qjob.prototype.add = function(job,args) {
  44. var self = this;
  45. self.jobsList.push([job,args]);
  46. self.jobsTotal++;
  47. }
  48. /*
  49. *
  50. */
  51. qjob.prototype.sleepDueToInterval = function() {
  52. var self = this;
  53. if (this.interval === null) {
  54. return;
  55. }
  56. if (this.sleeping) {
  57. return true;
  58. }
  59. if (this.stopAdding) {
  60. if (this.jobsRunning > 0) {
  61. //console.log('waiting for '+jobsRunning+' jobs to finish');
  62. return true;
  63. }
  64. //console.log('waiting for '+rafaleDelay+' ms');
  65. this.sleeping = true;
  66. self.emit('sleep');
  67. setTimeout(function() {
  68. this.stopAdding = false;
  69. this.sleeping = false;
  70. self.emit('continu');
  71. self.run();
  72. }.bind(self),this.interval);
  73. return true;
  74. }
  75. if (this.jobsRunning + 1 == this.maxConcurrency) {
  76. //console.log('max concurrent jobs reached');
  77. this.stopAdding = true;
  78. return true;
  79. }
  80. }
  81. /*
  82. * run the queue
  83. */
  84. qjob.prototype.run = function() {
  85. var self = this;
  86. // first launch, let's emit start event
  87. if (this.jobsDone == 0) {
  88. self.emit('start');
  89. this.timeStart = Date.now();
  90. }
  91. if (self.sleepDueToInterval()) return;
  92. if (self.aborting) {
  93. this.jobsList = [];
  94. }
  95. // while queue is empty and number of job running
  96. // concurrently are less than max job running,
  97. // then launch the next job
  98. while (this.jobsList.length && this.jobsRunning < this.maxConcurrency) {
  99. // get the next job and
  100. // remove it from the queue
  101. var job = self.jobsList.shift();
  102. // increment number of job running
  103. self.jobsRunning++;
  104. // fetch args for the job
  105. var args = job[1];
  106. // add jobId in args
  107. args._jobId = this.jobId++;
  108. // emit jobStart event
  109. self.emit('jobStart',args);
  110. // run the job
  111. setTimeout(function() {
  112. this.j(this.args,self.next.bind(self,this.args));
  113. }.bind({j:job[0],args:args}),1);
  114. }
  115. // all jobs done ? emit end event
  116. if (this.jobsList.length == 0 && this.jobsRunning == 0) {
  117. self.emit('end');
  118. }
  119. }
  120. /*
  121. * a task has been terminated,
  122. * so 'next()' has been called
  123. */
  124. qjob.prototype.next = function(args) {
  125. var self = this;
  126. // update counters
  127. this.jobsRunning--;
  128. this.jobsDone++;
  129. // emit 'jobEnd' event
  130. self.emit('jobEnd',args);
  131. // if queue has been set to pause
  132. // then do nothing
  133. if (this.paused) return;
  134. // else, execute run() function
  135. self.run();
  136. }
  137. /*
  138. * You can 'pause' jobs.
  139. * it will not pause running jobs, but
  140. * it will stop launching pending jobs
  141. * until paused = false
  142. */
  143. qjob.prototype.pause = function(status) {
  144. var self = this;
  145. this.paused = status;
  146. if (!this.paused && this.pausedId) {
  147. clearInterval(this.pausedId);
  148. self.emit('unpause');
  149. this.run();
  150. }
  151. if (this.paused && !this.pausedId) {
  152. self.lastPause = Date.now();
  153. this.pausedId = setInterval(function() {
  154. var since = Date.now() - self.lastPause;
  155. self.emit('pause',since);
  156. },1000);
  157. return;
  158. }
  159. }
  160. qjob.prototype.stats = function() {
  161. var now = Date.now();
  162. var o = {};
  163. o._timeStart = this.timeStart || 'N/A';
  164. o._timeElapsed = (now - this.timeStart) || 'N/A';
  165. o._jobsTotal = this.jobsTotal;
  166. o._jobsRunning = this.jobsRunning;
  167. o._jobsDone = this.jobsDone;
  168. o._progress = Math.floor((this.jobsDone/this.jobsTotal)*100);
  169. o._concurrency = this.maxConcurrency;
  170. if (this.paused) {
  171. o._status = 'Paused';
  172. return o;
  173. }
  174. if (o._timeElapsed == 'N/A') {
  175. o._status = 'Starting';
  176. return o;
  177. }
  178. if (this.jobsTotal == this.jobsDone) {
  179. o._status = 'Finished';
  180. return o;
  181. }
  182. o._status = 'Running';
  183. return o;
  184. }
  185. qjob.prototype.abort = function() {
  186. this.aborting = true;
  187. }
  188. module.exports = qjob;