PFx : adoptez le réflexe
Nous sommes beaucoup à en être persuadé : le Parallel Framework (PFx) va prendre de plus en plus d’importance dans les développements futurs.
Cependant, les démos sont souvent déconnectées de la réalité. C’est vrai les démos de fractales sur une machine avec 10000 cœurs s’est l’idéal pour une démo sur la parallélisation cependant force est de constater que dans la vraie vie, les fractales, je n’en ai jamais eu besoin et je ne pense pas être le seul.
Mon idée était donc de réaliser un exemple un peu plus inspiré de mon travail quotidien. Aussi ais-je fait un exemple qui tourne autour de… l’Entity Framework. 
Prenons un exemple très simple : une base avec une table Customers avec potentiellement beaucoup de row (100 000 dans mon cas ce qui n’a rien de monstrueux), une application WPF avec une DataGrid pour afficher les clients. Soit dit en passant, la grille WPF est beaucoup plus performante que la grille Winforms pour afficher le résultat puisqu’elle ne charge que les éléments visibles.
La classe ObjectQuery implémentant IListSource, pas de problème. (la propriété Customers est de type ObjectSet<Customer> qui hérite de ObjectQuery). Maintenant ajoutons des controls pour pouvoir filtrer les résultats (par exemple, un TextBox par colonne de type string). Dans l’exemple, nous n’utiliserons pas le pattern MVVM afin d’éviter de complexifié inutilement l’exemple. Bien entendu, on peut changer le DataSource de la grille afin de ne plus prendre l’ObjectSet Customers mais Customers.Where(…). Le résultat sera également un ObjectQuery donc pas de soucis. Sauf que… cela implique une nouvelle requête en base. Il vaut mieux éviter d’exécuter une nouvelle requête en base à chaque fois que la valeur d’un des control de filtre change. En effet, lors de l’ouverture de la Window, tous les clients ont déjà été récupérés.
En plus du problème de requête superflue, cela pose un autre problème : les nouveaux clients pas encore persistés en base ne seront plus présents dans la grille dès lors qu’un filtre sera appliqué (et ceci de façon irréversible tant qu’on n’appelle pas la méthode SaveChanges du contexte). A l’inverse, les clients supprimés seront présents tant que la suppression ne sera pas persistée. En effet, avec EF, on ne récupère que les données présentes en base. Pour contourner cela, on pourrait persister systématiquement les ajouts et suppressions et inclure le tout dans une transaction afin de pouvoir annuler les modifications cependant cette solution n’est vraiment pas idéale.
Ce qu’il faudrait c’est un système qui permette de travailler sur le cache de l’ObjectContext une fois les entités chargées. Pour cela, on peut utiliser la propriété ObjectStateManager sauf que… en travaillant sur l’ObjectStateManager, on aura qu’un IEnumerable<Customer>, il n’est donc plus possible d’automatiser l’ajout et la suppression.
Afin de contourner cela, j’ai décidé de créer une nouvelle classe :
public class ObjectSetDataSource<T> : IList<T>, ICollection<T>, IEnumerable<T>, IList, ICollection, IEnumerable, INotifyCollectionChanged where T : class, new()
{
[…]
private IEnumerable<T> AllEntities
{
get
{
if (_isLoaded == false)
{
foreach (var e in ObjectSet)
yield return e;
_isLoaded = true;
}
else
{
foreach (var e in ObjectSet.Context.ObjectStateManager.GetObjectStateEntries(EntityState.Added | EntityState.Modified | EntityState.Unchanged).Select(ose => ose.Entity).OfType<T>())
yield return e;
}
}
}
private void CreateEntitiesList()
{
_entities = null;
if (_allEntities == null)
_allEntities = AllEntities;
if (Predicate == null)
{
_entities = _allEntities.ToList();
return;
}
_entities = _allEntities.Where(Predicate).ToList();
}
public Func<T, bool> Predicate
{
get { return _predicate; }
set
{
_predicate = value;
CreateEntitiesList();
OnCollectionChanged(NotifyCollectionChangedAction.Reset);
}
}
}
Ok ça marche mais maintenant imaginons que l’on fasse le filtre à l’aide du TextChanged comme ceci :
private void lastNameTB_TextChanged(object sender, TextChangedEventArgs e)
{
Filter();
}
private void firstNameTB_TextChanged(object sender, TextChangedEventArgs e)
{
Filter();
}
private void Filter()
{
string lastName = lastNameTB.Text;
string firstName = firstNameTB.Text;
_customers.Predicate = c =>
{
int i = 0;
while (i < 100000) i++;
return c.LastName.StartsWith(lastName) && c.FirstName.StartsWith(firstName);
};
}
Notez au passage la boucle destiné à ralentir le traitement afin de bien observer ce qui se passe.
On va très vite observer des problèmes de performances. Imaginons que l’on veuille récupérer les clients dont le nom commence par “MEZ”.
Que va-t-il se passer ?
Premier problème : lorsque l’on va taper MEZ, à moins de faire un copier coller, on va probablement taper M puis E puis Z. Or avec notre traitement séquentiel on va d’abord récupérer la liste des clients dont le nom commence par M puis ceux dont le nom commence par ME et enfin ceux dont le nom commence par MEZ. Le problème vient du fait qu’il n’est pas possible d’annuler le traitement car l’évènement TextChanged ne sera pas déclenché tant qu’on n’aura pas fini. L’autre problème vient du fait que notre code n’a utilisé qu’un seul cœur de notre processeur et donc 50% du CPU dans le cas de mon dual core :
Le fait de paralléliser notre code va nous permettre de résoudre ce deuxième problème :
Pour utiliser 100% du CPU, il faudrait exécuter notre requête LINQ sur plusieurs threads. En effet, le filtrage des clients est indépendant entre chaque client. Cependant, hors de question de créer un thread par client ! D’où l’intérêt des tâches et dans le cas précis de PLINQ. Voici donc le code que je propose :
public class ObjectSetDataSource<T> : IList<T>, ICollection<T>, IEnumerable<T>, IList, ICollection, IEnumerable, INotifyCollectionChanged where T : class, new()
{
[…]
private void CreateEntitiesList()
{
_entities = null;
if (_allEntities == null)
_allEntities = AllEntities;
if (Predicate == null)
{
_entities = _allEntities.ToList();
return;
}
try
{
_entities = _allEntities.AsParallel.Where(Predicate).ToList();
}
catch (OperationCanceledException)
{
}
}
public Func<T, bool> Predicate
{
get { return _predicate; }
set
{
_predicate = value;
CreateEntitiesList();
OnCollectionChanged(NotifyCollectionChangedAction.Reset);
}
}
}
L’utilisation du CPU montre une réelle amélioration :
Tout ça uniquement grâce à l’extension method AsParallel, dur de faire plus simple !
Du coup notre requête LINQ ne va plus s’exécuter sur un seul cœur mais sur les n cœurs de l’ordinateur. L’extension method AsParallel va répartir le traitement en divisant la source de données en n (2 dans le cas de mon dual core). Il faut noter que du fait de l’exécution sur deux threads, le résultat (sans OrderBy) ne se retrouve pas forcément dans le même ordre que dans notre application mono-threadée mais dans notre cas, cela n’a aucune importance.
Cependant il reste un problème : il faudrait pouvoir annuler le calcul de la liste si le filtre change afin de ne pas avoir à calculer la liste pour M et ME quand le filtre souhaité est MEZ. Bien entendu, PFx propose tout ce qu’il faut pour cela :
public class ObjectSetDataSource<T> : IList<T>, ICollection<T>, IEnumerable<T>, IList, ICollection, IEnumerable, INotifyCollectionChanged where T : class, new()
{
[…]
private void CreateEntitiesList(cancellationToken = null)
{
_entities = null;
if (_allEntities == null)
_allEntities = AllEntities;
if (Predicate == null)
{
_entities = _allEntities.ToList();
return;
}
try
{
_entities = _allEntities.AsParallel().WithCancellation(cancellationToken.Token).Where(Predicate).ToList();
}
catch (OperationCanceledException)
{
}
}
public Func<T, bool> Predicate
{
get { return _predicate; }
set
{
_predicate = value;
if (_cancellationToken != null)
_cancellationToken.Cancel();
_cancellationToken = new CancellationTokenSource();
var _cancellationToken = _cancellationToken
new Task(t =>
{
CreateEntitiesList();
if (cancellationToken.IsCancellationRequested)
return;
var app = Application.Current;
if (app != null)
app.Dispatcher.BeginInvoke((Action)(() => OnCollectionChanged(NotifyCollectionChangedAction.Reset)));
}, cancellationToken).Start();
}
}
}
L’utilisation du CPU montre une nouvelle amélioration significative :
L’extension method WithCancellation permet d’annuler la requête LINQ durant son exécution.
L’idée de créer la tâche dans la propriété Predicate est de ne pas être obligé d’attendre la fin du traitement avec M comme filtre puis ME pour finalement filtrer par MEZ. Pour cela, nous nous exécutons le calcul de la liste en asynchrone. Cela permettra aussi de ne pas figer l’UI.
Enfin, la méthode BeginInvoke permet de déclencher l’évènement CollectionChanged dans le thread principal ce qui est obligatoire afin d’éviter l’exception “This type of CollectionView does not support changes to its SourceCollection from a thread different from the Dispatcher thread”.
Cet exemple, très courant, nous montre rapidement l’intérêt de PFx. Attention cependant, tous les algorithmes ne sont pas parralélisables. De plus, la parralélisation d’un code ajoute de la complexité et implique de bien maîtriser la programmation parallèle. Cependant si vous avez du temps pour vous former, je vous recommande très très très vivement de vous penchez sur PFx et la programmation parallèle en général. En effet, comme je le précisais au tout début, le Parallel Framework (PFx) va prendre de plus en plus d’importance dans les développements futurs.
Vous pouvez télécharger la classe complète ici.
Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :