summaryrefslogtreecommitdiff
path: root/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/FromEvent.cs
blob: 82c43483b133d5adfd3196f6cef0349715fbb159 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#if !NO_PERF
using System;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Subjects;

//
// BREAKING CHANGE v2 > v1.x - FromEvent[Pattern] now has an implicit SubscribeOn and Publish operation.
//
// The free-threaded nature of Rx is key to the performance characteristics of the event processing
// pipeline. However, in places where we bridge with the external world, this sometimes has negative
// effects due to thread-affine operations involved. The FromEvent[Pattern] bridges are one such
// place where we reach out to add and remove operations on events.
//
// Consider the following piece of code, assuming Rx v1.x usage:
//
//   var txt = Observable.FromEventPattern(txtInput, "TextChanged");
//   var res = from term in txt
//             from word in svc.Lookup(term).TakeUntil(txt)
//             select word;
//
// This code is flawed for various reasons. Seasoned Rx developers will immediately suggest usage of
// the Publish operator to share the side-effects of subscribing to the txt sequence, resulting in
// only one subscription to the event:
//
//   var txt = Observable.FromEventPattern(txtInput, "TextChanged");
//   var res = txt.Publish(txt_ => from term in txt_
//                                 from word in svc.Lookup(term).TakeUntil(txt_)
//                                 select word);
//
// Customers are typically confused as to why FromEvent[Pattern] causes multiple handlers to be added
// to the underlying event. This is in contrast with other From* bridges which involve the use of a
// subject (e.g. FromAsyncPattern, FromAsync, and ToObservable on Task<T>).
//
// But there are more issues with the code fragment above. Upon completion of the svc.Lookup(term)
// sequence, TakeUntil will unsubscribe from both sequences, causing the unsubscription to happen in
// the context of the source's OnCompleted, which may be the thread pool. Some thread-affine events
// don't quite like this. In UI frameworks like WPF and Silverlight, this turns out to be not much of
// a problem typically, but it's merely an accident things work out. From an e-mail conversion with
// the WPF/SL/Jupiter experts:
//
//   "Unfortunately, as I expected, it’s confusing, and implementation details are showing through.
//    The bottom line is that event add/remove should always be done on the right thread.
//    
//    Where events are implemented with compiler-generated code, i.e. MultiCastDelegate, the add/remove
//    will be thread safe/agile.  Where events are implemented in custom code, across Wpf/SL/WP/Jupiter,
//    the add/remove are expected to happen on the Dispatcher thread.
//    
//    Jupiter actually has the consistent story here, where all the event add/remove implementations do
//    the thread check.  It should still be a “wrong thread” error, though, not an AV.
//    
//    In SL there’s a mix of core events (which do the thread check) and framework events (which use
//    compiler-generated event implementations).  So you get an exception if you unhook Button.Loaded
//    from off thread, but you don’t get an exception if you unhook Button.Click.
//    
//    In WPF there’s a similar mix (some events are compiler-generated and some use the EventHandlerStore).
//    But I don’t see any thread safety or thread check in the EventHandlerStore.  So while it works, IIUC,
//    it should have race conditions and corruptions."
//
// Starting with "Jupiter" (Windows XAML aka "Metro"), checks are added to ensure the add and remove
// operations for UI events are called from the UI thread. As a result, the dictionary suggest sample
// code shown above starts to fail. A possible fix is to use SubscribeOnDispatcher:
//
//   var txt = Observable.FromEventPattern(txtInput, "TextChanged").SubscribeOnDispatcher();
//   var res = from term in txt
//             from word in svc.Lookup(term).TakeUntil(txt)
//             select word;
//
// This fix has two problems:
//
// 1. Customers often don't quite understand the difference between ObserveOn and SubscribeOn. In fact,
//    we've given guidance that use of the latter is typically indicative of a misunderstanding, and
//    is used rarely. Also, the fragment above would likely be extended with some UI binding code where
//    one needs to use ObserveOnDispatcher, so the combination of both becomes even more confusing.
//
// 2. There's a subtle race condition now. Upon receiving a new term from the txt sequence, SelectMany's
//    invocation of the result selector involves TakeUntil subscribing to txt again. However, the use
//    of SubscribeOnDispatcher means the subscription is now happening asynchronously, leaving a time
//    gap between returning from Subscribe and doing the += on the underlying event:
//
//                    (Subscription of TakeUntil to txt)
//                                     |
//                                     v
//        txt            --------------------------------------------------------------
//                                     |
//                                     +-----...----+  (SubscribeOnDispatcher's post of Subscribe)
//                                                  |
//        TextChanged    ------"re"---------"rea"-------------"reac"-----"react"----...
//                                                  ^
//                                                  |
//                                    (where += on the event happens)
//
//    While this problem is rare and sometimes gets mitigated by accident because code is posting back
//    to e.g. the UI message loop, it's extremely hard to debug when things go wrong.
//
// In order to fix this behavior such that code has the expected behavior, we do two things in Rx v2.0:
//
// - To solve the cross-thread add/remove handler operations and make them single-thread affine, we
//   now do an implicit SubscribeOn with the SynchronizationContext.Current retrieved eagerly upon
//   calling FromEvent[Pattern]. This goes hand-in-hand with a recommendation:
//
//      "Always call FromEvent[Pattern] in a place where you'd normally write += and -= operations
//       yourself. Don't inline the creation of a FromEvent[Pattern] object inside a query."
//
//   This recommendation helps to keep code clean (bridging operations are moved outside queries) and
//   ensures the captured SynchronizationContext is the least surprising one. E.g in the sample code
//   above, the whole query likely lives in a button_Click handler or so.
//
// - To solve the time gap issue, we now add implicit Publish behavior with ref-counted behavior. In
//   other words, the new FromEvent[Pattern] is pretty much the same as:
//
//          Observable_v2.FromEvent[Pattern](<args>)
//      ==
//          Observable_v1.FromEvent[Pattern](<args>).SubscribeOn(SynchronizationContext.Current)
//                                                  .Publish()
//                                                  .RefCount()
//
// Overloads to FromEvent[Pattern] allow to specify the scheduler used for the SubscribeOn operation
// that's taking place internally. When omitted, a SynchronizationContextScheduler will be supplied
// if a current SynchronizationContext is found. If no current SynchronizationContext is found, the
// default scheduler is the immediate scheduler, falling back to the free-threaded behavior we had
// before in v1.x. (See GetSchedulerForCurrentContext in QueryLanguage.Events.cs).
//
// Notice a time gap can still occur at the point of the first subscription to the event sequence,
// or when the ref count fell back to zero. In cases of nested uses of the sequence (such as in the
// running example here), this is fine because the top-level subscription is kept alive for the whole
// duration. In other cases, there's already a race condition between the underlying event and the
// observable wrapper (assuming events are hot). For cold events that have side-effects upon add and
// remove handler operations, use of Observable.Create is recommended. This should be rather rare,
// as most events follow the typical MulticastDelegate implementation pattern:
//
//    public event EventHandler<BarEventArgs> Bar;
//
//    protected void OnBar(int value)
//    {
//        var bar = Bar;
//        if (bar != null)
//            bar(this, new BarEventArgs(value));
//    }
//
// In here, there's already a race between the user hooking up an event handler through the += add
// operation and the event producer (possibly on a different thread) calling OnBar. It's also worth
// pointing out that this race condition is migitated by a check in SynchronizationContextScheduler
// causing synchronous execution in case the caller is already on the target SynchronizationContext.
// This situation is common when using FromEvent[Pattern] immediately after declaring it, e.g. in
// the context of a UI event handler.
//
// Finally, notice we can't simply connect the event to a Subject<T> upon a FromEvent[Pattern] call,
// because this would make it impossible to get rid of this one event handler (unless we expose some
// other means of resource maintenance, e.g. by making the returned object implement IDisposable).
// Also, this would cause the event producer to see the event's delegate in a non-null state all the
// time, causing event argument objects to be newed up, possibly sending those into a zero-observer
// subject (which is opaque to the event producer). Not to mention that the subject would always be
// rooted by the target event (even when the FromEvent[Pattern] observable wrapper is unreachable).
//
namespace System.Reactive.Linq.Observαble
{
    class FromEvent<TDelegate, TEventArgs> : ClassicEventProducer<TDelegate, TEventArgs>
    {
        private readonly Func<Action<TEventArgs>, TDelegate> _conversion;

