I am running a Parallel for loop which initially runs for the times = number of processors and performs a long running operation. Each task when finished, checks for more tasks and if found, calls itself again.
Here is how my code looks like:
static void Main(string[] args)
{
Int32 numberOfProcessors = Environment.ProcessorCount;
Parallel.For(0, numberOfProcessors, index => DoSomething(index, sqsQueueURL));
}
private async static Task DoSomething(int index, string queueURL)
{
var receiveMessageRequest = new ReceiveMessageRequest { QueueUrl = queueURL, WaitTimeSeconds = 20, MaxNumberOfMessages = 1, VisibilityTimeout = 1200 };
AmazonSQSClient sqsClient = new AmazonSQSClient(new AmazonSQSConfig { MaxErrorRetry = 4 });
var receiveMessageResponse = sqsClient.ReceiveMessage(receiveMessageRequest);
foreach (var msg in receiveMessageResponse.Messages)
{
PerformALongRunningTask......
//delete the message
DeleteMessageRequest deleteMessageRequest = new DeleteMessageRequest(queueURL, msg.ReceiptHandle);
AmazonSQSClient sqsDeleteClient = new AmazonSQSClient();
sqsDeleteClient.DeleteMessage(deleteMessageRequest);
//Do it again
DoSometing(index,queueURL)
}
}
I am getting very unpredictable results. It never completes all the tasks. It exits before completing everything.
What am i doing wrong here?
Shorter Code:
static Int32 TimesToLoop = 143;
static void Main(string[] args)
{
Int32 numberOfProcessors = Environment.ProcessorCount;
Parallel.For(0, numberOfProcessors, index => DoSomething(index));
Console.Read();
}
private async static Task DoSomething(int index)
{
if(TimesToLoop == 0)
{
return;
}
Console.WriteLine(index);
Interlocked.Decrement(ref TimesToLoop);
DoSomething(index++);
return;
}
I see various problems at the moment:
Parallel.For
is just starting the tasks. It won't wait for them to complete. It will wait for the DoSomething
method calls to return, but they're returning tasks representing the asynchronous operations, which probably won't have completed synchronously.await
the tasks returned anyway - it almost certainly should. It might want to create a collection of all the tasks created in the foreach
loop and await them all in one go, or it might want to await
them immediately. We can't tell.The simplest way of fixing the first part is probably to use Task.WaitAll
instead of Parallel.For
:
var tasks = Enumerable.Range(0, numberOfProcessors)
.Select(index => DoSomething(index, sqsQueueURL))
.ToList();
Task.WaitAll(tasks);
Unlike Task.WhenAll
, Task.WaitAll
will block until all the specified tasks have completed. Note that this is not safe to do if any of the tasks need to continue on the thread calling WaitAll
, precisely because it blocks - but if this is a console application and you're calling this from the initial thread, you'll be okay as the continuations will execute on the thread pool anyway.
See more on this question at Stackoverflow