diff options
Diffstat (limited to 'mcs/class/corlib/System.Threading.Tasks')
7 files changed, 145 insertions, 38 deletions
diff --git a/mcs/class/corlib/System.Threading.Tasks/Task.cs b/mcs/class/corlib/System.Threading.Tasks/Task.cs index 62aa14fddc..796b863336 100644 --- a/mcs/class/corlib/System.Threading.Tasks/Task.cs +++ b/mcs/class/corlib/System.Threading.Tasks/Task.cs @@ -49,8 +49,8 @@ namespace System.Threading.Tasks // parent is the outer task in which this task is created readonly Task parent; - // contAncestor is the Task on which this continuation was setup - readonly Task contAncestor; + // A reference to a Task on which this continuation is attached to + Task contAncestor; static int id = -1; static readonly TaskFactory defaultFactory = new TaskFactory (); @@ -182,6 +182,9 @@ namespace System.Threading.Tasks if (IsContinuation) throw new InvalidOperationException ("Start may not be called on a continuation task"); + if (IsPromise) + throw new InvalidOperationException ("Start may not be called on a promise-style task"); + SetupScheduler (scheduler); Schedule (); } @@ -208,6 +211,9 @@ namespace System.Threading.Tasks if (IsContinuation) throw new InvalidOperationException ("RunSynchronously may not be called on a continuation task"); + if (IsPromise) + throw new InvalidOperationException ("RunSynchronously may not be called on a promise-style task"); + RunSynchronouslyCore (scheduler); } @@ -220,11 +226,13 @@ namespace System.Threading.Tasks if (scheduler.RunInline (this, false)) return; } catch (Exception inner) { - throw new TaskSchedulerException (inner); + var ex = new TaskSchedulerException (inner); + TrySetException (new AggregateException (ex), false, true); + throw ex; } Schedule (); - Wait (); + WaitCore (Timeout.Infinite, CancellationToken.None, false); } #endregion @@ -330,18 +338,28 @@ namespace System.Threading.Tasks ContinueWith (new TaskContinuation (continuation, options)); } - internal void ContinueWith (IContinuation continuation) + internal bool ContinueWith (IContinuation continuation, bool canExecuteInline = true) { if (IsCompleted) { + if (!canExecuteInline) + return false; + continuation.Execute (); - return; + return true; } continuations.Add (continuation); // Retry in case completion was achieved but event adding was too late - if (IsCompleted && continuations.Remove (continuation)) + if (IsCompleted) { + continuations.Remove (continuation); + if (!canExecuteInline) + return false; + continuation.Execute (); + } + + return true; } internal void RemoveContinuation (IContinuation continuation) @@ -438,7 +456,7 @@ namespace System.Threading.Tasks return true; } - internal bool TrySetException (AggregateException aggregate) + internal bool TrySetException (AggregateException aggregate, bool cancellation, bool observed) { if (IsCompleted) return false; @@ -450,8 +468,19 @@ namespace System.Threading.Tasks return false; } - - HandleGenericException (aggregate); + + if (cancellation) { + ExceptionSlot.Exception = aggregate; + Thread.MemoryBarrier (); + + CancelReal (); + } else { + HandleGenericException (aggregate); + } + + if (observed) + exSlot.Observed = true; + return true; } @@ -500,7 +529,9 @@ namespace System.Threading.Tasks void InnerInvoke () { if (IsContinuation) { - invoker.Invoke (contAncestor, state, this); + var ancestor = contAncestor; + contAncestor = null; + invoker.Invoke (ancestor, state, this); } else { invoker.Invoke (this, state, this); } @@ -639,7 +670,7 @@ namespace System.Threading.Tasks if (millisecondsTimeout < -1) throw new ArgumentOutOfRangeException ("millisecondsTimeout"); - bool result = WaitCore (millisecondsTimeout, cancellationToken); + bool result = WaitCore (millisecondsTimeout, cancellationToken, true); if (IsCanceled) throw new AggregateException (new TaskCanceledException (this)); @@ -651,13 +682,13 @@ namespace System.Threading.Tasks return result; } - internal bool WaitCore (int millisecondsTimeout, CancellationToken cancellationToken) + internal bool WaitCore (int millisecondsTimeout, CancellationToken cancellationToken, bool runInline) { if (IsCompleted) return true; // If the task is ready to be run and we were supposed to wait on it indefinitely without cancellation, just run it - if (Status == TaskStatus.WaitingToRun && millisecondsTimeout == Timeout.Infinite && scheduler != null && !cancellationToken.CanBeCanceled) + if (runInline && Status == TaskStatus.WaitingToRun && millisecondsTimeout == Timeout.Infinite && scheduler != null && !cancellationToken.CanBeCanceled) scheduler.RunInline (this, true); bool result = true; @@ -957,11 +988,23 @@ namespace System.Threading.Tasks if (millisecondsDelay < -1) throw new ArgumentOutOfRangeException ("millisecondsDelay"); - var task = new Task (TaskActionInvoker.Delay, millisecondsDelay, cancellationToken, TaskCreationOptions.None, null, TaskConstants.Finished); + if (cancellationToken.IsCancellationRequested) + return TaskConstants.Canceled; + + var task = new Task (TaskActionInvoker.Empty, null, cancellationToken, TaskCreationOptions.None, null, null); task.SetupScheduler (TaskScheduler.Default); - - if (millisecondsDelay != Timeout.Infinite) - task.scheduler.QueueTask (task); + + if (millisecondsDelay != Timeout.Infinite) { + var timer = new Timer (delegate (object state) { + var t = (Task) state; + if (t.Status == TaskStatus.WaitingForActivation) { + t.Status = TaskStatus.Running; + t.Finish (); + } + }, task, millisecondsDelay, -1); + + task.ContinueWith (new DisposeContinuation (timer)); + } return task; } @@ -1068,6 +1111,9 @@ namespace System.Threading.Tasks internal static Task<TResult[]> WhenAllCore<TResult> (IList<Task<TResult>> tasks) { + if (tasks.Count == 0) + return FromResult(new TResult[0]); + foreach (var t in tasks) { if (t == null) throw new ArgumentException ("tasks", "the tasks argument contains a null element"); @@ -1228,7 +1274,7 @@ namespace System.Threading.Tasks public AggregateException Exception { get { - if (exSlot == null) + if (exSlot == null || !IsFaulted) return null; exSlot.Observed = true; return exSlot.Exception; @@ -1269,7 +1315,7 @@ namespace System.Threading.Tasks } } - TaskExceptionSlot ExceptionSlot { + internal TaskExceptionSlot ExceptionSlot { get { if (exSlot != null) return exSlot; @@ -1314,6 +1360,12 @@ namespace System.Threading.Tasks } } + bool IsPromise { + get { + return invoker == TaskActionInvoker.Promise; + } + } + internal Task ContinuationAncestor { get { return contAncestor; diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs b/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs index 513b09968b..72a486f7a8 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs @@ -35,6 +35,7 @@ namespace System.Threading.Tasks abstract class TaskActionInvoker { public static readonly TaskActionInvoker Empty = new EmptyTaskActionInvoker (); + public static readonly TaskActionInvoker Promise = new EmptyTaskActionInvoker (); public static readonly TaskActionInvoker Delay = new DelayTaskInvoker (); sealed class EmptyTaskActionInvoker : TaskActionInvoker @@ -290,12 +291,10 @@ namespace System.Threading.Tasks sealed class FuncTaskSelected<TResult> : TaskActionInvoker { readonly Func<Task, TResult> action; - readonly Task[] tasks; - public FuncTaskSelected (Func<Task, TResult> action, Task[] tasks) + public FuncTaskSelected (Func<Task, TResult> action) { this.action = action; - this.tasks = tasks; } public override Delegate Action { @@ -306,8 +305,8 @@ namespace System.Threading.Tasks public override void Invoke (Task owner, object state, Task context) { - var result = ((Task<int>) owner).Result; - ((Task<TResult>) context).Result = action (tasks[result]); + var result = ((Task<Task>) owner).Result; + ((Task<TResult>) context).Result = action (result); } } @@ -492,9 +491,9 @@ namespace System.Threading.Tasks return new ActionTaskSelected (action); } - public static TaskActionInvoker Create<TResult> (Func<Task, TResult> action, Task[] tasks) + public static TaskActionInvoker CreateSelected<TResult> (Func<Task, TResult> action) { - return new FuncTaskSelected<TResult> (action, tasks); + return new FuncTaskSelected<TResult> (action); } #endregion diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs b/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs index f4b3a3b81d..f0bc902310 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs @@ -113,7 +113,7 @@ namespace System.Threading.Tasks if (aggregate.InnerExceptions.Count == 0) throw new ArgumentNullException ("exceptions"); - return source.TrySetException (aggregate); + return source.TrySetException (aggregate, false, false); } public bool TrySetResult (TResult result) diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs b/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs index 8189df744b..12eaa8be19 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs @@ -110,18 +110,41 @@ namespace System.Threading.Tasks } } - class ActionContinuation : IContinuation + class AwaiterActionContinuation : IContinuation { readonly Action action; - public ActionContinuation (Action action) + public AwaiterActionContinuation (Action action) { this.action = action; } public void Execute () { - action (); + // + // Continuation can be inlined only when the current context allows it. This is different to awaiter setup + // because the context where the awaiter task is set to completed can be anywhere (due to TaskCompletionSource) + // + if ((SynchronizationContext.Current == null || SynchronizationContext.Current.GetType () == typeof (SynchronizationContext)) && TaskScheduler.IsDefault) { + action (); + } else { + ThreadPool.UnsafeQueueUserWorkItem (l => ((Action) l) (), action); + } + } + } + + class SchedulerAwaitContinuation : IContinuation + { + readonly Task task; + + public SchedulerAwaitContinuation (Task task) + { + this.task = task; + } + + public void Execute () + { + task.RunSynchronouslyCore (task.scheduler); } } @@ -179,7 +202,7 @@ namespace System.Threading.Tasks } if (exceptions != null) { - owner.TrySetException (new AggregateException (exceptions)); + owner.TrySetException (new AggregateException (exceptions), false, false); return; } @@ -239,7 +262,7 @@ namespace System.Threading.Tasks } if (exceptions != null) { - owner.TrySetException (new AggregateException (exceptions)); + owner.TrySetException (new AggregateException (exceptions), false, false); return; } @@ -316,6 +339,7 @@ namespace System.Threading.Tasks sealed class CountdownContinuation : IContinuation, IDisposable { readonly CountdownEvent evt; + bool disposed; public CountdownContinuation (int initialCount) { @@ -330,12 +354,33 @@ namespace System.Threading.Tasks public void Dispose () { + disposed = true; + Thread.MemoryBarrier (); + evt.Dispose (); } public void Execute () { - evt.Signal (); + // Guard against possible race when continuation is disposed and some tasks may still + // execute it (removal was late and the execution is slower than the Dispose thread) + if (!disposed) + evt.Signal (); + } + } + + sealed class DisposeContinuation : IContinuation + { + readonly IDisposable instance; + + public DisposeContinuation (IDisposable instance) + { + this.instance = instance; + } + + public void Execute () + { + instance.Dispose (); } } } diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs b/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs index 9330fd40a5..a558c622c5 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs @@ -214,9 +214,14 @@ namespace System.Threading.Tasks TaskCreationOptions creationOptions, TaskScheduler scheduler) { - Task<TResult> t = new Task<TResult> (function, state, cancellationToken, creationOptions); - t.Start (scheduler); + var t = new Task<TResult> (function, state, cancellationToken, creationOptions); + // + // Don't start cancelled task it would throw an exception + // + if (!t.IsCompleted) + t.Start (scheduler); + return t; } #endregion @@ -305,7 +310,7 @@ namespace System.Threading.Tasks { CheckContinueArguments (tasks, continuationFunction, continuationOptions, scheduler); - var cont = Task.WhenAnyCore (tasks).ContinueWith<TResult> (TaskActionInvoker.Create (continuationFunction, tasks), cancellationToken, continuationOptions, scheduler); + var cont = Task.WhenAnyCore (tasks).ContinueWith<TResult> (TaskActionInvoker.CreateSelected (continuationFunction), cancellationToken, continuationOptions, scheduler); return cont; } diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs b/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs index 618c185084..a4031c0a50 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs @@ -100,6 +100,12 @@ namespace System.Threading.Tasks return id; } } + + internal static bool IsDefault { + get { + return currentScheduler == null || currentScheduler == defaultScheduler; + } + } public virtual int MaximumConcurrencyLevel { get { diff --git a/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs b/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs index c38a566e2c..e1c9a6842c 100644 --- a/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs +++ b/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs @@ -50,7 +50,7 @@ namespace System.Threading.Tasks return; } - ThreadPool.UnsafeQueueUserWorkItem (callback, task); + ThreadPool.QueueWorkItem (callback, task); } static void TaskExecuterCallback (object obj) |