        public FromEvent(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
            : base(addHandler, removeHandler, scheduler)
        {
        }

        public FromEvent(Func<Action<TEventArgs>, TDelegate> conversion, Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
            : base(addHandler, removeHandler, scheduler)
        {
            _conversion = conversion;
        }

        protected override TDelegate GetHandler(Action<TEventArgs> onNext)
        {
            var handler = default(TDelegate);

            if (_conversion == null)
            {
                handler = ReflectionUtils.CreateDelegate<TDelegate>(onNext, typeof(Action<TEventArgs>).GetMethod("Invoke"));
            }
            else
            {
                handler = _conversion(onNext);
            }

            return handler;
        }
    }

    abstract class EventProducer<TDelegate, TArgs> : Producer<TArgs>
    {
        private readonly IScheduler _scheduler;
        private readonly object _gate;

        public EventProducer(IScheduler scheduler)
        {
            _scheduler = scheduler;
            _gate = new object();
        }

        protected abstract TDelegate GetHandler(Action<TArgs> onNext);
        protected abstract IDisposable AddHandler(TDelegate handler);

        private Session _session;

        protected override IDisposable Run(IObserver<TArgs> observer, IDisposable cancel, Action<IDisposable> setSink)
        {
            var connection = default(IDisposable);

            lock (_gate)
            {
                //
                // A session object holds on to a single handler to the underlying event, feeding
                // into a subject. It also ref counts the number of connections to the subject.
                //
                // When the ref count goes back to zero, the event handler is unregistered, and
                // the session will reach out to reset the _session field to null under the _gate
                // lock. Future subscriptions will cause a new session to be created.
                //
                if (_session == null)
                    _session = new Session(this);

                connection = _session.Connect(observer);
            }

            return connection;
        }

        class Session
        {
            private readonly EventProducer<TDelegate, TArgs> _parent;
            private readonly Subject<TArgs> _subject;

            private SingleAssignmentDisposable _removeHandler;
            private int _count;

            public Session(EventProducer<TDelegate, TArgs> parent)
            {
                _parent = parent;
                _subject = new Subject<TArgs>();
            }

            public IDisposable Connect(IObserver<TArgs> observer)
            {
                /*
                 * CALLERS - Ensure this is called under the lock!
                 * 
                lock (_parent._gate) */
                {
                    //
                    // We connect the given observer to the subject first, before performing any kind
                    // of initialization which will register an event handler. This is done to ensure
                    // we don't have a time gap between adding the handler and connecting the user's
                    // subject, e.g. when the ImmediateScheduler is used.
                    //
                    // [OK] Use of unsafe Subscribe: called on a known subject implementation.
                    //
                    var connection = _subject.Subscribe/*Unsafe*/(observer);

                    if (++_count == 1)
                    {
                        try
                        {
                            Initialize();
                        }
                        catch (Exception exception)
                        {
                            --_count;
                            connection.Dispose();

                            observer.OnError(exception);
                            return Disposable.Empty;
                        }
                    }

                    return Disposable.Create(() =>
                    {
                        connection.Dispose();

                        lock (_parent._gate)
                        {
                            if (--_count == 0)
                            {
                                _parent._scheduler.Schedule(_removeHandler.Dispose);
                                _parent._session = null;
                            }
                        }
                    });
                }
            }

            private void Initialize()
            {
                /*
                 * CALLERS - Ensure this is called under the lock!
                 * 
                lock (_parent._gate) */
                {
                    //
                    // When the ref count goes to zero, no-one should be able to perform operations on
                    // the session object anymore, because it gets nulled out.
                    //
                    Debug.Assert(_removeHandler == null);
                    _removeHandler = new SingleAssignmentDisposable();

                    //
                    // Conversion code is supposed to be a pure function and shouldn't be run on the
                    // scheduler, but the add handler call should. Notice the scheduler can be the
                    // ImmediateScheduler, causing synchronous invocation. This is the default when
                    // no SynchronizationContext is found (see QueryLanguage.Events.cs and search for
                    // the GetSchedulerForCurrentContext method).
                    //
                    var onNext = _parent.GetHandler(_subject.OnNext);
                    _parent._scheduler.Schedule(onNext, AddHandler);
                }
            }

            private IDisposable AddHandler(IScheduler self, TDelegate onNext)
            {
                var removeHandler = default(IDisposable);
                try
                {
                    removeHandler = _parent.AddHandler(onNext);
                }
                catch (Exception exception)
                {
                    _subject.OnError(exception);
                    return Disposable.Empty;
                }

                //
                // We don't propagate the exception to the OnError channel upon Dispose. This is
                // not possible at this stage, because we've already auto-detached in the base
                // class Producer implementation. Even if we would switch the OnError and auto-
                // detach calls, it wouldn't work because the remove handler logic is scheduled
                // on the given scheduler, causing asynchrony. We can't block waiting for the
                // remove handler to run on the scheduler.
                //
                _removeHandler.Disposable = removeHandler;

                return Disposable.Empty;
            }
        }
    }

    abstract class ClassicEventProducer<TDelegate, TArgs> : EventProducer<TDelegate, TArgs>
    {
        private readonly Action<TDelegate> _addHandler;
        private readonly Action<TDelegate> _removeHandler;

        public ClassicEventProducer(Action<TDelegate> addHandler, Action<TDelegate> removeHandler, IScheduler scheduler)
            : base(scheduler)
        {
            _addHandler = addHandler;
            _removeHandler = removeHandler;
        }

        protected override IDisposable AddHandler(TDelegate handler)
        {
            _addHandler(handler);
            return Disposable.Create(() => _removeHandler(handler));
        }
    }
}
#endif