summaryrefslogtreecommitdiff
path: root/mcs/class/corlib/System.Threading.Tasks
diff options
context:
space:
mode:
Diffstat (limited to 'mcs/class/corlib/System.Threading.Tasks')
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/Task.cs92
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs13
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs2
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs57
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs11
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs6
-rw-r--r--mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs2
7 files changed, 145 insertions, 38 deletions
diff --git a/mcs/class/corlib/System.Threading.Tasks/Task.cs b/mcs/class/corlib/System.Threading.Tasks/Task.cs
index 62aa14fddc..796b863336 100644
--- a/mcs/class/corlib/System.Threading.Tasks/Task.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/Task.cs
@@ -49,8 +49,8 @@ namespace System.Threading.Tasks
// parent is the outer task in which this task is created
readonly Task parent;
- // contAncestor is the Task on which this continuation was setup
- readonly Task contAncestor;
+ // A reference to a Task on which this continuation is attached to
+ Task contAncestor;
static int id = -1;
static readonly TaskFactory defaultFactory = new TaskFactory ();
@@ -182,6 +182,9 @@ namespace System.Threading.Tasks
if (IsContinuation)
throw new InvalidOperationException ("Start may not be called on a continuation task");
+ if (IsPromise)
+ throw new InvalidOperationException ("Start may not be called on a promise-style task");
+
SetupScheduler (scheduler);
Schedule ();
}
@@ -208,6 +211,9 @@ namespace System.Threading.Tasks
if (IsContinuation)
throw new InvalidOperationException ("RunSynchronously may not be called on a continuation task");
+ if (IsPromise)
+ throw new InvalidOperationException ("RunSynchronously may not be called on a promise-style task");
+
RunSynchronouslyCore (scheduler);
}
@@ -220,11 +226,13 @@ namespace System.Threading.Tasks
if (scheduler.RunInline (this, false))
return;
} catch (Exception inner) {
- throw new TaskSchedulerException (inner);
+ var ex = new TaskSchedulerException (inner);
+ TrySetException (new AggregateException (ex), false, true);
+ throw ex;
}
Schedule ();
- Wait ();
+ WaitCore (Timeout.Infinite, CancellationToken.None, false);
}
#endregion
@@ -330,18 +338,28 @@ namespace System.Threading.Tasks
ContinueWith (new TaskContinuation (continuation, options));
}
- internal void ContinueWith (IContinuation continuation)
+ internal bool ContinueWith (IContinuation continuation, bool canExecuteInline = true)
{
if (IsCompleted) {
+ if (!canExecuteInline)
+ return false;
+
continuation.Execute ();
- return;
+ return true;
}
continuations.Add (continuation);
// Retry in case completion was achieved but event adding was too late
- if (IsCompleted && continuations.Remove (continuation))
+ if (IsCompleted) {
+ continuations.Remove (continuation);
+ if (!canExecuteInline)
+ return false;
+
continuation.Execute ();
+ }
+
+ return true;
}
internal void RemoveContinuation (IContinuation continuation)
@@ -438,7 +456,7 @@ namespace System.Threading.Tasks
return true;
}
- internal bool TrySetException (AggregateException aggregate)
+ internal bool TrySetException (AggregateException aggregate, bool cancellation, bool observed)
{
if (IsCompleted)
return false;
@@ -450,8 +468,19 @@ namespace System.Threading.Tasks
return false;
}
-
- HandleGenericException (aggregate);
+
+ if (cancellation) {
+ ExceptionSlot.Exception = aggregate;
+ Thread.MemoryBarrier ();
+
+ CancelReal ();
+ } else {
+ HandleGenericException (aggregate);
+ }
+
+ if (observed)
+ exSlot.Observed = true;
+
return true;
}
@@ -500,7 +529,9 @@ namespace System.Threading.Tasks
void InnerInvoke ()
{
if (IsContinuation) {
- invoker.Invoke (contAncestor, state, this);
+ var ancestor = contAncestor;
+ contAncestor = null;
+ invoker.Invoke (ancestor, state, this);
} else {
invoker.Invoke (this, state, this);
}
@@ -639,7 +670,7 @@ namespace System.Threading.Tasks
if (millisecondsTimeout < -1)
throw new ArgumentOutOfRangeException ("millisecondsTimeout");
- bool result = WaitCore (millisecondsTimeout, cancellationToken);
+ bool result = WaitCore (millisecondsTimeout, cancellationToken, true);
if (IsCanceled)
throw new AggregateException (new TaskCanceledException (this));
@@ -651,13 +682,13 @@ namespace System.Threading.Tasks
return result;
}
- internal bool WaitCore (int millisecondsTimeout, CancellationToken cancellationToken)
+ internal bool WaitCore (int millisecondsTimeout, CancellationToken cancellationToken, bool runInline)
{
if (IsCompleted)
return true;
// If the task is ready to be run and we were supposed to wait on it indefinitely without cancellation, just run it
- if (Status == TaskStatus.WaitingToRun && millisecondsTimeout == Timeout.Infinite && scheduler != null && !cancellationToken.CanBeCanceled)
+ if (runInline && Status == TaskStatus.WaitingToRun && millisecondsTimeout == Timeout.Infinite && scheduler != null && !cancellationToken.CanBeCanceled)
scheduler.RunInline (this, true);
bool result = true;
@@ -957,11 +988,23 @@ namespace System.Threading.Tasks
if (millisecondsDelay < -1)
throw new ArgumentOutOfRangeException ("millisecondsDelay");
- var task = new Task (TaskActionInvoker.Delay, millisecondsDelay, cancellationToken, TaskCreationOptions.None, null, TaskConstants.Finished);
+ if (cancellationToken.IsCancellationRequested)
+ return TaskConstants.Canceled;
+
+ var task = new Task (TaskActionInvoker.Empty, null, cancellationToken, TaskCreationOptions.None, null, null);
task.SetupScheduler (TaskScheduler.Default);
-
- if (millisecondsDelay != Timeout.Infinite)
- task.scheduler.QueueTask (task);
+
+ if (millisecondsDelay != Timeout.Infinite) {
+ var timer = new Timer (delegate (object state) {
+ var t = (Task) state;
+ if (t.Status == TaskStatus.WaitingForActivation) {
+ t.Status = TaskStatus.Running;
+ t.Finish ();
+ }
+ }, task, millisecondsDelay, -1);
+
+ task.ContinueWith (new DisposeContinuation (timer));
+ }
return task;
}
@@ -1068,6 +1111,9 @@ namespace System.Threading.Tasks
internal static Task<TResult[]> WhenAllCore<TResult> (IList<Task<TResult>> tasks)
{
+ if (tasks.Count == 0)
+ return FromResult(new TResult[0]);
+
foreach (var t in tasks) {
if (t == null)
throw new ArgumentException ("tasks", "the tasks argument contains a null element");
@@ -1228,7 +1274,7 @@ namespace System.Threading.Tasks
public AggregateException Exception {
get {
- if (exSlot == null)
+ if (exSlot == null || !IsFaulted)
return null;
exSlot.Observed = true;
return exSlot.Exception;
@@ -1269,7 +1315,7 @@ namespace System.Threading.Tasks
}
}
- TaskExceptionSlot ExceptionSlot {
+ internal TaskExceptionSlot ExceptionSlot {
get {
if (exSlot != null)
return exSlot;
@@ -1314,6 +1360,12 @@ namespace System.Threading.Tasks
}
}
+ bool IsPromise {
+ get {
+ return invoker == TaskActionInvoker.Promise;
+ }
+ }
+
internal Task ContinuationAncestor {
get {
return contAncestor;
diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs b/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs
index 513b09968b..72a486f7a8 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TaskActionInvoker.cs
@@ -35,6 +35,7 @@ namespace System.Threading.Tasks
abstract class TaskActionInvoker
{
public static readonly TaskActionInvoker Empty = new EmptyTaskActionInvoker ();
+ public static readonly TaskActionInvoker Promise = new EmptyTaskActionInvoker ();
public static readonly TaskActionInvoker Delay = new DelayTaskInvoker ();
sealed class EmptyTaskActionInvoker : TaskActionInvoker
@@ -290,12 +291,10 @@ namespace System.Threading.Tasks
sealed class FuncTaskSelected<TResult> : TaskActionInvoker
{
readonly Func<Task, TResult> action;
- readonly Task[] tasks;
- public FuncTaskSelected (Func<Task, TResult> action, Task[] tasks)
+ public FuncTaskSelected (Func<Task, TResult> action)
{
this.action = action;
- this.tasks = tasks;
}
public override Delegate Action {
@@ -306,8 +305,8 @@ namespace System.Threading.Tasks
public override void Invoke (Task owner, object state, Task context)
{
- var result = ((Task<int>) owner).Result;
- ((Task<TResult>) context).Result = action (tasks[result]);
+ var result = ((Task<Task>) owner).Result;
+ ((Task<TResult>) context).Result = action (result);
}
}
@@ -492,9 +491,9 @@ namespace System.Threading.Tasks
return new ActionTaskSelected (action);
}
- public static TaskActionInvoker Create<TResult> (Func<Task, TResult> action, Task[] tasks)
+ public static TaskActionInvoker CreateSelected<TResult> (Func<Task, TResult> action)
{
- return new FuncTaskSelected<TResult> (action, tasks);
+ return new FuncTaskSelected<TResult> (action);
}
#endregion
diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs b/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs
index f4b3a3b81d..f0bc902310 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TaskCompletionSource.cs
@@ -113,7 +113,7 @@ namespace System.Threading.Tasks
if (aggregate.InnerExceptions.Count == 0)
throw new ArgumentNullException ("exceptions");
- return source.TrySetException (aggregate);
+ return source.TrySetException (aggregate, false, false);
}
public bool TrySetResult (TResult result)
diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs b/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs
index 8189df744b..12eaa8be19 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TaskContinuation.cs
@@ -110,18 +110,41 @@ namespace System.Threading.Tasks
}
}
- class ActionContinuation : IContinuation
+ class AwaiterActionContinuation : IContinuation
{
readonly Action action;
- public ActionContinuation (Action action)
+ public AwaiterActionContinuation (Action action)
{
this.action = action;
}
public void Execute ()
{
- action ();
+ //
+ // Continuation can be inlined only when the current context allows it. This is different to awaiter setup
+ // because the context where the awaiter task is set to completed can be anywhere (due to TaskCompletionSource)
+ //
+ if ((SynchronizationContext.Current == null || SynchronizationContext.Current.GetType () == typeof (SynchronizationContext)) && TaskScheduler.IsDefault) {
+ action ();
+ } else {
+ ThreadPool.UnsafeQueueUserWorkItem (l => ((Action) l) (), action);
+ }
+ }
+ }
+
+ class SchedulerAwaitContinuation : IContinuation
+ {
+ readonly Task task;
+
+ public SchedulerAwaitContinuation (Task task)
+ {
+ this.task = task;
+ }
+
+ public void Execute ()
+ {
+ task.RunSynchronouslyCore (task.scheduler);
}
}
@@ -179,7 +202,7 @@ namespace System.Threading.Tasks
}
if (exceptions != null) {
- owner.TrySetException (new AggregateException (exceptions));
+ owner.TrySetException (new AggregateException (exceptions), false, false);
return;
}
@@ -239,7 +262,7 @@ namespace System.Threading.Tasks
}
if (exceptions != null) {
- owner.TrySetException (new AggregateException (exceptions));
+ owner.TrySetException (new AggregateException (exceptions), false, false);
return;
}
@@ -316,6 +339,7 @@ namespace System.Threading.Tasks
sealed class CountdownContinuation : IContinuation, IDisposable
{
readonly CountdownEvent evt;
+ bool disposed;
public CountdownContinuation (int initialCount)
{
@@ -330,12 +354,33 @@ namespace System.Threading.Tasks
public void Dispose ()
{
+ disposed = true;
+ Thread.MemoryBarrier ();
+
evt.Dispose ();
}
public void Execute ()
{
- evt.Signal ();
+ // Guard against possible race when continuation is disposed and some tasks may still
+ // execute it (removal was late and the execution is slower than the Dispose thread)
+ if (!disposed)
+ evt.Signal ();
+ }
+ }
+
+ sealed class DisposeContinuation : IContinuation
+ {
+ readonly IDisposable instance;
+
+ public DisposeContinuation (IDisposable instance)
+ {
+ this.instance = instance;
+ }
+
+ public void Execute ()
+ {
+ instance.Dispose ();
}
}
}
diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs b/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs
index 9330fd40a5..a558c622c5 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TaskFactory.cs
@@ -214,9 +214,14 @@ namespace System.Threading.Tasks
TaskCreationOptions creationOptions,
TaskScheduler scheduler)
{
- Task<TResult> t = new Task<TResult> (function, state, cancellationToken, creationOptions);
- t.Start (scheduler);
+ var t = new Task<TResult> (function, state, cancellationToken, creationOptions);
+ //
+ // Don't start cancelled task it would throw an exception
+ //
+ if (!t.IsCompleted)
+ t.Start (scheduler);
+
return t;
}
#endregion
@@ -305,7 +310,7 @@ namespace System.Threading.Tasks
{
CheckContinueArguments (tasks, continuationFunction, continuationOptions, scheduler);
- var cont = Task.WhenAnyCore (tasks).ContinueWith<TResult> (TaskActionInvoker.Create (continuationFunction, tasks), cancellationToken, continuationOptions, scheduler);
+ var cont = Task.WhenAnyCore (tasks).ContinueWith<TResult> (TaskActionInvoker.CreateSelected (continuationFunction), cancellationToken, continuationOptions, scheduler);
return cont;
}
diff --git a/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs b/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs
index 618c185084..a4031c0a50 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TaskScheduler.cs
@@ -100,6 +100,12 @@ namespace System.Threading.Tasks
return id;
}
}
+
+ internal static bool IsDefault {
+ get {
+ return currentScheduler == null || currentScheduler == defaultScheduler;
+ }
+ }
public virtual int MaximumConcurrencyLevel {
get {
diff --git a/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs b/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs
index c38a566e2c..e1c9a6842c 100644
--- a/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs
+++ b/mcs/class/corlib/System.Threading.Tasks/TpScheduler.cs
@@ -50,7 +50,7 @@ namespace System.Threading.Tasks
return;
}
- ThreadPool.UnsafeQueueUserWorkItem (callback, task);
+ ThreadPool.QueueWorkItem (callback, task);
}
static void TaskExecuterCallback (object obj)