HTMLify
collection_ops.js
Views: 6 | Author: cody
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 349 350 351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | 'use strict'; const applyWriteConcern = require('../utils').applyWriteConcern; const Code = require('../core').BSON.Code; const createIndexDb = require('./db_ops').createIndex; const decorateWithCollation = require('../utils').decorateWithCollation; const decorateWithReadConcern = require('../utils').decorateWithReadConcern; const ensureIndexDb = require('./db_ops').ensureIndex; const evaluate = require('./db_ops').evaluate; const executeCommand = require('./db_ops').executeCommand; const resolveReadPreference = require('../utils').resolveReadPreference; const handleCallback = require('../utils').handleCallback; const indexInformationDb = require('./db_ops').indexInformation; const Long = require('../core').BSON.Long; const MongoError = require('../core').MongoError; const ReadPreference = require('../core').ReadPreference; const toError = require('../utils').toError; const insertDocuments = require('./common_functions').insertDocuments; const updateDocuments = require('./common_functions').updateDocuments; /** * Group function helper * @ignore */ // var groupFunction = function () { // var c = db[ns].find(condition); // var map = new Map(); // var reduce_function = reduce; // // while (c.hasNext()) { // var obj = c.next(); // var key = {}; // // for (var i = 0, len = keys.length; i < len; ++i) { // var k = keys[i]; // key[k] = obj[k]; // } // // var aggObj = map.get(key); // // if (aggObj == null) { // var newObj = Object.extend({}, key); // aggObj = Object.extend(newObj, initial); // map.put(key, aggObj); // } // // reduce_function(obj, aggObj); // } // // return { "result": map.values() }; // }.toString(); const groupFunction = 'function () {\nvar c = db[ns].find(condition);\nvar map = new Map();\nvar reduce_function = reduce;\n\nwhile (c.hasNext()) {\nvar obj = c.next();\nvar key = {};\n\nfor (var i = 0, len = keys.length; i < len; ++i) {\nvar k = keys[i];\nkey[k] = obj[k];\n}\n\nvar aggObj = map.get(key);\n\nif (aggObj == null) {\nvar newObj = Object.extend({}, key);\naggObj = Object.extend(newObj, initial);\nmap.put(key, aggObj);\n}\n\nreduce_function(obj, aggObj);\n}\n\nreturn { "result": map.values() };\n}'; // Check the update operation to ensure it has atomic operators. function checkForAtomicOperators(update) { if (Array.isArray(update)) { return update.reduce((err, u) => err || checkForAtomicOperators(u), null); } const keys = Object.keys(update); // same errors as the server would give for update doc lacking atomic operators if (keys.length === 0) { return toError('The update operation document must contain at least one atomic operator.'); } if (keys[0][0] !== '$') { return toError('the update operation document must contain atomic operators.'); } } /** * Create an index on the db and collection. * * @method * @param {Collection} a Collection instance. * @param {(string|object)} fieldOrSpec Defines the index. * @param {object} [options] Optional settings. See Collection.prototype.createIndex for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function createIndex(coll, fieldOrSpec, options, callback) { createIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback); } /** * Create multiple indexes in the collection. This method is only supported for * MongoDB 2.6 or higher. Earlier version of MongoDB will throw a command not supported * error. Index specifications are defined at http://docs.mongodb.org/manual/reference/command/createIndexes/. * * @method * @param {Collection} a Collection instance. * @param {array} indexSpecs An array of index specifications to be created * @param {Object} [options] Optional settings. See Collection.prototype.createIndexes for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function createIndexes(coll, indexSpecs, options, callback) { const capabilities = coll.s.topology.capabilities(); // Ensure we generate the correct name if the parameter is not set for (let i = 0; i < indexSpecs.length; i++) { if (indexSpecs[i].name == null) { const keys = []; // Did the user pass in a collation, check if our write server supports it if (indexSpecs[i].collation && capabilities && !capabilities.commandsTakeCollation) { return callback(new MongoError('server/primary/mongos does not support collation')); } for (let name in indexSpecs[i].key) { keys.push(`${name}_${indexSpecs[i].key[name]}`); } // Set the name indexSpecs[i].name = keys.join('_'); } } options = Object.assign({}, options, { readPreference: ReadPreference.PRIMARY }); // Execute the index executeCommand( coll.s.db, { createIndexes: coll.collectionName, indexes: indexSpecs }, options, callback ); } /** * Ensure that an index exists. If the index does not exist, this function creates it. * * @method * @param {Collection} a Collection instance. * @param {(string|object)} fieldOrSpec Defines the index. * @param {object} [options] Optional settings. See Collection.prototype.ensureIndex for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function ensureIndex(coll, fieldOrSpec, options, callback) { ensureIndexDb(coll.s.db, coll.collectionName, fieldOrSpec, options, callback); } /** * Run a group command across a collection. * * @method * @param {Collection} a Collection instance. * @param {(object|array|function|code)} keys An object, array or function expressing the keys to group by. * @param {object} condition An optional condition that must be true for a row to be considered. * @param {object} initial Initial value of the aggregation counter object. * @param {(function|Code)} reduce The reduce function aggregates (reduces) the objects iterated * @param {(function|Code)} finalize An optional function to be run on each item in the result set just before the item is returned. * @param {boolean} command Specify if you wish to run using the internal group command or using eval, default is true. * @param {object} [options] Optional settings. See Collection.prototype.group for a list of options. * @param {Collection~resultCallback} [callback] The command result callback * @deprecated MongoDB 3.6 or higher will no longer support the group command. We recommend rewriting using the aggregation framework. */ function group(coll, keys, condition, initial, reduce, finalize, command, options, callback) { // Execute using the command if (command) { const reduceFunction = reduce && reduce._bsontype === 'Code' ? reduce : new Code(reduce); const selector = { group: { ns: coll.collectionName, $reduce: reduceFunction, cond: condition, initial: initial, out: 'inline' } }; // if finalize is defined if (finalize != null) selector.group['finalize'] = finalize; // Set up group selector if ('function' === typeof keys || (keys && keys._bsontype === 'Code')) { selector.group.$keyf = keys && keys._bsontype === 'Code' ? keys : new Code(keys); } else { const hash = {}; keys.forEach(key => { hash[key] = 1; }); selector.group.key = hash; } options = Object.assign({}, options); // Ensure we have the right read preference inheritance options.readPreference = resolveReadPreference(coll, options); // Do we have a readConcern specified decorateWithReadConcern(selector, coll, options); // Have we specified collation try { decorateWithCollation(selector, coll, options); } catch (err) { return callback(err, null); } // Execute command executeCommand(coll.s.db, selector, options, (err, result) => { if (err) return handleCallback(callback, err, null); handleCallback(callback, null, result.retval); }); } else { // Create execution scope const scope = reduce != null && reduce._bsontype === 'Code' ? reduce.scope : {}; scope.ns = coll.collectionName; scope.keys = keys; scope.condition = condition; scope.initial = initial; // Pass in the function text to execute within mongodb. const groupfn = groupFunction.replace(/ reduce;/, reduce.toString() + ';'); evaluate(coll.s.db, new Code(groupfn, scope), null, options, (err, results) => { if (err) return handleCallback(callback, err, null); handleCallback(callback, null, results.result || results); }); } } /** * Retrieve all the indexes on the collection. * * @method * @param {Collection} a Collection instance. * @param {Object} [options] Optional settings. See Collection.prototype.indexes for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function indexes(coll, options, callback) { options = Object.assign({}, { full: true }, options); indexInformationDb(coll.s.db, coll.collectionName, options, callback); } /** * Check if one or more indexes exist on the collection. This fails on the first index that doesn't exist. * * @method * @param {Collection} a Collection instance. * @param {(string|array)} indexes One or more index names to check. * @param {Object} [options] Optional settings. See Collection.prototype.indexExists for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function indexExists(coll, indexes, options, callback) { indexInformation(coll, options, (err, indexInformation) => { // If we have an error return if (err != null) return handleCallback(callback, err, null); // Let's check for the index names if (!Array.isArray(indexes)) return handleCallback(callback, null, indexInformation[indexes] != null); // Check in list of indexes for (let i = 0; i < indexes.length; i++) { if (indexInformation[indexes[i]] == null) { return handleCallback(callback, null, false); } } // All keys found return true return handleCallback(callback, null, true); }); } /** * Retrieve this collection's index info. * * @method * @param {Collection} a Collection instance. * @param {object} [options] Optional settings. See Collection.prototype.indexInformation for a list of options. * @param {Collection~resultCallback} [callback] The command result callback */ function indexInformation(coll, options, callback) { indexInformationDb(coll.s.db, coll.collectionName, options, callback); } /** * Return N parallel cursors for a collection to allow parallel reading of the entire collection. There are * no ordering guarantees for returned results. * * @method * @param {Collection} a Collection instance. * @param {object} [options] Optional settings. See Collection.prototype.parallelCollectionScan for a list of options. * @param {Collection~parallelCollectionScanCallback} [callback] The command result callback */ function parallelCollectionScan(coll, options, callback) { // Create command object const commandObject = { parallelCollectionScan: coll.collectionName, numCursors: options.numCursors }; // Do we have a readConcern specified decorateWithReadConcern(commandObject, coll, options); // Store the raw value const raw = options.raw; delete options['raw']; // Execute the command executeCommand(coll.s.db, commandObject, options, (err, result) => { if (err) return handleCallback(callback, err, null); if (result == null) return handleCallback( callback, new Error('no result returned for parallelCollectionScan'), null ); options = Object.assign({ explicitlyIgnoreSession: true }, options); const cursors = []; // Add the raw back to the option if (raw) options.raw = raw; // Create command cursors for each item for (let i = 0; i < result.cursors.length; i++) { const rawId = result.cursors[i].cursor.id; // Convert cursorId to Long if needed const cursorId = typeof rawId === 'number' ? Long.fromNumber(rawId) : rawId; // Add a command cursor cursors.push(coll.s.topology.cursor(coll.namespace, cursorId, options)); } handleCallback(callback, null, cursors); }); } /** * Save a document. * * @method * @param {Collection} a Collection instance. * @param {object} doc Document to save * @param {object} [options] Optional settings. See Collection.prototype.save for a list of options. * @param {Collection~writeOpCallback} [callback] The command result callback * @deprecated use insertOne, insertMany, updateOne or updateMany */ function save(coll, doc, options, callback) { // Get the write concern options const finalOptions = applyWriteConcern( Object.assign({}, options), { db: coll.s.db, collection: coll }, options ); // Establish if we need to perform an insert or update if (doc._id != null) { finalOptions.upsert = true; return updateDocuments(coll, { _id: doc._id }, doc, finalOptions, callback); } // Insert the document insertDocuments(coll, [doc], finalOptions, (err, result) => { if (callback == null) return; if (doc == null) return handleCallback(callback, null, null); if (err) return handleCallback(callback, err, null); handleCallback(callback, null, result); }); } module.exports = { checkForAtomicOperators, createIndex, createIndexes, ensureIndex, group, indexes, indexExists, indexInformation, parallelCollectionScan, save }; |