From 972def8697bc11e37399beaee70eae69cef87b0c Mon Sep 17 00:00:00 2001 From: houfeng Date: Fri, 23 Mar 2018 16:06:45 +0800 Subject: [PATCH 1/2] first rxjava --- .../rxjava/share/practices/Practice1.java | 28 ++++++++++- .../rxjava/share/practices/Practice2.java | 13 ++++- .../rxjava/share/practices/Practice3.java | 45 ++++++++++++++++- .../rxjava/share/practices/Practice4.java | 19 ++++++- .../rxjava/share/practices/Practice5.java | 50 ++++++++++++++----- 5 files changed, 137 insertions(+), 18 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..dbbe0f9 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -18,6 +18,10 @@ import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; +import io.reactivex.ObservableSource; +import io.reactivex.ObservableTransformer; +import io.reactivex.annotations.NonNull; +import io.reactivex.subjects.PublishSubject; /** * @author Baoyi Chen @@ -30,6 +34,28 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.method1(observable); } + + private Observable> method1(Observable source) { + return source.compose(new ObservableTransformer>() { + int index = 1; + @Override + public ObservableSource> apply(@NonNull Observable upstream) { + return Observable.create(emitter -> { + upstream.subscribe(sourceItem -> { + emitter.onNext(new Tuple2(index++, sourceItem)); + }, e -> emitter.onError(e), () -> emitter.onComplete()); + }); + } + }); + } + +// private Observable> method2(Observable source) { +// return source.>map(x -> new Tuple2(1, x)) +// .>scan((tuple2, s) -> { +// return new Tuple2(new Integer(1), ""); +// }); +// } + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..641018b 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -21,6 +21,7 @@ import io.reactivex.Observable; import io.reactivex.Single; +import java.util.HashMap; import java.util.Map; /** @@ -34,7 +35,11 @@ public class Practice2 { * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] */ public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); + return words.groupBy(s -> s).flatMap(s -> { + return s.count().toObservable().map(count -> { + return new Tuple2(s.getKey(), count.intValue()); + }); + }); } /* @@ -43,7 +48,11 @@ public Observable> wordCount1(Observable words) * 返回: Single[Map{a=2, b=1, c=2}] */ public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); + Map map = new HashMap<>(); + return this.wordCount1(words).>reduce(map,(countMap, item) -> { + countMap.put(item.getV1(), item.getV2()); + return countMap; + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..a26648f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -19,6 +19,11 @@ import io.reactivex.Maybe; import io.reactivex.Observable; +import java.util.Iterator; +import java.util.NoSuchElementException; +import java.util.Stack; + + /** * @author Baoyi Chen */ @@ -28,7 +33,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce(0, (s, i) -> s += i).toMaybe(); } /* @@ -42,7 +47,43 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + // flatMap no map + return observable.flatMap(s -> { + return Observable.fromIterable(iterator(s)); + }); + } + + private Iterable iterator(Node node) { + class Iter implements Iterator { + private Node root; + private Stack stack = new Stack<>(); + Iter(Node node) { + Node curr = node; + this.walk(node); + } + + private Node walk(Node node) { + this.stack.push(node); + if (node.left != null) { + this.walk(node.left); + } + if (node.right != null) { + walk(node.right); + } + return null; + } + @Override + public boolean hasNext() { + return this.stack.size() > 0; + } + + @Override + public Integer next() { + Node curr = this.stack.pop(); + return curr.value; + } + } + return node == null ? null : () -> new Iter(node); } public static class Node { diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..057e52c 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,12 @@ import io.reactivex.Observable; +import io.reactivex.ObservableEmitter; +import io.reactivex.ObservableOnSubscribe; +import io.reactivex.ObservableSource; +import io.reactivex.annotations.NonNull; +import io.reactivex.functions.Function; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +50,18 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.concatMap(new Function>() { + @Override + public ObservableSource apply(@NonNull String s) throws Exception { + return Observable.create(new ObservableOnSubscribe() { + @Override + public void subscribe(@NonNull ObservableEmitter emitter) throws Exception { + emitter.onNext(s); + emitter.onComplete(); + } + }).observeOn(Schedulers.newThread()); + } + }); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..2923eb4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,12 +16,19 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; +import io.reactivex.*; +import io.reactivex.annotations.NonNull; +import io.reactivex.functions.Function; +import io.reactivex.observables.GroupedObservable; +import io.reactivex.subjects.PublishSubject; +import io.reactivex.subjects.Subject; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; import java.util.List; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; /** @@ -35,7 +42,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.reduce(0L, (i, s) -> i+1); } /* @@ -44,7 +51,7 @@ public Single count(Observable source) { * return: Observable["a", "b", "c","b", "c", "d"] */ public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); + return source.flatMap(s -> Observable.fromIterable(s)); } /* @@ -53,7 +60,12 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); + return source.groupBy(s -> s).flatMap(g -> { + return Observable.create(emitter -> { + emitter.onNext(g.getKey()); + emitter.onComplete(); + }); + }); } /* @@ -62,7 +74,13 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); + List list = new ArrayList(); + return source.reduce(list, (c, i) -> { + if (conditon.test(i)) c.add(i); + return c; + }).toObservable().flatMap(s -> { + return Observable.fromIterable(s); + }); } /* @@ -71,7 +89,13 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); + AtomicInteger cursor = new AtomicInteger(0); + return source.reduce((seed, s) -> { + if (cursor.incrementAndGet() == index) { + return s; + } + return seed; + }); } /* @@ -80,7 +104,7 @@ public Maybe elementAt(Observable source, int index) { * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); + return Observable.range(0, count).flatMap(s -> source); } /* @@ -89,7 +113,7 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).concatMap(s -> s); } /* @@ -98,7 +122,7 @@ public Observable concat(List> source) { * return: Observable["a", "b"] */ public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); + return Observable.fromIterable(source).flatMap(s -> s); } /* @@ -107,7 +131,9 @@ public Observable merge(List> source) { * return: Observable["a", "b", "c"], 每个元素都延迟1秒 */ public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); + return source.concatMap(s -> { + return Observable.just(s).delay(delay, unit); + }); } } From 52ab381eca35c38e07a74ad4a141c9f4202df10e Mon Sep 17 00:00:00 2001 From: houfeng Date: Fri, 23 Mar 2018 17:55:24 +0800 Subject: [PATCH 2/2] optimize --- .../rxjava/share/practices/Practice1.java | 21 ++++++++---- .../rxjava/share/practices/Practice2.java | 1 + .../rxjava/share/practices/Practice3.java | 2 -- .../rxjava/share/practices/Practice4.java | 13 +------- .../rxjava/share/practices/Practice5.java | 33 ++++++++----------- 5 files changed, 30 insertions(+), 40 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index dbbe0f9..13923f1 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -34,7 +34,7 @@ public class Practice1 { * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 */ public Observable> indexable(Observable observable) { - return this.method1(observable); + return this.method3(observable); } private Observable> method1(Observable source) { @@ -51,11 +51,18 @@ public ObservableSource> apply(@NonNull Observable> method2(Observable source) { -// return source.>map(x -> new Tuple2(1, x)) -// .>scan((tuple2, s) -> { -// return new Tuple2(new Integer(1), ""); -// }); -// } + private Observable> method2(Observable source) { + return source.map(x -> new Tuple2(1, x)) + .scan((t, s) -> new Tuple2(t.getV1().intValue() + 1, s.getV2())); + } + + private Observable> method3(Observable source) { +// Observable.zip + return source.zipWith(Observable.range(1, Integer.MAX_VALUE), (o1, o2) -> { + return new Tuple2(o2, o1); + }); + } + + } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 641018b..7e4ddc9 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -36,6 +36,7 @@ public class Practice2 { */ public Observable> wordCount1(Observable words) { return words.groupBy(s -> s).flatMap(s -> { +// s.reduce() return s.count().toObservable().map(count -> { return new Tuple2(s.getKey(), count.intValue()); }); diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a26648f..5b25af9 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -55,10 +55,8 @@ public Observable iterate(Observable observable) { private Iterable iterator(Node node) { class Iter implements Iterator { - private Node root; private Stack stack = new Stack<>(); Iter(Node node) { - Node curr = node; this.walk(node); } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 057e52c..0734253 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -50,18 +50,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - return observable.concatMap(new Function>() { - @Override - public ObservableSource apply(@NonNull String s) throws Exception { - return Observable.create(new ObservableOnSubscribe() { - @Override - public void subscribe(@NonNull ObservableEmitter emitter) throws Exception { - emitter.onNext(s); - emitter.onComplete(); - } - }).observeOn(Schedulers.newThread()); - } - }); + return observable.concatMap(s -> Observable.just(s).observeOn(Schedulers.newThread())); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 2923eb4..1f871f7 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -60,12 +60,10 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return source.groupBy(s -> s).flatMap(g -> { - return Observable.create(emitter -> { - emitter.onNext(g.getKey()); - emitter.onComplete(); - }); - }); +// return source.groupBy(s -> s).flatMap(g -> { +// return Observable.just(g.getKey()); +// }); + return source.groupBy(s -> s).map(s -> s.getKey()); } /* @@ -74,12 +72,9 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - List list = new ArrayList(); - return source.reduce(list, (c, i) -> { - if (conditon.test(i)) c.add(i); - return c; - }).toObservable().flatMap(s -> { - return Observable.fromIterable(s); + return source.flatMap(s -> { + if (conditon.test(s)) return Observable.just(s); + return Observable.empty(); }); } @@ -89,13 +84,13 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - AtomicInteger cursor = new AtomicInteger(0); - return source.reduce((seed, s) -> { - if (cursor.incrementAndGet() == index) { - return s; - } - return seed; - }); +// source.skip(index).take(1).firstElement(); + return source.take(index + 1).lastElement(); +// AtomicInteger cursor = new AtomicInteger(0); +// return source.reduce((seed, s) -> { +// if (cursor.incrementAndGet() == index) return s; +// return seed; +// }); } /*