Monday, 14 January 2013

Early exit of parallel loops

This short article will present different techniques of early exiting parallel loops.

Cancelling a PLINQ loop


The following code presents some simple code that demonstrates how to cancel a PLINQ loop.


            //Create a cancellation token source 
            var cts = new CancellationTokenSource();
 
            var nums = Enumerable.Range(0, 100);

            //Support cancellation token source 
            var result = nums.AsParallel().WithCancellation(cts.Token).Select(n => Math.Pow(n, 2));

            //create a task that enumerates the PLINQ query above 
            var enumTask = Task.Factory.StartNew(() =>
            {
                try
                {
                    foreach (var r in result)
                    {
                        Console.WriteLine("Got result: {0}", r);
                        Thread.Sleep(100); //slow things a bit down 
                    }
                }
                catch (OperationCanceledException oce)
                {
                    Console.WriteLine("Caught exception of type: {0}, message: {1}", oce.GetType().Name, oce.Message);
                }
            }); 
          
            //create a cancelling task 
            var cancellingTask = Task.Factory.StartNew(() =>
            {
                Thread.Sleep(500);
                cts.Cancel(); 
            });

            //Wait for both tasks 
            Task.WaitAll(enumTask, cancellingTask);  

            //Wait for user input before exiting 
            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey(); 

Note that although cancelling the token source with the Cancel method, this does not mean that no iterations will be executed afterwards. This will vary upon the amount of iterations already started in parallel. In PLINQ, use the WithCancellation method to supply a cancellation token to use for cancelling the PLINQ Query when it is enumerated (executed). Make note that cancelling a PLINQ query with a cancellation token will actually create an OperationCanceledException. This must be caught in a try-catch block.

Breaking a Parallel For or Parallel Foreach loop

It is possible to break a parallel for loop using the Break method of the ParallelLoopState object passed in, which means we must use specific overload(s) of the Parallel.For and Parallel.ForEach method of the Parallel class in TPL. Let's investigate the Break() method of a Parallel.ForEach loop:

   var nums = Enumerable.Range(0, 10000); 

            var parallelLoopResult = Parallel.ForEach(nums, 
           (int n, ParallelLoopState state) => {
                var item = Math.Pow(n, 2); 
                Console.WriteLine("Got item {0}", item);
                if (item > 1000)
                    state.Break();
            });

            Console.WriteLine(parallelLoopResult.IsCompleted);
            Console.WriteLine(parallelLoopResult.LowestBreakIteration.Value);

            //Wait for user input before exiting 
            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey(); 

In the code above, note that in addition to the usual Parallel.ForEach index being passed in the lambda action, one also passes the ParallelLoopState. In addition, both Parallel.For and Parallel.ForEach will return a ParallelLoopResult. From this result it is possible to get different information such as if the loop actually was completed and the LowestBreakIteration.

Stopping a parallel loop


Let's look at also stopping a Parallel.For loop with the Stop method.

       var parallelLoopResult = Parallel.For(0, 10000, (int n, ParallelLoopState state) =>
            {
                var item = Math.Pow(n, 2);
                Console.WriteLine("Got item {0}", item);
                if (item > 1000000)
                    state.Stop();
            });

            Console.WriteLine(parallelLoopResult.IsCompleted);
            Console.WriteLine(parallelLoopResult.LowestBreakIteration.HasValue);

            //Wait for user input before exiting 
            Console.WriteLine("Press any key to continue ..");
            Console.ReadKey(); 

So to summarize, cancelling a PLINQ query is possible using the WithCancellation method providing a CancellationToken from a CancellationTokenSource. To exit early a Parallel For or a Parallel ForEach method, use the Stop() or the Break() method of the ParallelLoopState. The ParallelLoopResult object returned from executing a Parallel For or Parallel ForEach loop contains information about the execution of the parallel loop, such as if it is completed and the LowestBreakIteration. The last value is a nullable int. If it has no value, the break method is not executed. If IsCompleted is false, either Stop or Break is executed. If IsCompleted is false and LowestBreakIteration has no value, Stop was called. If IsCompleted is false and LowestBreakIteration has value, Break was called. If both IsCompleted is true and LowestBreakIteration HasValue is false - which means the parallel loop executed in ordinary manner and neither Stop or Cancel was called on the ParallelLoopState.

Saturday, 12 January 2013

ConcurrentQueue in TPL

This article will present some simple code to make use of the ConcurrentQueue collection that lives in the System.Collections.Concurrent namespace in TPL. The collections available in this namespace are the following:
  • ConcurrentBag
  • ConcurrentQueue
  • ConcurrentStack
  • ConcurrentDictionary
  • BlockingCollection
