mercredi 17 juin 2020

Testing delays in Rx

I'm trying to work out how one could go about testing the following function, which adds monitoring around the internal queue of Observable.ObserveOn.

public IObservable<T> MonitorBuffer<T>(IObservable<T> source, Action<int> monitor, TimeSpan interval, IScheduler scheduler)
{
    return Observable.Create<T>(ob =>
    {
        int count = 0;

        return new CompositeDisposable(source
            .Do(_ => Interlocked.Increment(ref count))
            .ObserveOn(scheduler)
            .Do(_ => Interlocked.Decrement(ref count))
            .Subscribe(ob),
            Observable.Interval(interval, scheduler).Select(_ => count).DistinctUntilChanged().Subscribe(monitor)
        );
    });
}

I envisage something like this:

var ts = new TestScheduler();
var input = Enumerable.Range(1, 8).Select(i => OnNext(i * 10, i)).ToArray();
var hot = ts.CreateHotObservable(input);
var observer = ts.CreateObserver<int>();
var log = new Subject<int>();
var monitor = ts.CreateObserver<int>();
var ticks = TimeSpan.FromTicks(5);
var buffered = MonitorBuffer(hot, log.OnNext, ticks, ts);
log.Subscribe(monitor);

buffered.Do(x => { /*if(x == 3) Introduce delay here */}).Subscribe(observer);

ts.AdvanceTo(100);
observer.Messages.AssertEqual(...);
monitor.Messages.AssertEqual(...);

The question is, what can I put in the Do to get the desired effect of a temporary downstream delay.

I'm looking for results something like this:

//time:    0--------10--------20--------30--------40--------50--------60--------70--------
//source:  ---------1---------2---------3---------4---------5---------6---------7---------
//output:  ---------1---------2-----------------------------345-------6---------7---------
//log:     ----0-------------------------1---------2---------2----0-----------------------

(NB: I asked a similar question a while ago, but it wasn't very clear, and it's a bit late for a complete rewrite now).

Aucun commentaire:

Enregistrer un commentaire