I have been investigating the possibility of 'protecting' an observable source from a slow observer, by throwing away any new items that come in while the observer is working.
After looking at ObserveLatestOn
(and finding it a bit hard to understand...), I chanced on this answer, which suggested doing it in Subscribe, rather than inside the Observable monad.
Which gives something like this (versions with delegate overloads not shown):
public static IDisposable NonBlockingSubscribe<T>(this IObservable<T> observable,
IObserver<T> observer)
{
Task task = null;
return observable.Subscribe(value =>
{
if(task == null || task.IsCompleted)
task = Task.Run(() => observer.OnNext(value));
}, observer.OnError, observer.OnCompleted);
}
The challenge now is working out how to write a test for this using ReactiveTest
and virtual time.
I've seen the answer to this question, which involves using TestScheduler to generate a 'ticker' which can be used in turn to create an AsyncSelector, but I'm a bit stuck working out how to go from there to something which can 'wait for a given number of ticks'.
Aucun commentaire:
Enregistrer un commentaire