Publié mercredi 25 avril 2012 12:06 par Groc

Reactive Extensions : Consommer des services avec Rx Partie 1, créer une source observable

Pour créer une source obsevable sans implémenter IObservable à la main, il y a deux principales solutions :

- utiliser la méthode Observable.Create<T> pour créer une séquence cold;

- utiliser l’instance d’une classe qui implémente ISubject<T>, et qui implémentera donc aussi IObservable<T> et IObserver<T> pour créer une séquence hot.

 

Une séquence cold signifie qu’elle est passive, c’est à dire qu’elle n’est exécutée que lorsque quelqu’un s’y abonne. Dans le cas d’un Observable.Create, la func que l’on passe en paramètre à create sera appelée à chaque subscribe. C’est typiquement ce genre de flux que l’on utilisera pour une requête asynchrone vers un web service.

Cette func reçoit en paramètre un observer et doit renvoyer une action qui sera éxécutée lors du désabonnement.

class Program { static IObservable<int> GetColdStream() { return Observable.Create<int>(observer => { observer.OnNext(1); observer.OnNext(2); observer.OnCompleted(); return () => { Console.WriteLine("unsubscribed"); }; }); } static void ConsumeStream(IObservable<int> observable) { var disposable = observable.Subscribe(n => { Console.WriteLine(n); }, exp => { }, () => { Console.WriteLine("completed"); }); disposable.Dispose(); } static void Main(string[] args) { var coldStream = GetColdStream(); ConsumeStream(coldStream); ConsumeStream(coldStream); Console.ReadLine(); } }

donne

1 2 completed unsubscribed 1 2 completed unsubscribed

 

A l’inverse une séquence hot est active, c’est à dire qu’elle vit sa vie et qu’un subscribe agit un peu comme l’abonnement à un évènement et on ne sera notifié que de ce qui se passe après s’être abonné (sauf cas particuliers ci dessous). C’est le genre de flux que l’on utiliserait pour réagir à des situations “continues” du type mouvement de la souris ou traffic UDP.

Un moyen simple de mettre votre propre séquence hot en place est d’utiliser un Subject. En fait, il y a plusieurs implémentations de ISubject<T> dans l’espace de noms System.Reactive.Subjects, mais elles ont toutes un comportement différent. Ainsi il ne faut surtout pas tomber dans  le piège du bien mal nommé AsyncSubject<T>

 

D’abord, Subject<T>, c’est l’implémentation basique de ISubject<T>. On ne sera notifié que de l’arrivée de nouveaux éléments après avoir fait un subscribe.

static void Main(string[] args) { var subject = new Subject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne en sortie …

1 2 completed

ReplaySubject<T> notifiera automatiquement tout nouveau subscriber de l’intégralité des éléments qui sont passés dans le flux, y compris ceux antérieurs au subscribe.

static void Main(string[] args) { var subject = new ReplaySubject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne en sortie …

0 1 2 completed

Un peu dans le même principe, BehaviorSubject<T> va conserver la dernière valeur ou une valeur par défaut (qu’il faut obligatoirement passer au constructeur).

static void Main(string[] args) { var subject = new BehaviorSubject<int>(-1); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne …

-1 1 2 completed

Enfin, AsyncSubject<T> n’a rien d’asynchrone : il ne retournera qu’une seule valeur, la dernière passée au flux, et la retournera que suite à un appel à OnCompleted();

static void Main(string[] args) { var subject = new AsyncSubject<int>(); subject.OnNext(0); subject.Subscribe(n => { Console.WriteLine(n); }, error => { Console.WriteLine("error"); }, () => { Console.WriteLine("completed"); }); subject.OnNext(1); subject.OnNext(2); subject.OnCompleted(); Console.ReadLine(); }

… donne …

2 completed

Maintenant que nous savons comment créer une source, la prochaine étape sera d’exposer le résultat d’un service au travers de cette source.

Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :

Classé sous ,

Les 10 derniers blogs postés

- Intégration Yammer et SharePoint Online (Office 365), étape 1 … par Le blog de Patrick [MVP SharePoint] le 06-12-2013, 17:37

- [Dynamics CRM] Ajouter les dossiers de CRM au dossier Favoris d’Outlook par Christine Dubois le 06-10-2013, 15:50

- Visual Studio 2013 par Etienne Margraff le 06-04-2013, 10:26

- Configurer la collation SQL Server pour SharePoint par Blog de Jérémy Jeanson le 06-03-2013, 19:48

- Etendre le Team Web Access de TFS 2012 – Step 1: Création du plugin par Philippe Didiergeorges Aka Philess le 06-03-2013, 07:30

- Livre Blanc : Développer des applications NUI par Fathi Bellahcene le 06-01-2013, 11:35

- [Dynamics CRM 2011] Copier une vue d'entité par Christine Dubois le 05-29-2013, 13:20

- [Conf’SharePoint 2013] Mes présentations… par Le blog de Patrick [MVP SharePoint] le 05-28-2013, 09:04

- [wpdev] Storage bug in MediaLibrary.SavePicture par Kévin Gosse le 05-26-2013, 19:08

- VMMap en mode instrumentation sur système 64bit : attention à la plateforme cible du build .NET par CoqBlog le 05-25-2013, 22:25