HTMLify
message_stream.js
Views: 8 | Author: cody
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 | 'use strict'; const Duplex = require('stream').Duplex; const BufferList = require('bl'); const MongoParseError = require('../core/error').MongoParseError; const decompress = require('../core/wireprotocol/compression').decompress; const Response = require('../core/connection/commands').Response; const BinMsg = require('../core/connection/msg').BinMsg; const MongoError = require('../core/error').MongoError; const OP_COMPRESSED = require('../core/wireprotocol/shared').opcodes.OP_COMPRESSED; const OP_MSG = require('../core/wireprotocol/shared').opcodes.OP_MSG; const MESSAGE_HEADER_SIZE = require('../core/wireprotocol/shared').MESSAGE_HEADER_SIZE; const COMPRESSION_DETAILS_SIZE = require('../core/wireprotocol/shared').COMPRESSION_DETAILS_SIZE; const opcodes = require('../core/wireprotocol/shared').opcodes; const compress = require('../core/wireprotocol/compression').compress; const compressorIDs = require('../core/wireprotocol/compression').compressorIDs; const uncompressibleCommands = require('../core/wireprotocol/compression').uncompressibleCommands; const Msg = require('../core/connection/msg').Msg; const kDefaultMaxBsonMessageSize = 1024 * 1024 * 16 * 4; const kBuffer = Symbol('buffer'); /** * A duplex stream that is capable of reading and writing raw wire protocol messages, with * support for optional compression */ class MessageStream extends Duplex { constructor(options) { options = options || {}; super(options); this.bson = options.bson; this.maxBsonMessageSize = options.maxBsonMessageSize || kDefaultMaxBsonMessageSize; this[kBuffer] = new BufferList(); } _write(chunk, _, callback) { const buffer = this[kBuffer]; buffer.append(chunk); processIncomingData(this, callback); } _read(/* size */) { // NOTE: This implementation is empty because we explicitly push data to be read // when `writeMessage` is called. return; } writeCommand(command, operationDescription) { // TODO: agreed compressor should live in `StreamDescription` const shouldCompress = operationDescription && !!operationDescription.agreedCompressor; if (!shouldCompress || !canCompress(command)) { const data = command.toBin(); this.push(Array.isArray(data) ? Buffer.concat(data) : data); return; } // otherwise, compress the message const concatenatedOriginalCommandBuffer = Buffer.concat(command.toBin()); const messageToBeCompressed = concatenatedOriginalCommandBuffer.slice(MESSAGE_HEADER_SIZE); // Extract information needed for OP_COMPRESSED from the uncompressed message const originalCommandOpCode = concatenatedOriginalCommandBuffer.readInt32LE(12); // Compress the message body compress({ options: operationDescription }, messageToBeCompressed, (err, compressedMessage) => { if (err) { operationDescription.cb(err, null); return; } // Create the msgHeader of OP_COMPRESSED const msgHeader = Buffer.alloc(MESSAGE_HEADER_SIZE); msgHeader.writeInt32LE( MESSAGE_HEADER_SIZE + COMPRESSION_DETAILS_SIZE + compressedMessage.length, 0 ); // messageLength msgHeader.writeInt32LE(command.requestId, 4); // requestID msgHeader.writeInt32LE(0, 8); // responseTo (zero) msgHeader.writeInt32LE(opcodes.OP_COMPRESSED, 12); // opCode // Create the compression details of OP_COMPRESSED const compressionDetails = Buffer.alloc(COMPRESSION_DETAILS_SIZE); compressionDetails.writeInt32LE(originalCommandOpCode, 0); // originalOpcode compressionDetails.writeInt32LE(messageToBeCompressed.length, 4); // Size of the uncompressed compressedMessage, excluding the MsgHeader compressionDetails.writeUInt8(compressorIDs[operationDescription.agreedCompressor], 8); // compressorID this.push(Buffer.concat([msgHeader, compressionDetails, compressedMessage])); }); } } // Return whether a command contains an uncompressible command term // Will return true if command contains no uncompressible command terms function canCompress(command) { const commandDoc = command instanceof Msg ? command.command : command.query; const commandName = Object.keys(commandDoc)[0]; return !uncompressibleCommands.has(commandName); } function processIncomingData(stream, callback) { const buffer = stream[kBuffer]; if (buffer.length < 4) { callback(); return; } const sizeOfMessage = buffer.readInt32LE(0); if (sizeOfMessage < 0) { callback(new MongoParseError(`Invalid message size: ${sizeOfMessage}`)); return; } if (sizeOfMessage > stream.maxBsonMessageSize) { callback( new MongoParseError( `Invalid message size: ${sizeOfMessage}, max allowed: ${stream.maxBsonMessageSize}` ) ); return; } if (sizeOfMessage > buffer.length) { callback(); return; } const message = buffer.slice(0, sizeOfMessage); buffer.consume(sizeOfMessage); const messageHeader = { length: message.readInt32LE(0), requestId: message.readInt32LE(4), responseTo: message.readInt32LE(8), opCode: message.readInt32LE(12) }; let ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; const responseOptions = stream.responseOptions; if (messageHeader.opCode !== OP_COMPRESSED) { const messageBody = message.slice(MESSAGE_HEADER_SIZE); stream.emit( 'message', new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); if (buffer.length >= 4) { processIncomingData(stream, callback); } else { callback(); } return; } messageHeader.fromCompressed = true; messageHeader.opCode = message.readInt32LE(MESSAGE_HEADER_SIZE); messageHeader.length = message.readInt32LE(MESSAGE_HEADER_SIZE + 4); const compressorID = message[MESSAGE_HEADER_SIZE + 8]; const compressedBuffer = message.slice(MESSAGE_HEADER_SIZE + 9); // recalculate based on wrapped opcode ResponseType = messageHeader.opCode === OP_MSG ? BinMsg : Response; decompress(compressorID, compressedBuffer, (err, messageBody) => { if (err) { callback(err); return; } if (messageBody.length !== messageHeader.length) { callback( new MongoError( 'Decompressing a compressed message from the server failed. The message is corrupt.' ) ); return; } stream.emit( 'message', new ResponseType(stream.bson, message, messageHeader, messageBody, responseOptions) ); if (buffer.length >= 4) { processIncomingData(stream, callback); } else { callback(); } }); } module.exports = MessageStream; |