diff options
Diffstat (limited to 'external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs')
-rw-r--r-- | external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs | 212 |
1 files changed, 177 insertions, 35 deletions
diff --git a/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs index 0089f04b89..41ceb9ce8f 100644 --- a/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs +++ b/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs @@ -1,13 +1,10 @@ // Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information. using System.Collections.Generic; -using System.Linq; using System.Reactive.Concurrency; -using System.Reactive.Disposables; -using System.Reactive.Subjects; + #if !NO_TPL -using System.Reactive.Threading.Tasks; using System.Threading; using System.Threading.Tasks; #endif @@ -15,7 +12,7 @@ using System.Threading.Tasks; namespace System.Reactive.Linq { #if !NO_PERF - using Observαble; + using ObservableImpl; #endif internal partial class QueryLanguage @@ -156,30 +153,50 @@ namespace System.Reactive.Linq public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector) { - return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default); + return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default); } public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer) { - return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer); + return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer); } public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector) { - return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default); + return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default); } public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer) { - return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer); + return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer); + } + + public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity) + { + return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default); + } + + public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer) + { + return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer); + } + + public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity) + { + return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default); + } + + public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer) + { + return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer); } - private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer) + private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer) { #if !NO_PERF - return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer); + return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer); #else - return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer); + return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer); #endif } @@ -189,32 +206,54 @@ namespace System.Reactive.Linq public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer) { - return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer); + return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer); } public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector) { - return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default); + return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default); } public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer) { - return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer); + return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer); } public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector) { - return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default); + return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default); + } + + public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer) + { + return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer); } - private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer) + public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity) + { + return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default); + } + + public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer) + { + return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer); + } + + public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity) + { + return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default); + } + + private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer) { #if !NO_PERF - return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer); + return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer); #else return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer => { - var map = new Dictionary<TKey, ISubject<TElement>>(comparer); + var map = capacity.HasValue + ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer) + : new Dictionary<TKey, ISubject<TElement>>(comparer); var groupDisposable = new CompositeDisposable(); var refCountDisposable = new RefCountDisposable(groupDisposable); @@ -733,7 +772,7 @@ namespace System.Reactive.Linq #if !NO_PERF var select = source as Select<TSource>; if (select != null) - return select.Ω(selector); + return select.Omega(selector); return new Select<TSource, TResult>(source, selector); #else @@ -853,6 +892,15 @@ namespace System.Reactive.Linq #endif } + public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector) + { +#if !NO_PERF + return new SelectMany<TSource, TResult>(source, (x, i, token) => selector(x, i)); +#else + return SelectMany_<TSource, TResult>(source, (x, i) => selector(x, i).ToObservable()); +#endif + } + public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector) { #if !NO_PERF @@ -861,6 +909,15 @@ namespace System.Reactive.Linq return SelectMany_<TSource, TResult>(source, x => FromAsync(ct => selector(x, ct))); #endif } + + public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector) + { +#if !NO_PERF + return new SelectMany<TSource, TResult>(source, selector); +#else + return SelectMany_<TSource, TResult>(source, (x, i) => FromAsync(ct => selector(x, i, ct))); +#endif + } #endif public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) @@ -883,6 +940,15 @@ namespace System.Reactive.Linq #endif } + public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector) + { +#if !NO_PERF + return new SelectMany<TSource, TTaskResult, TResult>(source, (x, i, token) => taskSelector(x, i), resultSelector); +#else + return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => taskSelector(x, i).ToObservable(), (x, i, t, _) => resultSelector(x, i, t)); +#endif + } + public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector) { #if !NO_PERF @@ -891,6 +957,15 @@ namespace System.Reactive.Linq return SelectMany_<TSource, TTaskResult, TResult>(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector); #endif } + + public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector) + { +#if !NO_PERF + return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector); +#else + return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => FromAsync(ct => taskSelector(x, i, ct)), (x, i, t, _) => resultSelector(x, i, t)); +#endif + } #endif private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector) @@ -901,7 +976,7 @@ namespace System.Reactive.Linq return source.Select(selector).Merge(); #endif } - + private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector) { #if !NO_PERF @@ -925,7 +1000,7 @@ namespace System.Reactive.Linq #if !NO_PERF return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector); #else - return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y))); + return SelectMany_<TSource, TResult>(source, (x, i) => collectionSelector(x, i).Select((y, i2) => resultSelector(x, i, y, i2))); #endif } @@ -946,19 +1021,23 @@ namespace System.Reactive.Linq #endif } - public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted) + public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted) { #if !NO_PERF return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted); #else - return source.Materialize().SelectMany(notification => + return Defer(() => { - if (notification.Kind == NotificationKind.OnNext) - return onNext(notification.Value); - else if (notification.Kind == NotificationKind.OnError) - return onError(notification.Exception); - else - return onCompleted(); + var index = 0; + return source.Materialize().SelectMany(notification => + { + if (notification.Kind == NotificationKind.OnNext) + return onNext(notification.Value, checked(index++)); + else if (notification.Kind == NotificationKind.OnError) + return onError(notification.Exception); + else + return onCompleted(); + }); }); #endif } @@ -977,7 +1056,7 @@ namespace System.Reactive.Linq #if !NO_PERF return new SelectMany<TSource, TResult>(source, selector); #else - return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x); + return SelectMany_<TSource, TResult, TResult>(source, selector, (_, __, x, ___) => x); #endif } @@ -986,6 +1065,11 @@ namespace System.Reactive.Linq return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector); } + public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) + { + return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector); + } + private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector) { #if !NO_PERF @@ -1045,9 +1129,67 @@ namespace System.Reactive.Linq #endif } - public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) + private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector) { +#if !NO_PERF return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector); +#else + return new AnonymousObservable<TResult>(observer => + { + var index = 0; + + return source.Subscribe( + x => + { + var xs = default(IEnumerable<TCollection>); + try + { + xs = collectionSelector(x, checked(index++)); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + + var e = xs.GetEnumerator(); + + try + { + var eIndex = 0; + var hasNext = true; + while (hasNext) + { + hasNext = false; + var current = default(TResult); + + try + { + hasNext = e.MoveNext(); + if (hasNext) + current = resultSelector(x, index, e.Current, checked(eIndex++)); + } + catch (Exception exception) + { + observer.OnError(exception); + return; + } + + if (hasNext) + observer.OnNext(current); + } + } + finally + { + if (e != null) + e.Dispose(); + } + }, + observer.OnError, + observer.OnCompleted + ) + }); +#endif } #endregion @@ -1059,7 +1201,7 @@ namespace System.Reactive.Linq #if !NO_PERF var skip = source as Skip<TSource>; if (skip != null && skip._scheduler == null) - return skip.Ω(count); + return skip.Omega(count); return new Skip<TSource>(source, count); #else @@ -1156,7 +1298,7 @@ namespace System.Reactive.Linq { var take = source as Take<TSource>; if (take != null && take._scheduler == null) - return take.Ω(count); + return take.Omega(count); return new Take<TSource>(source, count); } @@ -1248,7 +1390,7 @@ namespace System.Reactive.Linq #if !NO_PERF var where = source as Where<TSource>; if (where != null) - return where.Ω(predicate); + return where.Omega(predicate); return new Where<TSource>(source, predicate); #else |