summaryrefslogtreecommitdiff
path: root/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
diff options
context:
space:
mode:
Diffstat (limited to 'external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs')
-rw-r--r--external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs212
1 files changed, 177 insertions, 35 deletions
diff --git a/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs b/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
index 0089f04b89..41ceb9ce8f 100644
--- a/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
+++ b/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/QueryLanguage.StandardSequenceOperators.cs
@@ -1,13 +1,10 @@
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.
using System.Collections.Generic;
-using System.Linq;
using System.Reactive.Concurrency;
-using System.Reactive.Disposables;
-using System.Reactive.Subjects;
+
#if !NO_TPL
-using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
#endif
@@ -15,7 +12,7 @@ using System.Threading.Tasks;
namespace System.Reactive.Linq
{
#if !NO_PERF
- using Observαble;
+ using ObservableImpl;
#endif
internal partial class QueryLanguage
@@ -156,30 +153,50 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, comparer);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector)
{
- return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, EqualityComparer<TKey>.Default);
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
{
- return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, null, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupBy<TSource, TKey>(IObservable<TSource> source, Func<TSource, TKey> keySelector, int capacity)
+ {
+ return GroupBy_<TSource, TKey, TSource>(source, keySelector, x => x, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupBy<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupBy_<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, IEqualityComparer<TKey> comparer)
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupBy_<TSource, TKey, TElement>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, comparer);
+ return new GroupBy<TSource, TKey, TElement>(source, keySelector, elementSelector, capacity, comparer);
#else
- return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), comparer);
+ return GroupByUntil_<TSource, TKey, TElement, Unit>(source, keySelector, elementSelector, _ => Observable.Never<Unit>(), capacity, comparer);
#endif
}
@@ -189,32 +206,54 @@ namespace System.Reactive.Linq
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, null, EqualityComparer<TKey>.Default);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, comparer);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, comparer);
}
public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector)
{
- return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, EqualityComparer<TKey>.Default);
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, null, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
}
- private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, IEqualityComparer<TKey> comparer)
+ public virtual IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity, IEqualityComparer<TKey> comparer)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, comparer);
+ }
+
+ public virtual IObservable<IGroupedObservable<TKey, TSource>> GroupByUntil<TSource, TKey, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<IGroupedObservable<TKey, TSource>, IObservable<TDuration>> durationSelector, int capacity)
+ {
+ return GroupByUntil_<TSource, TKey, TSource, TDuration>(source, keySelector, x => x, durationSelector, capacity, EqualityComparer<TKey>.Default);
+ }
+
+ private static IObservable<IGroupedObservable<TKey, TElement>> GroupByUntil_<TSource, TKey, TElement, TDuration>(IObservable<TSource> source, Func<TSource, TKey> keySelector, Func<TSource, TElement> elementSelector, Func<IGroupedObservable<TKey, TElement>, IObservable<TDuration>> durationSelector, int? capacity, IEqualityComparer<TKey> comparer)
{
#if !NO_PERF
- return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, comparer);
+ return new GroupByUntil<TSource, TKey, TElement, TDuration>(source, keySelector, elementSelector, durationSelector, capacity, comparer);
#else
return new AnonymousObservable<IGroupedObservable<TKey, TElement>>(observer =>
{
- var map = new Dictionary<TKey, ISubject<TElement>>(comparer);
+ var map = capacity.HasValue
+ ? new Dictionary<TKey, ISubject<TElement>>(capacity.Value, comparer)
+ : new Dictionary<TKey, ISubject<TElement>>(comparer);
var groupDisposable = new CompositeDisposable();
var refCountDisposable = new RefCountDisposable(groupDisposable);
@@ -733,7 +772,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var select = source as Select<TSource>;
if (select != null)
- return select.Ω(selector);
+ return select.Omega(selector);
return new Select<TSource, TResult>(source, selector);
#else
@@ -853,6 +892,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, (x, i, token) => selector(x, i));
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => selector(x, i).ToObservable());
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TResult>> selector)
{
#if !NO_PERF
@@ -861,6 +909,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TResult>(source, x => FromAsync(ct => selector(x, ct)));
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TResult>> selector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TResult>(source, selector);
+#else
+ return SelectMany_<TSource, TResult>(source, (x, i) => FromAsync(ct => selector(x, i, ct)));
+#endif
+ }
#endif
public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
@@ -883,6 +940,15 @@ namespace System.Reactive.Linq
#endif
}
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, (x, i, token) => taskSelector(x, i), resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => taskSelector(x, i).ToObservable(), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
+
public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, TTaskResult, TResult> resultSelector)
{
#if !NO_PERF
@@ -891,6 +957,15 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TTaskResult, TResult>(source, x => FromAsync(ct => taskSelector(x, ct)), resultSelector);
#endif
}
+
+ public virtual IObservable<TResult> SelectMany<TSource, TTaskResult, TResult>(IObservable<TSource> source, Func<TSource, int, CancellationToken, Task<TTaskResult>> taskSelector, Func<TSource, int, TTaskResult, TResult> resultSelector)
+ {
+#if !NO_PERF
+ return new SelectMany<TSource, TTaskResult, TResult>(source, taskSelector, resultSelector);
+#else
+ return SelectMany_<TSource, TTaskResult, TResult>(source, (x, i) => FromAsync(ct => taskSelector(x, i, ct)), (x, i, t, _) => resultSelector(x, i, t));
+#endif
+ }
#endif
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, IObservable<TResult>> selector)
@@ -901,7 +976,7 @@ namespace System.Reactive.Linq
return source.Select(selector).Merge();
#endif
}
-
+
private static IObservable<TResult> SelectMany_<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> selector)
{
#if !NO_PERF
@@ -925,7 +1000,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
#else
- return SelectMany_<TSource, TResult>(source, x => collectionSelector(x).Select(y => resultSelector(x, y)));
+ return SelectMany_<TSource, TResult>(source, (x, i) => collectionSelector(x, i).Select((y, i2) => resultSelector(x, i, y, i2)));
#endif
}
@@ -946,19 +1021,23 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, int, IObservable<TResult>> onError, Func<int, IObservable<TResult>> onCompleted)
+ public virtual IObservable<TResult> SelectMany<TSource, TResult>(IObservable<TSource> source, Func<TSource, int, IObservable<TResult>> onNext, Func<Exception, IObservable<TResult>> onError, Func<IObservable<TResult>> onCompleted)
{
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, onNext, onError, onCompleted);
#else
- return source.Materialize().SelectMany(notification =>
+ return Defer(() =>
{
- if (notification.Kind == NotificationKind.OnNext)
- return onNext(notification.Value);
- else if (notification.Kind == NotificationKind.OnError)
- return onError(notification.Exception);
- else
- return onCompleted();
+ var index = 0;
+ return source.Materialize().SelectMany(notification =>
+ {
+ if (notification.Kind == NotificationKind.OnNext)
+ return onNext(notification.Value, checked(index++));
+ else if (notification.Kind == NotificationKind.OnError)
+ return onError(notification.Exception);
+ else
+ return onCompleted();
+ });
});
#endif
}
@@ -977,7 +1056,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
return new SelectMany<TSource, TResult>(source, selector);
#else
- return SelectMany_<TSource, TResult, TResult>(source, selector, (_, x) => x);
+ return SelectMany_<TSource, TResult, TResult>(source, selector, (_, __, x, ___) => x);
#endif
}
@@ -986,6 +1065,11 @@ namespace System.Reactive.Linq
return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
}
+ public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ {
+ return SelectMany_<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+ }
+
private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, IEnumerable<TCollection>> collectionSelector, Func<TSource, TCollection, TResult> resultSelector)
{
#if !NO_PERF
@@ -1045,9 +1129,67 @@ namespace System.Reactive.Linq
#endif
}
- public virtual IObservable<TResult> SelectMany<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
+ private static IObservable<TResult> SelectMany_<TSource, TCollection, TResult>(IObservable<TSource> source, Func<TSource, int, IEnumerable<TCollection>> collectionSelector, Func<TSource, int, TCollection, int, TResult> resultSelector)
{
+#if !NO_PERF
return new SelectMany<TSource, TCollection, TResult>(source, collectionSelector, resultSelector);
+#else
+ return new AnonymousObservable<TResult>(observer =>
+ {
+ var index = 0;
+
+ return source.Subscribe(
+ x =>
+ {
+ var xs = default(IEnumerable<TCollection>);
+ try
+ {
+ xs = collectionSelector(x, checked(index++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ var e = xs.GetEnumerator();
+
+ try
+ {
+ var eIndex = 0;
+ var hasNext = true;
+ while (hasNext)
+ {
+ hasNext = false;
+ var current = default(TResult);
+
+ try
+ {
+ hasNext = e.MoveNext();
+ if (hasNext)
+ current = resultSelector(x, index, e.Current, checked(eIndex++));
+ }
+ catch (Exception exception)
+ {
+ observer.OnError(exception);
+ return;
+ }
+
+ if (hasNext)
+ observer.OnNext(current);
+ }
+ }
+ finally
+ {
+ if (e != null)
+ e.Dispose();
+ }
+ },
+ observer.OnError,
+ observer.OnCompleted
+ )
+ });
+#endif
}
#endregion
@@ -1059,7 +1201,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var skip = source as Skip<TSource>;
if (skip != null && skip._scheduler == null)
- return skip.Ω(count);
+ return skip.Omega(count);
return new Skip<TSource>(source, count);
#else
@@ -1156,7 +1298,7 @@ namespace System.Reactive.Linq
{
var take = source as Take<TSource>;
if (take != null && take._scheduler == null)
- return take.Ω(count);
+ return take.Omega(count);
return new Take<TSource>(source, count);
}
@@ -1248,7 +1390,7 @@ namespace System.Reactive.Linq
#if !NO_PERF
var where = source as Where<TSource>;
if (where != null)
- return where.Ω(predicate);
+ return where.Omega(predicate);
return new Where<TSource>(source, predicate);
#else