newspaint

Documenting Problems That Were Difficult To Find The Answer To

Thread Pool Pattern for Node.JS using Q.js Promises

I wanted to emulate the Thread Pool Pattern in Node.JS using Q.js promises.

Of course Node.JS does not have multiple threads for the user to use. Which is why Q.js is used in the first place – to make concurrency programming easier using the event/callback model that Node.JS implements.

Given an array of functions which return promises I wanted a pattern function that provided a customisable n number of workers. These workers would run asynchronously processing one function from the list at a time, and when each returned promise resolved it would then fetch the next function from the list – until no functions were left to process. When the last worker finished processing it would then resolve the promise returned by the pattern function to the caller. Usually there would be many tasks and limited number of workers.

Why would I want to do this? Let’s say I have a hundred or thousand URLs I want to fetch. If I put them all into a list and called Q.allSettled then Node.JS would attempt to fetch them all at the same time – this could result in excessive delays, a peak in network traffic, or might be plain impolite on the target webserver. Instead it may be better to download a maximum of 4 web pages at any one time.

For simplicity’s sake here is an example. Let’s create a function that returns a promise:

// delayms() - returns a promise that resolves approximately n milliseconds into the future
//   - ms - milliseconds to wait before resolving promise
function delayms( ms ) {
  var deferred = Q.defer();

  setTimeout( function () { deferred.resolve( "Finished after " + ms + "ms" ); }, ms);
  return deferred.promise;
}

Now let’s create a list of functions that return a promise (we could call these functions “tasks”). It’s important that the array contains functions, not the result of a function that returns a promise (i.e. a promise).

For example:

// create array of functions to process
var tasklist = [ ];
for ( var item = 0; item < 12; item++ ) {
  tasklist.push(
    function () {
      console.log( "Going to wait 1 second..." );
      return delayms( 1000 ); // return promise as last statement
    }
  );
}

// let's throw in an error, too
tasklist.push(
  function () {
    throw new Error( "Deliberate error!" );
  }
);

Now we have an array of tasks to process. How about we process them all – but a maximum of 3 running at any one time? We would want to do something like:

var Q = require('./q.js'); // load Q.js library

Q.fcall(
  // call a function, this function resolves immediately
  function () {
    console.log( "Starting" );
  }
).then(
  // do the list of tasks, max concurrency of 3
  function () {
    // return the promise
    return workermodel( 3, tasks );
  }
).then(
  // output the list of results
  function ( results ) {
    console.log( "End: " + JSON.stringify( results, null, 2 ) );
  }
).fail(
  function ( reason ) {
    console.error( "Error: " + reason );
  }
).done();

Well all we need now is the workermodel() function to run through our list of tasks:

// workermodel() - returns a promise that resolves after task list processes with max concurrency
//   - workers - maximum concurrency or number of workers
//   - tasklist - array of functions that return a promise
var debug = true;
function workermodel( workers, tasklist ) {
  var taskidx = 0;
  var workersfree = 0;

  // if no tasks to perform return a completed promise with empty array
  if ( tasklist.length == 0 ) {
    return Q.fcall( function () { return []; } );
  }

  var deferred = Q.defer();

  // if less tasks than workers, limit workers to task size
  if ( tasklist.length < workers ) {
    workers = tasklist.length;
  }

  // results will go into this array
  var resultsarray = new Array( workers );

  var getNextTaskGenerator = function () { }; // place holder

  var startNextTask = function ( innerworkeridx, innertaskidx ) {
    if ( debug )
      console.error( new Date() + " task[" + innertaskidx + "/" + tasklist.length + "] assigned to worker[" + innerworkeridx + "]" );
    var nextTask = Q.fcall(
      function () {
        return tasklist[innertaskidx]();
      }
    );
    nextTask.then(
      function ( value ) {
        if ( debug )
          console.error( new Date() + " task[" + innertaskidx + "] resolved on worker[" + innerworkeridx + "]" );
        resultsarray[innertaskidx] = { state: "fulfilled", value: value };
        return getNextTaskGenerator( innerworkeridx )();
      },
      function ( reason ) {
        if ( debug )
          console.log( new Date() + " task[" + innertaskidx + "] rejected on worker[" + innerworkeridx + "]" );
        resultsarray[innertaskidx] = { state: "rejected", reason: reason };
        return getNextTaskGenerator( innerworkeridx )();
      }
    );

    return nextTask;
  };

  getNextTaskGenerator = function ( workeridx ) {
    return function () {
      if ( debug )
        console.error( new Date() + " getnext task[" + taskidx + "] for worker[" + workeridx + "]" );
      if ( taskidx < tasklist.length ) {
        var nextTask = startNextTask( workeridx, taskidx );
        taskidx++;
        return nextTask;
      } else {
        workersfree++;
        if ( workersfree == workers ) {
          if ( debug )
            console.error( new Date() + " workermodel RESOLVE" );
          deferred.resolve( resultsarray );
        } else {
          if ( debug )
            console.error( new Date() + " no more work but " + ( workers - workersfree ) + " workers busy" );
        }
      }
    };
  };

  // start workers
  for ( var workeridx = 0; workeridx < workers; workeridx++ ) {
    startNextTask( workeridx, taskidx );
    taskidx++;
  }

  if ( debug )
    console.error( new Date() + " RETURNING PROMISE" );
  return deferred.promise;
}

