i'm trying build simple application using java reactive extensions. have 2 streams emits temperature values continuously, want detect , filter out spikes of sensed temperature errors, doing need take account of precedent value can take account of variation so: still unable find right operator in documentation. has idea of how can accomplish task? should make custom operator?
these streams:
double min = 50, max = 75, spikefreq = 0.01; observable<double> tempstream1 = observable.create(( subscriber<? super double> subscriber) -> { new tempstream(subscriber, min, max, spikefreq).start(); }); observable<double> tempstream2 = observable.create(( subscriber<? super double> subscriber) -> { new tempstream(subscriber, min, max, spikefreq).start(); }); public class tempstream extends thread{ private subscriber<? super double> subscriber; private tempsensor sensor; public tempstream(subscriber<? super double> subscriber, double min, double max, double spikefreq) { this.subscriber = subscriber; sensor = new tempsensor(min, max, spikefreq); } @override public void run() { random gen = new random(system.currenttimemillis()); while (!subscriber.isunsubscribed()) { try { subscriber.onnext(sensor.getcurrentvalue()); thread.sleep(1000 + gen.nextint() % 1000); } catch (exception ex) { subscriber.onerror(ex); } } subscriber.oncompleted(); } }
thank help.
perhaps buffer
operator (http://reactivex.io/documentation/operators/buffer.html) might in case. want use buffer
count = 2
, skip = 1
. way you'll "lookahead" of 1 element on stream.
e.g.:
stream.buffer(2,1).filter(buf -> buf.size() == 2 && buf.get(0) - buf.get(1) < max);
notice example checks whether 2 values buffered since might happen 1 emitted upon completion of stream.
Comments
Post a Comment