本文介绍了信号量块,尽管它不是满的的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧!
问题描述
我目前正在尝试优化一个旧的、编写得非常糟糕的类,它处理大量数据,因此很容易花费数小时来运行一组数据。收集数据已经花费了很多时间,这是我在这里试图改进的地方。我知道这是相当难闻的代码,但这只是一个测试,如果这能改善什么的话,所以请只关注这个问题:
我尝试了SemaphoreSlim
和Semaphore
以减少并发运行的任务数量。我的数据集将生成大约70个任务,这可能会导致线程匮乏和整体性能下降。至少,它的反应变得不那么灵敏了。因此,我尝试将其保持在5个任务的同时,以获得更好的总体吞吐量。
现在,当我尝试等待我的任务进入Sempahore时,它会阻塞(使用aWait Also块的细长信号量),但它永远不会进入,即使信号量没有满。此代码位于异步方法内部,作为轻微的上下文提示。
Semaphore throttle = new Semaphore(0, 5);
try
{
foreach (var folder in folders)
{
// Wait in case there are already 5 tasks running to reduce thread starvation
collectionTasks.Add(Task.Run( () =>
{
// ReSharper disable once AccessToDisposedClosure
throttle.WaitOne();
return GetGapProfiles(folder.Value, progress, token);
}, token).ContinueWith(
t =>
{
// ReSharper disable once AccessToDisposedClosure
throttle.Release();
return t.Result;
}, TaskContinuationOptions.None));
}
// When all are loaded concat all results into one collection
await Task.WhenAll(collectionTasks);
}
catch (Exception ex)
{
Log.Error(ex, "Failed to collect profiles.");
}
finally
{
throttle.Dispose();
}
我就是不明白为什么这个会阻塞,永远不会进入GetGapProfiles
。有谁能解释一下这个吗?
推荐答案
public static class perTaskThrottle
{
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
this IEnumerable<TInput> sourceItems,
Func<TInput, Task<TResult>> func,
int concurrentTasks = 1)
{
return ForEachAsyncThrottled(sourceItems, func, CancellationToken.None, concurrentTasks);
}
/// <summary>
/// Run multiple tasks in parallel - up to concurrentTasks tasks may run at any one time
/// </summary>
/// <typeparam name="TInput"></typeparam>
/// <typeparam name="TResult"></typeparam>
/// <param name="sourceItems"></param>
/// <param name="func"></param>
/// <param name="token"></param>
/// <param name="concurrentTasks"></param>
/// <returns></returns>
public static async Task<IDictionary<TInput, TResult>> ForEachAsyncThrottled<TInput, TResult>(
this IEnumerable<TInput> sourceItems,
Func<TInput, Task<TResult>> func,
CancellationToken token,
int concurrentTasks = 1)
{
var result = new ConcurrentDictionary<TInput, TResult>();
var tasksList = new List<Task>();
using (var semaphoreSlim = new SemaphoreSlim(concurrentTasks))
{
foreach (var item in sourceItems)
{
token.ThrowIfCancellationRequested();
// if there are already concurrentTasks tasks executing, pause until one has completed ( semaphoreSlim.Release() )
await semaphoreSlim.WaitAsync(perTimeSpanHelper.Forever, token).ConfigureAwait(false);
token.ThrowIfCancellationRequested();
Action<Task<TResult>> okContinuation = async task =>
{
// the task has already completed if status is CompletedOk, but using await once more is safer than using task.Result
var taskResult = await task;
result[item] = taskResult;
};
// ReSharper disable once AccessToDisposedClosure
Action<Task> allContinuation = task => semaphoreSlim.Release();
tasksList.Add(func.Invoke(item)
.ContinueWith(okContinuation, TaskContinuationOptions.OnlyOnRanToCompletion)
.ContinueWith(allContinuation, token));
token.ThrowIfCancellationRequested();
}
if (!token.IsCancellationRequested)
{
await Task.WhenAll(tasksList).ConfigureAwait(false);
}
}
return result;
}
}
所以在您的情况下,您可以使用
var results = folders.ForEachAsyncThrottled( (f) => GetGapProfiles(f.Value), token, 5);
这篇关于信号量块,尽管它不是满的的文章就介绍到这了,希望我们推荐的答案对大家有所帮助,也希望大家多多支持编程学习网!
本站部分内容来源互联网,如果有图片或者内容侵犯您的权益请联系我们删除!