I'm trying to change Stephen Toub's ForEachAsync<T>
extension method into an extension which returns a result...
Stephen's extension:
public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
return Task.WhenAll(
from partition in Partitioner.Create(source).GetPartitions(dop)
select Task.Run(async delegate {
using (partition)
while (partition.MoveNext())
await body(partition.Current);
}));
}
My approach (not working; tasks get executed but result is wrong)
public static Task<TResult[]> ForEachAsync<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
return Task.WhenAll<TResult>(
from partition in Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
select Task.Run<TResult>(async () =
{
using (partition)
while (partition.MoveNext())
await body(partition.Current); //when I "return await" I get good results but only one per partition
return default(TResult);
}));
}
I know I somehow have to return (WhenAll?) the results from the last part but I didn't yet figure out how to do it...
Update
The result I get is just degreeOfParalleslism times null (I guess because of default(TResult)
) even though all the tasks get executed. I also tried to return await body(...)
and then the result was fine but only degreeOfParalleslism
number of tasks got executed.
Your LINQ query can only ever have the same number of results as the number of partitions - you're just projecting each partition into a single result.
If you don't care about the order, you just need to assemble the results of each partition into a list, then flatten them afterwards.
public static async Task<TResult[]> ExecuteInParallel<T, TResult>(this IList<T> source, int degreeOfParalleslism, Func<T, Task<TResult>> body)
{
var lists = await Task.WhenAll<List<TResult>>(
Partitioner.Create(source).GetPartitions(degreeOfParalleslism)
.Select(partition => Task.Run<List<TResult>>(async () =>
{
var list = new List<TResult>();
using (partition)
{
while (partition.MoveNext())
{
list.Add(await body(partition.Current));
}
}
return list;
})));
return lists.SelectMany(list => list).ToArray();
}
(I've renamed this from ForEachAsync
, as ForEach
sounds imperative (suitable for the Func<T, Task>
in the original) whereas this is fetching results. A foreach
loop doesn't have a result - this does.)
See more on this question at Stackoverflow