Dans cet article, je vais montrer un exemple d’utilisation de rx pour la construction d’une couche de service dans une application wp7. Pour l’exemple, j’utilise un service qui expose en json une liste de joueurs et une méthode pour récupérer le score d’un joueur.
Basiquement, dans ce genre de scénario, avec ou sans rx, la première étape consiste à créer une web request.
var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");
Le premier opérateur qui nous sera utile est celui qui gère automatiquement le pattern async utilisé par les webrequest. Ainsi, la méthode ci dessous me retournera une séquence observable de webresponse.
Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
A partir de là, je peux utiliser des opérateurs de projection tels que Select ou SelectMany afin d’extraire une liste d’objets depuis le flux json de la réponse. Si une exception était levée à ce stade, elle serait automatiquement captée par l’observer.
Ainsi, je peux écrire la première méthode de ma couche de service.
public IObservable<Player> GetPlayers()
{
var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");
return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.SelectMany(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
}
});
}
Rx nous propose des opérateurs de base pour gérer automatiquement timeout et politique de retry. Ainsi, dans l’exemple ci dessous, en cas de congestion réseau, si le temps d’execution de la requête dépasse 20 secondes, une exception sera levée, et un nouvel essai aura lieu, et ainsi de suite 3 fois avant que l’exception soit remontée à mon observer.
public IObservable<Player> GetPlayers()
{
var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");
return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Timeout(TimeSpan.FromSeconds(20))
.Retry(3)
.SelectMany(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
}
});
}
Enfin, dans mon view model, mon observer recevra simplement les objets player un par un, et je pourrais gérer propement mon erreur.
playerService.GetPlayers()
.Subscribe(player =>
{
_players.Add(player);
}, error =>
{
});
Mamheureusement, si on execute l’application à ce stade, l’exception suivante apparaît.
En fait, le traitement effectué par la méthode FromAsyncPattern est déporté sur le thread pool. A partir de là, les opérations qui suivent sont executées sur le même thread, y compris les callbacks de l’observer.
Pour preuve, ajoutons quelques infos de debug …
private void Button_Click(object sender, RoutedEventArgs e)
{
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
var playerService = new PlayerService();
playerService.GetPlayers()
.Subscribe(player =>
{
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
_players.Add(player);
}, error =>
{
});
}
public IObservable<Player> GetPlayers()
{
var request = (HttpWebRequest)WebRequest.Create("http://localhost/WcfService1/Service1.svc/Players");
return Observable.FromAsyncPattern<WebResponse>(request.BeginGetResponse, request.EndGetResponse)()
.Timeout(TimeSpan.FromSeconds(20))
.Retry(3)
.SelectMany(response =>
{
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
}
});
}
Rx offre la possibilité de refaire passer le traitement sur un autre scheduler via la méthode ObserveOn, et, dans le cas du dispatcher, directement via la méthode ObserveOnDispatcher.
playerService.GetPlayers()
.ObserveOnDispatcher()
.Subscribe(player =>
{
Debug.WriteLine(Thread.CurrentThread.ManagedThreadId);
_players.Add(player);
}, error =>
{
});
Il y a un effet que j’aime bien sur WP7, c’est le fait de voir les éléments “popper” un par un dans une liste. Je me suis donc fait mon petit opérateur pour cet effet… La méthode zip renvoie le résullutat d’un selecteur que l’on applique à deux sources observables. L’observer sera alors notifié dès qu’un élément sera dispo dans les deux collections. Ainsi si je fais un zip entre mon observable de player et un timer de n ms, je recevrais bien un player tous les n ms.
public static IObservable<T> Cadence<T>(this IObservable<T> observable, TimeSpan cadence)
{
return observable.Zip(Observable.Timer(DateTime.Now, cadence), (r, t) => r);
}
Je peux appliquer mon nouvel opérateur.
playerService.GetPlayers()
.Cadence(TimeSpan.FromMilliseconds(100))
.ObserveOnDispatcher()
.Subscribe(player =>
{
_players.Add(player);
}, error =>
{
});
Il y a un point important qui peut encore poser problème : la création d’une webrequest peut lever une exception et nous n’avons pas géré ce cas. Par exemple avec …
var request = (HttpWebRequest)WebRequest.Create("htt://localhost/WcfService1/Service1.svc/Players");
Pour remédier à ce problème, nous allons créer une source observable comme vu dans le précédent article, et nous déporterons la création de la requête sur le threadpool. Ainsi la création ne sera plus bloquante et l’erreur pourra être gérée proprement via l’observer.
public static IObservable<WebResponse> GetResponseWithRetries(string url)
{
return Observable.Create<WebRequest>(observer =>
{
Scheduler.ThreadPool.Schedule(() =>
{
try
{
observer.OnNext((HttpWebRequest)WebRequest.Create(url));
observer.OnCompleted();
}
catch (Exception e)
{
observer.OnError(e);
}
});
return () => { };
})
.SelectMany(r => Observable.FromAsyncPattern<WebResponse>(r.BeginGetResponse, r.EndGetResponse)())
.Timeout(TimeSpan.FromSeconds(20))
.Retry(3);
}
La méthode de notre couche de service peut donc s’écrire de la manière suivante …
public IObservable<Player> GetPlayers()
{
return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/Players")
.SelectMany(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(List<Player>));
return dataContractJsonSerializer.ReadObject(responseStream) as List<Player>;
}
});
}
Ou dans le cas d’une autre méthode…
public IObservable<int> GetPlayerRankValue(int id)
{
return ObservableExtensions.GetResponseWithRetries("http://localhost/WcfService1/Service1.svc/PlayerRank/" + id)
.Select(response =>
{
using (var responseStream = response.GetResponseStream())
{
var dataContractJsonSerializer = new DataContractJsonSerializer(typeof(Rank));
return dataContractJsonSerializer.ReadObject(responseStream) as Rank;
}
})
.Select(rank => rank.Value);
}
Enfin, je peux même combiner très facilement le résultat de mes requêtes …
public IObservable<Player> GetPlayersWithRank()
{
return from player in GetPlayers()
select new Player
{
Id = player.Id,
Name = player.Name,
Rank = GetPlayerRankValue(player.Id).Single()
};
}
Voila, j’espère que cet article vous aura aidé à construire des applications plus réactives grâce à Rx !
Ce post vous a plu ? Ajoutez le dans vos favoris pour ne pas perdre de temps à le retrouver le jour où vous en aurez besoin :