swh:1:snp:c53ab93da1867a0ee99951a3636bca865f9194df
Raw File
Tip revision: d287461265e86f639a2e815b23a3c0dbd9601075 authored by einaros on 25 December 2011, 17:17:29 UTC
bump
Tip revision: d287461
WebSocket.js
/*!
 * ws: a node.js websocket client
 * Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
 * MIT Licensed
 */

var util = require('util')
  , events = require('events')
  , http = require('http')
  , crypto = require('crypto')
  , url = require('url')
  , fs = require('fs')
  , Options = require('options')
  , Sender = require('./Sender')
  , Receiver = require('./Receiver');

/**
 * Constants
 */

var protocolPrefix = "HyBi-";
var protocolVersion = 13;

/**
 * WebSocket implementation
 */

function WebSocket(address, options) {
  if (Object.prototype.toString.call(address) == '[object Array]') {
    /**
     * Act as server client
     */

    options = new Options({
      protocolVersion: protocolVersion,
      protocol: null
    }).merge(options);

    // Exposed properties
    Object.defineProperty(this, 'protocol', {
      value: options.value.protocol,
      configurable: false,
      enumerable: true
    });
    Object.defineProperty(this, 'protocolVersion', {
      value: options.value.protocolVersion,
      configurable: false,
      enumerable: true
    });
    Object.defineProperty(this, 'upgradeReq', {
      value: address[0],
      configurable: false,
      enumerable: true
    });

    this._state = 'connecting';
    this._isServer = true;
    var self = this;
    process.nextTick(function() {
      upgrade.apply(self, address);
    });
  }
  else {
    /**
     * Act as regular client
     */

    this._isServer = false;

    var serverUrl = url.parse(address);
    if (!serverUrl.host) throw new Error('invalid url');

    options = new Options({
      origin: null,
      protocolVersion: protocolVersion,
      protocol: null
    }).merge(options);
    if (options.value.protocolVersion != 8 && options.value.protocolVersion != 13) {
      throw new Error('unsupported protocol version');
    }

    var key = new Buffer(options.value.protocolVersion + '-' + Date.now()).toString('base64');
    var shasum = crypto.createHash('sha1');
    shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
    var expectedServerKey = shasum.digest('base64');

    // node<=v0.4.x compatibility
    var isNodeV4 = false;
    var agent;
    if (/^v0\.4/.test(process.version)) {
      isNodeV4 = true;
      agent = new http.Agent({
        host: serverUrl.hostname,
        port: serverUrl.port || 80
      });
    }

    var requestOptions = {
      port: serverUrl.port || 80,
      host: serverUrl.hostname,
      headers: {
        'Connection': 'Upgrade',
        'Upgrade': 'websocket',
        'Sec-WebSocket-Version': options.value.protocolVersion,
        'Sec-WebSocket-Key': key
      }
    };
    if (options.value.protocol) {
      requestOptions.headers['Sec-WebSocket-Protocol'] = options.value.protocol;
    }
    if (isNodeV4) {
      requestOptions.path = (serverUrl.pathname || '/') + (serverUrl.search || '');
      requestOptions.agent = agent;
    }
    else requestOptions.path = serverUrl.path || '/';
    if (options.value.origin) {
      if (options.value.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.value.origin;
      else requestOptions.headers['Origin'] = options.value.origin;
    }

    var req = http.request(requestOptions);
    var self = this;
    (isNodeV4 ? agent : req).on('error', function(error) {
      self.emit('error', error);
    });
    (isNodeV4 ? agent : req).on('upgrade', function(res, socket, upgradeHead) {
      if (self._state == 'disconnected') {
        // client disconnected before server accepted connection
        self.emit('close');
        socket.end();
        return;
      }
      var serverKey = res.headers['sec-websocket-accept'];
      if (typeof serverKey == 'undefined' || serverKey !== expectedServerKey) {
        self.emit('error', 'invalid server key');
        socket.end();
        return;
      }

      upgrade.call(self, res, socket, upgradeHead);
    });

    req.end();
    this._state = 'connecting';
  }

  this._socket = null;
  var realEmit = this.emit;
  this.emit = function(event) {
    if (event == 'error') delete this._queue;
    realEmit.apply(this, arguments);
  }
  Object.defineProperty(this, 'state', {
    value: this._state,
    configurable: false,
    enumerable: true
  });
}

/**
 * Inherits from EventEmitter.
 */

util.inherits(WebSocket, events.EventEmitter);

/**
 * Gracefully closes the connection, after sending a description message to the server
 *
 * @param {Object} data to be sent to the server
 * @api public
 */

WebSocket.prototype.close = function(code, data) {
  if (this._state == 'closing') return;
  if (this._state == 'connecting') {
    this._state = 'disconnected';
    return;
  }
  if (this._state != 'connected') throw new Error('not connected');
  try {
    this._state = 'closing';
    this._closeCode = code;
    this._closeMessage = data;
    var mask = !this._isServer;
    this._sender.close(code, data, mask);
    this.terminate();
  }
  catch (e) {
    this.emit('error', e);
  }
}

/**
 * Sends a ping
 *
 * @param {Object} data to be sent to the server
 * @param {Object} Members - mask: boolean, binary: boolean
 * @api public
 */

WebSocket.prototype.ping = function(data, options) {
  if (this._state != 'connected') throw new Error('not connected');
  options = options || {};
  if (typeof options.mask == 'undefined') options.mask = !this._isServer;
  this._sender.ping(data, options);
}

/**
 * Sends a pong
 *
 * @param {Object} data to be sent to the server
 * @param {Object} Members - mask: boolean, binary: boolean
 * @api public
 */

WebSocket.prototype.pong = function(data, options) {
  if (this._state != 'connected') throw new Error('not connected');
  options = options || {};
  if (typeof options.mask == 'undefined') options.mask = !this._isServer;
  this._sender.pong(data, options);
}

/**
 * Sends a piece of data
 *
 * @param {Object} data to be sent to the server
 * @param {Object} Members - mask: boolean, binary: boolean
 * @param {function} Optional callback which is executed after the send completes
 * @api public
 */

WebSocket.prototype.send = function(data, options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = {};
  }
  if (this._state != 'connected') {
    if (typeof cb == 'function') cb(new Error('not connected'));
    else throw new Error('not connected');
    return;
  }
  if (!data) data = '';
  if (this._queue) {
    var self = this;
    this._queue.push(function() { self.send(data, options, cb); });
    return;
  }
  options = options || {};
  options.fin = true;
  if (typeof options.mask == 'undefined') options.mask = !this._isServer;
  if (data instanceof fs.ReadStream) {
    startQueue(this);
    var self = this;
    sendStream(this, data, options, function(error) {
      process.nextTick(function() { executeQueueSends(self); });
      if (typeof cb == 'function') cb(error);
    });
  }
  else this._sender.send(data, options, cb);
}

