summaryrefslogtreecommitdiff
path: root/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable.Imperative.cs
blob: 2124e628852143a289777b3f12f7355978cede94 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

using System.Collections.Generic;
using System.Reactive.Concurrency;
using System.Threading;

#if !NO_TPL
using System.Threading.Tasks;
#endif

namespace System.Reactive.Linq
{
    public static partial class Observable
    {
        #region + ForEachAsync +

#if !NO_TPL
        /// <summary>
        /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
        /// <returns>Task that signals the termination of the sequence.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
        public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");

            return s_impl.ForEachAsync<TSource>(source, onNext);
        }

        /// <summary>
        /// Invokes an action for each element in the observable sequence, and returns a Task object that will get signaled when the sequence terminates.
        /// The loop can be quit prematurely by setting the specified cancellation token.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
        /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
        /// <returns>Task that signals the termination of the sequence.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
        public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource> onNext, CancellationToken cancellationToken)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");

            return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
        }

        /// <summary>
        /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
        /// <returns>Task that signals the termination of the sequence.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
        public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");

            return s_impl.ForEachAsync<TSource>(source, onNext);
        }

        /// <summary>
        /// Invokes an action for each element in the observable sequence, incorporating the element's index, and returns a Task object that will get signaled when the sequence terminates.
        /// The loop can be quit prematurely by setting the specified cancellation token.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source sequence.</param>
        /// <param name="onNext">Action to invoke for each element in the observable sequence.</param>
        /// <param name="cancellationToken">Cancellation token used to stop the loop.</param>
        /// <returns>Task that signals the termination of the sequence.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="onNext"/> is null.</exception>
        /// <remarks>This operator is especially useful in conjunction with the asynchronous programming features introduced in C# 5.0 and Visual Basic 11.</remarks>
        public static Task ForEachAsync<TSource>(this IObservable<TSource> source, Action<TSource, int> onNext, CancellationToken cancellationToken)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (onNext == null)
                throw new ArgumentNullException("onNext");

            return s_impl.ForEachAsync<TSource>(source, onNext, cancellationToken);
        }
