Dashboard Temp Share Shortlinks Frames API

HTMLify

eachAsync.js
Views: 6 | Author: cody
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
'use strict';

/*!
 * Module dependencies.
 */

const promiseOrCallback = require('../promiseOrCallback');

/**
 * Execute `fn` for every document in the cursor. If `fn` returns a promise,
 * will wait for the promise to resolve before iterating on to the next one.
 * Returns a promise that resolves when done.
 *
 * @param {Function} next the thunk to call to get the next document
 * @param {Function} fn
 * @param {Object} options
 * @param {Function} [callback] executed when all docs have been processed
 * @return {Promise}
 * @api public
 * @method eachAsync
 */

module.exports = function eachAsync(next, fn, options, callback) {
  const parallel = options.parallel || 1;
  const enqueue = asyncQueue();

  return promiseOrCallback(callback, cb => {
    iterate(cb);
  });

  function iterate(finalCallback) {
    let drained = false;
    let handleResultsInProgress = 0;
    let currentDocumentIndex = 0;

    let error = null;
    for (let i = 0; i < parallel; ++i) {
      enqueue(fetch);
    }

    function fetch(done) {
      if (drained || error) {
        return done();
      }

      next(function(err, doc) {
        if (drained || error != null) {
          return done();
        }
        if (err != null) {
          error = err;
          finalCallback(err);
          return done();
        }
        if (doc == null) {
          drained = true;
          if (handleResultsInProgress <= 0) {
            finalCallback(null);
          }
          return done();
        }

        ++handleResultsInProgress;

        // Kick off the subsequent `next()` before handling the result, but
        // make sure we know that we still have a result to handle re: #8422
        process.nextTick(() => done());

        handleNextResult(doc, currentDocumentIndex++, function(err) {
          --handleResultsInProgress;
          if (err != null) {
            error = err;
            return finalCallback(err);
          }
          if (drained && handleResultsInProgress <= 0) {
            return finalCallback(null);
          }

          setTimeout(() => enqueue(fetch), 0);
        });
      });
    }
  }

  function handleNextResult(doc, i, callback) {
    const promise = fn(doc, i);
    if (promise && typeof promise.then === 'function') {
      promise.then(
        function() { callback(null); },
        function(error) { callback(error || new Error('`eachAsync()` promise rejected without error')); });
    } else {
      callback(null);
    }
  }
};

// `next()` can only execute one at a time, so make sure we always execute
// `next()` in series, while still allowing multiple `fn()` instances to run
// in parallel.
function asyncQueue() {
  const _queue = [];
  let inProgress = null;
  let id = 0;

  return function enqueue(fn) {
    if (_queue.length === 0 && inProgress == null) {
      inProgress = id++;
      return fn(_step);
    }
    _queue.push(fn);
  };

  function _step() {
    inProgress = null;
    if (_queue.length > 0) {
      inProgress = id++;
      const fn = _queue.shift();
      fn(_step);
    }
  }
}