588 lines
21 KiB
JavaScript
588 lines
21 KiB
JavaScript
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
|
|
|
|
;(function (factory) {
|
|
var objectTypes = {
|
|
'boolean': false,
|
|
'function': true,
|
|
'object': true,
|
|
'number': false,
|
|
'string': false,
|
|
'undefined': false
|
|
};
|
|
|
|
var root = (objectTypes[typeof window] && window) || this,
|
|
freeExports = objectTypes[typeof exports] && exports && !exports.nodeType && exports,
|
|
freeModule = objectTypes[typeof module] && module && !module.nodeType && module,
|
|
moduleExports = freeModule && freeModule.exports === freeExports && freeExports,
|
|
freeGlobal = objectTypes[typeof global] && global;
|
|
|
|
if (freeGlobal && (freeGlobal.global === freeGlobal || freeGlobal.window === freeGlobal)) {
|
|
root = freeGlobal;
|
|
}
|
|
|
|
// Because of build optimizers
|
|
if (typeof define === 'function' && define.amd) {
|
|
define(['rx'], function (Rx, exports) {
|
|
return factory(root, exports, Rx);
|
|
});
|
|
} else if (typeof module === 'object' && module && module.exports === freeExports) {
|
|
module.exports = factory(root, module.exports, require('./rx'));
|
|
} else {
|
|
root.Rx = factory(root, {}, root.Rx);
|
|
}
|
|
}.call(this, function (root, exp, Rx, undefined) {
|
|
|
|
// Aliases
|
|
var Observable = Rx.Observable,
|
|
observableProto = Observable.prototype,
|
|
AnonymousObservable = Rx.AnonymousObservable,
|
|
observableConcat = Observable.concat,
|
|
observableDefer = Observable.defer,
|
|
observableEmpty = Observable.empty,
|
|
disposableEmpty = Rx.Disposable.empty,
|
|
CompositeDisposable = Rx.CompositeDisposable,
|
|
SerialDisposable = Rx.SerialDisposable,
|
|
SingleAssignmentDisposable = Rx.SingleAssignmentDisposable,
|
|
Enumerator = Rx.internals.Enumerator,
|
|
Enumerable = Rx.internals.Enumerable,
|
|
enumerableOf = Enumerable.of,
|
|
immediateScheduler = Rx.Scheduler.immediate,
|
|
currentThreadScheduler = Rx.Scheduler.currentThread,
|
|
slice = Array.prototype.slice,
|
|
AsyncSubject = Rx.AsyncSubject,
|
|
Observer = Rx.Observer,
|
|
inherits = Rx.internals.inherits,
|
|
bindCallback = Rx.internals.bindCallback,
|
|
addProperties = Rx.internals.addProperties,
|
|
helpers = Rx.helpers,
|
|
noop = helpers.noop,
|
|
isPromise = helpers.isPromise,
|
|
isScheduler = helpers.isScheduler,
|
|
observableFromPromise = Observable.fromPromise;
|
|
|
|
// Utilities
|
|
function argsOrArray(args, idx) {
|
|
return args.length === 1 && Array.isArray(args[idx]) ?
|
|
args[idx] :
|
|
slice.call(args);
|
|
}
|
|
|
|
// Shim in iterator support
|
|
var $iterator$ = (typeof Symbol === 'function' && Symbol.iterator) ||
|
|
'_es6shim_iterator_';
|
|
// Bug for mozilla version
|
|
if (root.Set && typeof new root.Set()['@@iterator'] === 'function') {
|
|
$iterator$ = '@@iterator';
|
|
}
|
|
|
|
var doneEnumerator = Rx.doneEnumerator = { done: true, value: undefined };
|
|
|
|
var isIterable = Rx.helpers.isIterable = function (o) {
|
|
return o[$iterator$] !== undefined;
|
|
}
|
|
|
|
var isArrayLike = Rx.helpers.isArrayLike = function (o) {
|
|
return o && o.length !== undefined;
|
|
}
|
|
|
|
Rx.helpers.iterator = $iterator$;
|
|
|
|
function enumerableWhile(condition, source) {
|
|
return new Enumerable(function () {
|
|
return new Enumerator(function () {
|
|
return condition() ?
|
|
{ done: false, value: source } :
|
|
{ done: true, value: undefined };
|
|
});
|
|
});
|
|
}
|
|
|
|
/**
|
|
* Returns an observable sequence that is the result of invoking the selector on the source sequence, without sharing subscriptions.
|
|
* This operator allows for a fluent style of writing queries that use the same sequence multiple times.
|
|
*
|
|
* @param {Function} selector Selector function which can use the source sequence as many times as needed, without sharing subscriptions to the source sequence.
|
|
* @returns {Observable} An observable sequence that contains the elements of a sequence produced by multicasting the source sequence within a selector function.
|
|
*/
|
|
observableProto.letBind = observableProto['let'] = function (func) {
|
|
return func(this);
|
|
};
|
|
|
|
/**
|
|
* Determines whether an observable collection contains values. There is an alias for this method called 'ifThen' for browsers <IE9
|
|
*
|
|
* @example
|
|
* 1 - res = Rx.Observable.if(condition, obs1);
|
|
* 2 - res = Rx.Observable.if(condition, obs1, obs2);
|
|
* 3 - res = Rx.Observable.if(condition, obs1, scheduler);
|
|
* @param {Function} condition The condition which determines if the thenSource or elseSource will be run.
|
|
* @param {Observable} thenSource The observable sequence or Promise that will be run if the condition function returns true.
|
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the condition function returns false. If this is not provided, it defaults to Rx.Observabe.Empty with the specified scheduler.
|
|
* @returns {Observable} An observable sequence which is either the thenSource or elseSource.
|
|
*/
|
|
Observable['if'] = Observable.ifThen = function (condition, thenSource, elseSourceOrScheduler) {
|
|
return observableDefer(function () {
|
|
elseSourceOrScheduler || (elseSourceOrScheduler = observableEmpty());
|
|
|
|
isPromise(thenSource) && (thenSource = observableFromPromise(thenSource));
|
|
isPromise(elseSourceOrScheduler) && (elseSourceOrScheduler = observableFromPromise(elseSourceOrScheduler));
|
|
|
|
// Assume a scheduler for empty only
|
|
typeof elseSourceOrScheduler.now === 'function' && (elseSourceOrScheduler = observableEmpty(elseSourceOrScheduler));
|
|
return condition() ? thenSource : elseSourceOrScheduler;
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Concatenates the observable sequences obtained by running the specified result selector for each element in source.
|
|
* There is an alias for this method called 'forIn' for browsers <IE9
|
|
* @param {Array} sources An array of values to turn into an observable sequence.
|
|
* @param {Function} resultSelector A function to apply to each item in the sources array to turn it into an observable sequence.
|
|
* @returns {Observable} An observable sequence from the concatenated observable sequences.
|
|
*/
|
|
Observable['for'] = Observable.forIn = function (sources, resultSelector, thisArg) {
|
|
return enumerableOf(sources, resultSelector, thisArg).concat();
|
|
};
|
|
|
|
/**
|
|
* Repeats source as long as condition holds emulating a while loop.
|
|
* There is an alias for this method called 'whileDo' for browsers <IE9
|
|
*
|
|
* @param {Function} condition The condition which determines if the source will be repeated.
|
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true.
|
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds.
|
|
*/
|
|
var observableWhileDo = Observable['while'] = Observable.whileDo = function (condition, source) {
|
|
isPromise(source) && (source = observableFromPromise(source));
|
|
return enumerableWhile(condition, source).concat();
|
|
};
|
|
|
|
/**
|
|
* Repeats source as long as condition holds emulating a do while loop.
|
|
*
|
|
* @param {Function} condition The condition which determines if the source will be repeated.
|
|
* @param {Observable} source The observable sequence that will be run if the condition function returns true.
|
|
* @returns {Observable} An observable sequence which is repeated as long as the condition holds.
|
|
*/
|
|
observableProto.doWhile = function (condition) {
|
|
return observableConcat([this, observableWhileDo(condition, this)]);
|
|
};
|
|
|
|
/**
|
|
* Uses selector to determine which source in sources to use.
|
|
* There is an alias 'switchCase' for browsers <IE9.
|
|
*
|
|
* @example
|
|
* 1 - res = Rx.Observable.case(selector, { '1': obs1, '2': obs2 });
|
|
* 1 - res = Rx.Observable.case(selector, { '1': obs1, '2': obs2 }, obs0);
|
|
* 1 - res = Rx.Observable.case(selector, { '1': obs1, '2': obs2 }, scheduler);
|
|
*
|
|
* @param {Function} selector The function which extracts the value for to test in a case statement.
|
|
* @param {Array} sources A object which has keys which correspond to the case statement labels.
|
|
* @param {Observable} [elseSource] The observable sequence or Promise that will be run if the sources are not matched. If this is not provided, it defaults to Rx.Observabe.empty with the specified scheduler.
|
|
*
|
|
* @returns {Observable} An observable sequence which is determined by a case statement.
|
|
*/
|
|
Observable['case'] = Observable.switchCase = function (selector, sources, defaultSourceOrScheduler) {
|
|
return observableDefer(function () {
|
|
isPromise(defaultSourceOrScheduler) && (defaultSourceOrScheduler = observableFromPromise(defaultSourceOrScheduler));
|
|
defaultSourceOrScheduler || (defaultSourceOrScheduler = observableEmpty());
|
|
|
|
typeof defaultSourceOrScheduler.now === 'function' && (defaultSourceOrScheduler = observableEmpty(defaultSourceOrScheduler));
|
|
|
|
var result = sources[selector()];
|
|
isPromise(result) && (result = observableFromPromise(result));
|
|
|
|
return result || defaultSourceOrScheduler;
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Expands an observable sequence by recursively invoking selector.
|
|
*
|
|
* @param {Function} selector Selector function to invoke for each produced element, resulting in another sequence to which the selector will be invoked recursively again.
|
|
* @param {Scheduler} [scheduler] Scheduler on which to perform the expansion. If not provided, this defaults to the current thread scheduler.
|
|
* @returns {Observable} An observable sequence containing all the elements produced by the recursive expansion.
|
|
*/
|
|
observableProto.expand = function (selector, scheduler) {
|
|
isScheduler(scheduler) || (scheduler = immediateScheduler);
|
|
var source = this;
|
|
return new AnonymousObservable(function (observer) {
|
|
var q = [],
|
|
m = new SerialDisposable(),
|
|
d = new CompositeDisposable(m),
|
|
activeCount = 0,
|
|
isAcquired = false;
|
|
|
|
var ensureActive = function () {
|
|
var isOwner = false;
|
|
if (q.length > 0) {
|
|
isOwner = !isAcquired;
|
|
isAcquired = true;
|
|
}
|
|
if (isOwner) {
|
|
m.setDisposable(scheduler.scheduleRecursive(function (self) {
|
|
var work;
|
|
if (q.length > 0) {
|
|
work = q.shift();
|
|
} else {
|
|
isAcquired = false;
|
|
return;
|
|
}
|
|
var m1 = new SingleAssignmentDisposable();
|
|
d.add(m1);
|
|
m1.setDisposable(work.subscribe(function (x) {
|
|
observer.onNext(x);
|
|
var result = null;
|
|
try {
|
|
result = selector(x);
|
|
} catch (e) {
|
|
observer.onError(e);
|
|
}
|
|
q.push(result);
|
|
activeCount++;
|
|
ensureActive();
|
|
}, observer.onError.bind(observer), function () {
|
|
d.remove(m1);
|
|
activeCount--;
|
|
if (activeCount === 0) {
|
|
observer.onCompleted();
|
|
}
|
|
}));
|
|
self();
|
|
}));
|
|
}
|
|
};
|
|
|
|
q.push(source);
|
|
activeCount++;
|
|
ensureActive();
|
|
return d;
|
|
}, this);
|
|
};
|
|
|
|
/**
|
|
* Runs all observable sequences in parallel and collect their last elements.
|
|
*
|
|
* @example
|
|
* 1 - res = Rx.Observable.forkJoin([obs1, obs2]);
|
|
* 1 - res = Rx.Observable.forkJoin(obs1, obs2, ...);
|
|
* @returns {Observable} An observable sequence with an array collecting the last elements of all the input sequences.
|
|
*/
|
|
Observable.forkJoin = function () {
|
|
var allSources = argsOrArray(arguments, 0);
|
|
return new AnonymousObservable(function (subscriber) {
|
|
var count = allSources.length;
|
|
if (count === 0) {
|
|
subscriber.onCompleted();
|
|
return disposableEmpty;
|
|
}
|
|
var group = new CompositeDisposable(),
|
|
finished = false,
|
|
hasResults = new Array(count),
|
|
hasCompleted = new Array(count),
|
|
results = new Array(count);
|
|
|
|
for (var idx = 0; idx < count; idx++) {
|
|
(function (i) {
|
|
var source = allSources[i];
|
|
isPromise(source) && (source = observableFromPromise(source));
|
|
group.add(
|
|
source.subscribe(
|
|
function (value) {
|
|
if (!finished) {
|
|
hasResults[i] = true;
|
|
results[i] = value;
|
|
}
|
|
},
|
|
function (e) {
|
|
finished = true;
|
|
subscriber.onError(e);
|
|
group.dispose();
|
|
},
|
|
function () {
|
|
if (!finished) {
|
|
if (!hasResults[i]) {
|
|
subscriber.onCompleted();
|
|
return;
|
|
}
|
|
hasCompleted[i] = true;
|
|
for (var ix = 0; ix < count; ix++) {
|
|
if (!hasCompleted[ix]) { return; }
|
|
}
|
|
finished = true;
|
|
subscriber.onNext(results);
|
|
subscriber.onCompleted();
|
|
}
|
|
}));
|
|
})(idx);
|
|
}
|
|
|
|
return group;
|
|
});
|
|
};
|
|
|
|
/**
|
|
* Runs two observable sequences in parallel and combines their last elemenets.
|
|
*
|
|
* @param {Observable} second Second observable sequence.
|
|
* @param {Function} resultSelector Result selector function to invoke with the last elements of both sequences.
|
|
* @returns {Observable} An observable sequence with the result of calling the selector function with the last elements of both input sequences.
|
|
*/
|
|
observableProto.forkJoin = function (second, resultSelector) {
|
|
var first = this;
|
|
|
|
return new AnonymousObservable(function (observer) {
|
|
var leftStopped = false, rightStopped = false,
|
|
hasLeft = false, hasRight = false,
|
|
lastLeft, lastRight,
|
|
leftSubscription = new SingleAssignmentDisposable(), rightSubscription = new SingleAssignmentDisposable();
|
|
|
|
isPromise(second) && (second = observableFromPromise(second));
|
|
|
|
leftSubscription.setDisposable(
|
|
first.subscribe(function (left) {
|
|
hasLeft = true;
|
|
lastLeft = left;
|
|
}, function (err) {
|
|
rightSubscription.dispose();
|
|
observer.onError(err);
|
|
}, function () {
|
|
leftStopped = true;
|
|
if (rightStopped) {
|
|
if (!hasLeft) {
|
|
observer.onCompleted();
|
|
} else if (!hasRight) {
|
|
observer.onCompleted();
|
|
} else {
|
|
var result;
|
|
try {
|
|
result = resultSelector(lastLeft, lastRight);
|
|
} catch (e) {
|
|
observer.onError(e);
|
|
return;
|
|
}
|
|
observer.onNext(result);
|
|
observer.onCompleted();
|
|
}
|
|
}
|
|
})
|
|
);
|
|
|
|
rightSubscription.setDisposable(
|
|
second.subscribe(function (right) {
|
|
hasRight = true;
|
|
lastRight = right;
|
|
}, function (err) {
|
|
leftSubscription.dispose();
|
|
observer.onError(err);
|
|
}, function () {
|
|
rightStopped = true;
|
|
if (leftStopped) {
|
|
if (!hasLeft) {
|
|
observer.onCompleted();
|
|
} else if (!hasRight) {
|
|
observer.onCompleted();
|
|
} else {
|
|
var result;
|
|
try {
|
|
result = resultSelector(lastLeft, lastRight);
|
|
} catch (e) {
|
|
observer.onError(e);
|
|
return;
|
|
}
|
|
observer.onNext(result);
|
|
observer.onCompleted();
|
|
}
|
|
}
|
|
})
|
|
);
|
|
|
|
return new CompositeDisposable(leftSubscription, rightSubscription);
|
|
}, first);
|
|
};
|
|
|
|
/**
|
|
* Comonadic bind operator.
|
|
* @param {Function} selector A transform function to apply to each element.
|
|
* @param {Object} scheduler Scheduler used to execute the operation. If not specified, defaults to the ImmediateScheduler.
|
|
* @returns {Observable} An observable sequence which results from the comonadic bind operation.
|
|
*/
|
|
observableProto.manySelect = function (selector, scheduler) {
|
|
isScheduler(scheduler) || (scheduler = immediateScheduler);
|
|
var source = this;
|
|
return observableDefer(function () {
|
|
var chain;
|
|
|
|
return source
|
|
.map(function (x) {
|
|
var curr = new ChainObservable(x);
|
|
|
|
chain && chain.onNext(x);
|
|
chain = curr;
|
|
|
|
return curr;
|
|
})
|
|
.tap(
|
|
noop,
|
|
function (e) { chain && chain.onError(e); },
|
|
function () { chain && chain.onCompleted(); }
|
|
)
|
|
.observeOn(scheduler)
|
|
.map(selector);
|
|
}, source);
|
|
};
|
|
|
|
var ChainObservable = (function (__super__) {
|
|
|
|
function subscribe (observer) {
|
|
var self = this, g = new CompositeDisposable();
|
|
g.add(currentThreadScheduler.schedule(function () {
|
|
observer.onNext(self.head);
|
|
g.add(self.tail.mergeAll().subscribe(observer));
|
|
}));
|
|
|
|
return g;
|
|
}
|
|
|
|
inherits(ChainObservable, __super__);
|
|
|
|
function ChainObservable(head) {
|
|
__super__.call(this, subscribe);
|
|
this.head = head;
|
|
this.tail = new AsyncSubject();
|
|
}
|
|
|
|
addProperties(ChainObservable.prototype, Observer, {
|
|
onCompleted: function () {
|
|
this.onNext(Observable.empty());
|
|
},
|
|
onError: function (e) {
|
|
this.onNext(Observable.throwException(e));
|
|
},
|
|
onNext: function (v) {
|
|
this.tail.onNext(v);
|
|
this.tail.onCompleted();
|
|
}
|
|
});
|
|
|
|
return ChainObservable;
|
|
|
|
}(Observable));
|
|
|
|
/*
|
|
* Performs a exclusive waiting for the first to finish before subscribing to another observable.
|
|
* Observables that come in between subscriptions will be dropped on the floor.
|
|
* @returns {Observable} A exclusive observable with only the results that happen when subscribed.
|
|
*/
|
|
observableProto.exclusive = function () {
|
|
var sources = this;
|
|
return new AnonymousObservable(function (observer) {
|
|
var hasCurrent = false,
|
|
isStopped = false,
|
|
m = new SingleAssignmentDisposable(),
|
|
g = new CompositeDisposable();
|
|
|
|
g.add(m);
|
|
|
|
m.setDisposable(sources.subscribe(
|
|
function (innerSource) {
|
|
if (!hasCurrent) {
|
|
hasCurrent = true;
|
|
|
|
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
|
|
|
|
var innerSubscription = new SingleAssignmentDisposable();
|
|
g.add(innerSubscription);
|
|
|
|
innerSubscription.setDisposable(innerSource.subscribe(
|
|
observer.onNext.bind(observer),
|
|
observer.onError.bind(observer),
|
|
function () {
|
|
g.remove(innerSubscription);
|
|
hasCurrent = false;
|
|
if (isStopped && g.length === 1) {
|
|
observer.onCompleted();
|
|
}
|
|
}));
|
|
}
|
|
},
|
|
observer.onError.bind(observer),
|
|
function () {
|
|
isStopped = true;
|
|
if (!hasCurrent && g.length === 1) {
|
|
observer.onCompleted();
|
|
}
|
|
}));
|
|
|
|
return g;
|
|
}, this);
|
|
};
|
|
|
|
/*
|
|
* Performs a exclusive map waiting for the first to finish before subscribing to another observable.
|
|
* Observables that come in between subscriptions will be dropped on the floor.
|
|
* @param {Function} selector Selector to invoke for every item in the current subscription.
|
|
* @param {Any} [thisArg] An optional context to invoke with the selector parameter.
|
|
* @returns {Observable} An exclusive observable with only the results that happen when subscribed.
|
|
*/
|
|
observableProto.exclusiveMap = function (selector, thisArg) {
|
|
var sources = this,
|
|
selectorFunc = bindCallback(selector, thisArg, 3);
|
|
return new AnonymousObservable(function (observer) {
|
|
var index = 0,
|
|
hasCurrent = false,
|
|
isStopped = true,
|
|
m = new SingleAssignmentDisposable(),
|
|
g = new CompositeDisposable();
|
|
|
|
g.add(m);
|
|
|
|
m.setDisposable(sources.subscribe(
|
|
function (innerSource) {
|
|
|
|
if (!hasCurrent) {
|
|
hasCurrent = true;
|
|
|
|
innerSubscription = new SingleAssignmentDisposable();
|
|
g.add(innerSubscription);
|
|
|
|
isPromise(innerSource) && (innerSource = observableFromPromise(innerSource));
|
|
|
|
innerSubscription.setDisposable(innerSource.subscribe(
|
|
function (x) {
|
|
var result;
|
|
try {
|
|
result = selectorFunc(x, index++, innerSource);
|
|
} catch (e) {
|
|
observer.onError(e);
|
|
return;
|
|
}
|
|
|
|
observer.onNext(result);
|
|
},
|
|
function (e) { observer.onError(e); },
|
|
function () {
|
|
g.remove(innerSubscription);
|
|
hasCurrent = false;
|
|
|
|
if (isStopped && g.length === 1) {
|
|
observer.onCompleted();
|
|
}
|
|
}));
|
|
}
|
|
},
|
|
function (e) { observer.onError(e); },
|
|
function () {
|
|
isStopped = true;
|
|
if (g.length === 1 && !hasCurrent) {
|
|
observer.onCompleted();
|
|
}
|
|
}));
|
|
return g;
|
|
}, this);
|
|
};
|
|
|
|
return Rx;
|
|
}));
|