summaryrefslogtreecommitdiff
path: root/external/rx/Rx/NET/Source/System.Reactive.Linq/Reactive/Linq/Observable/Next.cs
blob: 2d4ec45f51c727dea57731587d3740758c650f84 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
// Copyright (c) Microsoft Open Technologies, Inc. All rights reserved. See License.txt in the project root for license information.

#if !NO_PERF
using System;
using System.Collections.Generic;
using System.Reactive.Threading;
using System.Threading;

namespace System.Reactive.Linq.Observαble
{
    class Next<TSource> : PushToPullAdapter<TSource, TSource>
    {
        public Next(IObservable<TSource> source)
            : base(source)
        {
        }

        protected override PushToPullSink<TSource, TSource> Run(IDisposable subscription)
        {
            return new _(subscription);
        }

        class _ : PushToPullSink<TSource, TSource>
        {
            private readonly object _gate;

#if !NO_CDS
            private readonly SemaphoreSlim _semaphore;
#else
            private readonly Semaphore _semaphore;
#endif

            public _(IDisposable subscription)
                : base(subscription)
            {
                _gate = new object();

#if !NO_CDS
                _semaphore = new SemaphoreSlim(0, 1);
#else
                _semaphore = new Semaphore(0, 1);
#endif
            }

            private bool _waiting;
            private NotificationKind _kind;
            private TSource _value;
            private Exception _error;

            public override void OnNext(TSource value)
            {
                lock (_gate)
                {
                    if (_waiting)
                    {
                        _value = value;
                        _kind = NotificationKind.OnNext;
                        _semaphore.Release();
                    }

                    _waiting = false;
                }
            }

            public override void OnError(Exception error)
            {
                base.Dispose();

                lock (_gate)
                {
                    //
                    // BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
                    //
                    _error = error;
                    _kind = NotificationKind.OnError;

                    if (_waiting)
                        _semaphore.Release();

                    _waiting = false;
                }
            }

            public override void OnCompleted()
            {
                base.Dispose();

                lock (_gate)
                {
                    //
                    // BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
                    //
                    _kind = NotificationKind.OnCompleted;

                    if (_waiting)
                        _semaphore.Release();

                    _waiting = false;
                }
            }

            public override bool TryMoveNext(out TSource current)
            {
                var done = false;

                lock (_gate)
                {
                    _waiting = true;

                    //
                    // BREAKING CHANGE v2 > v1.x - Next doesn't block indefinitely when it reaches the end.
                    //
                    done = _kind != NotificationKind.OnNext;
                }

                if (!done)
                {
#if !NO_CDS
                    _semaphore.Wait();
#else
                    _semaphore.WaitOne();
#endif
                }

                //
                // When we reach this point, we released the lock and got the next notification
                // from the observer. We assume no concurrent calls to the TryMoveNext method
                // are made (per general guidance on usage of IEnumerable<T>). If the observer
                // enters the lock again, it should have quit it first, causing _waiting to be
                // set to false, hence future accesses of the lock won't set the _kind, _value,
                // and _error fields, until TryMoveNext is entered again and _waiting is reset
                // to true. In conclusion, the fields are stable for read below.
                //
                // Notice we rely on memory barrier acquire/release behavior due to the use of
                // the semaphore, not the lock (we're still under the lock when we release the
                // semaphore in the On* methods!).
                //
                switch (_kind)
                {
                    case NotificationKind.OnNext:
                        current = _value;
                        return true;
                    case NotificationKind.OnError:
                        _error.Throw();
                        break;
                    case NotificationKind.OnCompleted:
                        break;
                }

                current = default(TSource);
                return false;
            }
        }
    }
}
#endif