HTMLify
cursor_ops.js
Views: 6 | Author: cody
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 | 'use strict'; const buildCountCommand = require('./collection_ops').buildCountCommand; const handleCallback = require('../utils').handleCallback; const MongoError = require('../core').MongoError; const push = Array.prototype.push; const CursorState = require('../core/cursor').CursorState; /** * Get the count of documents for this cursor. * * @method * @param {Cursor} cursor The Cursor instance on which to count. * @param {boolean} [applySkipLimit=true] Specifies whether the count command apply limit and skip settings should be applied on the cursor or in the provided options. * @param {object} [options] Optional settings. See Cursor.prototype.count for a list of options. * @param {Cursor~countResultCallback} [callback] The result callback. */ function count(cursor, applySkipLimit, opts, callback) { if (applySkipLimit) { if (typeof cursor.cursorSkip() === 'number') opts.skip = cursor.cursorSkip(); if (typeof cursor.cursorLimit() === 'number') opts.limit = cursor.cursorLimit(); } // Ensure we have the right read preference inheritance if (opts.readPreference) { cursor.setReadPreference(opts.readPreference); } if ( typeof opts.maxTimeMS !== 'number' && cursor.cmd && typeof cursor.cmd.maxTimeMS === 'number' ) { opts.maxTimeMS = cursor.cmd.maxTimeMS; } let options = {}; options.skip = opts.skip; options.limit = opts.limit; options.hint = opts.hint; options.maxTimeMS = opts.maxTimeMS; // Command options.collectionName = cursor.namespace.collection; let command; try { command = buildCountCommand(cursor, cursor.cmd.query, options); } catch (err) { return callback(err); } // Set cursor server to the same as the topology cursor.server = cursor.topology.s.coreTopology; // Execute the command cursor.topology.command( cursor.namespace.withCollection('$cmd'), command, cursor.options, (err, result) => { callback(err, result ? result.result.n : null); } ); } /** * Iterates over all the documents for this cursor. See Cursor.prototype.each for more information. * * @method * @deprecated * @param {Cursor} cursor The Cursor instance on which to run. * @param {Cursor~resultCallback} callback The result callback. */ function each(cursor, callback) { if (!callback) throw MongoError.create({ message: 'callback is mandatory', driver: true }); if (cursor.isNotified()) return; if (cursor.s.state === CursorState.CLOSED || cursor.isDead()) { return handleCallback( callback, MongoError.create({ message: 'Cursor is closed', driver: true }) ); } if (cursor.s.state === CursorState.INIT) { cursor.s.state = CursorState.OPEN; } // Define function to avoid global scope escape let fn = null; // Trampoline all the entries if (cursor.bufferedCount() > 0) { while ((fn = loop(cursor, callback))) fn(cursor, callback); each(cursor, callback); } else { cursor.next((err, item) => { if (err) return handleCallback(callback, err); if (item == null) { return cursor.close({ skipKillCursors: true }, () => handleCallback(callback, null, null)); } if (handleCallback(callback, null, item) === false) return; each(cursor, callback); }); } } // Trampoline emptying the number of retrieved items // without incurring a nextTick operation function loop(cursor, callback) { // No more items we are done if (cursor.bufferedCount() === 0) return; // Get the next document cursor._next(callback); // Loop return loop; } /** * Returns an array of documents. See Cursor.prototype.toArray for more information. * * @method * @param {Cursor} cursor The Cursor instance from which to get the next document. * @param {Cursor~toArrayResultCallback} [callback] The result callback. */ function toArray(cursor, callback) { const items = []; // Reset cursor cursor.rewind(); cursor.s.state = CursorState.INIT; // Fetch all the documents const fetchDocs = () => { cursor._next((err, doc) => { if (err) { return handleCallback(callback, err); } if (doc == null) { return cursor.close({ skipKillCursors: true }, () => handleCallback(callback, null, items)); } // Add doc to items items.push(doc); // Get all buffered objects if (cursor.bufferedCount() > 0) { let docs = cursor.readBufferedDocuments(cursor.bufferedCount()); // Transform the doc if transform method added if (cursor.s.transforms && typeof cursor.s.transforms.doc === 'function') { docs = docs.map(cursor.s.transforms.doc); } push.apply(items, docs); } // Attempt a fetch fetchDocs(); }); }; fetchDocs(); } module.exports = { count, each, toArray }; |