filter - filtering based on the last two current values in java rx -


i'm trying build simple application using java reactive extensions. have 2 streams emits temperature values continuously, want detect , filter out spikes of sensed temperature errors, doing need take account of precedent value can take account of variation so: filtering on pairs idea still unable find right operator in documentation. has idea of how can accomplish task? should make custom operator?

these streams:

double min = 50, max = 75, spikefreq = 0.01;     observable<double> tempstream1 = observable.create((             subscriber<? super double> subscriber) -> {         new tempstream(subscriber, min, max, spikefreq).start();     });      observable<double> tempstream2 = observable.create((             subscriber<? super double> subscriber) -> {         new tempstream(subscriber, min, max, spikefreq).start();     });  public class tempstream extends thread{  private subscriber<? super double> subscriber; private tempsensor sensor;  public tempstream(subscriber<? super double> subscriber, double min,         double max, double spikefreq) {     this.subscriber = subscriber;     sensor = new tempsensor(min, max, spikefreq); }      @override     public void run() {         random gen = new random(system.currenttimemillis());         while (!subscriber.isunsubscribed()) {             try {                 subscriber.onnext(sensor.getcurrentvalue());                 thread.sleep(1000 + gen.nextint() % 1000);             } catch (exception ex) {                 subscriber.onerror(ex);             }         }         subscriber.oncompleted();     } } 

thank help.

perhaps buffer operator (http://reactivex.io/documentation/operators/buffer.html) might in case. want use buffer count = 2 , skip = 1. way you'll "lookahead" of 1 element on stream.

e.g.:

stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max); 

notice example checks whether 2 values buffered since might happen 1 emitted upon completion of stream.


Comments