Une mauvaise utilisation de rx lors de l’écriture d’une couche d’accès à des services peut conduire à des cas embarassants avec des erreurs mal gérées, des appels qui ne partent lorsqu’ils le devraient, et même des résultats incorrects … le tout nuisant fortement à la qualité de votre application.
Afin d’éviter que vous tapiez bêtement sur la techno, je vous ai préparé un petit meddley de trucs à pas faire vs trucs à faire.
Pour commencer, les subscribe imbriqués, c’est vilain. Par exemple, je veux faire une requête sur une url de service, je crée donc une source observable dans laquelle je vais créer ma requête puis je récupère la réponse avec un FromAsyncPattern sur lequel je vais faire un subscribe afin de traiter le résultat et renvoyer mes items à l’observer.
public IObservable<Player> GetPlayers()
{
return Observable.Create<Player>(observer =>
{
try
{
var request = WebRequest.CreateHttp("http://localhost/WcfService1/Service1.svc/Players");
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Subscribe(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
var players = dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
foreach (var player in players)
observer.OnNext(player);
observer.OnCompleted();
}
});
}
catch (Exception e)
{
observer.OnError(e);
}
return () => { };
});
}
Ce qu’il faut savoir, c’est que si une erreur se produit dans la callback du subscribe ci dessus, elle ne sera pas récupérée automatiquement par le onError de l’observer. Essayer l’exemple ci dessous où je fais volontairement un throw…
return Observable.Create<Player>(observer =>
{
try
{
var request = WebRequest.CreateHttp("http://localhost/WcfService1/Service1.svc/Players");
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Subscribe(response =>
{
throw new Exception("test !");
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
var players = dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
foreach (var player in players)
observer.OnNext(player);
observer.OnCompleted();
}
});
}
catch (Exception e)
{
observer.OnError(e);
}
return () => { };
});
BIM

