Applications can be single threaded or multithreaded. A single-threaded application is one in which the processor executes threads in a sequence, i.e., a thread would be scheduled by the operating system only if the execution of the currently running thread is complete. This approach doesn’t provide much system throughput (a measure of the amount of work done in unit time).
In contrast, a multithreaded application is one in which multiple threads reside in memory at the same point in time with one of them in execution state. In such applications, a thread needn’t wait for the turnaround time of the executing thread to be complete for it to be scheduled - threads are preempted and scheduled by the operating system according to their priorities. So, you have more throughput in such applications than you can expect in single-threaded applications. Moreover, single-threaded applications cannot take advantage of today’s multi-core systems. With a view to leverage the benefits of the multi-core systems, support for parallel programming has been included in .NET Framework.
Microsoft’s Parallel LINQ (PLINQ) is a concurrency execution engine for executing LINQ queries. It is a part of the managed concurrency library called Parallel Extensions Library (previously known as Parallel Framework Extensions or PFX). This article explores parallel programming concepts, PLINQ, discusses the new features and enhancements to the Parallel Extensions Library, and shows you how to implement declarative data parallelism in your applications using PLINQ. It also discusses how you can handle concurrency exceptions, the guidelines and best practices in using PLINQ in enterprise applications, and tips for improving the performance of PLINQ queries.
Prerequisites
To work with the code examples illustrated in this article, you should have one of the following installed in your system:
- Visual Studio 2008 with the Parallel Extensions Library
- Visual Studio 2010
Concurrency and Parallelism - How Do They Differ?
Concurrency and parallelism are two distinctly different concepts. Let’s consider a scenario in which two tasks, TaskA and TaskB, are to be executed concurrently. Now, TaskA and TaskB are concurrent if one of the following is true:
Note that concurrent execution of a program is possible on a single processor but parallelism is only possible on multi-core, multi-processor or distributed systems. The reason is that a single processor can at the same point in time execute one thread only. Incidentally, a thread is the path of execution within a process. In essence, parallelism is the ability to execute tasks simultaneously on different processors. This is constrained by the fact that if you need threads to be executed in parallel, you should have multi-core, multi-processor or distributed systems to execute them.
Two of the most important types of parallelism include data parallelism and task parallelism. While the former refers to the ability to distribute data across different parallel computing nodes and have threads in parallel execute the same instruction on different chunks of data, the later is the ability to execute tasks in parallel where each task may use the same or different data. In other words, task parallelism enables you to execute tasks asynchronously.
Parallel Extensions
Parallel Extensions (previously known as Parallel Framework Extensions or PFX) is a managed concurrency library that provides excellent support for imperative and declarative, data and task parallelism. It is a light-weight library that allows LINQ developers to leverage the benefits of multi-core systems with full support for all .NET query operators and minimal impact to the existing LINQ model. The Parallel Extensions library comprises of a set of APIs that can be used to take advantage of multiple cores in your system and implement data parallelism and task parallelism in your applications.
The Parallel Extensions library is comprised of:
- Task Parallel Library (TPL)
- Parallel LINQ (PLINQ)
The Task Parallel Library
The Task Parallel Library is a set of APIs in the System.Threading and System.Threading.Tasks namespaces in .NET Framework 4 that helps you to implement parallelism and concurrency to your applications.
The Parallel class is a part of the TPL and is contained in the System.Threading namespace. This class contains the static methods For, ForEach and Invoke. While you can use Parallel.For and Parallel.ForEach to parallelize loops and implement imperative data parallelism in your applications, Parallel.Invoke enables you to implement task parallelism with ease.
Consider the following “for” loop that stores the squares of integers from 1 to 25 in an integer list:
for (int i = 0; i < integerList.Count(); i++)
{
integerList[i] = (i + 1) * (i + 1);
}
To parallelize the “for” loop using the TPL, all you have to do is use the Parallel.For method of the Parallel class as shown below:
Parallel.For(0, integerList.Length, i =>
integerList[i] = (i + 1) * (i + 1));
The IsPrime() method below accepts an integer and checks to see if the number is prime, i.e., if the number is only divisible by itself and 1. If the number is prime, the method returns true, false otherwise.
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
Listing 1 makes use of the IsPrime() method to display all prime numbers from 1 to 999.
Now, to parallelize the “for” loop, you just need to make a call to the Parallel.For() method. Listing 2 shows how this can be accomplished.
The Parallel.ForEach() method is a multithreaded implementation of the foreach loop. It enables you to iterate over a set of data.
When parallelizing loops, you should consider parallelizing only the outer loops and not the inner ones unless the outer loop’s range is small and the inner loop has a large range so as to provide enough parallelism.
You can use the Parallel.Invoke() method to implement task parallelism in your applications. Suppose you have three tasks, TaskA, TaskB and TaskC that you would like to be executed in parallel. Let these three tasks be executed by the methods MethodA, MethodB and MethodC respectively. You can use the Parallel.Invoke() static method to execute these tasks in parallel as illustrated in the code snippet below:
Parallel.Invoke(
() => MethodA(),
() => MethodB(),
() => MethodC());
Parallel LINQ
Parallel LINQ or PLINQ as it is commonly called, is a parallel execution engine that runs on top of the managed environment of the .NET Framework and is used to execute LINQ queries on multi-core systems. It accepts LINQ to Object or LINQ to XML queries and runs them on multiple processors in your system.
Language Integrated Query (or LINQ as it is commonly called) is a query execution engine that provides a simplified framework for accessing relational data. LINQ runs on top of the managed environment of the .NET Framework and was first made available as part of .NET Framework 3.5 and Visual Studio 2008.
Under the covers, PLINQ works on the principle of partitioning - it breaks the input chunk of data into pieces and distributes these pieces to the processing cores of the system. Each of these processing cores would then process the data and then the resultant set of data would then be formed by merging the results as appropriate. You can use PLINQ to build applications that can take advantage of the parallel hardware for building applications that are high performant and scalable.
In their MSDN article, “Running Queries On Multi-Core Processors” Joe Duffy and Ed Essey mention: “PLINQ is a query execution engine that accepts any LINQ-to-Objects or LINQ-to-XML query and automatically utilizes multiple processors or cores for execution when they are available.” Reference: http://msdn.microsoft.com/en-us/magazine/cc163329.aspx
Declarative Data Parallelism Using PLINQ
Parallel LINQ (PLINQ) is a parallel execution engine that can be used to execute LINQ queries on multi-core systems by taking advantage of parallel hardware. PLINQ provides excellent support for implementing declarative data parallelism in your applications using the ParallelEnumerable and ParallelQuery classes. Both of these classes belong to the System.Linq namespace and help you to implement declarative data parallelism in your applications. The ParallelQuery class contains two methods namely, AsParallel and AsSequential.
To parallelize your existing LINQ to Objects or LINQ to XML queries, all you have to do is:
- Add a reference to the System.Concurrency.dll assembly in your application at compilation time
- Make a call to the System.Linq.ParallelEnumerable.AsParallel() extension method on the data
The AsParallel() extension method is defined in the PLINQ library as shown below:
public static class System.Linq.ParallelEnumerable
{
public static IParallelEnumerable<T> AsParallel<T>(
this IEnumerable<T> source);
//Other Standard Query Operators
}
The AsParallel is an overloaded extension method that can accept the degree of parallelism and the ParallelQueryOptions enumeration as parameters. The degree of parallelism is given by the total number of threads in use while the parallel query is in execution. The ParallelQueryOptions enumeration can have one of the two possible values: None and PreserveOrdering.
Consider the following code snippet that uses LINQ to display integers stored in an integer list:
int[] numList = new int[25];
for (int i = 0; i < numList.Count(); i++)
{
numList[i] = i + 1;
}
var result = (from n in numList select n);
foreach (var x in result)
{
Console.WriteLine(x);
}
To convert the query to PLINQ, all you have to do is use the AsParallel() method as shown here:
PLINQ would parallelize the query based on the number of processing cores available in the system. The AsParallel() method would return an object of type ParallelQuery<int>. This example, though not a great illustration of how you can benefit from using PLINQ in your applications, still could help you understand how to parallelize your queries using PLINQ.
The ForAll extension method in PLINQ is defined on ParallelEnumerable and works on any data of type ParallelQuery<T>. It can be used to process data returned as a result of execution of a parallel query. The following piece of code shows how ForAll() can be used:
String[] data = new String[26];
for (int index = 65; index <= 90; data[index - 65] =
((char)index).ToString(), index++) ;
var result = from x in data.AsParallel() select x;
result.ForAll(q => Console.WriteLine(q));
Listing 3 is the PLINQ version of Listing 2 - it shows how you can display prime numbers from 1 to 999 using PLINQ.
To find the time elapsed to execute the parallel query, you can make use of the Stopwatch class. Listing 4 shows how you can do this.
Analyzing PLINQ Performance
In this section I’ll write a program that will illustrate the performance differences between a LINQ and a PLINQ query. The program will display prime numbers between 1 to 999999 both using LINQ and PLINQ.
The GetTimeElasped() method shown below returns the time taken to execute an action.
private static TimeSpan GetTimeElasped(Action action)
{
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
action();
return stopWatch.Elapsed;
}
The following code snippet shows how the IsPrime() method can be called, once using LINQ and then using PLINQ query. The prime numbers are stored in a Dictionary. This example doesn’t display the primes - you’ve seen it already earlier. We would just need to retrieve the difference in the time taken for both these approaches.
for (int index = 0; index < 5; index++)
{
Console.WriteLine("Concurrent Execution: " +
GetTimeElasped(delegate
{
var dictionary = (from i in
Enumerable.Range(0, 999999)
where IsPrime(i)
select i).ToDictionary(i =>
i);
}));
Console.WriteLine("Parallel Execution: " +
GetTimeElasped(delegate
{
var dict = (from i in Enumerable.Range(0,
999999).AsParallel()
where IsPrime(i)
select i).ToDictionary(i => i);
}));
}
The Visual Studio 2010 IDE now comes with excellent support for debugging, profiling and analyzing applications that make use of parallel programming. You now have the Parallel Stacks Window that gives you the call stack information of all running threads and tasks at any given point in time and the Parallel Tasks Window that gives you all related information about a task, i.e., the task ID, the entry point, and the thread to which the task has been assigned, state information and the current location of the task.
Listing 5 shows the complete source code of this program. When you execute the program, the output would look similar to Figure 1. As you can see, the PLINQ version of the query runs much faster.
![Figure 1: Illustrating that a parallel query runs faster.](https://codemag.com/Article/Image/112071/Kanjilal_Figure 1.tif)
Preserving the Order of Data
int[] numList = new int[25];
for (int i = 1; i <= numList.Count(); i++)
{
numList[i - 1] = i;
}
int[] resultSet = (from x in
numList.AsParallel()
select x * x).ToArray();
for (int index = 0; index < resultSet.Length;
index++)
{
Console.WriteLine(resultSet[index]);
}
When this piece of code is executed, the obvious result should be that it would display the squares of integers from 1 to 25 sequentially. However, because PLINQ executes the query in parallel, the ordering may not be preserved in this sequence. If you were to preserve the ordering of the data displayed, i.e., if you would like to have the display of the integers in the sequence stated above, you would have to pass QueryOptions.PreserveOrdering as a parameter to the PLINQ query as shown in the following code snippet:
int[] numList = new int[25];
for (int i = 1; i <= numList.Count(); i++)
{
numList[i - 1] = i;
}
int[] resultSet = (from x in
numList.AsParallel(
QueryOptions.PreserveOrdering)
select x * x).ToArray();
for (int index = 0; index < resultSet.Length;
index++)
{
Console.WriteLine(resultSet[index]);
}
When you pass QueryOptions.PreserveOrdering as a parameter to the AsParallel() method, the ordering of the resultant data is preserved. However, preserving the order is costly in terms of performance as it leads to an overhead for PLINQ to sort the data before it returns it.
Merge Options
PLINQ partitions the input query to multiple partitions with each partition working on different parts of data on separate threads. You can use the WithMergeOptions<TSource> method to tell PLINQ the merge option you would like to opt for. Here’s an example:
var result = from p in
primeNumbers.AsParallel().AsOrdered()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
where IsPrime(p)
select p;
The ParallelMergeOptions enumeration can have one of the three possible values:
- Not Buffered
- Auto Buffered
- FullyBuffered
Handling Exceptions
Exceptions are errors that occur at runtime. The PLINQ API provides support for handling exceptions that might occur while queries are being executed in parallel. You can handle such exceptions using the System.Threading.AggregateException class. To retrieve the exception details of the exception that has occurred, you can use the InnerException property of this class.
Here’s an example that illustrates this:
try
{
ProcessData(somedata);
}
catch (AggregateException exceptionInstance)
{
Console.WriteLine(exceptionInstance.
InnerException.Message);
}
Summary
Multi-core processors have now become a part and parcel of today’s computers. Parallel LINQ (PLINQ) is a part of Parallel FX extensions for Microsoft .NET that helps you to leverage the multiple cores in today’s systems without having to change much of your LINQ queries or even having to bother how it would work behind the scenes. Parallel LINQ (PLINQ) provides a programming model that enables LINQ developers to take advantage of parallel hardware to build applications that are high performant and those that can scale.
In this article we have had a discussion on the concepts of concurrency and parallelism and how PLINQ can be used to parallelize queries and take advantage of today’s computer systems to build applications that are high performant and have the ability to scale.
Listing 1: Program to display prime numbers
using System;
using System.Threading.Tasks;
using System.Collections;
namespace PLINQ
{
class Program
{
static void Main(string[] args)
{
ArrayList primeNumbers = new ArrayList();
for (int i = 0; i < 999; i++)
{
if (IsPrime(i))
primeNumbers.Add(i);
}
for (int index = 0; index < primeNumbers.Count; index++)
{
Console.WriteLine(primeNumbers[index]);
}
Console.Read();
}
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
}
}
Listing 2: Using Parallel.For() to parallelize the query
using System;
using System.Threading.Tasks;
using System.Collections;
namespace PLINQ
{
class Program
{
static void Main(string[] args)
{
ArrayList primeNumbers = new ArrayList();
Parallel.For(0, 999, i =>
{
if (IsPrime(i))
primeNumbers.Add(i);
});
for (int index = 0; index < primeNumbers.Count; index++)
{
Console.WriteLine(primeNumbers[index]);
}
Console.Read();
}
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
}
}
Listing 3: Using PLINQ to display prime numbers
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Collections;
namespace PLINQ
{
class Program
{
static void Main(string[] args)
{
int[] primeNumbers = new int[1000];
for (int i = 2; i < primeNumbers.Length; i++)
primeNumbers[i - 2] = i;
var result = from p in
primeNumbers.AsParallel().AsOrdered()
where IsPrime(p) select p;
foreach (var v in result)
{
Console.WriteLine(v);
}
Console.Read();
}
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
}
}
Listing 4: Retrieving the time elasped in executing the PLINQ query
using System;
using System.Linq;
using System.Threading.Tasks;
using System.Collections;
using System.Diagnostics;
namespace PLINQ
{
class Program
{
static void Main(string[] args)
{
int[] primeNumbers = new int[1000];
for (int i = 2; i < primeNumbers.Length; i++)
primeNumbers[i - 2] = i;
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
var result = from p in primeNumbers.AsParallel()
where IsPrime(p)
select p;
foreach (var v in result)
{
Console.WriteLine(v);
}
stopWatch.Stop();
Console.WriteLine("Time elapsed: {0}",
stopWatch.Elapsed);
Console.Read();
}
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
}
}
Listing 5: Checking the difference in time for executing a query concurrently and in parallell
using System;
using System.Linq;
using System.Diagnostics;
namespace PLINQ
{
class Program
{
static void Main(string[] args)
{
for (int index = 0; index < 5; index++)
{
Console.WriteLine("Concurrent Execution: " +
GetTimeElasped(delegate
{
var dictionary = (from i in Enumerable.Range(0,
999999) where IsPrime(i)
select i).ToDictionary(i => i);
}));
Console.WriteLine("Parallel Execution: " +
GetTimeElasped(delegate
{
var dict = (from i in Enumerable.Range(0,
999999).AsParallel()
where IsPrime(i)
select i).ToDictionary(i => i);
}));
}
Console.Read();
}
private static bool IsPrime(int number)
{
if (number < 2)
return false;
int limit = (int)Math.Sqrt(number);
for (int i = 2; i <= limit; i++)
if (number % i == 0)
return false;
return true;
}
private static TimeSpan GetTimeElasped(Action action)
{
Stopwatch stopWatch = new Stopwatch();
stopWatch.Start();
action();
return stopWatch.Elapsed;
}
}
}