This article will focus on the ConcurrentQueue collection. This class is much like a Queue, but it is also thread safe and therefore supports concurrency in a better manner than the ordinary Queue collection. To enqueue items in the ConcurrentQueue, one uses the Enqueue method. To dequeue items in the ConcurrentQueue, one uses the TryDequeue Method. If this method returns true, the out parameter will contain the object returned from the ConcurrentQueue. Make note also that as the ordinary Queue we add and remove items from the queue in the ordinary First-In First-Out manner (FIFO), obviously because it is called a queue. The next code shows how to use the ConcurrentQueue:

 class Program
    {

        static void Main(string[] args)
        {

            ConcurrentQueue<int> queue = new ConcurrentQueue<int>(); 
            foreach (var i in Enumerable.Range(0, 1000))
            {
                queue.Enqueue(i); 
            }

            Parallel.ForEach(queue, new ParallelOptions { 
                MaxDegreeOfParallelism = System.Environment.ProcessorCount },
                q =>
            {
                int currentItem;
                if (queue.TryDequeue(out currentItem))
                    Console.WriteLine("Got item: {0}", currentItem);
            }); 

            Console.WriteLine("Press any key to continue ...");
            Console.ReadKey(); 

        }

In the code above, the code methods Enqueue and TryDequeue is used of the ConcurrentQueue class. Also note the use of the out parameter and the fact that TryDequeue returns a boolean flag that shows if the dequeueing was successful or not. There is a also a method called TryPeek which will inspect the item returned from the front of the queue, but it will not remove the item from the queue. Often, one will use ConcurrentDictionary also and BlockingCollection is often used to implement Producer-Consumer patterns in a parallel world. ConcurrentStack and ConcurrentBag will also be used in many situations. Of all the concurrent classes, ConcurrentBag looks the simplest to use. The nice part of these classes is that we do not have think too much about locking access to them, since this is already implemented. Actually, do not lock access to these collections at all, one only has to understand each collection and call the correct methods upon them to interact with the collection. Problems around synchronization and parallel access will already be resolved.

Continuing tasks in TPL

This article will present code where a task is continued with another task in TPL. Let's look at some code for this:

  class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Running the first task ...");
                Thread.Sleep(2000);
                Console.WriteLine("First task done");
            }).ContinueWith((antecendent) =>
             {
                 Console.WriteLine("Continuing the second task ...");
                 Thread.Sleep(3000);
                 Console.WriteLine("Second task done");
             }, TaskContinuationOptions.NotOnFaulted | TaskContinuationOptions.NotOnCanceled);

            task.Wait(); 

            Console.WriteLine("Press any key to continue ...");
            Console.ReadKey(); 

        }
    }

A task is started with Task.Factory.StartNew and the empty lambda contains a block following that contains the code that will run the work of the first task. Now, the ContinueWith extension method is used next to continue with the next task. I have specified TaskContinuationOptions here with NotOnFaulted and NotOnCanceled, this is an overload and one does not have to specify this always. An often used continuing TaskContinuationOption is OnlyOnFaulted. Let's look last at some code that actually returns some data and gives that data to the next task continuing the work. This makes it possible to partition the work performed in much like a pipeline, one calls this kind of parallelism as dataflow parallelism.

 class Program
    {
        static void Main(string[] args)
        {
            var task = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Running the first task ...");
                Thread.Sleep(2000);
                Console.WriteLine("First task done, returning some result");
                return 42;
            }).ContinueWith((antecendent) =>
             {
                 Console.WriteLine("Continuing the second task ...");
                 Thread.Sleep(3000);
                 Console.WriteLine("Got result from first task: {0}", antecendent.Result);
                 Console.WriteLine("Second task done");
                 return antecendent.Result * 2; 
             }).ContinueWith((antecendent) => {
                 Console.WriteLine("Continuing the third task ...");
                 Thread.Sleep(3000);
                 Console.WriteLine("Got result from second task: {0}", antecendent.Result);
                 Console.WriteLine("Second task done");
                 return antecendent.Result * 2; 
             });  

            task.Wait();

            Console.WriteLine("Final result: {0}", task.Result);

            Console.WriteLine("Press any key to continue ...");
            Console.ReadKey();

        }

In the code above, one not only continue a task with another task, but then continue with a third task. There can be arbitrary number of tasks being chained in this way. We propagate the result from the previous example by accessing the Result of the antecendent task (previous task). This makes it possible to partition a problem into stages - a pipeline of tasks doing individual processing to create the final result, much like how a factory works. It is possible to propagate values that are not only value types such as int, but also Reference types (arbitrary complex classes). This shows that TPL is able to help you compose the total amount of work to be processed into stages in a pipeline fashion in a simple way. Keep in mind that there are more optimized way of doing this following a more advanced Producer-Consumer approach if one wishes to implement a true pipeline with TPL.

Simple child tasks in TPL

This short article will present some code that displays how to perform the steps necessary to create a child task in the Task Parallel Library (TPL). To create a child task, simple specify the TaskCreationOptions set to TaskCreationOptions.AttachedToParent. The simple console application presents this next:

    var nums = ParallelEnumerable.Range(0, 10000);

    var parent = Task.Factory.StartNew(() =>
            {
                Console.WriteLine("Parent task starting");
                Thread.Sleep(2000);
                Console.WriteLine("Parent task done");

                   Task.Factory.StartNew(() =>
                    {
                        Console.WriteLine("Child task starting");
                        Thread.Sleep(5000);
                        Console.WriteLine("Child task done");
                    
                    }, TaskCreationOptions.AttachedToParent);               

            });

      parent.Wait(); 

      Console.WriteLine("Press any key to continue ...");
      Console.ReadKey(); 
                     
