System.Threading.Tasks & Parallelism in .NET 4.0

Parallel

Alas, all my hope & dreams & promises of a regular blog post, dashed... oh well, here's one now.

I've been playing with the System.Threading.Tasks namespace over the last few hours and it's quite neat.

We'll be rolling out some new software in the next few months at work which Processes SMS messages from Customers. In the past we had fudged together our own Multi-Threading/Multi-Pipeline code to try and get messages through the system as quickly as possible but it was fairly bloated to say the least. Enter the new Task and Parallel classes in .NET 4.0

In our system, we take a collection of MyObjects from a DataSource. (usually DB or MSMQ). Each MyObject represents an instruction sent in by a customer so the time it takes to process a single MyObject can vary greatly depending on what part of the system the instruction needs to interact with.

It doesn't make much sense from a UX point-of-view to process these in a linear sequential order waiting for one to complete before the next starts... e.g. if message A took 9 seconds to process & messages B-J took one second each, you would ideally want to process B-J on a seperate thread and get them out in Paralell. Enter the Parallel.ForEach<T> & Parallel.Invoke methods. These allow you to pass in a collection of Input Objects & a delegate to process them, and the CLR will handle all the multi thread/multi core messiness for you under the hood.

It comes with the usual abort handling code so that if you need to break out of the parallelism at some point (maybe due to some critical exception), you can. In our case this is necessary so we can requeue any messages that haven't yet been processed, if the Windows Service OnStop function is called.

public class MyObject
{
    public int MoID {get;set;}
    public bool ProcessedSuccessfully { get; set; }
    public MyObject(int moID)
    {
        MoID = moID;
        ProcessedSuccessfully = false;
    }
}

class Program
{
    public static void Main(string[] args)
    {
        //Using PARALELL
        //Simulate obtaining 50 objects from some datasource
        var inputs = Enumerable
            .Range(1, 50)
            .Select(i =&gt; new MyObject(i))
            .ToList();

        //Process Them in Paralell
        var po = new ParallelOptions() {
            MaxDegreeOfParallelism = 50
        };
        var pmR = Parallel.ForEach<MyObject>(inputs
            , po, ProcessMessage);

        //Once Processing is complete
        Console.WriteLine("Processing Done");

        //Check for any we didn't process / maybe due to internal
        //exceptions requesting that the processing stop
        // and do something with them, like requeue them for later.
        var fails = inputs
            .Where(i =&gt; !i.ProcessedSuccessfully);
        var bqR = Parallel.ForEach<MyObject>(fails, po, PutBackOnQueue);

        //Using TASK
        Console.ReadKey();
    }

    public static void ProcessMessage(MyObject message
        , ParallelLoopState pls, long l)
    {
        //Simulate a reason to stop on Message 25
        if (message.MoID < 25 && pls != null)
            pls.Break();

        Console.WriteLine("Starting ID: {0:0000}", message.MoID);

        Thread.Sleep(1000);

        message.ProcessedSuccessfully = true;
        Console.WriteLine("Stopping ID: {0:0000}", message.MoID);
    }

    public static void PutBackOnQueue(MyObject message)
    {
        Console.WriteLine("Queueing ID: {0:0000}", message.MoID);
    }
}

Eoin Campbell

Eoin Campbell

Eoin Campbell
Dad, Husband, Coder, Architect, Nerd, Runner, Photographer, Gamer. I work primarily on the Microsoft .NET & Azure Stack for ChannelSight

CPU Spikes in Azure App Services

Working with Azure App Services and plans which have different CPU utilization profiles Continue reading

Building BuyIrish.com

Published on November 05, 2020

Data Partitioning Strategy in Cosmos DB

Published on June 05, 2018