Like Q.allSettled() this function always resolves and when it does the value is a list of associative arrays, each row (related to the function in the tasklist provided) contains a state field of “fulfilled” or “rejected” and if “fulfilled” then the “value” field will contain the result. Otherwise the “reason” field will contain the error.

What is the output of this script (with debugging turned on)?

Starting
Sat Sep 12 2015 19:48:29 GMT+0100 (BST) task[0/13] assigned to worker[0]
Sat Sep 12 2015 19:48:29 GMT+0100 (BST) task[1/13] assigned to worker[1]
Sat Sep 12 2015 19:48:29 GMT+0100 (BST) task[2/13] assigned to worker[2]
Sat Sep 12 2015 19:48:29 GMT+0100 (BST) RETURNING PROMISE
Going to wait 1 second...
Going to wait 1 second...
Going to wait 1 second...
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[0] resolved on worker[0]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) getnext task[3] for worker[0]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[3/13] assigned to worker[0]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[1] resolved on worker[1]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) getnext task[4] for worker[1]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[4/13] assigned to worker[1]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[2] resolved on worker[2]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) getnext task[5] for worker[2]
Sat Sep 12 2015 19:48:30 GMT+0100 (BST) task[5/13] assigned to worker[2]
Going to wait 1 second...
Going to wait 1 second...
Going to wait 1 second...
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[3] resolved on worker[0]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) getnext task[6] for worker[0]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[6/13] assigned to worker[0]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[4] resolved on worker[1]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) getnext task[7] for worker[1]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[7/13] assigned to worker[1]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[5] resolved on worker[2]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) getnext task[8] for worker[2]
Sat Sep 12 2015 19:48:31 GMT+0100 (BST) task[8/13] assigned to worker[2]
Going to wait 1 second...
Going to wait 1 second...
Going to wait 1 second...
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[6] resolved on worker[0]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) getnext task[9] for worker[0]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[9/13] assigned to worker[0]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[7] resolved on worker[1]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) getnext task[10] for worker[1]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[10/13] assigned to worker[1]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[8] resolved on worker[2]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) getnext task[11] for worker[2]
Sat Sep 12 2015 19:48:32 GMT+0100 (BST) task[11/13] assigned to worker[2]
Going to wait 1 second...
Going to wait 1 second...
Going to wait 1 second...
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) task[9] resolved on worker[0]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) getnext task[12] for worker[0]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) task[12/13] assigned to worker[0]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) task[10] resolved on worker[1]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) getnext task[13] for worker[1]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) no more work but 2 workers busy
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) task[11] resolved on worker[2]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) getnext task[13] for worker[2]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) no more work but 1 workers busy
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) task[12] rejected on worker[0]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) getnext task[13] for worker[0]
Sat Sep 12 2015 19:48:33 GMT+0100 (BST) workermodel RESOLVE
End: [
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "fulfilled",
    "value": "Finished after 1000ms"
  },
  {
    "state": "rejected",
    "reason": {}
  }
]

All of the above is my own work. It is free for others to use.

See Also

  • bluebird Javascript promises library has a concurrency option which would appear to be similar in functionality

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s

%d bloggers like this: