var Rx = require('./dist/rx.all'); require('./dist/rx.sorting'); require('./dist/rx.testing'); // Add specific Node functions var EventEmitter = require('events').EventEmitter, Observable = Rx.Observable; Rx.Node = { /** * @deprecated Use Rx.Observable.fromCallback from rx.async.js instead. * * Converts a callback function to an observable sequence. * * @param {Function} func Function to convert to an asynchronous function. * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined. * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next. * @returns {Function} Asynchronous function. */ fromCallback: function (func, context, selector) { return Observable.fromCallback(func, context, selector); }, /** * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead. * * Converts a Node.js callback style function to an observable sequence. This must be in function (err, ...) format. * * @param {Function} func The function to call * @param {Mixed} [context] The context for the func parameter to be executed. If not specified, defaults to undefined. * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next. * @returns {Function} An async function which when applied, returns an observable sequence with the callback arguments as an array. */ fromNodeCallback: function (func, context, selector) { return Observable.fromNodeCallback(func, context, selector); }, /** * @deprecated Use Rx.Observable.fromNodeCallback from rx.async.js instead. * * Handles an event from the given EventEmitter as an observable sequence. * * @param {EventEmitter} eventEmitter The EventEmitter to subscribe to the given event. * @param {String} eventName The event name to subscribe * @param {Function} [selector] A selector which takes the arguments from the event handler to produce a single item to yield on next. * @returns {Observable} An observable sequence generated from the named event from the given EventEmitter. The data will be returned as an array of arguments to the handler. */ fromEvent: function (eventEmitter, eventName, selector) { return Observable.fromEvent(eventEmitter, eventName, selector); }, /** * Converts the given observable sequence to an event emitter with the given event name. * The errors are handled on the 'error' event and completion on the 'end' event. * @param {Observable} observable The observable sequence to convert to an EventEmitter. * @param {String} eventName The event name to emit onNext calls. * @returns {EventEmitter} An EventEmitter which emits the given eventName for each onNext call in addition to 'error' and 'end' events. * You must call publish in order to invoke the subscription on the Observable sequuence. */ toEventEmitter: function (observable, eventName, selector) { var e = new EventEmitter(); // Used to publish the events from the observable e.publish = function () { e.subscription = observable.subscribe( function (x) { var result = x; if (selector) { try { result = selector(x); } catch (e) { e.emit('error', e); return; } } e.emit(eventName, result); }, function (err) { e.emit('error', err); }, function () { e.emit('end'); }); }; return e; }, /** * Converts a flowing stream to an Observable sequence. * @param {Stream} stream A stream to convert to a observable sequence. * @param {String} [finishEventName] Event that notifies about closed stream. ("end" by default) * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and finish events like `end` or `finish`. */ fromStream: function (stream, finishEventName) { stream.pause(); finishEventName || (finishEventName = 'end'); return Observable.create(function (observer) { function dataHandler (data) { observer.onNext(data); } function errorHandler (err) { observer.onError(err); } function endHandler () { observer.onCompleted(); } stream.addListener('data', dataHandler); stream.addListener('error', errorHandler); stream.addListener(finishEventName, endHandler); stream.resume(); return function () { stream.removeListener('data', dataHandler); stream.removeListener('error', errorHandler); stream.removeListener(finishEventName, endHandler); }; }).publish().refCount(); }, /** * Converts a flowing readable stream to an Observable sequence. * @param {Stream} stream A stream to convert to a observable sequence. * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'end' events. */ fromReadableStream: function (stream) { return this.fromStream(stream, 'end'); }, /** * Converts a flowing writeable stream to an Observable sequence. * @param {Stream} stream A stream to convert to a observable sequence. * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events. */ fromWritableStream: function (stream) { return this.fromStream(stream, 'finish'); }, /** * Converts a flowing transform stream to an Observable sequence. * @param {Stream} stream A stream to convert to a observable sequence. * @returns {Observable} An observable sequence which fires on each 'data' event as well as handling 'error' and 'finish' events. */ fromTransformStream: function (stream) { return this.fromStream(stream, 'finish'); }, /** * Writes an observable sequence to a stream * @param {Observable} observable Observable sequence to write to a stream. * @param {Stream} stream The stream to write to. * @param {String} [encoding] The encoding of the item to write. * @returns {Disposable} The subscription handle. */ writeToStream: function (observable, stream, encoding) { var source = observable.pausableBuffered(); function onDrain() { source.resume(); } stream.addListener('drain', onDrain); return source.subscribe( function (x) { !stream.write(String(x), encoding) && source.pause(); }, function (err) { stream.emit('error', err); }, function () { // Hack check because STDIO is not closable !stream._isStdio && stream.end(); stream.removeListener('drain', onDrain); }); source.resume(); } }; /** * Pipes the existing Observable sequence into a Node.js Stream. * @param {Stream} dest The destination Node.js stream. * @returns {Stream} The destination stream. */ Rx.Observable.prototype.pipe = function (dest) { var source = this.pausableBuffered(); function onDrain() { source.resume(); } dest.addListener('drain', onDrain); source.subscribe( function (x) { !dest.write(String(x)) && source.pause(); }, function (err) { dest.emit('error', err); }, function () { // Hack check because STDIO is not closable !dest._isStdio && dest.end(); dest.removeListener('drain', onDrain); }); source.resume(); return dest; }; module.exports = Rx;