swh:1:snp:c53ab93da1867a0ee99951a3636bca865f9194df
Tip revision: e0a0c3a70a146683b4d50ae6f479e3b8210e3ffe authored by einaros on 13 November 2011, 20:32:10 UTC
sequence tested and implemented for long running sends. bump.
sequence tested and implemented for long running sends. bump.
Tip revision: e0a0c3a
WebSocket.js
/*!
* WebSocket
* Copyright(c) 2011 Einar Otto Stangvik <einaros@gmail.com>
* MIT Licensed
*/
var util = require('util')
, events = require('events')
, http = require('http')
, crypto = require('crypto')
, url = require('url')
, fs = require('fs')
, Sender = require('./Sender')
, Receiver = require('./Receiver');
/**
* Constants
*/
var protocolPrefix = "HyBi-";
var protocolVersion = 13;
/**
* WebSocket implementation
*/
function WebSocket(address, options) {
var serverUrl = url.parse(address);
if (!serverUrl.host) throw 'invalid url';
options = options || {};
options.origin = options.origin || null;
options.protocolVersion = options.protocolVersion || protocolVersion;
if (options.protocolVersion != 8 && options.protocolVersion != 13) {
throw 'unsupported protocol version';
}
var key = new Buffer(protocolPrefix + options.protocolVersion).toString('base64');
var shasum = crypto.createHash('sha1');
shasum.update(key + '258EAFA5-E914-47DA-95CA-C5AB0DC85B11');
var expectedServerKey = shasum.digest('base64');
var requestOptions = {
port: serverUrl.port || 80,
host: serverUrl.hostname,
path: serverUrl.path || '/',
headers: {
'Connection': 'Upgrade',
'Upgrade': 'websocket',
'Sec-WebSocket-Version': options.protocolVersion,
'Sec-WebSocket-Key': key
}
};
if (options.origin) {
if (options.protocolVersion < 13) requestOptions.headers['Sec-WebSocket-Origin'] = options.origin;
else requestOptions.headers.origin = options.origin;
}
var req = http.request(requestOptions);
req.end();
this._socket = null;
this._state = 'connecting';
var self = this;
req.on('upgrade', function(res, socket, upgradeHead) {
if (self._state == 'disconnected') {
// client disconnected before server accepted connection
self.emit('disconnected');
socket.end();
return;
}
var serverKey = res.headers['sec-websocket-accept'];
if (typeof serverKey == 'undefined' || serverKey !== expectedServerKey) {
self.emit('error', 'invalid server key');
socket.end();
return;
}
self._socket = socket;
socket.setTimeout(0);
socket.setNoDelay(true);
socket.on('close', function() {
self._state = 'disconnected';
self.emit('disconnected');
});
var receiver = new Receiver();
socket.on('data', function (data) {
receiver.add(data);
});
self._sender = new Sender(socket);
receiver.on('text', function (data, flags) {
flags = flags || {};
self.emit('data', data, flags);
});
receiver.on('binary', function (data, flags) {
flags = flags || {};
flags.binary = true;
self.emit('data', data, flags);
});
self._state = 'connected';
self.emit('connected');
});
var realEmit = this.emit;
this.emit = function(event) {
if (event == 'error') {
delete this._queue;
this.terminate();
this._state = 'disconnected';
}
realEmit.apply(this, arguments);
}
}
/**
* Inherits from EventEmitter.
*/
util.inherits(WebSocket, events.EventEmitter);
/**
* Gracefully closes the connection, after sending a description message to the server
*
* @param {Object} data to be sent to the server
* @param {Object} Members - mask: boolean, binary: boolean
* @api public
*/
WebSocket.prototype.close = function(data, options) {
if (this._state != 'connected') throw 'not connected';
try {
this._sender.close(data, options);
this.terminate();
}
catch (e) {
this.emit('error', e);
}
}
/**
* Sends a ping
*
* @param {Object} data to be sent to the server
* @param {Object} Members - mask: boolean, binary: boolean
* @api public
*/
WebSocket.prototype.ping = function(data, options) {
if (this._state != 'connected') throw 'not connected';
try {
this._sender.ping(data, options);
}
catch (e) {
this.emit('error', e);
}
}
/**
* Sends a pong
*
* @param {Object} data to be sent to the server
* @param {Object} Members - mask: boolean, binary: boolean
* @api public
*/
WebSocket.prototype.pong = function(data, options) {
if (this._state != 'connected') throw 'not connected';
try {
this._sender.pong(data, options);
}
catch (e) {
this.emit('error', e);
}
}
/**
* Sends a piece of data
*
* @param {Object} data to be sent to the server
* @param {Object} Members - mask: boolean, binary: boolean
* @param {function} Optional callback which is executed after the send completes
* @api public
*/
WebSocket.prototype.send = function(data, options, cb) {
if (this._state != 'connected') throw 'not connected';
if (!data) throw 'cannot send empty data';
if (this._queue) {
this._queue.push(this.send.bind(this, data, options, cb));
return;
}
if (typeof options === 'function') {
cb = options;
options = {};
}
options = options || {};
options.fin = true;
if (data instanceof fs.ReadStream) {
startQueue(this);
var self = this;
sendStream(this, data, options, function(error) {
if (typeof cb === 'function') {
cb(error);
return;
}
executeQueueSends(self);
});
}
else {
try {
this._sender.send(data, options, cb);
}
catch (e) {
this.emit('error', e);
}
}
}
/**
* Streams data through calls to a user supplied function
*
* @param {Object} Members - mask: boolean, binary: boolean
* @param {function} 'function (error, send)' which is executed on successive ticks,
* of which send is 'function (data, final)'.
* @api public
*/
WebSocket.prototype.stream = function(options, cb) {
if (this._state != 'connected') throw 'not connected';
if (this._queue) {
this._queue.push(this.stream.bind(this, options, cb));
return;
}
if (typeof options === 'function') {
cb = options;
options = {};
}
if (typeof cb != 'function') throw 'callback must be provided';
startQueue(this);
var self = this;
var send = function(data, final) {
try {
if (self._state != 'connected') throw 'not connected';
options.fin = final === true;
self._sender.send(data, options);
if (!final) process.nextTick(cb.bind(null, null, send));
else executeQueueSends(self);
}
catch (e) {
if (typeof cb == 'function') cb(e);
else self.emit('error', e);
}
}
process.nextTick(cb.bind(null, null, send));
}
/**
* Immediately shuts down the connection
*
* @api public
*/
WebSocket.prototype.terminate = function() {
if (this._socket) {
this._socket.end();
this._socket = null;
}
else if (this._state == 'connecting') {
this._state = 'disconnected';
}
}
module.exports = WebSocket;
/**
* Entirely private apis,
* which may or may not be bound to a sepcific WebSocket instance.
*/
function startQueue(instance) {
instance._queue = instance._queue || [];
}
function executeQueueSends(instance) {
try {
var queue = instance._queue;
if (typeof queue == 'undefined') return;
delete instance._queue;
queue.forEach(function(method) { method(); });
}
catch (e) {
instance.emit('error', e);
}
}
function sendStream(self, stream, options, cb) {
stream.on('data', function(data) {
try {
if (self._state != 'connected') throw 'not connected';
options.fin = false;
self._sender.send(data, options);
}
catch (e) {
if (typeof cb == 'function') cb(e);
else self.emit('error', e);
}
});
stream.on('end', function() {
try {
options.fin = true;
self._sender.send(null, options);
if (typeof cb === 'function') cb(null);
}
catch (e) {
if (typeof cb == 'function') cb(e);
else self.emit('error', e);
}
});
}