#endif

        #endregion

        #region + Case +

        /// <summary>
        /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing <paramref name="defaultSource"/> if no match is found.
        /// </summary>
        /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
        /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
        /// <param name="defaultSource">Default source to select in case no matching source in <paramref name="sources"/> is found.</param>
        /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or <paramref name="defaultSource"/> if no match is found.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="defaultSource"/> is null.</exception>
        public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IObservable<TResult> defaultSource)
        {
            if (selector == null)
                throw new ArgumentNullException("selector");
            if (sources == null)
                throw new ArgumentNullException("sources");
            if (defaultSource == null)
                throw new ArgumentNullException("defaultSource");

            return s_impl.Case<TValue, TResult>(selector, sources, defaultSource);
        }

        /// <summary>
        /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence on the specified scheduler if no match is found.
        /// </summary>
        /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
        /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
        /// <param name="scheduler">Scheduler to generate an empty sequence on in case no matching source in <paramref name="sources"/> is found.</param>
        /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> or <paramref name="scheduler"/> is null.</exception>
        public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources, IScheduler scheduler)
        {
            if (selector == null)
                throw new ArgumentNullException("selector");
            if (sources == null)
                throw new ArgumentNullException("sources");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");

            return s_impl.Case<TValue, TResult>(selector, sources, scheduler);
        }

        /// <summary>
        /// Uses <paramref name="selector"/> to determine which source in <paramref name="sources"/> to return, choosing an empty sequence if no match is found.
        /// </summary>
        /// <typeparam name="TValue">The type of the value returned by the selector function, used to look up the resulting source.</typeparam>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="selector">Selector function invoked to determine the source to lookup in the <paramref name="sources"/> dictionary.</param>
        /// <param name="sources">Dictionary of sources to select from based on the <paramref name="selector"/> invocation result.</param>
        /// <returns>The observable sequence retrieved from the <paramref name="sources"/> dictionary based on the <paramref name="selector"/> invocation result, or an empty sequence if no match is found.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="selector"/> or <paramref name="sources"/> is null.</exception>
        public static IObservable<TResult> Case<TValue, TResult>(Func<TValue> selector, IDictionary<TValue, IObservable<TResult>> sources)
        {
            if (selector == null)
                throw new ArgumentNullException("selector");
            if (sources == null)
                throw new ArgumentNullException("sources");

            return s_impl.Case<TValue, TResult>(selector, sources);
        }

        #endregion

        #region + DoWhile +

        /// <summary>
        /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated after each repeated <paramref name="source"/> completed.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
        /// <param name="condition">Condition that will be evaluated upon the completion of an iteration through the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
        /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="condition"/> is null.</exception>
        public static IObservable<TSource> DoWhile<TSource>(this IObservable<TSource> source, Func<bool> condition)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (condition == null)
                throw new ArgumentNullException("condition");

            return s_impl.DoWhile<TSource>(source, condition);
        }

        #endregion

        #region + For +

        /// <summary>
        /// Concatenates the observable sequences obtained by running the <paramref name="resultSelector"/> for each element in the given enumerable <paramref name="source"/>.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the enumerable source sequence.</typeparam>
        /// <typeparam name="TResult">The type of the elements in the observable result sequence.</typeparam>
        /// <param name="source">Enumerable source for which each element will be mapped onto an observable source that will be concatenated in the result sequence.</param>
        /// <param name="resultSelector">Function to select an observable source for each element in the <paramref name="source"/>.</param>
        /// <returns>The observable sequence obtained by concatenating the sources returned by <paramref name="resultSelector"/> for each element in the <paramref name="source"/>.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="source"/> or <paramref name="resultSelector"/> is null.</exception>
        public static IObservable<TResult> For<TSource, TResult>(IEnumerable<TSource> source, Func<TSource, IObservable<TResult>> resultSelector)
        {
            if (source == null)
                throw new ArgumentNullException("source");
            if (resultSelector == null)
                throw new ArgumentNullException("resultSelector");

            return s_impl.For<TSource, TResult>(source, resultSelector);
        }

        #endregion

        #region + If +

        /// <summary>
        /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, select the <paramref name="elseSource"/> sequence.
        /// </summary>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
        /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
        /// <param name="elseSource">Sequence returned in case <paramref name="condition"/> evaluates false.</param>
        /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; <paramref name="elseSource"/> otherwise.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="elseSource"/> is null.</exception>
        public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IObservable<TResult> elseSource)
        {
            if (condition == null)
                throw new ArgumentNullException("condition");
            if (thenSource == null)
                throw new ArgumentNullException("thenSource");
            if (elseSource == null)
                throw new ArgumentNullException("elseSource");

            return s_impl.If<TResult>(condition, thenSource, elseSource);
        }

        /// <summary>
        /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence.
        /// </summary>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
        /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
        /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> is null.</exception>
        public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource)
        {
            if (condition == null)
                throw new ArgumentNullException("condition");
            if (thenSource == null)
                throw new ArgumentNullException("thenSource");

            return s_impl.If<TResult>(condition, thenSource);
        }

        /// <summary>
        /// If the specified <paramref name="condition"/> evaluates true, select the <paramref name="thenSource"/> sequence. Otherwise, return an empty sequence generated on the specified scheduler.
        /// </summary>
        /// <typeparam name="TResult">The type of the elements in the result sequence.</typeparam>
        /// <param name="condition">Condition evaluated to decide which sequence to return.</param>
        /// <param name="thenSource">Sequence returned in case <paramref name="condition"/> evaluates true.</param>
        /// <param name="scheduler">Scheduler to generate an empty sequence on in case <paramref name="condition"/> evaluates false.</param>
        /// <returns><paramref name="thenSource"/> if <paramref name="condition"/> evaluates true; an empty sequence otherwise.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="thenSource"/> or <paramref name="scheduler"/> is null.</exception>
        public static IObservable<TResult> If<TResult>(Func<bool> condition, IObservable<TResult> thenSource, IScheduler scheduler)
        {
            if (condition == null)
                throw new ArgumentNullException("condition");
            if (thenSource == null)
                throw new ArgumentNullException("thenSource");
            if (scheduler == null)
                throw new ArgumentNullException("scheduler");

            return s_impl.If<TResult>(condition, thenSource, scheduler);
        }

        #endregion

        #region + While +

        /// <summary>
        /// Repeats the given <paramref name="source"/> as long as the specified <paramref name="condition"/> holds, where the <paramref name="condition"/> is evaluated before each repeated <paramref name="source"/> is subscribed to.
        /// </summary>
        /// <typeparam name="TSource">The type of the elements in the source sequence.</typeparam>
        /// <param name="source">Source to repeat as long as the <paramref name="condition"/> function evaluates to true.</param>
        /// <param name="condition">Condition that will be evaluated before subscription to the <paramref name="source"/>, to determine whether repetition of the source is required.</param>
        /// <returns>The observable sequence obtained by concatenating the <paramref name="source"/> sequence as long as the <paramref name="condition"/> holds.</returns>
        /// <exception cref="ArgumentNullException"><paramref name="condition"/> or <paramref name="source"/> is null.</exception>
        public static IObservable<TSource> While<TSource>(Func<bool> condition, IObservable<TSource> source)
        {
            if (condition == null)
                throw new ArgumentNullException("condition");
            if (source == null)
                throw new ArgumentNullException("source");

            return s_impl.While<TSource>(condition, source);
        }

        #endregion
    }
}