/**
 * Streams data through calls to a user supplied function
 *
 * @param {Object} Members - mask: boolean, binary: boolean
 * @param {function} 'function (error, send)' which is executed on successive ticks of which send is 'function (data, final)'.
 * @api public
 */

WebSocket.prototype.stream = function(options, cb) {
  if (typeof options == 'function') {
    cb = options;
    options = {};
  }
  if (typeof cb != 'function') throw new Error('callback must be provided');
  if (this._state != 'connected') {
    if (typeof cb == 'function') cb(new Error('not connected'));
    else throw new Error('not connected');
    return;
  }
  if (this._queue) {
    var self = this;
    this._queue.push(function() { self.stream(options, cb); });
    return;
  }
  options = options || {};
  if (typeof options.mask == 'undefined') options.mask = !this._isServer;
  startQueue(this);
  var self = this;
  var send = function(data, final) {
    try {
      if (self._state != 'connected') throw new Error('not connected');
      options.fin = final === true;
      self._sender.send(data, options);
      if (!final) process.nextTick(cb.bind(null, null, send));
      else executeQueueSends(self);
    }
    catch (e) {
      if (typeof cb == 'function') cb(e);
      else self.emit('error', e);
    }
  }
  process.nextTick(cb.bind(null, null, send));
}

/**
 * Immediately shuts down the connection
 *
 * @api public
 */

