From 8ebac897230e1b2470cd69de0e5a7df1e86fc125 Mon Sep 17 00:00:00 2001 From: "Pan, Xinyang" Date: Mon, 26 Mar 2018 09:38:28 +0800 Subject: [PATCH 1/2] Practices all done --- .../rxjava/share/practices/Practice1.java | 16 +- .../rxjava/share/practices/Practice2.java | 51 +++--- .../rxjava/share/practices/Practice3.java | 20 ++- .../rxjava/share/practices/Practice4.java | 3 +- .../rxjava/share/practices/Practice5.java | 147 +++++++++--------- 5 files changed, 134 insertions(+), 103 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java index b432fea..1ca20b4 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice1.java @@ -24,12 +24,12 @@ */ public class Practice1 { - /* - * 举例如下: - * 参数 Observable["a","b","c"] - * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 - */ - public Observable> indexable(Observable observable) { - throw new UnsupportedOperationException("implementation"); - } + /* + * 举例如下: + * 参数 Observable["a","b","c"] + * 返回值 Observable[(1, "a"), (2, "b"), (3, "c")] 注意index从1开始 + */ + public Observable> indexable(Observable observable) { + return observable.map(s -> new Tuple2(1, s)).scan((t1, t2) -> new Tuple2(t1.getV1() + 1, t2.getV2())); + } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java index 08c7dcd..8edb9e8 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice2.java @@ -16,34 +16,45 @@ package cn.nextop.rxjava.share.practices; +import java.util.HashMap; +import java.util.Map; import cn.nextop.rxjava.share.util.type.Tuple2; import io.reactivex.Observable; import io.reactivex.Single; -import java.util.Map; - /** * @author Baoyi Chen */ public class Practice2 { - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] - */ - public Observable> wordCount1(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - - /* - * 举例: - * words = Observable["a", "a", "b", "c", "c"] - * 返回: Single[Map{a=2, b=1, c=2}] - */ - public Single> wordCount2(Observable words) { - throw new UnsupportedOperationException("implementation"); - } - + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Observable[("a", 2), ("b", 1), ("c", 2)] + */ + public Observable> wordCount1(Observable words) { + return words.groupBy(v -> v).map(gr -> { + Observable> map = gr.count().map(c -> new Tuple2(gr.getKey(), c.intValue())).toObservable(); + return map; + }).flatMap(s -> s); + } + + /* + * 举例: + * words = Observable["a", "a", "b", "c", "c"] + * 返回: Single[Map{a=2, b=1, c=2}] + */ + public Single> wordCount2(Observable words) { + return words.reduce(new HashMap(), this::map); + } + + private Map map(Map map, String s) { + Integer integer = map.get(s); + if (integer == null) { + integer = 0; + } + map.put(s, ++integer); + return map; + } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java index a43bd78..7fa9b23 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice3.java @@ -16,6 +16,9 @@ package cn.nextop.rxjava.share.practices; +import java.util.ArrayList; +import java.util.List; + import io.reactivex.Maybe; import io.reactivex.Observable; @@ -28,7 +31,7 @@ public class Practice3 { * 根据iterate的结果求和 */ public Maybe sum(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return this.iterate(observable).reduce((a,b) -> a + b); } /* @@ -42,9 +45,22 @@ public Maybe sum(Observable observable) { * return Observable[4, 3, 6, 7, 5] 顺序无关 */ public Observable iterate(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return observable.flatMapIterable(n -> { + List l = new ArrayList<>(); + this.addLeafToList(l, n); + return l; + }).map(n -> n.value); } + private void addLeafToList(List l, Node n) { + if (n == null) { + return; + } + l.add(n); + this.addLeafToList(l, n.left); + this.addLeafToList(l, n.right); + } + public static class Node { public Node left; public Node right; diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java index 33a5804..906a62f 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice4.java @@ -18,6 +18,7 @@ import io.reactivex.Observable; +import io.reactivex.schedulers.Schedulers; /** @@ -44,7 +45,7 @@ public class Practice4 { * */ public Observable runInMultiThread(Observable observable) { - throw new UnsupportedOperationException("implementation"); + return Observable.concat(observable.map(s -> Observable.just(s).observeOn(Schedulers.io()))); } } diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index 1193642..c093012 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -29,85 +29,88 @@ */ public class Practice5 { - /* - * example: - * param: Observable["a","b","c"] - * return: Single[3] - */ - public Single count(Observable source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a","b","c"] + * return: Single[3] + */ + public Single count(Observable source) { + return source.count(); + } - /* - * example: - * param: Observable[["a", "b", "c"], ["b", "c", "d"]] - * return: Observable["a", "b", "c","b", "c", "d"] - */ - public Observable convert(Observable> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable[["a", "b", "c"], ["b", "c", "d"]] + * return: Observable["a", "b", "c","b", "c", "d"] + */ + public Observable convert(Observable> source) { + return source.flatMapIterable(l -> l); + } - /* - * example: - * param: Observable["a", "a", "b", "b", "c"] - * return: Observable["a", "b", "c"] - */ - public Observable distinct(Observable source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a", "a", "b", "b", "c"] + * return: Observable["a", "b", "c"] + */ + public Observable distinct(Observable source) { + return source.distinct(); + } - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 - * return: Observable[3, 4] - */ - public Observable filter(Observable source, Predicate conditon) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , conditon = x > 2 and x < 5 + * return: Observable[3, 4] + */ + public Observable filter(Observable source, Predicate conditon) { + return source.filter(conditon::test); + } - /* - * example: - * param: Observable[1, 2, 3, 4, 5] , index = 2 - * return: Maybe[3] - */ - public Maybe elementAt(Observable source, int index) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable[1, 2, 3, 4, 5] , index = 2 + * return: Maybe[3] + */ + public Maybe elementAt(Observable source, int index) { + return source.elementAt(index); + } - /* - * example: - * param: Observable["a", "b"] , count = 2 - * return: Observable["a", "b", "a", "b"] - */ - public Observable repeat(Observable source, int count) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a", "b"] , count = 2 + * return: Observable["a", "b", "a", "b"] + */ + public Observable repeat(Observable source, int count) { + return source.repeat(count); + } - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable concat(List> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable concat(List> source) { + return Observable.concat(source); + } - /* - * example: - * param: Observable["a"], Observable["b"] - * return: Observable["a", "b"] - */ - public Observable merge(List> source) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a"], Observable["b"] + * return: Observable["a", "b"] + */ + public Observable merge(List> source) { + return Observable.merge(source); + } - /* - * example: - * param: Observable["a", "b", "c"], 1, SECONDS - * return: Observable["a", "b", "c"], 每个元素都延迟1秒 - */ - public Observable delayAll(Observable source, long delay, TimeUnit unit) { - throw new UnsupportedOperationException("implementation"); - } + /* + * example: + * param: Observable["a", "b", "c"], 1, SECONDS + * return: Observable["a", "b", "c"], 每个元素都延迟1秒 + */ + public Observable delayAll(Observable source, long delay, TimeUnit unit) { + return source.map(t -> { + unit.sleep(delay); + return t; + }); + } } From 048636aa25cfd86c3744a256f5d6d7db60fc0bdf Mon Sep 17 00:00:00 2001 From: "Pan, Xinyang" Date: Mon, 26 Mar 2018 13:00:21 +0800 Subject: [PATCH 2/2] 1 --- .../rxjava/share/practices/Practice5.java | 62 +++++++++++++++---- 1 file changed, 49 insertions(+), 13 deletions(-) diff --git a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java index c093012..f158e2b 100644 --- a/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java +++ b/src/main/java/cn/nextop/rxjava/share/practices/Practice5.java @@ -16,14 +16,15 @@ package cn.nextop.rxjava.share.practices; -import io.reactivex.Maybe; -import io.reactivex.Observable; -import io.reactivex.Single; - +import java.util.ArrayList; import java.util.List; import java.util.concurrent.TimeUnit; import java.util.function.Predicate; +import io.reactivex.Maybe; +import io.reactivex.Observable; +import io.reactivex.Single; + /** * @author Baoyi Chen */ @@ -35,7 +36,7 @@ public class Practice5 { * return: Single[3] */ public Single count(Observable source) { - return source.count(); + return source.reduce(0L, (x, y) -> x + 1L); } /* @@ -53,7 +54,7 @@ public Observable convert(Observable> source) { * return: Observable["a", "b", "c"] */ public Observable distinct(Observable source) { - return source.distinct(); + return source.groupBy(s -> s).map(g -> Observable.just(g.getKey())).flatMap(og -> og); } /* @@ -62,7 +63,21 @@ public Observable distinct(Observable source) { * return: Observable[3, 4] */ public Observable filter(Observable source, Predicate conditon) { - return source.filter(conditon::test); +// return source.map(s -> { +// if (conditon.test(s)) { +// return Observable.just(s); +// } else { +// return Observable.empty(); +// } +// }).flatMap(o -> o); + return Observable.concat(source.map(s -> { + if (conditon.test(s)) { + return Observable.just(s); + } else { + return Observable.empty(); + } + })); + } /* @@ -71,16 +86,37 @@ public Observable filter(Observable source, Predicate * return: Maybe[3] */ public Maybe elementAt(Observable source, int index) { - return source.elementAt(index); + return source.filter(new Index(index)).firstElement(); } - + + private class Index implements io.reactivex.functions.Predicate { + private int i = 0; + private final int index; + + private Index(int index) { + this.index = index; + } + + @Override + public boolean test(String t) throws Exception { + if (i++ == index) { + return true; + } + return false; + } + } + /* * example: * param: Observable["a", "b"] , count = 2 * return: Observable["a", "b", "a", "b"] */ public Observable repeat(Observable source, int count) { - return source.repeat(count); + List> list = new ArrayList<>(); + for (int i = 0; i < count; i++) { + list.add(source); + } + return Observable.concat(list); } /* @@ -89,16 +125,16 @@ public Observable repeat(Observable source, int count) { * return: Observable["a", "b"] */ public Observable concat(List> source) { - return Observable.concat(source); + return Observable.fromIterable(source).concatMapDelayError(s -> s); } - + /* * example: * param: Observable["a"], Observable["b"] * return: Observable["a", "b"] */ public Observable merge(List> source) { - return Observable.merge(source); + return Observable.fromIterable(source).flatMap(s -> s); } /*