The console app code above waits for the parent task and will not continue to the next line until not only the parent task is performed and executed, but also its child tasks are finished. In other ways, a task can have multiple child tasks. However, only the child tasks that specifies a TaskCreationOptions set to AttachedToParent will actually be waited upon. Also note that creating a next Task is done using the Task.Factory.StartNew calls. This is the preferred option to create new tasks, but one can also instantiate a Task with the new keyword, but then you have to remember to start the task also. Task.Factory.StartNew not only instantiates a new task but also starts it immediately. In the next short article, the ContinueWith construct in TPL will be presented. This is a construct that allows one to pipeline tasks, one after another. This is known as dataflow parallelism.

Sunday, 6 January 2013

Map and Reduce in PLinq

This article will present some code that can be used to apply a Map and a Reduce transformation of a provided input array of a given type of objects, using PLINQ. For developers familiar with LINQ, PLINQ is the parallel version of Linq. First, the source code will be presented, then the code will be explained.

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;

namespace TestParallelMap
{
    class Program
    {

        static void Main(string[] args)
        {
            var nums = Enumerable.Range(0, 1001).ToArray(); 
            Func<int,TaggedResult<int,double>> mapFunction = n => new TaggedResult<int,double>(n, Math.Pow(n, 2)); 
            var result = ParallelMap(nums, mapFunction);

            foreach (var r in result)
            {
                Console.WriteLine("Got result {0} for the num {1}", r.OutputValue, r.InputtedValue);
            }

            Console.WriteLine("Press the any key to continue ...");
            Console.ReadKey();

            var resultSecond = ParallelReduce(nums, 0, (first, second) => first + second);

            Console.WriteLine("Got result {0}", resultSecond);  
         
            Console.WriteLine("Press the any key to continue ...");
            Console.ReadKey();

        }

        private static TValue ParallelReduce<TValue>(TValue[] source, TValue seedValue, Func<TValue, TValue, TValue> reduceFunction)
        {
            return source.AsParallel().AsOrdered().Aggregate(
                seedValue,
                (localResult, nextValue) => reduceFunction(localResult, nextValue),
                (overallResult, localResult) => reduceFunction(overallResult, localResult), 
                 overallResult => overallResult); 
                  
        }

        private static TOutput[] ParallelMap<TInput, TOutput>(TInput[] source, Func<TInput, TOutput> mapFunction) 
        {
            return source.AsParallel().AsOrdered().Select(x => mapFunction(x)).ToArray(); 
        }

        private class TaggedResult<TInput, TOutput>
        {
            public TaggedResult(TInput inputtedValue, TOutput outputtedValue)
            {
                InputtedValue = inputtedValue;
                OutputValue = outputtedValue; 
            }

            public TInput InputtedValue { get; set; }

            public TOutput OutputValue { get; set; }
        }

    }
}


The source code presented above is a simple console application. It is written in C# (obviously) and uses PLINQ to prosess the input array. The Map method will transform each value in the input array with a provided mapfunction. The source code uses Func to provide a function pointer to the method that will be executed to produce each item. The generic arguments control the type of the resulting array:


        private static TOutput[] ParallelMap<TInput, TOutput>(TInput[] source, Func<TInput, TOutput> mapFunction) 
        {
            return source.AsParallel().AsOrdered().Select(x => mapFunction(x)).ToArray(); 
        }

Note that PLINQ is very similar to LINQ and the main difference is the AsParallel() method. Also note that AsOrdered() will mean that the ordering will be maintained in the output array. If the order does not matter, there is a performance gain by omitting the call to AsOrdered().. Further, note that one uses the Select method of PLINQ to perform a projection where one at the same time performes the map method. The Reduce method is a bit more complicated. It uses PLINQ's Aggregate method, which really is a bit tough of understanding. Let's repeat the source code:

  private static TValue ParallelReduce<TValue>(TValue[] source, TValue seedValue, Func<TValue, TValue, TValue> reduceFunction)
        {
            return source.AsParallel().AsOrdered().Aggregate(
                seedValue,
                (localResult, nextValue) => reduceFunction(localResult, nextValue),
                (overallResult, localResult) => reduceFunction(overallResult, localResult), 
                 overallResult => overallResult); 
                  
        }


Actually, the PLINQ aggregate method is very versatile and powerful method when you want to process an entire array and perform a reduction method, in our case just a summation, but this could be anything, for example statistical methods and so on. The first argument is the seed value, the next parameter is a lambda expression that will perform the stepwise part of the reduction, the next parameter is the lambda expression that merges the stepwise result with the overall result - and the final fourth parameter is just a simple lambda which tells we do not want to do anything with the final overall result than propagate the value to the processed final result which then is returned. There are many more patterns that are a bit more complicated too, like MapReduce (which combines map and reduce), but this will be a relatively gentle introduction to the functionality that PLINQ presents. It is impressive to think that all the thread handling will be automatically be handled by PLINQ. From relatively simple code one can perform a multitude of different transformation from this source code with Map and Reduce, powered by PLINQ and generics.