WebSocket.prototype.terminate = function() {
  if (this._socket) {
    this._socket.end();
    this._socket = null;
  }
  else if (this._state == 'connecting') {
    this._state = 'disconnected';
  }
};

/**
 * Emulates the Browser based WebSocket interface.
 *
 * @see http://dev.w3.org/html5/websockets/#the-websocket-interface
 * @api public
 */

['open', 'error', 'close', 'message'].forEach(function(method) {
  Object.defineProperty(WebSocket.prototype, 'on' + method, {
    /**
     * Returns the current listener
     *
     * @returns {Mixed} the set function or undefined
     * @api public
     */

    get: function get() {
      var listener = this.listeners(method)[0];
      return listener ? (listener._listener ? listener._listener : listener) : undefined;
    },

    /**
     * Start listening for events
     *
     * @param {Function} listener the listener
     * @returns {Mixed} the set function or undefined
     * @api public
     */

    set: function set(listener) {
      this.removeAllListeners(method);

      if (typeof listener === 'function') {
        // Special case for messages as we need to wrap the response here to
        // emulate a WebSocket event response.
        if (method === 'message') {
          function message (data) {
            listener.call(this, { data: data });
          }

          // store a reference so we can return the origional function again
          message._listener = listener;
          this.on(method, message);
        } else {
          this.on(method, listener);
        }
      }
    }
  });
});

module.exports = WebSocket;

/**
 * Entirely private apis,
 * which may or may not be bound to a sepcific WebSocket instance.
 */

function upgrade(res, socket, upgradeHead) {
  this._socket = socket;
  socket.setTimeout(0);
  socket.setNoDelay(true);
  var self = this;
  socket.on('end', function() {
    if (self._state == 'disconnected') return;
    self._state = 'disconnected';
    self.emit('close', self._closeCode || 1000, self._closeMessage || '');
  });
  socket.on('close', function() {
    if (self._state == 'disconnected') return;
    self._state = 'disconnected';
    self.emit('close', self._closeCode || 1000, self._closeMessage || '');
  });

  var receiver = new Receiver();
  socket.on('data', function (data) {
    receiver.add(data);
  });
  receiver.on('text', function (data, flags) {
    flags = flags || {};
    self.emit('message', data, flags);
  });
  receiver.on('binary', function (data, flags) {
    flags = flags || {};
    flags.binary = true;
    self.emit('message', data, flags);
  });
  receiver.on('ping', function(data, flags) {
    flags = flags || {};
    self.pong(data, {mask: !self._isServer, binary: flags.binary === true});
    self.emit('ping', data, flags);
  });
  receiver.on('close', function(code, data, flags) {
    flags = flags || {};
    self.close(code, data, {mask: !self._isServer});
  });
  receiver.on('error', function(reason, errorCode) {
    // close the connection when the receiver reports a HyBi error code
    if (typeof errorCode !== 'undefined') {
      self.close(errorCode, '', {mask: !self._isServer});
    }
    self.emit('error', reason, errorCode);
  });

  this._sender = new Sender(socket);
  this._sender.on('error', function(error) {
    self.emit('error', error);
  });
  this._state = 'connected';
  this.emit('open');

  if (upgradeHead && upgradeHead.length > 0) receiver.add(upgradeHead);
}

function startQueue(instance) {
  instance._queue = instance._queue || [];
}

function executeQueueSends(instance) {
  var queue = instance._queue;
  if (typeof queue == 'undefined') return;
  delete instance._queue;
  for (var i = 0, l = queue.length; i < l; ++i) {
    queue[i]();
  }
}

function sendStream(instance, stream, options, cb) {
  stream.on('data', function(data) {
    if (instance._state != 'connected') {
      if (typeof cb == 'function') cb(new Error('not connected'));
      else instance.emit('error', new Error('not connected'));
      return;
    }
    options.fin = false;
    instance._sender.send(data, options);
  });
  stream.on('end', function() {
    if (instance._state != 'connected') {
      if (typeof cb == 'function') cb(new Error('not connected'));
      else instance.emit('error', new Error('not connected'));
      return;
    }
    options.fin = true;
    instance._sender.send(null, options);
    if (typeof cb == 'function') cb(null);
  });
}
back to top