1/*
2 * Licensed to the Apache Software Foundation (ASF) under one
3 * or more contributor license agreements. See the NOTICE file
4 * distributed with this work for additional information
5 * regarding copyright ownership. The ASF licenses this file
6 * to you under the Apache License, Version 2.0 (the
7 * "License"); you may not use this file except in compliance
8 * with the License. You may obtain a copy of the License at
9 *
10 *   http://www.apache.org/licenses/LICENSE-2.0
11 *
12 * Unless required by applicable law or agreed to in writing,
13 * software distributed under the License is distributed on an
14 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15 * KIND, either express or implied. See the License for the
16 * specific language governing permissions and limitations
17 * under the License.
18 */
19var http = require('http');
20var https = require('https');
21var url = require("url");
22var path = require("path");
23var fs = require("fs");
24var crypto = require("crypto");
25var log = require('./log');
26
27var MultiplexedProcessor = require('./multiplexed_processor').MultiplexedProcessor;
28
29var TBufferedTransport = require('./buffered_transport');
30var TBinaryProtocol = require('./binary_protocol');
31var InputBufferUnderrunError = require('./input_buffer_underrun_error');
32
33// WSFrame constructor and prototype
34/////////////////////////////////////////////////////////////////////
35
36/** Apache Thrift RPC Web Socket Transport
37 *  Frame layout conforming to RFC 6455 circa 12/2011
38 *
39 * Theoretical frame size limit is 4GB*4GB, however the Node Buffer
40 * limit is 1GB as of v0.10. The frame length encoding is also
41 * configured for a max of 4GB presently and needs to be adjusted
42 * if Node/Browsers become capabile of > 4GB frames.
43 *
44 *  - FIN is 1 if the message is complete
45 *  - RSV1/2/3 are always 0
46 *  - Opcode is 1(TEXT) for TJSONProtocol and 2(BIN) for TBinaryProtocol
47 *  - Mask Present bit is 1 sending to-server and 0 sending to-client
48 *  - Payload Len:
49 *        + If < 126: then represented directly
50 *        + If >=126: but within range of an unsigned 16 bit integer
51 *             then Payload Len is 126 and the two following bytes store
52 *             the length
53 *        + Else: Payload Len is 127 and the following 8 bytes store the
54 *             length as an unsigned 64 bit integer
55 *  - Masking key is a 32 bit key only present when sending to the server
56 *  - Payload follows the masking key or length
57 *
58 *     0                   1                   2                   3
59 *     0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
60 *    +-+-+-+-+-------+-+-------------+-------------------------------+
61 *    |F|R|R|R| opcode|M| Payload len |    Extended payload length    |
62 *    |I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
63 *    |N|V|V|V|       |S|             |   (if payload len==126/127)   |
64 *    | |1|2|3|       |K|             |                               |
65 *    +-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
66 *    |     Extended payload length continued, if payload len == 127  |
67 *    + - - - - - - - - - - - - - - - +-------------------------------+
68 *    |                               |Masking-key, if MASK set to 1  |
69 *    +-------------------------------+-------------------------------+
70 *    | Masking-key (continued)       |          Payload Data         |
71 *    +-------------------------------- - - - - - - - - - - - - - - - +
72 *    :                     Payload Data continued ...                :
73 *    + - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
74 *    |                     Payload Data continued ...                |
75 *    +---------------------------------------------------------------+
76 */
77var wsFrame = {
78  /** Encodes a WebSocket frame
79   *
80   * @param {Buffer} data - The raw data to encode
81   * @param {Buffer} mask - The mask to apply when sending to server, null for no mask
82   * @param {Boolean} binEncoding - True for binary encoding, false for text encoding
83   * @returns {Buffer} - The WebSocket frame, ready to send
84   */
85  encode: function(data, mask, binEncoding) {
86      var frame = new Buffer(wsFrame.frameSizeFromData(data, mask));
87      //Byte 0 - FIN & OPCODE
88      frame[0] = wsFrame.fin.FIN +
89          (binEncoding ? wsFrame.frameOpCodes.BIN : wsFrame.frameOpCodes.TEXT);
90      //Byte 1 or 1-3 or 1-9 - MASK FLAG & SIZE
91      var payloadOffset = 2;
92      if (data.length < 0x7E) {
93        frame[1] = data.length + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
94      } else if (data.length < 0xFFFF) {
95        frame[1] = 0x7E + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
96        frame.writeUInt16BE(data.length, 2, true);
97        payloadOffset = 4;
98      } else {
99        frame[1] = 0x7F + (mask ? wsFrame.mask.TO_SERVER : wsFrame.mask.TO_CLIENT);
100        frame.writeUInt32BE(0, 2, true);
101        frame.writeUInt32BE(data.length, 6, true);
102        payloadOffset = 10;
103      }
104      //MASK
105      if (mask) {
106        mask.copy(frame, payloadOffset, 0, 4);
107        payloadOffset += 4;
108      }
109      //Payload
110      data.copy(frame, payloadOffset);
111      if (mask) {
112        wsFrame.applyMask(frame.slice(payloadOffset), frame.slice(payloadOffset-4,payloadOffset));
113      }
114      return frame;
115  },
116
117  /**
118   * @class
119   * @name WSDecodeResult
120   * @property {Buffer} data - The decoded data for the first ATRPC message
121   * @property {Buffer} mask - The frame mask
122   * @property {Boolean} binEncoding - True if binary (TBinaryProtocol),
123   *                                   False if text (TJSONProtocol)
124   * @property {Buffer} nextFrame - Multiple ATRPC messages may be sent in a
125   *                                single WebSocket frame, this Buffer contains
126   *                                any bytes remaining to be decoded
127   * @property {Boolean} FIN - True is the message is complete
128   */
129
130   /** Decodes a WebSocket frame
131   *
132   * @param {Buffer} frame - The raw inbound frame, if this is a continuation
133   *                         frame it must have a mask property with the mask.
134   * @returns {WSDecodeResult} - The decoded payload
135   *
136   * @see {@link WSDecodeResult}
137   */
138  decode: function(frame) {
139      var result = {
140        data: null,
141        mask: null,
142        binEncoding: false,
143        nextFrame: null,
144        FIN: true
145      };
146
147      //Byte 0 - FIN & OPCODE
148      if (wsFrame.fin.FIN != (frame[0] & wsFrame.fin.FIN)) {
149        result.FIN = false;
150      }
151      result.binEncoding = (wsFrame.frameOpCodes.BIN == (frame[0] & wsFrame.frameOpCodes.BIN));
152      //Byte 1 or 1-3 or 1-9 - SIZE
153      var lenByte = (frame[1] & 0x0000007F);
154      var len = lenByte;
155      var dataOffset = 2;
156      if (lenByte == 0x7E) {
157        len = frame.readUInt16BE(2);
158        dataOffset = 4;
159      } else if (lenByte == 0x7F) {
160        len = frame.readUInt32BE(6);
161        dataOffset = 10;
162      }
163      //MASK
164      if (wsFrame.mask.TO_SERVER == (frame[1] & wsFrame.mask.TO_SERVER)) {
165        result.mask = new Buffer(4);
166        frame.copy(result.mask, 0, dataOffset, dataOffset + 4);
167        dataOffset += 4;
168      }
169      //Payload
170      result.data = new Buffer(len);
171      frame.copy(result.data, 0, dataOffset, dataOffset+len);
172      if (result.mask) {
173        wsFrame.applyMask(result.data, result.mask);
174      }
175      //Next Frame
176      if (frame.length > dataOffset+len) {
177        result.nextFrame = new Buffer(frame.length - (dataOffset+len));
178        frame.copy(result.nextFrame, 0, dataOffset+len, frame.length);
179      }
180      //Don't forward control frames
181      if (frame[0] & wsFrame.frameOpCodes.FINCTRL) {
182        result.data = null;
183      }
184
185      return result;
186  },
187
188  /** Masks/Unmasks data
189   *
190   * @param {Buffer} data - data to mask/unmask in place
191   * @param {Buffer} mask - the mask
192   */
193  applyMask: function(data, mask){
194    //TODO: look into xoring words at a time
195    var dataLen = data.length;
196    var maskLen = mask.length;
197    for (var i = 0; i < dataLen; i++) {
198      data[i] = data[i] ^ mask[i%maskLen];
199    }
200  },
201
202  /** Computes frame size on the wire from data to be sent
203   *
204   * @param {Buffer} data - data.length is the assumed payload size
205   * @param {Boolean} mask - true if a mask will be sent (TO_SERVER)
206   */
207  frameSizeFromData: function(data, mask) {
208    var headerSize = 10;
209    if (data.length < 0x7E) {
210      headerSize = 2;
211    } else if (data.length < 0xFFFF) {
212      headerSize = 4;
213    }
214    return headerSize + data.length + (mask ? 4 : 0);
215  },
216
217  frameOpCodes: {
218    CONT:     0x00,
219    TEXT:     0x01,
220    BIN:      0x02,
221    CTRL:     0x80
222  },
223
224  mask: {
225    TO_SERVER: 0x80,
226    TO_CLIENT: 0x00
227  },
228
229  fin: {
230    CONT: 0x00,
231    FIN: 0x80
232  }
233};
234
235
236// createWebServer constructor and options
237/////////////////////////////////////////////////////////////////////
238
239/**
240 * @class
241 * @name ServerOptions
242 * @property {array} cors - Array of CORS origin strings to permit requests from.
243 * @property {string} files - Path to serve static files from, if absent or ""
244 *                               static file service is disabled.
245 * @property {object} headers - An object hash mapping header strings to header value
246 *                              strings, these headers are transmitted in response to
247 *                              static file GET operations.
248 * @property {object} services - An object hash mapping service URI strings
249 *                               to ServiceOptions objects
250 * @property {object} tls - Node.js TLS options (see: nodejs.org/api/tls.html),
251 *                          if not present or null regular http is used,
252 *                          at least a key and a cert must be defined to use SSL/TLS
253 * @see {@link ServiceOptions}
254 */
255
256/**
257 * @class
258 * @name ServiceOptions
259 * @property {object} transport - The layered transport to use (defaults
260 *                                to TBufferedTransport).
261 * @property {object} protocol - The serialization Protocol to use (defaults to
262 *                               TBinaryProtocol).
263 * @property {object} processor - The Thrift Service class/processor generated
264 *                                by the IDL Compiler for the service (the "cls"
265 *                                key can also be used for this attribute).
266 * @property {object} handler - The handler methods for the Thrift Service.
267 */
268
269/**
270 * Create a Thrift server which can serve static files and/or one or
271 * more Thrift Services.
272 * @param {ServerOptions} options - The server configuration.
273 * @returns {object} - The Apache Thrift Web Server.
274 */
275exports.createWebServer = function(options) {
276  var baseDir = options.files;
277  var contentTypesByExtension = {
278    '.txt': 'text/plain',
279    '.html': 'text/html',
280    '.css': 'text/css',
281    '.xml': 'application/xml',
282    '.json': 'application/json',
283    '.js': 'application/javascript',
284    '.jpg': 'image/jpeg',
285    '.jpeg': 'image/jpeg',
286    '.gif': 'image/gif',
287    '.png': 'image/png',
288    '.svg': 'image/svg+xml'
289  };
290
291  //Setup all of the services
292  var services = options.services;
293  for (var uri in services) {
294    var svcObj = services[uri];
295
296    //Setup the processor
297    if (svcObj.processor instanceof MultiplexedProcessor) {
298      //Multiplex processors have pre embedded processor/handler pairs, save as is
299      svcObj.processor = svcObj.processor;
300    } else {
301      //For historical reasons Node.js supports processors passed in directly or via the
302      //  IDL Compiler generated class housing the processor. Also, the options property
303      //  for a Processor has been called both cls and processor at different times. We
304      //  support any of the four possibilities here.
305      var processor = (svcObj.processor) ? (svcObj.processor.Processor || svcObj.processor) :
306                                           (svcObj.cls.Processor || svcObj.cls);
307      //Processors can be supplied as constructed objects with handlers already embedded,
308      //  if a handler is provided we construct a new processor, if not we use the processor
309      //  object directly
310      if (svcObj.handler) {
311        svcObj.processor = new processor(svcObj.handler);
312      } else {
313        svcObj.processor = processor;
314      }
315    }
316    svcObj.transport = svcObj.transport ? svcObj.transport : TBufferedTransport;
317    svcObj.protocol = svcObj.protocol ? svcObj.protocol : TBinaryProtocol;
318  }
319
320  //Verify CORS requirements
321  function VerifyCORSAndSetHeaders(request, response) {
322    if (request.headers.origin && options.cors) {
323      if (options.cors["*"] || options.cors[request.headers.origin]) {
324        //Allow, origin allowed
325        response.setHeader("access-control-allow-origin", request.headers.origin);
326        response.setHeader("access-control-allow-methods", "GET, POST, OPTIONS");
327        response.setHeader("access-control-allow-headers", "content-type, accept");
328        response.setHeader("access-control-max-age", "60");
329        return true;
330      } else {
331        //Disallow, origin denied
332        return false;
333      }
334    }
335    //Allow, CORS is not in use
336    return true;
337  }
338
339
340  //Handle OPTIONS method (CORS)
341  ///////////////////////////////////////////////////
342  function processOptions(request, response) {
343    if (VerifyCORSAndSetHeaders(request, response)) {
344      response.writeHead("204", "No Content", {"content-length": 0});
345    } else {
346      response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
347    }
348    response.end();
349  }
350
351
352  //Handle POST methods (TXHRTransport)
353  ///////////////////////////////////////////////////
354  function processPost(request, response) {
355    //Lookup service
356    var uri = url.parse(request.url).pathname;
357    var svc = services[uri];
358    if (!svc) {
359      response.writeHead("403", "No Apache Thrift Service at " + uri, {});
360      response.end();
361      return;
362    }
363
364    //Verify CORS requirements
365    if (!VerifyCORSAndSetHeaders(request, response)) {
366      response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
367      response.end();
368      return;
369    }
370
371    //Process XHR payload
372    request.on('data', svc.transport.receiver(function(transportWithData) {
373      var input = new svc.protocol(transportWithData);
374      var output = new svc.protocol(new svc.transport(undefined, function(buf) {
375        try {
376          response.writeHead(200);
377          response.end(buf);
378        } catch (err) {
379          response.writeHead(500);
380          response.end();
381        }
382      }));
383
384      try {
385        svc.processor.process(input, output);
386        transportWithData.commitPosition();
387      } catch (err) {
388        if (err instanceof InputBufferUnderrunError) {
389          transportWithData.rollbackPosition();
390        } else {
391          response.writeHead(500);
392          response.end();
393        }
394      }
395    }));
396  }
397
398
399  //Handle GET methods (Static Page Server)
400  ///////////////////////////////////////////////////
401  function processGet(request, response) {
402    //Undefined or empty base directory means do not serve static files
403    if (!baseDir || "" === baseDir) {
404      response.writeHead(404);
405      response.end();
406      return;
407    }
408
409    //Verify CORS requirements
410    if (!VerifyCORSAndSetHeaders(request, response)) {
411      response.writeHead("403", "Origin " + request.headers.origin + " not allowed", {});
412      response.end();
413      return;
414    }
415
416    //Locate the file requested and send it
417    var uri = url.parse(request.url).pathname;
418    var filename = path.resolve(path.join(baseDir, uri));
419
420    //Ensure the basedir path is not able to be escaped
421    if (filename.indexOf(baseDir) != 0) {
422      response.writeHead(400, "Invalid request path", {});
423      response.end();
424      return;
425    }
426
427    fs.exists(filename, function(exists) {
428      if(!exists) {
429        response.writeHead(404);
430        response.end();
431        return;
432      }
433
434      if (fs.statSync(filename).isDirectory()) {
435        filename += '/index.html';
436      }
437
438      fs.readFile(filename, "binary", function(err, file) {
439        if (err) {
440          response.writeHead(500);
441          response.end(err + "\n");
442          return;
443        }
444        var headers = {};
445        var contentType = contentTypesByExtension[path.extname(filename)];
446        if (contentType) {
447          headers["Content-Type"] = contentType;
448        }
449        for (var k in options.headers) {
450          headers[k] = options.headers[k];
451        }
452        response.writeHead(200, headers);
453        response.write(file, "binary");
454        response.end();
455      });
456    });
457  }
458
459
460  //Handle WebSocket calls (TWebSocketTransport)
461  ///////////////////////////////////////////////////
462  function processWS(data, socket, svc, binEncoding) {
463    svc.transport.receiver(function(transportWithData) {
464      var input = new svc.protocol(transportWithData);
465      var output = new svc.protocol(new svc.transport(undefined, function(buf) {
466        try {
467          var frame = wsFrame.encode(buf, null, binEncoding);
468          socket.write(frame);
469        } catch (err) {
470          //TODO: Add better error processing
471        }
472      }));
473
474      try {
475        svc.processor.process(input, output);
476        transportWithData.commitPosition();
477      }
478      catch (err) {
479        if (err instanceof InputBufferUnderrunError) {
480          transportWithData.rollbackPosition();
481        }
482        else {
483          //TODO: Add better error processing
484        }
485      }
486    })(data);
487  }
488
489  //Create the server (HTTP or HTTPS)
490  var server = null;
491  if (options.tls) {
492    server = https.createServer(options.tls);
493  } else {
494    server = http.createServer();
495  }
496
497  //Wire up listeners for upgrade(to WebSocket) & request methods for:
498  //   - GET static files,
499  //   - POST XHR Thrift services
500  //   - OPTIONS CORS requests
501  server.on('request', function(request, response) {
502    if (request.method === 'POST') {
503      processPost(request, response);
504    } else if (request.method === 'GET') {
505      processGet(request, response);
506    } else if (request.method === 'OPTIONS') {
507      processOptions(request, response);
508    } else {
509      response.writeHead(500);
510      response.end();
511    }
512  }).on('upgrade', function(request, socket, head) {
513    //Lookup service
514    var svc;
515    try {
516      svc = services[Object.keys(services)[0]];
517    } catch(e) {
518      socket.write("HTTP/1.1 403 No Apache Thrift Service available\r\n\r\n");
519      return;
520    }
521    //Perform upgrade
522    var hash = crypto.createHash("sha1");
523    hash.update(request.headers['sec-websocket-key'] + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11");
524    socket.write("HTTP/1.1 101 Switching Protocols\r\n" +
525                   "Upgrade: websocket\r\n" +
526                   "Connection: Upgrade\r\n" +
527                   "Sec-WebSocket-Accept: " + hash.digest("base64") + "\r\n" +
528                   "Sec-WebSocket-Origin: " + request.headers.origin + "\r\n" +
529                   "Sec-WebSocket-Location: ws://" + request.headers.host + request.url + "\r\n" +
530                   "\r\n");
531    //Handle WebSocket traffic
532    var data = null;
533    socket.on('data', function(frame) {
534      try {
535        while (frame) {
536          var result = wsFrame.decode(frame);
537          //Prepend any existing decoded data
538          if (data) {
539            if (result.data) {
540              var newData = new Buffer(data.length + result.data.length);
541              data.copy(newData);
542              result.data.copy(newData, data.length);
543              result.data = newData;
544            } else {
545              result.data = data;
546            }
547            data = null;
548          }
549          //If this completes a message process it
550          if (result.FIN) {
551            processWS(result.data, socket, svc, result.binEncoding);
552          } else {
553            data = result.data;
554          }
555          //Prepare next frame for decoding (if any)
556          frame = result.nextFrame;
557        }
558      } catch(e) {
559        log.error('TWebSocketTransport Exception: ' + e);
560        socket.destroy();
561      }
562    });
563  });
564
565  //Return the server
566  return server;
567};
568