Mon exception n’est pas catchée, l’application crash bêtement …
Deuxième raison qui devrait vous convraincre de ne pas imbriquer des subscribe : la complexité du code. Dans l’exemple précédent, je me contentais de créer une requête pour d’un seul FromAsyncPattern pour récupérer la réponse. Si maintenant je veux faire un appel en POST à mon service, je vais devoir faire un autre appel à FromAsyncPattern afin de récupérer le stream de ma requête. Je vais donc écrire quelque chose du genre…
public IObservable<Player> GetPlayers()
{
return Observable.Create<Player>(observer =>
{
try
{
var request = WebRequest.CreateHttp("http://localhost/WcfService1/Service1.svc/Players");
request.Method = "POST";
Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream)()
.Subscribe(stream =>
{
var bytes = Encoding.UTF8.GetBytes("mes données post");
stream.Write(bytes, 0, bytes.Length);
stream.Close();
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Subscribe(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
var players = dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
foreach (var player in players)
observer.OnNext(player);
observer.OnCompleted();
}
});
});
}
catch (Exception e)
{
observer.OnError(e);
}
return () => { };
});
On commence déjà à avoir quelque chose d’illisible. Sachant que si je veux gérer un peu plus proprement mes erreurs et le cas où je dois disposer mes “sous requêtes” car la principale se fait disposer, je me retrouve avec quelque chose du genre…
public IObservable<Player> GetPlayers()
{
IDisposable getRequestStream = null;
IDisposable getResponse = null;
return Observable.Create<Player>(observer =>
{
try
{
var request = WebRequest.CreateHttp("http://localhost/WcfService1/Service1.svc/Players");
request.Method = "POST";
getRequestStream =
Observable.FromAsyncPattern<Stream>(request.BeginGetRequestStream, request.EndGetRequestStream)()
.Subscribe(stream =>
{
try
{
var bytes = Encoding.UTF8.GetBytes("mes données post");
stream.Write(bytes, 0, bytes.Length);
stream.Close();
}
catch (Exception e)
{
observer.OnError(e);
}
getResponse =
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Subscribe(response =>
{
try
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
var players = dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
foreach (var player in players)
observer.OnNext(player);
observer.OnCompleted();
}
}
catch (Exception e)
{
observer.OnError(e);
}
}, e =>
{
observer.OnError(e);
});
}, e =>
{
observer.OnError(e);
});
}
catch (Exception e)
{
observer.OnError(e);
}
return () =>
{
if (getRequestStream != null)
getRequestStream.Dispose();
if (getResponse != null)
getResponse.Dispose();
};
});
}
C’est totalement imbuvable et inmaintenable. Et le pauvre développeur qui va passer après vous ne va pas comprendre ce qui a motivé votre choix de Rx. Et franchement, avec un code comme ça, même vous vous devriez remettre en cause votre choix de rx.
Donc, on oublie les subscribe imbriqués, et à la place on va plutôt chainer les appels Rx. Exemple ci dessous, grâce au meilleur des opérateurs Linq, le SelectMany, je peux créer ma requête, récupérer son stream, y écrire des choses, puis récupérer la réponse et enchainer avec un autre Select pour parser mon résultat (cf mon article de blog précédent).
Si une erreur est levée, elle sera remontée correctement à votre observer. Si vous faîtes un dispose sur votre subscription, tout sera proprement arrêté. Et surtout, le code est beaucoup plus concis et lisible.
return Observable.Create<WebRequest>(observer =>
{
try
{
var r = (HttpWebRequest)WebRequest.CreateHttp(url);
r.Method = "POST";
observer.OnNext(r);
observer.OnCompleted();
}
catch (Exception e)
{
observer.OnError(e);
}
return () => { };
})
.SubscribeOn(Scheduler.ThreadPool)
.SelectMany(r => Observable.FromAsyncPattern<Stream>(r.BeginGetRequestStream, r.EndGetRequestStream)(),
(r, s) => new { request = r, stream = s })
.Do(a =>
{
var bytes = Encoding.UTF8.GetBytes("mes données post");
a.stream.Write(bytes, 0, bytes.Length);
a.stream.Close();
})
.Select(a => a.request)
.SelectMany(r => Observable.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)());
De la même manière, c’est grâce au select many que je vais pouvoir combiner des résultats de mon service. Par exemple, j’ai une méthode GetPlayers qui doit me renvoyer une liste de joueurs, sans score. Une méthode GetPlayerRanks qui me renvoie les scores d’un joueur donné. Je veux faire une méthode GetPlayersWithRanks qui doit faire un appel à GetPlayers puis pour chaque joueur récupérer ses scores et me renvoyer des objets Player correctement hydratés.
La magie réside dans le sélecteur que l’on peut passer comme deuxième argument au selectmany.
public class PlayerService
{
public IObservable<Player> GetPlayers()
{
return ObservableExtensions.GetResponseWithRetries("http://ipv4.fiddler/WcfService1/Service1.svc/Players")
.SelectMany(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
}
});
}
public IObservable<List<Rank>> GetPlayerRanks(int playerId)
{
return ObservableExtensions.GetResponseWithRetries("http://ipv4.fiddler/WcfService1/Service1.svc/Ranks/" + playerId)
.Select(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Rank>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Rank>;
}
});
}
public IObservable<Player> GetPlayersWithRanks()
{
return GetPlayers()
.SelectMany(p => GetPlayerRanks(p.Id), (p, r) => new { player = p, ranks = r })
.Do(a => a.player.Ranks = a.ranks)
.Select(a => a.player);
}
}
Avec Fiddler qui reste ouvert dans un coin, je peux vérifier qu’il y a bien le bon nombre de requêtes qui passent.

L’autre gros point a éviter (sauf dans certains cas particuliers mais j’y reviendrais dans un prochain post), c’est l’utilisation de subject plutôt que d’un Observable.Create. Par exemple, testez la méthode GetPlayers ci dessous.
public IObservable<Player> GetPlayers()
{
var subject = new Subject<Player>();
var request = WebRequest.CreateHttp("http://ipv4.fiddler/WcfService1/Service1.svc/Players");
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Subscribe(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
var players = dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
foreach (var player in players)
subject.OnNext(player);
subject.OnCompleted();
}
});
return subject;
}
Là où vous consommez les données, gardez uniquement l’appel à la méthode GetPlayers et commentez le subscribe. Executez le projet et jetez un coup d’oeil à Fiddler.
La requête va partir alors qu’aucun observer ne s’est abonné à la source observable ! ! !
var observable = service.GetPlayers();
//observable.Subscribe(player =>
// {
// }, error =>
// {
// });
Si l’abonnement se fait à posteriori, et que les résultats du service avaient été obtenus avant, alors ils seraient perdus. Il faudrait alors commencer à essayer de contourner le problème avec un ReplaySubject par exemple. mais le comportement ne serait absolument pas logique si jamais on avait souhaité combiner n requête avec un merge par exemple.
Dans la majorité des cas, un observable.create suffira et vous simplifiera la vie lors de la création de votre couche d’accès. Pour bien comprendre la différence entre les deux, je vous renvoie vers ce précédent post : http://blogs.developpeur.org/leo/archive/2012/04/25/reactive-extensions-consommer-des-services-avec-rx-partie-1-cr-er-une-source-observable.aspx
En espérant que cet article vous aidera à créer des applications plus solides et plus réactives grâce à rx !
A bientôt
Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :